wallet-core: towards event-based waiting in runIntegrationTestV2

This commit is contained in:
Florian Dold 2023-06-30 23:01:48 +02:00
parent 86e9799ffd
commit 7a18e12a17
No known key found for this signature in database
GPG Key ID: D2E4F00F29D02A4B
4 changed files with 124 additions and 24 deletions

View File

@ -184,6 +184,8 @@ export interface InternalWalletState {
merchantOps: MerchantOperations; merchantOps: MerchantOperations;
refreshOps: RefreshOperations; refreshOps: RefreshOperations;
isTaskLoopRunning: boolean;
getTransactionState( getTransactionState(
ws: InternalWalletState, ws: InternalWalletState,
tx: GetReadOnlyAccess<typeof WalletStoresV1>, tx: GetReadOnlyAccess<typeof WalletStoresV1>,
@ -218,4 +220,10 @@ export interface InternalWalletState {
runSequentialized<T>(tokens: string[], f: () => Promise<T>): Promise<T>; runSequentialized<T>(tokens: string[], f: () => Promise<T>): Promise<T>;
runUntilDone(req?: { maxRetries?: number }): Promise<void>; runUntilDone(req?: { maxRetries?: number }): Promise<void>;
/**
* Ensure that a task loop is currently running.
* Starts one if no task loop is running.
*/
ensureTaskLoopRunning(): void;
} }

View File

@ -746,31 +746,37 @@ export async function initiatePeerPullPayment(
undefined, undefined,
); );
await ws.db const transitionInfo = await ws.db
.mktx((x) => [x.peerPullPaymentInitiations, x.contractTerms]) .mktx((x) => [x.peerPullPaymentInitiations, x.contractTerms])
.runReadWrite(async (tx) => { .runReadWrite(async (tx) => {
await tx.peerPullPaymentInitiations.put({ const ppi: PeerPullPaymentInitiationRecord = {
amount: req.partialContractTerms.amount, amount: req.partialContractTerms.amount,
contractTermsHash: hContractTerms, contractTermsHash: hContractTerms,
exchangeBaseUrl: exchangeBaseUrl, exchangeBaseUrl: exchangeBaseUrl,
pursePriv: pursePair.priv, pursePriv: pursePair.priv,
pursePub: pursePair.pub, pursePub: pursePair.pub,
mergePriv: mergePair.priv, mergePriv: mergePair.priv,
mergePub: mergePair.pub, mergePub: mergePair.pub,
status: PeerPullPaymentInitiationStatus.PendingCreatePurse, status: PeerPullPaymentInitiationStatus.PendingCreatePurse,
contractTerms: contractTerms, contractTerms: contractTerms,
mergeTimestamp, mergeTimestamp,
contractEncNonce, contractEncNonce,
mergeReserveRowId: mergeReserveRowId, mergeReserveRowId: mergeReserveRowId,
contractPriv: contractKeyPair.priv, contractPriv: contractKeyPair.priv,
contractPub: contractKeyPair.pub, contractPub: contractKeyPair.pub,
withdrawalGroupId, withdrawalGroupId,
estimatedAmountEffective: wi.withdrawalAmountEffective, estimatedAmountEffective: wi.withdrawalAmountEffective,
}); }
await tx.peerPullPaymentInitiations.put(ppi);
const oldTxState: TransactionState = {
major: TransactionMajorState.None,
};
const newTxState = computePeerPullCreditTransactionState(ppi);
await tx.contractTerms.put({ await tx.contractTerms.put({
contractTermsRaw: contractTerms, contractTermsRaw: contractTerms,
h: hContractTerms, h: hContractTerms,
}); });
return { oldTxState, newTxState };
}); });
const transactionId = constructTransactionIdentifier({ const transactionId = constructTransactionIdentifier({
@ -781,6 +787,10 @@ export async function initiatePeerPullPayment(
// The pending-incoming balance has changed. // The pending-incoming balance has changed.
ws.notify({ type: NotificationType.BalanceChange }); ws.notify({ type: NotificationType.BalanceChange });
notifyTransition(ws, transactionId, transitionInfo);
ws.workAvailable.trigger();
return { return {
talerUri: stringifyTalerUri({ talerUri: stringifyTalerUri({
type: TalerUriAction.PayPull, type: TalerUriAction.PayPull,

View File

@ -27,6 +27,8 @@ import {
NotificationType, NotificationType,
stringToBytes, stringToBytes,
TestPayResult, TestPayResult,
TransactionMajorState,
TransactionMinorState,
WithdrawTestBalanceRequest, WithdrawTestBalanceRequest,
} from "@gnu-taler/taler-util"; } from "@gnu-taler/taler-util";
import { import {
@ -66,6 +68,7 @@ import {
} from "./pay-peer-push-credit.js"; } from "./pay-peer-push-credit.js";
import { initiatePeerPushDebit } from "./pay-peer-push-debit.js"; import { initiatePeerPushDebit } from "./pay-peer-push-debit.js";
import { OpenedPromise, openPromise } from "../index.js"; import { OpenedPromise, openPromise } from "../index.js";
import { getTransactionById } from "./transactions.js";
const logger = new Logger("operations/testing.ts"); const logger = new Logger("operations/testing.ts");
@ -459,10 +462,45 @@ async function waitUntilDone(ws: InternalWalletState): Promise<void> {
}); });
} }
async function waitUntilPendingReady(
ws: InternalWalletState,
transactionId: string,
): Promise<void> {
logger.info(`starting waiting for ${transactionId} to be in pending(ready)`);
ws.ensureTaskLoopRunning();
let p: OpenedPromise<void> | undefined = undefined;
ws.addNotificationListener((notif) => {
if (!p) {
return;
}
if (notif.type === NotificationType.TransactionStateTransition) {
p.resolve();
}
});
while (1) {
p = openPromise();
const tx = await getTransactionById(ws, {
transactionId,
});
if (
tx.txState.major == TransactionMajorState.Pending &&
tx.txState.minor === TransactionMinorState.Ready
) {
break;
}
// Wait until transaction state changed
await p.promise;
}
logger.info(`done waiting for ${transactionId} to be in pending(ready)`);
// FIXME: Remove listener!
}
export async function runIntegrationTest2( export async function runIntegrationTest2(
ws: InternalWalletState, ws: InternalWalletState,
args: IntegrationTestV2Args, args: IntegrationTestV2Args,
): Promise<void> { ): Promise<void> {
// FIXME: Make sure that a task look is running, since we're
// waiting for notifications.
logger.info("running test with arguments", args); logger.info("running test with arguments", args);
const exchangeInfo = await updateExchangeFromUrl(ws, args.exchangeBaseUrl); const exchangeInfo = await updateExchangeFromUrl(ws, args.exchangeBaseUrl);
@ -565,6 +603,8 @@ export async function runIntegrationTest2(
}, },
}); });
await waitUntilPendingReady(ws, peerPushInit.transactionId);
const peerPushCredit = await preparePeerPushCredit(ws, { const peerPushCredit = await preparePeerPushCredit(ws, {
talerUri: peerPushInit.talerUri, talerUri: peerPushInit.talerUri,
}); });
@ -586,6 +626,8 @@ export async function runIntegrationTest2(
}, },
}); });
await waitUntilPendingReady(ws, peerPullInit.transactionId);
const peerPullInc = await preparePeerPullDebit(ws, { const peerPullInc = await preparePeerPullDebit(ws, {
talerUri: peerPullInit.talerUri, talerUri: peerPullInit.talerUri,
}); });
@ -594,6 +636,8 @@ export async function runIntegrationTest2(
peerPullPaymentIncomingId: peerPullInc.peerPullPaymentIncomingId, peerPullPaymentIncomingId: peerPullInc.peerPullPaymentIncomingId,
}); });
await ws.runUntilDone();
logger.trace("integration test: all done!"); logger.trace("integration test: all done!");
} }

