From 7a18e12a175856b3d17d2bb70ec549004c281ff5 Mon Sep 17 00:00:00 2001 From: Florian Dold Date: Fri, 30 Jun 2023 23:01:48 +0200 Subject: [PATCH] wallet-core: towards event-based waiting in runIntegrationTestV2 --- .../src/internal-wallet-state.ts | 8 ++++ .../src/operations/pay-peer-pull-credit.ts | 48 +++++++++++-------- .../src/operations/testing.ts | 44 +++++++++++++++++ packages/taler-wallet-core/src/wallet.ts | 48 +++++++++++++++++-- 4 files changed, 124 insertions(+), 24 deletions(-) diff --git a/packages/taler-wallet-core/src/internal-wallet-state.ts b/packages/taler-wallet-core/src/internal-wallet-state.ts index d97703dc1..a2ca34a86 100644 --- a/packages/taler-wallet-core/src/internal-wallet-state.ts +++ b/packages/taler-wallet-core/src/internal-wallet-state.ts @@ -184,6 +184,8 @@ export interface InternalWalletState { merchantOps: MerchantOperations; refreshOps: RefreshOperations; + isTaskLoopRunning: boolean; + getTransactionState( ws: InternalWalletState, tx: GetReadOnlyAccess, @@ -218,4 +220,10 @@ export interface InternalWalletState { runSequentialized(tokens: string[], f: () => Promise): Promise; runUntilDone(req?: { maxRetries?: number }): Promise; + + /** + * Ensure that a task loop is currently running. + * Starts one if no task loop is running. + */ + ensureTaskLoopRunning(): void; } diff --git a/packages/taler-wallet-core/src/operations/pay-peer-pull-credit.ts b/packages/taler-wallet-core/src/operations/pay-peer-pull-credit.ts index 4c00ed592..c7e13754f 100644 --- a/packages/taler-wallet-core/src/operations/pay-peer-pull-credit.ts +++ b/packages/taler-wallet-core/src/operations/pay-peer-pull-credit.ts @@ -746,31 +746,37 @@ export async function initiatePeerPullPayment( undefined, ); - await ws.db + const transitionInfo = await ws.db .mktx((x) => [x.peerPullPaymentInitiations, x.contractTerms]) .runReadWrite(async (tx) => { - await tx.peerPullPaymentInitiations.put({ - amount: req.partialContractTerms.amount, - contractTermsHash: hContractTerms, - exchangeBaseUrl: exchangeBaseUrl, - pursePriv: pursePair.priv, - pursePub: pursePair.pub, - mergePriv: mergePair.priv, - mergePub: mergePair.pub, - status: PeerPullPaymentInitiationStatus.PendingCreatePurse, - contractTerms: contractTerms, - mergeTimestamp, - contractEncNonce, - mergeReserveRowId: mergeReserveRowId, - contractPriv: contractKeyPair.priv, - contractPub: contractKeyPair.pub, - withdrawalGroupId, - estimatedAmountEffective: wi.withdrawalAmountEffective, - }); + const ppi: PeerPullPaymentInitiationRecord = { + amount: req.partialContractTerms.amount, + contractTermsHash: hContractTerms, + exchangeBaseUrl: exchangeBaseUrl, + pursePriv: pursePair.priv, + pursePub: pursePair.pub, + mergePriv: mergePair.priv, + mergePub: mergePair.pub, + status: PeerPullPaymentInitiationStatus.PendingCreatePurse, + contractTerms: contractTerms, + mergeTimestamp, + contractEncNonce, + mergeReserveRowId: mergeReserveRowId, + contractPriv: contractKeyPair.priv, + contractPub: contractKeyPair.pub, + withdrawalGroupId, + estimatedAmountEffective: wi.withdrawalAmountEffective, + } + await tx.peerPullPaymentInitiations.put(ppi); + const oldTxState: TransactionState = { + major: TransactionMajorState.None, + }; + const newTxState = computePeerPullCreditTransactionState(ppi); await tx.contractTerms.put({ contractTermsRaw: contractTerms, h: hContractTerms, }); + return { oldTxState, newTxState }; }); const transactionId = constructTransactionIdentifier({ @@ -781,6 +787,10 @@ export async function initiatePeerPullPayment( // The pending-incoming balance has changed. ws.notify({ type: NotificationType.BalanceChange }); + notifyTransition(ws, transactionId, transitionInfo); + + ws.workAvailable.trigger(); + return { talerUri: stringifyTalerUri({ type: TalerUriAction.PayPull, diff --git a/packages/taler-wallet-core/src/operations/testing.ts b/packages/taler-wallet-core/src/operations/testing.ts index 77e218cd7..2a3584a0a 100644 --- a/packages/taler-wallet-core/src/operations/testing.ts +++ b/packages/taler-wallet-core/src/operations/testing.ts @@ -27,6 +27,8 @@ import { NotificationType, stringToBytes, TestPayResult, + TransactionMajorState, + TransactionMinorState, WithdrawTestBalanceRequest, } from "@gnu-taler/taler-util"; import { @@ -66,6 +68,7 @@ import { } from "./pay-peer-push-credit.js"; import { initiatePeerPushDebit } from "./pay-peer-push-debit.js"; import { OpenedPromise, openPromise } from "../index.js"; +import { getTransactionById } from "./transactions.js"; const logger = new Logger("operations/testing.ts"); @@ -459,10 +462,45 @@ async function waitUntilDone(ws: InternalWalletState): Promise { }); } +async function waitUntilPendingReady( + ws: InternalWalletState, + transactionId: string, +): Promise { + logger.info(`starting waiting for ${transactionId} to be in pending(ready)`); + ws.ensureTaskLoopRunning(); + let p: OpenedPromise | undefined = undefined; + ws.addNotificationListener((notif) => { + if (!p) { + return; + } + if (notif.type === NotificationType.TransactionStateTransition) { + p.resolve(); + } + }); + while (1) { + p = openPromise(); + const tx = await getTransactionById(ws, { + transactionId, + }); + if ( + tx.txState.major == TransactionMajorState.Pending && + tx.txState.minor === TransactionMinorState.Ready + ) { + break; + } + // Wait until transaction state changed + await p.promise; + } + logger.info(`done waiting for ${transactionId} to be in pending(ready)`); + // FIXME: Remove listener! +} + export async function runIntegrationTest2( ws: InternalWalletState, args: IntegrationTestV2Args, ): Promise { + // FIXME: Make sure that a task look is running, since we're + // waiting for notifications. logger.info("running test with arguments", args); const exchangeInfo = await updateExchangeFromUrl(ws, args.exchangeBaseUrl); @@ -565,6 +603,8 @@ export async function runIntegrationTest2( }, }); + await waitUntilPendingReady(ws, peerPushInit.transactionId); + const peerPushCredit = await preparePeerPushCredit(ws, { talerUri: peerPushInit.talerUri, }); @@ -586,6 +626,8 @@ export async function runIntegrationTest2( }, }); + await waitUntilPendingReady(ws, peerPullInit.transactionId); + const peerPullInc = await preparePeerPullDebit(ws, { talerUri: peerPullInit.talerUri, }); @@ -594,6 +636,8 @@ export async function runIntegrationTest2( peerPullPaymentIncomingId: peerPullInc.peerPullPaymentIncomingId, }); + await ws.runUntilDone(); + logger.trace("integration test: all done!"); } diff --git a/packages/taler-wallet-core/src/wallet.ts b/packages/taler-wallet-core/src/wallet.ts index 583dc33d4..1b355a32c 100644 --- a/packages/taler-wallet-core/src/wallet.ts +++ b/packages/taler-wallet-core/src/wallet.ts @@ -287,9 +287,7 @@ import { GetReadOnlyAccess, GetReadWriteAccess, } from "./util/query.js"; -import { - TaskIdentifiers, -} from "./operations/common.js"; +import { TaskIdentifiers } from "./operations/common.js"; import { TimerAPI, TimerGroup } from "./util/timer.js"; import { WALLET_BANK_INTEGRATION_PROTOCOL_VERSION, @@ -404,6 +402,12 @@ async function runTaskLoop( opts: RetryLoopOpts = {}, ): Promise { logger.info(`running task loop opts=${j2s(opts)}`); + if (ws.isTaskLoopRunning) { + logger.warn( + "task loop already running, nesting the wallet-core task loop is deprecated and should be avoided", + ); + } + ws.isTaskLoopRunning = true; let retriesExceeded = false; for (let iteration = 0; !ws.stopped; iteration++) { const pending = await getPendingOperations(ws); @@ -434,6 +438,14 @@ async function runTaskLoop( if (opts.stopWhenDone && numGivingLiveness === 0 && iteration !== 0) { logger.warn(`stopping, as no pending operations have lifeness`); + ws.isTaskLoopRunning = false; + return { + retriesExceeded, + }; + } + + if (ws.stopped) { + ws.isTaskLoopRunning = false; return { retriesExceeded, }; @@ -468,16 +480,24 @@ async function runTaskLoop( } await runTaskWithErrorReporting(ws, p.id, async () => { logger.trace(`running pending ${JSON.stringify(p, undefined, 2)}`); + ws.isTaskLoopRunning = false; return await callOperationHandler(ws, p); }); ws.notify({ type: NotificationType.PendingOperationProcessed, id: p.id, }); + if (ws.stopped) { + ws.isTaskLoopRunning = false; + return { + retriesExceeded, + }; + } } } } - logger.trace("exiting wallet retry loop"); + logger.trace("exiting wallet task loop"); + ws.isTaskLoopRunning = false; return { retriesExceeded, }; @@ -1575,7 +1595,9 @@ export async function handleCoreApiRequest( }; } catch (e: any) { const err = getErrorDetailFromException(e); - logger.info(`finished wallet core request ${operation} with error: ${j2s(err)}`); + logger.info( + `finished wallet core request ${operation} with error: ${j2s(err)}`, + ); return { type: "error", operation, @@ -1737,6 +1759,8 @@ class InternalWalletStateImpl implements InternalWalletState { */ private resourceLocks: Set = new Set(); + isTaskLoopRunning: boolean = false; + config: Readonly; constructor( @@ -1948,6 +1972,20 @@ class InternalWalletStateImpl implements InternalWalletState { } } } + + ensureTaskLoopRunning(): void { + if (this.isTaskLoopRunning) { + return; + } + runTaskLoop(this) + .catch((e) => { + logger.error("error running task loop"); + logger.error(`err: ${e}`); + }) + .then(() => { + logger.info("done running task loop"); + }); + } } /**