wallet-core: implement coin selection repair for p2p payments

This commit is contained in:
Florian Dold 2023-06-19 12:02:43 +02:00
parent ed01d407e7
commit bcff03949b
No known key found for this signature in database
GPG Key ID: D2E4F00F29D02A4B
3 changed files with 277 additions and 80 deletions

View File

@ -158,6 +158,12 @@ export async function queryCoinInfosForSelection(
return infos; return infos;
} }
export interface PeerCoinRepair {
exchangeBaseUrl: string;
coinPubs: CoinPublicKeyString[];
contribs: AmountJson[];
}
export interface PeerCoinSelectionRequest { export interface PeerCoinSelectionRequest {
instructedAmount: AmountJson; instructedAmount: AmountJson;
@ -165,11 +171,7 @@ export interface PeerCoinSelectionRequest {
* Instruct the coin selection to repair this coin * Instruct the coin selection to repair this coin
* selection instead of selecting completely new coins. * selection instead of selecting completely new coins.
*/ */
repair?: { repair?: PeerCoinRepair;
exchangeBaseUrl: string;
coinPubs: CoinPublicKeyString[];
contribs: AmountJson[];
};
} }
export async function selectPeerCoins( export async function selectPeerCoins(

View File

@ -29,6 +29,7 @@ import {
TalerError, TalerError,
TalerErrorCode, TalerErrorCode,
TalerPreciseTimestamp, TalerPreciseTimestamp,
TalerProtocolViolationError,
TransactionAction, TransactionAction,
TransactionMajorState, TransactionMajorState,
TransactionMinorState, TransactionMinorState,
@ -44,7 +45,11 @@ import {
j2s, j2s,
parsePayPullUri, parsePayPullUri,
} from "@gnu-taler/taler-util"; } from "@gnu-taler/taler-util";
import { readSuccessResponseJsonOrThrow } from "@gnu-taler/taler-util/http"; import {
HttpResponse,
readSuccessResponseJsonOrThrow,
readTalerErrorResponse,
} from "@gnu-taler/taler-util/http";
import { import {
InternalWalletState, InternalWalletState,
PeerPullDebitRecordStatus, PeerPullDebitRecordStatus,
@ -62,6 +67,7 @@ import {
} from "../util/retries.js"; } from "../util/retries.js";
import { runOperationWithErrorReporting, spendCoins } from "./common.js"; import { runOperationWithErrorReporting, spendCoins } from "./common.js";
import { import {
PeerCoinRepair,
codecForExchangePurseStatus, codecForExchangePurseStatus,
getTotalPeerPaymentCost, getTotalPeerPaymentCost,
queryCoinInfosForSelection, queryCoinInfosForSelection,
@ -76,6 +82,84 @@ import { checkLogicInvariant } from "../util/invariants.js";
const logger = new Logger("pay-peer-pull-debit.ts"); const logger = new Logger("pay-peer-pull-debit.ts");
async function handlePurseCreationConflict(
ws: InternalWalletState,
peerPullInc: PeerPullPaymentIncomingRecord,
resp: HttpResponse,
): Promise<OperationAttemptResult> {
const pursePub = peerPullInc.pursePub;
const errResp = await readTalerErrorResponse(resp);
if (errResp.code !== TalerErrorCode.EXCHANGE_GENERIC_INSUFFICIENT_FUNDS) {
await failPeerPullDebitTransaction(ws, pursePub);
return OperationAttemptResult.finishedEmpty();
}
// FIXME: Properly parse!
const brokenCoinPub = (errResp as any).coin_pub;
logger.trace(`excluded broken coin pub=${brokenCoinPub}`);
if (!brokenCoinPub) {
// FIXME: Details!
throw new TalerProtocolViolationError();
}
const instructedAmount = Amounts.parseOrThrow(
peerPullInc.contractTerms.amount,
);
const sel = peerPullInc.coinSel;
if (!sel) {
throw Error("invalid state (coin selection expected)");
}
const repair: PeerCoinRepair = {
coinPubs: sel.coinPubs,
contribs: sel.contributions.map((x) => Amounts.parseOrThrow(x)),
exchangeBaseUrl: peerPullInc.exchangeBaseUrl,
};
const coinSelRes = await selectPeerCoins(ws, { instructedAmount, repair });
if (coinSelRes.type == "failure") {
// FIXME: Details!
throw Error(
"insufficient balance to re-select coins to repair double spending",
);
}
const totalAmount = await getTotalPeerPaymentCost(
ws,
coinSelRes.result.coins,
);
await ws.db
.mktx((x) => [x.peerPullPaymentIncoming])
.runReadWrite(async (tx) => {
const myPpi = await tx.peerPullPaymentIncoming.get(
peerPullInc.peerPullPaymentIncomingId,
);
if (!myPpi) {
return;
}
switch (myPpi.status) {
case PeerPullDebitRecordStatus.PendingDeposit:
case PeerPullDebitRecordStatus.SuspendedDeposit: {
const sel = coinSelRes.result;
myPpi.coinSel = {
coinPubs: sel.coins.map((x) => x.coinPub),
contributions: sel.coins.map((x) => x.contribution),
totalCost: Amounts.stringify(totalAmount),
};
break;
}
default:
return;
}
await tx.peerPullPaymentIncoming.put(myPpi);
});
return OperationAttemptResult.finishedEmpty();
}
async function processPeerPullDebitPendingDeposit( async function processPeerPullDebitPendingDeposit(
ws: InternalWalletState, ws: InternalWalletState,
peerPullInc: PeerPullPaymentIncomingRecord, peerPullInc: PeerPullPaymentIncomingRecord,
@ -118,81 +202,98 @@ async function processPeerPullDebitPendingDeposit(
method: "POST", method: "POST",
body: depositPayload, body: depositPayload,
}); });
if (httpResp.status === HttpStatusCode.Gone) { switch (httpResp.status) {
const transitionInfo = await ws.db case HttpStatusCode.Ok: {
.mktx((x) => [ const resp = await readSuccessResponseJsonOrThrow(
x.peerPullPaymentIncoming, httpResp,
x.refreshGroups, codecForAny(),
x.denominations, );
x.coinAvailability, logger.trace(`purse deposit response: ${j2s(resp)}`);
x.coins,
])
.runReadWrite(async (tx) => {
const pi = await tx.peerPullPaymentIncoming.get(
peerPullPaymentIncomingId,
);
if (!pi) {
throw Error("peer pull payment not found anymore");
}
if (pi.status !== PeerPullDebitRecordStatus.PendingDeposit) {
return;
}
const oldTxState = computePeerPullDebitTransactionState(pi);
const currency = Amounts.currencyOf(pi.totalCostEstimated); const transitionInfo = await ws.db
const coinPubs: CoinRefreshRequest[] = []; .mktx((x) => [x.peerPullPaymentIncoming])
.runReadWrite(async (tx) => {
const pi = await tx.peerPullPaymentIncoming.get(
peerPullPaymentIncomingId,
);
if (!pi) {
throw Error("peer pull payment not found anymore");
}
if (pi.status !== PeerPullDebitRecordStatus.PendingDeposit) {
return;
}
const oldTxState = computePeerPullDebitTransactionState(pi);
pi.status = PeerPullDebitRecordStatus.DonePaid;
const newTxState = computePeerPullDebitTransactionState(pi);
await tx.peerPullPaymentIncoming.put(pi);
return { oldTxState, newTxState };
});
notifyTransition(ws, transactionId, transitionInfo);
break;
}
case HttpStatusCode.Gone: {
const transitionInfo = await ws.db
.mktx((x) => [
x.peerPullPaymentIncoming,
x.refreshGroups,
x.denominations,
x.coinAvailability,
x.coins,
])
.runReadWrite(async (tx) => {
const pi = await tx.peerPullPaymentIncoming.get(
peerPullPaymentIncomingId,
);
if (!pi) {
throw Error("peer pull payment not found anymore");
}
if (pi.status !== PeerPullDebitRecordStatus.PendingDeposit) {
return;
}
const oldTxState = computePeerPullDebitTransactionState(pi);
if (!pi.coinSel) { const currency = Amounts.currencyOf(pi.totalCostEstimated);
throw Error("invalid db state"); const coinPubs: CoinRefreshRequest[] = [];
}
for (let i = 0; i < pi.coinSel.coinPubs.length; i++) { if (!pi.coinSel) {
coinPubs.push({ throw Error("invalid db state");
amount: pi.coinSel.contributions[i], }
coinPub: pi.coinSel.coinPubs[i],
});
}
const refresh = await createRefreshGroup( for (let i = 0; i < pi.coinSel.coinPubs.length; i++) {
ws, coinPubs.push({
tx, amount: pi.coinSel.contributions[i],
currency, coinPub: pi.coinSel.coinPubs[i],
coinPubs, });
RefreshReason.AbortPeerPushDebit, }
);
pi.status = PeerPullDebitRecordStatus.AbortingRefresh; const refresh = await createRefreshGroup(
pi.abortRefreshGroupId = refresh.refreshGroupId; ws,
const newTxState = computePeerPullDebitTransactionState(pi); tx,
await tx.peerPullPaymentIncoming.put(pi); currency,
return { oldTxState, newTxState }; coinPubs,
}); RefreshReason.AbortPeerPushDebit,
notifyTransition(ws, transactionId, transitionInfo); );
} else {
const resp = await readSuccessResponseJsonOrThrow(httpResp, codecForAny());
logger.trace(`purse deposit response: ${j2s(resp)}`);
const transitionInfo = await ws.db pi.status = PeerPullDebitRecordStatus.AbortingRefresh;
.mktx((x) => [x.peerPullPaymentIncoming]) pi.abortRefreshGroupId = refresh.refreshGroupId;
.runReadWrite(async (tx) => { const newTxState = computePeerPullDebitTransactionState(pi);
const pi = await tx.peerPullPaymentIncoming.get( await tx.peerPullPaymentIncoming.put(pi);
peerPullPaymentIncomingId, return { oldTxState, newTxState };
); });
if (!pi) { notifyTransition(ws, transactionId, transitionInfo);
throw Error("peer pull payment not found anymore"); break;
} }
if (pi.status !== PeerPullDebitRecordStatus.PendingDeposit) { case HttpStatusCode.Conflict: {
return; return handlePurseCreationConflict(ws, peerPullInc, httpResp);
} }
const oldTxState = computePeerPullDebitTransactionState(pi); default: {
pi.status = PeerPullDebitRecordStatus.DonePaid; const errResp = await readTalerErrorResponse(httpResp);
const newTxState = computePeerPullDebitTransactionState(pi); return {
await tx.peerPullPaymentIncoming.put(pi); type: OperationAttemptResultType.Error,
return { oldTxState, newTxState }; errorDetail: errResp,
}); };
notifyTransition(ws, transactionId, transitionInfo); }
} }
return { return {
type: OperationAttemptResultType.Finished, type: OperationAttemptResultType.Finished,
result: undefined, result: undefined,
@ -434,7 +535,7 @@ export async function preparePeerPullDebit(
const getPurseUrl = new URL(`purses/${pursePub}/merge`, exchangeBaseUrl); const getPurseUrl = new URL(`purses/${pursePub}/merge`, exchangeBaseUrl);
const purseHttpResp = await ws.http.get(getPurseUrl.href); const purseHttpResp = await ws.http.fetch(getPurseUrl.href);
const purseStatus = await readSuccessResponseJsonOrThrow( const purseStatus = await readSuccessResponseJsonOrThrow(
purseHttpResp, purseHttpResp,

View File

@ -28,6 +28,7 @@ import {
TalerError, TalerError,
TalerErrorCode, TalerErrorCode,
TalerPreciseTimestamp, TalerPreciseTimestamp,
TalerProtocolViolationError,
TalerUriAction, TalerUriAction,
TransactionAction, TransactionAction,
TransactionMajorState, TransactionMajorState,
@ -47,8 +48,13 @@ import {
getTotalPeerPaymentCost, getTotalPeerPaymentCost,
codecForExchangePurseStatus, codecForExchangePurseStatus,
queryCoinInfosForSelection, queryCoinInfosForSelection,
PeerCoinRepair,
} from "./pay-peer-common.js"; } from "./pay-peer-common.js";
import { readSuccessResponseJsonOrThrow } from "@gnu-taler/taler-util/http"; import {
HttpResponse,
readSuccessResponseJsonOrThrow,
readTalerErrorResponse,
} from "@gnu-taler/taler-util/http";
import { import {
PeerPushPaymentInitiationRecord, PeerPushPaymentInitiationRecord,
PeerPushPaymentInitiationStatus, PeerPushPaymentInitiationStatus,
@ -97,6 +103,73 @@ export async function checkPeerPushDebit(
}; };
} }
async function handlePurseCreationConflict(
ws: InternalWalletState,
peerPushInitiation: PeerPushPaymentInitiationRecord,
resp: HttpResponse,
): Promise<OperationAttemptResult> {
const pursePub = peerPushInitiation.pursePub;
const errResp = await readTalerErrorResponse(resp);
if (errResp.code !== TalerErrorCode.EXCHANGE_GENERIC_INSUFFICIENT_FUNDS) {
await failPeerPushDebitTransaction(ws, pursePub);
return OperationAttemptResult.finishedEmpty();
}
// FIXME: Properly parse!
const brokenCoinPub = (errResp as any).coin_pub;
logger.trace(`excluded broken coin pub=${brokenCoinPub}`);
if (!brokenCoinPub) {
// FIXME: Details!
throw new TalerProtocolViolationError();
}
const instructedAmount = Amounts.parseOrThrow(peerPushInitiation.amount);
const repair: PeerCoinRepair = {
coinPubs: peerPushInitiation.coinSel.coinPubs,
contribs: peerPushInitiation.coinSel.contributions.map((x) =>
Amounts.parseOrThrow(x),
),
exchangeBaseUrl: peerPushInitiation.exchangeBaseUrl,
};
const coinSelRes = await selectPeerCoins(ws, { instructedAmount, repair });
if (coinSelRes.type == "failure") {
// FIXME: Details!
throw Error(
"insufficient balance to re-select coins to repair double spending",
);
}
await ws.db
.mktx((x) => [x.peerPushPaymentInitiations])
.runReadWrite(async (tx) => {
const myPpi = await tx.peerPushPaymentInitiations.get(
peerPushInitiation.pursePub,
);
if (!myPpi) {
return;
}
switch (myPpi.status) {
case PeerPushPaymentInitiationStatus.PendingCreatePurse:
case PeerPushPaymentInitiationStatus.SuspendedCreatePurse: {
const sel = coinSelRes.result;
myPpi.coinSel = {
coinPubs: sel.coins.map((x) => x.coinPub),
contributions: sel.coins.map((x) => x.contribution),
}
break;
}
default:
return;
}
await tx.peerPushPaymentInitiations.put(myPpi);
});
return OperationAttemptResult.finishedEmpty();
}
async function processPeerPushDebitCreateReserve( async function processPeerPushDebitCreateReserve(
ws: InternalWalletState, ws: InternalWalletState,
peerPushInitiation: PeerPushPaymentInitiationRecord, peerPushInitiation: PeerPushPaymentInitiationRecord,
@ -175,6 +248,27 @@ async function processPeerPushDebitCreateReserve(
logger.info(`resp: ${j2s(resp)}`); logger.info(`resp: ${j2s(resp)}`);
switch (httpResp.status) {
case HttpStatusCode.Ok:
break;
case HttpStatusCode.Forbidden: {
// FIXME: Store this error!
await failPeerPushDebitTransaction(ws, pursePub);
return OperationAttemptResult.finishedEmpty();
}
case HttpStatusCode.Conflict: {
// Handle double-spending
return handlePurseCreationConflict(ws, peerPushInitiation, resp);
}
default: {
const errResp = await readTalerErrorResponse(resp);
return {
type: OperationAttemptResultType.Error,
errorDetail: errResp,
};
}
}
if (httpResp.status !== HttpStatusCode.Ok) { if (httpResp.status !== HttpStatusCode.Ok) {
// FIXME: do proper error reporting // FIXME: do proper error reporting
throw Error("got error response from exchange"); throw Error("got error response from exchange");
@ -710,17 +804,17 @@ export async function failPeerPushDebitTransaction(
switch (pushDebitRec.status) { switch (pushDebitRec.status) {
case PeerPushPaymentInitiationStatus.AbortingRefresh: case PeerPushPaymentInitiationStatus.AbortingRefresh:
case PeerPushPaymentInitiationStatus.SuspendedAbortingRefresh: case PeerPushPaymentInitiationStatus.SuspendedAbortingRefresh:
// FIXME: We also need to abort the refresh group! // FIXME: What to do about the refresh group?
newStatus = PeerPushPaymentInitiationStatus.Aborted; newStatus = PeerPushPaymentInitiationStatus.Failed;
break; break;
case PeerPushPaymentInitiationStatus.AbortingDeletePurse: case PeerPushPaymentInitiationStatus.AbortingDeletePurse:
case PeerPushPaymentInitiationStatus.SuspendedAbortingDeletePurse: case PeerPushPaymentInitiationStatus.SuspendedAbortingDeletePurse:
newStatus = PeerPushPaymentInitiationStatus.Aborted;
break;
case PeerPushPaymentInitiationStatus.PendingReady: case PeerPushPaymentInitiationStatus.PendingReady:
case PeerPushPaymentInitiationStatus.SuspendedReady: case PeerPushPaymentInitiationStatus.SuspendedReady:
case PeerPushPaymentInitiationStatus.SuspendedCreatePurse: case PeerPushPaymentInitiationStatus.SuspendedCreatePurse:
case PeerPushPaymentInitiationStatus.PendingCreatePurse: case PeerPushPaymentInitiationStatus.PendingCreatePurse:
newStatus = PeerPushPaymentInitiationStatus.Failed;
break;
case PeerPushPaymentInitiationStatus.Done: case PeerPushPaymentInitiationStatus.Done:
case PeerPushPaymentInitiationStatus.Aborted: case PeerPushPaymentInitiationStatus.Aborted:
case PeerPushPaymentInitiationStatus.Failed: case PeerPushPaymentInitiationStatus.Failed: