diff options
Diffstat (limited to 'src/wallet-impl/reserves.ts')
-rw-r--r-- | src/wallet-impl/reserves.ts | 110 |
1 files changed, 74 insertions, 36 deletions
diff --git a/src/wallet-impl/reserves.ts b/src/wallet-impl/reserves.ts index d70f02576..f00956b46 100644 --- a/src/wallet-impl/reserves.ts +++ b/src/wallet-impl/reserves.ts @@ -20,6 +20,7 @@ import { getTimestampNow, ConfirmReserveRequest, OperationError, + NotificationType, } from "../walletTypes"; import { canonicalizeBaseUrl } from "../util/helpers"; import { InternalWalletState } from "./state"; @@ -29,6 +30,8 @@ import { CurrencyRecord, Stores, WithdrawalSessionRecord, + initRetryInfo, + updateRetryInfoTimeout, } from "../dbTypes"; import { oneShotMutate, @@ -42,13 +45,13 @@ import * as Amounts from "../util/amounts"; import { updateExchangeFromUrl, getExchangeTrust } from "./exchanges"; import { WithdrawOperationStatusResponse, ReserveStatus } from "../talerTypes"; import { assertUnreachable } from "../util/assertUnreachable"; -import { OperationFailedAndReportedError } from "../wallet"; import { encodeCrock } from "../crypto/talerCrypto"; import { randomBytes } from "../crypto/primitives/nacl-fast"; import { getVerifiedWithdrawDenomList, processWithdrawSession, } from "./withdraw"; +import { guardOperationException, OperationFailedAndReportedError } from "./errors"; const logger = new Logger("reserves.ts"); @@ -91,7 +94,9 @@ export async function createReserve( bankWithdrawStatusUrl: req.bankWithdrawStatusUrl, exchangeWire: req.exchangeWire, reserveStatus, - lastStatusQuery: undefined, + lastSuccessfulStatusQuery: undefined, + retryInfo: initRetryInfo(), + lastError: undefined, }; const senderWire = req.senderWire; @@ -171,7 +176,7 @@ export async function createReserve( // Asynchronously process the reserve, but return // to the caller already. - processReserve(ws, resp.reservePub).catch(e => { + processReserve(ws, resp.reservePub, true).catch(e => { console.error("Processing reserve failed:", e); }); @@ -188,18 +193,19 @@ export async function createReserve( export async function processReserve( ws: InternalWalletState, reservePub: string, + forceNow: boolean = false, ): Promise<void> { - const p = ws.memoProcessReserve.find(reservePub); - if (p) { - return p; - } else { - return ws.memoProcessReserve.put( - reservePub, - processReserveImpl(ws, reservePub), + return ws.memoProcessReserve.memo(reservePub, async () => { + const onOpError = (err: OperationError) => + incrementReserveRetry(ws, reservePub, err); + await guardOperationException( + () => processReserveImpl(ws, reservePub, forceNow), + onOpError, ); - } + }); } + async function registerReserveWithBank( ws: InternalWalletState, reservePub: string, @@ -235,6 +241,7 @@ async function registerReserveWithBank( } r.timestampReserveInfoPosted = getTimestampNow(); r.reserveStatus = ReserveRecordStatus.WAIT_CONFIRM_BANK; + r.retryInfo = initRetryInfo(); return r; }); return processReserveBankStatus(ws, reservePub); @@ -244,6 +251,18 @@ export async function processReserveBankStatus( ws: InternalWalletState, reservePub: string, ): Promise<void> { + const onOpError = (err: OperationError) => + incrementReserveRetry(ws, reservePub, err); + await guardOperationException( + () => processReserveBankStatusImpl(ws, reservePub), + onOpError, + ); +} + +async function processReserveBankStatusImpl( + ws: InternalWalletState, + reservePub: string, +): Promise<void> { let reserve = await oneShotGet(ws.db, Stores.reserves, reservePub); switch (reserve?.reserveStatus) { case ReserveRecordStatus.WAIT_CONFIRM_BANK: @@ -287,9 +306,10 @@ export async function processReserveBankStatus( const now = getTimestampNow(); r.timestampConfirmed = now; r.reserveStatus = ReserveRecordStatus.QUERYING_STATUS; + r.retryInfo = initRetryInfo(); return r; }); - await processReserveImpl(ws, reservePub); + await processReserveImpl(ws, reservePub, true); } else { await oneShotMutate(ws.db, Stores.reserves, reservePub, r => { switch (r.reserveStatus) { @@ -304,16 +324,24 @@ export async function processReserveBankStatus( } } -async function setReserveError( +async function incrementReserveRetry( ws: InternalWalletState, reservePub: string, - err: OperationError, + err: OperationError | undefined, ): Promise<void> { - const mut = (reserve: ReserveRecord) => { - reserve.lastError = err; - return reserve; - }; - await oneShotMutate(ws.db, Stores.reserves, reservePub, mut); + await runWithWriteTransaction(ws.db, [Stores.reserves], async tx => { + const r = await tx.get(Stores.reserves, reservePub); + if (!r) { + return; + } + if (!r.retryInfo) { + return; + } + r.retryInfo.retryCounter++; + updateRetryInfoTimeout(r.retryInfo); + r.lastError = err; + await tx.put(Stores.reserves, r); + }); } /** @@ -345,15 +373,11 @@ async function updateReserve( } catch (e) { if (e.response?.status === 404) { const m = "The exchange does not know about this reserve (yet)."; - await setReserveError(ws, reservePub, { - type: "waiting", - details: {}, - message: "The exchange does not know about this reserve (yet).", - }); - throw new OperationFailedAndReportedError(m); + await incrementReserveRetry(ws, reservePub, undefined); + return; } else { const m = e.message; - await setReserveError(ws, reservePub, { + await incrementReserveRetry(ws, reservePub, { type: "network", details: {}, message: m, @@ -369,7 +393,7 @@ async function updateReserve( } // FIXME: check / compare history! - if (!r.lastStatusQuery) { + if (!r.lastSuccessfulStatusQuery) { // FIXME: check if this matches initial expectations r.withdrawRemainingAmount = balance; } else { @@ -392,22 +416,31 @@ async function updateReserve( // We're missing some money. } } - r.lastStatusQuery = getTimestampNow(); + r.lastSuccessfulStatusQuery = getTimestampNow(); r.reserveStatus = ReserveRecordStatus.WITHDRAWING; + r.retryInfo = initRetryInfo(); return r; }); - ws.notifier.notify(); + ws.notify( { type: NotificationType.ReserveUpdated }); } async function processReserveImpl( ws: InternalWalletState, reservePub: string, + forceNow: boolean = false, ): Promise<void> { const reserve = await oneShotGet(ws.db, Stores.reserves, reservePub); if (!reserve) { console.log("not processing reserve: reserve does not exist"); return; } + if (!forceNow) { + const now = getTimestampNow(); + if (reserve.retryInfo.nextRetry.t_ms > now.t_ms) { + logger.trace("processReserve retry not due yet"); + return; + } + } logger.trace( `Processing reserve ${reservePub} with status ${reserve.reserveStatus}`, ); @@ -417,10 +450,10 @@ async function processReserveImpl( break; case ReserveRecordStatus.REGISTERING_BANK: await processReserveBankStatus(ws, reservePub); - return processReserveImpl(ws, reservePub); + return processReserveImpl(ws, reservePub, true); case ReserveRecordStatus.QUERYING_STATUS: await updateReserve(ws, reservePub); - return processReserveImpl(ws, reservePub); + return processReserveImpl(ws, reservePub, true); case ReserveRecordStatus.WITHDRAWING: await depleteReserve(ws, reservePub); break; @@ -448,12 +481,13 @@ export async function confirmReserve( } reserve.timestampConfirmed = now; reserve.reserveStatus = ReserveRecordStatus.QUERYING_STATUS; + reserve.retryInfo = initRetryInfo(); return reserve; }); - ws.notifier.notify(); + ws.notify({ type: NotificationType.ReserveUpdated }); - processReserve(ws, req.reservePub).catch(e => { + processReserve(ws, req.reservePub, true).catch(e => { console.log("processing reserve failed:", e); }); } @@ -489,7 +523,7 @@ async function depleteReserve( logger.trace(`got denom list`); if (denomsForWithdraw.length === 0) { const m = `Unable to withdraw from reserve, no denominations are available to withdraw.`; - await setReserveError(ws, reserve.reservePub, { + await incrementReserveRetry(ws, reserve.reservePub, { type: "internal", message: m, details: {}, @@ -502,7 +536,8 @@ async function depleteReserve( const withdrawalSessionId = encodeCrock(randomBytes(32)); - const totalCoinValue = Amounts.sum(denomsForWithdraw.map(x => x.value)).amount; + const totalCoinValue = Amounts.sum(denomsForWithdraw.map(x => x.value)) + .amount; const withdrawalRecord: WithdrawalSessionRecord = { withdrawSessionId: withdrawalSessionId, @@ -517,6 +552,9 @@ async function depleteReserve( withdrawn: denomsForWithdraw.map(x => false), planchets: denomsForWithdraw.map(x => undefined), totalCoinValue, + retryInfo: initRetryInfo(), + lastCoinErrors: denomsForWithdraw.map(x => undefined), + lastError: undefined, }; const totalCoinWithdrawFee = Amounts.sum( @@ -545,7 +583,7 @@ async function depleteReserve( r.withdrawRemainingAmount = remaining.amount; r.withdrawAllocatedAmount = allocated.amount; r.reserveStatus = ReserveRecordStatus.DORMANT; - + r.retryInfo = initRetryInfo(false); return r; } |