View File

@ -287,9 +287,7 @@ import {
GetReadOnlyAccess, GetReadOnlyAccess,
GetReadWriteAccess, GetReadWriteAccess,
} from "./util/query.js"; } from "./util/query.js";
import { import { TaskIdentifiers } from "./operations/common.js";
TaskIdentifiers,
} from "./operations/common.js";
import { TimerAPI, TimerGroup } from "./util/timer.js"; import { TimerAPI, TimerGroup } from "./util/timer.js";
import { import {
WALLET_BANK_INTEGRATION_PROTOCOL_VERSION, WALLET_BANK_INTEGRATION_PROTOCOL_VERSION,
@ -404,6 +402,12 @@ async function runTaskLoop(
opts: RetryLoopOpts = {}, opts: RetryLoopOpts = {},
): Promise<TaskLoopResult> { ): Promise<TaskLoopResult> {
logger.info(`running task loop opts=${j2s(opts)}`); logger.info(`running task loop opts=${j2s(opts)}`);
if (ws.isTaskLoopRunning) {
logger.warn(
"task loop already running, nesting the wallet-core task loop is deprecated and should be avoided",
);
}
ws.isTaskLoopRunning = true;
let retriesExceeded = false; let retriesExceeded = false;
for (let iteration = 0; !ws.stopped; iteration++) { for (let iteration = 0; !ws.stopped; iteration++) {
const pending = await getPendingOperations(ws); const pending = await getPendingOperations(ws);
@ -434,6 +438,14 @@ async function runTaskLoop(
if (opts.stopWhenDone && numGivingLiveness === 0 && iteration !== 0) { if (opts.stopWhenDone && numGivingLiveness === 0 && iteration !== 0) {
logger.warn(`stopping, as no pending operations have lifeness`); logger.warn(`stopping, as no pending operations have lifeness`);
ws.isTaskLoopRunning = false;
return {
retriesExceeded,
};
}
if (ws.stopped) {
ws.isTaskLoopRunning = false;
return { return {
retriesExceeded, retriesExceeded,
}; };
@ -468,16 +480,24 @@ async function runTaskLoop(
} }
await runTaskWithErrorReporting(ws, p.id, async () => { await runTaskWithErrorReporting(ws, p.id, async () => {
logger.trace(`running pending ${JSON.stringify(p, undefined, 2)}`); logger.trace(`running pending ${JSON.stringify(p, undefined, 2)}`);
ws.isTaskLoopRunning = false;
return await callOperationHandler(ws, p); return await callOperationHandler(ws, p);
}); });
ws.notify({ ws.notify({
type: NotificationType.PendingOperationProcessed, type: NotificationType.PendingOperationProcessed,
id: p.id, id: p.id,
}); });
if (ws.stopped) {
ws.isTaskLoopRunning = false;
return {
retriesExceeded,
};
}
} }
} }
} }
logger.trace("exiting wallet retry loop"); logger.trace("exiting wallet task loop");
ws.isTaskLoopRunning = false;
return { return {
retriesExceeded, retriesExceeded,
}; };
@ -1575,7 +1595,9 @@ export async function handleCoreApiRequest(
}; };
} catch (e: any) { } catch (e: any) {
const err = getErrorDetailFromException(e); const err = getErrorDetailFromException(e);
logger.info(`finished wallet core request ${operation} with error: ${j2s(err)}`); logger.info(
`finished wallet core request ${operation} with error: ${j2s(err)}`,
);
return { return {
type: "error", type: "error",
operation, operation,
@ -1737,6 +1759,8 @@ class InternalWalletStateImpl implements InternalWalletState {
*/ */
private resourceLocks: Set<string> = new Set(); private resourceLocks: Set<string> = new Set();
isTaskLoopRunning: boolean = false;
config: Readonly<WalletConfig>; config: Readonly<WalletConfig>;
constructor( constructor(
@ -1948,6 +1972,20 @@ class InternalWalletStateImpl implements InternalWalletState {
} }
} }
} }
ensureTaskLoopRunning(): void {
if (this.isTaskLoopRunning) {
return;
}
runTaskLoop(this)
.catch((e) => {
logger.error("error running task loop");
logger.error(`err: ${e}`);
})
.then(() => {
logger.info("done running task loop");
});
}
} }
/** /**