wallet-core: handle Gone in peer-pull-debit

This commit is contained in:
Florian Dold 2023-06-05 18:38:17 +02:00
parent bdb67c83a9
commit da927b5e48
No known key found for this signature in database
GPG Key ID: D2E4F00F29D02A4B
8 changed files with 150 additions and 71 deletions

View File

@ -2052,6 +2052,8 @@ export interface PeerPullPaymentIncomingRecord {
*/
totalCostEstimated: AmountString;
abortRefreshGroupId?: string;
coinSel?: PeerPullPaymentCoinSelection;
}

View File

@ -134,13 +134,6 @@ export interface RecoupOperations {
exchangeBaseUrl: string,
coinPubs: string[],
): Promise<string>;
processRecoupGroup(
ws: InternalWalletState,
recoupGroupId: string,
options?: {
forceNow?: boolean;
},
): Promise<void>;
}
export type NotificationListener = (n: WalletNotification) => void;

View File

@ -863,9 +863,7 @@ export async function updateExchangeFromUrlHandler(
if (recoupGroupId) {
// Asynchronously start recoup. This doesn't need to finish
// for the exchange update to be considered finished.
ws.recoupOps.processRecoupGroup(ws, recoupGroupId).catch((e) => {
logger.error("error while recouping coins:", e);
});
ws.workAvailable.trigger();
}
if (!updated) {

View File

@ -28,6 +28,7 @@ import {
Logger,
TalerErrorCode,
TalerPreciseTimestamp,
TalerUriAction,
TransactionAction,
TransactionMajorState,
TransactionMinorState,
@ -37,11 +38,11 @@ import {
WalletKycUuid,
codecForAny,
codecForWalletKycUuid,
constructPayPullUri,
encodeCrock,
getRandomBytes,
j2s,
makeErrorDetail,
stringifyTalerUri,
} from "@gnu-taler/taler-util";
import {
readSuccessResponseJsonOrErrorCode,
@ -741,7 +742,8 @@ export async function initiatePeerPullPayment(
});
return {
talerUri: constructPayPullUri({
talerUri: stringifyTalerUri({
type: TalerUriAction.PayPull,
exchangeBaseUrl: exchangeBaseUrl,
contractPriv: contractKeyPair.priv,
}),

View File

@ -17,8 +17,10 @@
import {
AcceptPeerPullPaymentResponse,
Amounts,
CoinRefreshRequest,
ConfirmPeerPullDebitRequest,
ExchangePurseDeposits,
HttpStatusCode,
Logger,
PeerContractTerms,
PreparePeerPullDebitRequest,
@ -48,6 +50,8 @@ import {
PeerPullDebitRecordStatus,
PeerPullPaymentIncomingRecord,
PendingTaskType,
RefreshOperationStatus,
createRefreshGroup,
} from "../index.js";
import { assertUnreachable } from "../util/assertUnreachable.js";
import {
@ -68,6 +72,7 @@ import {
notifyTransition,
stopLongpolling,
} from "./transactions.js";
import { checkLogicInvariant } from "../util/invariants.js";
const logger = new Logger("pay-peer-pull-debit.ts");
@ -104,24 +109,89 @@ async function processPeerPullDebitPendingDeposit(
logger.trace(`purse deposit payload: ${j2s(depositPayload)}`);
}
const httpResp = await ws.http.postJson(purseDepositUrl.href, depositPayload);
const resp = await readSuccessResponseJsonOrThrow(httpResp, codecForAny());
logger.trace(`purse deposit response: ${j2s(resp)}`);
const transactionId = constructTransactionIdentifier({
tag: TransactionType.PeerPullDebit,
peerPullPaymentIncomingId,
});
await ws.db
.mktx((x) => [x.peerPullPaymentIncoming])
.runReadWrite(async (tx) => {
const pi = await tx.peerPullPaymentIncoming.get(
peerPullPaymentIncomingId,
);
if (!pi) {
throw Error("peer pull payment not found anymore");
}
if (pi.status === PeerPullDebitRecordStatus.PendingDeposit) {
const httpResp = await ws.http.fetch(purseDepositUrl.href, {
method: "POST",
body: depositPayload,
});
if (httpResp.status === HttpStatusCode.Gone) {
const transitionInfo = await ws.db
.mktx((x) => [
x.peerPullPaymentIncoming,
x.refreshGroups,
x.denominations,
x.coinAvailability,
x.coins,
])
.runReadWrite(async (tx) => {
const pi = await tx.peerPullPaymentIncoming.get(
peerPullPaymentIncomingId,
);
if (!pi) {
throw Error("peer pull payment not found anymore");
}
if (pi.status !== PeerPullDebitRecordStatus.PendingDeposit) {
return;
}
const oldTxState = computePeerPullDebitTransactionState(pi);
const currency = Amounts.currencyOf(pi.totalCostEstimated);
const coinPubs: CoinRefreshRequest[] = [];
if (!pi.coinSel) {
throw Error("invalid db state");
}
for (let i = 0; i < pi.coinSel.coinPubs.length; i++) {
coinPubs.push({
amount: pi.coinSel.contributions[i],
coinPub: pi.coinSel.coinPubs[i],
});
}
const refresh = await createRefreshGroup(
ws,
tx,
currency,
coinPubs,
RefreshReason.AbortPeerPushDebit,
);
pi.status = PeerPullDebitRecordStatus.AbortingRefresh;
pi.abortRefreshGroupId = refresh.refreshGroupId;
const newTxState = computePeerPullDebitTransactionState(pi);
await tx.peerPullPaymentIncoming.put(pi);
return { oldTxState, newTxState };
});
notifyTransition(ws, transactionId, transitionInfo);
} else {
const resp = await readSuccessResponseJsonOrThrow(httpResp, codecForAny());
logger.trace(`purse deposit response: ${j2s(resp)}`);
const transitionInfo = await ws.db
.mktx((x) => [x.peerPullPaymentIncoming])
.runReadWrite(async (tx) => {
const pi = await tx.peerPullPaymentIncoming.get(
peerPullPaymentIncomingId,
);
if (!pi) {
throw Error("peer pull payment not found anymore");
}
if (pi.status !== PeerPullDebitRecordStatus.PendingDeposit) {
return;
}
const oldTxState = computePeerPullDebitTransactionState(pi);
pi.status = PeerPullDebitRecordStatus.DonePaid;
}
await tx.peerPullPaymentIncoming.put(pi);
});
const newTxState = computePeerPullDebitTransactionState(pi);
await tx.peerPullPaymentIncoming.put(pi);
return { oldTxState, newTxState };
});
notifyTransition(ws, transactionId, transitionInfo);
}
return {
type: OperationAttemptResultType.Finished,
@ -133,7 +203,50 @@ async function processPeerPullDebitAbortingRefresh(
ws: InternalWalletState,
peerPullInc: PeerPullPaymentIncomingRecord,
): Promise<OperationAttemptResult> {
throw Error("not implemented");
const peerPullPaymentIncomingId = peerPullInc.peerPullPaymentIncomingId;
const abortRefreshGroupId = peerPullInc.abortRefreshGroupId;
checkLogicInvariant(!!abortRefreshGroupId);
const transactionId = constructTransactionIdentifier({
tag: TransactionType.PeerPullDebit,
peerPullPaymentIncomingId,
});
const transitionInfo = await ws.db
.mktx((x) => [x.refreshGroups, x.peerPullPaymentIncoming])
.runReadWrite(async (tx) => {
const refreshGroup = await tx.refreshGroups.get(abortRefreshGroupId);
let newOpState: PeerPullDebitRecordStatus | undefined;
if (!refreshGroup) {
// Maybe it got manually deleted? Means that we should
// just go into failed.
logger.warn("no aborting refresh group found for deposit group");
newOpState = PeerPullDebitRecordStatus.Failed;
} else {
if (refreshGroup.operationStatus === RefreshOperationStatus.Finished) {
newOpState = PeerPullDebitRecordStatus.Aborted;
} else if (
refreshGroup.operationStatus === RefreshOperationStatus.Failed
) {
newOpState = PeerPullDebitRecordStatus.Failed;
}
}
if (newOpState) {
const newDg = await tx.peerPullPaymentIncoming.get(
peerPullPaymentIncomingId,
);
if (!newDg) {
return;
}
const oldTxState = computePeerPullDebitTransactionState(newDg);
newDg.status = newOpState;
const newTxState = computePeerPullDebitTransactionState(newDg);
await tx.peerPullPaymentIncoming.put(newDg);
return { oldTxState, newTxState };
}
return undefined;
});
notifyTransition(ws, transactionId, transitionInfo);
// FIXME: Shouldn't this be finished in some cases?!
return OperationAttemptResult.pendingEmpty();
}
export async function processPeerPullDebit(
@ -158,7 +271,7 @@ export async function processPeerPullDebit(
return {
type: OperationAttemptResultType.Finished,
result: undefined,
}
};
}
export async function confirmPeerPullDebit(

View File

@ -304,24 +304,7 @@ async function recoupRefreshCoin(
export async function processRecoupGroup(
ws: InternalWalletState,
recoupGroupId: string,
options: {
forceNow?: boolean;
} = {},
): Promise<void> {
await unwrapOperationHandlerResultOrThrow(
await processRecoupGroupHandler(ws, recoupGroupId, options),
);
return;
}
export async function processRecoupGroupHandler(
ws: InternalWalletState,
recoupGroupId: string,
options: {
forceNow?: boolean;
} = {},
): Promise<OperationAttemptResult> {
const forceNow = options.forceNow ?? false;
let recoupGroup = await ws.db
.mktx((x) => [x.recoupGroups])
.runReadOnly(async (tx) => {

View File

@ -1273,7 +1273,6 @@ export interface WithdrawalGroupContext {
export async function processWithdrawalGroup(
ws: InternalWalletState,
withdrawalGroupId: string,
options: {} = {},
): Promise<OperationAttemptResult> {
logger.trace("processing withdrawal group", withdrawalGroupId);
const withdrawalGroup = await ws.db
@ -1303,9 +1302,8 @@ export async function processWithdrawalGroup(
switch (withdrawalGroup.status) {
case WithdrawalGroupStatus.PendingRegisteringBank:
await processReserveBankStatus(ws, withdrawalGroupId);
return await processWithdrawalGroup(ws, withdrawalGroupId, {
forceNow: true,
});
// FIXME: This will get called by the main task loop, why call it here?!
return await processWithdrawalGroup(ws, withdrawalGroupId);
case WithdrawalGroupStatus.PendingQueryingStatus: {
runLongpollAsync(ws, retryTag, (ct) => {
return queryReserve(ws, withdrawalGroupId, ct);

View File

@ -219,9 +219,7 @@ import {
} from "./operations/pay-peer-push-debit.js";
import { getPendingOperations } from "./operations/pending.js";
import {
createRecoupGroup,
processRecoupGroup,
processRecoupGroupHandler,
createRecoupGroup, processRecoupGroup,
} from "./operations/recoup.js";
import {
autoRefresh,
@ -295,27 +293,20 @@ const logger = new Logger("wallet.ts");
async function callOperationHandler(
ws: InternalWalletState,
pending: PendingTaskInfo,
forceNow = false,
): Promise<OperationAttemptResult> {
switch (pending.type) {
case PendingTaskType.ExchangeUpdate:
return await updateExchangeFromUrlHandler(ws, pending.exchangeBaseUrl, {
forceNow,
});
return await updateExchangeFromUrlHandler(ws, pending.exchangeBaseUrl);
case PendingTaskType.Refresh:
return await processRefreshGroup(ws, pending.refreshGroupId);
case PendingTaskType.Withdraw:
return await processWithdrawalGroup(ws, pending.withdrawalGroupId, {
forceNow,
});
return await processWithdrawalGroup(ws, pending.withdrawalGroupId);
case PendingTaskType.TipPickup:
return await processTip(ws, pending.tipId);
case PendingTaskType.Purchase:
return await processPurchase(ws, pending.proposalId);
case PendingTaskType.Recoup:
return await processRecoupGroupHandler(ws, pending.recoupGroupId, {
forceNow,
});
return await processRecoupGroup(ws, pending.recoupGroupId);
case PendingTaskType.ExchangeCheckRefresh:
return await autoRefresh(ws, pending.exchangeBaseUrl);
case PendingTaskType.Deposit: {
@ -342,16 +333,15 @@ async function callOperationHandler(
*/
export async function runPending(
ws: InternalWalletState,
forceNow = false,
): Promise<void> {
const pendingOpsResponse = await getPendingOperations(ws);
for (const p of pendingOpsResponse.pendingOperations) {
if (!forceNow && !AbsoluteTime.isExpired(p.timestampDue)) {
if (!AbsoluteTime.isExpired(p.timestampDue)) {
continue;
}
await runOperationWithErrorReporting(ws, p.id, async () => {
logger.trace(`running pending ${JSON.stringify(p, undefined, 2)}`);
return await callOperationHandler(ws, p, forceNow);
return await callOperationHandler(ws, p);
});
}
}
@ -1168,7 +1158,8 @@ async function dispatchRequestInternal<Op extends WalletApiOperation>(
return getContractTermsDetails(ws, req.proposalId);
}
case WalletApiOperation.RetryPendingNow: {
await runPending(ws, true);
// FIXME: Should we reset all operation retries here?
await runPending(ws);
return {};
}
case WalletApiOperation.PreparePayForUri: {
@ -1624,8 +1615,8 @@ export class Wallet {
this.ws.stop();
}
runPending(forceNow = false): Promise<void> {
return runPending(this.ws, forceNow);
runPending(): Promise<void> {
return runPending(this.ws);
}
runTaskLoop(opts?: RetryLoopOpts): Promise<TaskLoopResult> {
@ -1673,7 +1664,6 @@ class InternalWalletStateImpl implements InternalWalletState {
recoupOps: RecoupOperations = {
createRecoupGroup,
processRecoupGroup,
};
merchantOps: MerchantOperations = {