wallet-core: get rid of internal runUntilDone usages

This commit is contained in:
Florian Dold 2023-07-01 00:52:14 +02:00
parent 7a18e12a17
commit f93ab03a1b
No known key found for this signature in database
GPG Key ID: D2E4F00F29D02A4B
7 changed files with 92 additions and 68 deletions

View File

@ -145,6 +145,8 @@ export interface ActiveLongpollInfo {
};
}
export type CancelFn = () => void;
/**
* Internal, shared wallet state that is used by the implementation
* of wallet operations.
@ -206,7 +208,7 @@ export interface InternalWalletState {
notify(n: WalletNotification): void;
addNotificationListener(f: (n: WalletNotification) => void): void;
addNotificationListener(f: (n: WalletNotification) => void): CancelFn;
/**
* Stop ongoing processing.
@ -219,8 +221,6 @@ export interface InternalWalletState {
*/
runSequentialized<T>(tokens: string[], f: () => Promise<T>): Promise<T>;
runUntilDone(req?: { maxRetries?: number }): Promise<void>;
/**
* Ensure that a task loop is currently running.
* Starts one if no task loop is running.

View File

@ -64,9 +64,7 @@ import { checkLogicInvariant } from "../util/invariants.js";
import {
TaskRunResult,
TaskRunResultType,
TaskIdentifiers,
constructTaskIdentifier,
runTaskWithErrorReporting,
spendCoins,
} from "./common.js";
import {

View File

@ -26,36 +26,34 @@
*/
import {
Amounts,
CoinStatus,
Logger,
RefreshReason,
TalerPreciseTimestamp,
URL,
codecForRecoupConfirmation,
codecForReserveStatus,
CoinStatus,
encodeCrock,
getRandomBytes,
j2s,
Logger,
NotificationType,
RefreshReason,
TalerProtocolTimestamp,
TalerPreciseTimestamp,
URL,
} from "@gnu-taler/taler-util";
import { readSuccessResponseJsonOrThrow } from "@gnu-taler/taler-util/http";
import {
CoinRecord,
CoinSourceType,
RecoupGroupRecord,
RefreshCoinSource,
WalletStoresV1,
WithdrawCoinSource,
WithdrawalGroupStatus,
WithdrawalRecordType,
WithdrawCoinSource,
} from "../db.js";
import { InternalWalletState } from "../internal-wallet-state.js";
import { readSuccessResponseJsonOrThrow } from "@gnu-taler/taler-util/http";
import { checkDbInvariant } from "../util/invariants.js";
import { GetReadWriteAccess } from "../util/query.js";
import { createRefreshGroup, processRefreshGroup } from "./refresh.js";
import { internalCreateWithdrawalGroup } from "./withdraw.js";
import { TaskRunResult } from "./common.js";
import { createRefreshGroup } from "./refresh.js";
import { internalCreateWithdrawalGroup } from "./withdraw.js";
const logger = new Logger("operations/recoup.ts");
@ -402,9 +400,6 @@ export async function processRecoupGroup(
rg2.scheduleRefreshCoins,
RefreshReason.Recoup,
);
processRefreshGroup(ws, refreshGroupId.refreshGroupId).catch((e) => {
logger.error(`error while refreshing after recoup ${e}`);
});
}
await tx.recoupGroups.put(rg2);
});

View File

@ -810,9 +810,9 @@ export async function processRefreshGroup(
}),
);
try {
logger.trace("waiting for refreshes");
logger.info("waiting for refreshes");
await Promise.all(ps);
logger.trace("refresh finished");
logger.info("refresh group finished");
} catch (e) {
logger.warn("process refresh sessions got exception");
logger.warn(`exception: ${e}`);
@ -1066,13 +1066,6 @@ export async function createRefreshGroup(
logger.info(`created refresh group ${refreshGroupId}`);
processRefreshGroup(ws, refreshGroupId).catch((e) => {
if (e instanceof CryptoApiStoppedError) {
return;
}
logger.warn(`processing refresh group ${refreshGroupId} failed: ${e}`);
});
return {
refreshGroupId,
};

View File

@ -68,7 +68,7 @@ import {
} from "./pay-peer-push-credit.js";
import { initiatePeerPushDebit } from "./pay-peer-push-debit.js";
import { OpenedPromise, openPromise } from "../index.js";
import { getTransactionById } from "./transactions.js";
import { getTransactionById, getTransactions } from "./transactions.js";
const logger = new Logger("operations/testing.ts");
@ -378,7 +378,7 @@ export async function runIntegrationTest(
bankAccessApiBaseUrl: args.bankAccessApiBaseUrl,
exchangeBaseUrl: args.exchangeBaseUrl,
});
await ws.runUntilDone();
await waitUntilDone(ws);
logger.info("done withdrawing test balance");
const balance = await getBalances(ws);
@ -393,7 +393,7 @@ export async function runIntegrationTest(
await makePayment(ws, myMerchant, args.amountToSpend, "hello world");
// Wait until the refresh is done
await ws.runUntilDone();
await waitUntilDone(ws);
logger.trace("withdrawing test balance for refund");
const withdrawAmountTwo = Amounts.parseOrThrow(`${currency}:18`);
@ -408,7 +408,7 @@ export async function runIntegrationTest(
});
// Wait until the withdraw is done
await ws.runUntilDone();
await waitUntilDone(ws);
const { orderId: refundOrderId } = await makePayment(
ws,
@ -432,7 +432,7 @@ export async function runIntegrationTest(
logger.trace("integration test: applied refund");
// Wait until the refund is done
await ws.runUntilDone();
await waitUntilDone(ws);
logger.trace("integration test: making payment after refund");
@ -445,21 +445,52 @@ export async function runIntegrationTest(
logger.trace("integration test: make payment done");
await ws.runUntilDone();
await waitUntilDone(ws);
logger.trace("integration test: all done!");
}
async function waitUntilDone(ws: InternalWalletState): Promise<void> {
logger.info("waiting until all transactions are in a final state");
ws.ensureTaskLoopRunning();
let p: OpenedPromise<void> | undefined = undefined;
ws.addNotificationListener((notif) => {
const cancelNotifs = ws.addNotificationListener((notif) => {
if (!p) {
return;
}
if (notif.type === NotificationType.TransactionStateTransition) {
p.resolve();
}
// Work-around, refresh transactions don't properly emit transition notifications yet.
if (notif.type === NotificationType.PendingOperationProcessed) {
p.resolve();
}
});
while (1) {
p = openPromise();
const txs = await getTransactions(ws, {
includeRefreshes: true,
});
let finished = true;
for (const tx of txs.transactions) {
switch (tx.txState.major) {
case TransactionMajorState.Pending:
case TransactionMajorState.Aborting:
finished = false;
logger.info(
`continuing waiting, ${tx.transactionId} in ${tx.txState.major}(${tx.txState.minor})`,
);
break;
}
}
if (finished) {
break;
}
// Wait until transaction state changed
await p.promise;
}
cancelNotifs();
logger.info("done waiting until all transactions are in a final state");
}
async function waitUntilPendingReady(
@ -469,7 +500,7 @@ async function waitUntilPendingReady(
logger.info(`starting waiting for ${transactionId} to be in pending(ready)`);
ws.ensureTaskLoopRunning();
let p: OpenedPromise<void> | undefined = undefined;
ws.addNotificationListener((notif) => {
const cancelNotifs = ws.addNotificationListener((notif) => {
if (!p) {
return;
}
@ -492,7 +523,7 @@ async function waitUntilPendingReady(
await p.promise;
}
logger.info(`done waiting for ${transactionId} to be in pending(ready)`);
// FIXME: Remove listener!
cancelNotifs();
}
export async function runIntegrationTest2(
@ -516,7 +547,7 @@ export async function runIntegrationTest2(
bankAccessApiBaseUrl: args.bankAccessApiBaseUrl,
exchangeBaseUrl: args.exchangeBaseUrl,
});
await ws.runUntilDone();
await waitUntilDone(ws);
logger.info("done withdrawing test balance");
const balance = await getBalances(ws);
@ -536,7 +567,7 @@ export async function runIntegrationTest2(
);
// Wait until the refresh is done
await ws.runUntilDone();
await waitUntilDone(ws);
logger.trace("withdrawing test balance for refund");
const withdrawAmountTwo = Amounts.parseOrThrow(`${currency}:18`);
@ -551,7 +582,7 @@ export async function runIntegrationTest2(
});
// Wait until the withdraw is done
await ws.runUntilDone();
await waitUntilDone(ws);
const { orderId: refundOrderId } = await makePayment(
ws,
@ -575,7 +606,7 @@ export async function runIntegrationTest2(
logger.trace("integration test: applied refund");
// Wait until the refund is done
await ws.runUntilDone();
await waitUntilDone(ws);
logger.trace("integration test: making payment after refund");
@ -588,7 +619,7 @@ export async function runIntegrationTest2(
logger.trace("integration test: make payment done");
await ws.runUntilDone();
await waitUntilDone(ws);
const peerPushInit = await initiatePeerPushDebit(ws, {
partialContractTerms: {
@ -636,7 +667,7 @@ export async function runIntegrationTest2(
peerPullPaymentIncomingId: peerPullInc.peerPullPaymentIncomingId,
});
await ws.runUntilDone();
await waitUntilDone(ws);
logger.trace("integration test: all done!");
}

View File

@ -64,7 +64,6 @@ import {
codecForAddKnownBankAccounts,
codecForAny,
codecForApplyDevExperiment,
codecForFailTransactionRequest,
codecForCheckPeerPullPaymentRequest,
codecForCheckPeerPushDebitRequest,
codecForConfirmPayRequest,
@ -72,13 +71,13 @@ import {
codecForConvertAmountRequest,
codecForCreateDepositGroupRequest,
codecForDeleteTransactionRequest,
codecForFailTransactionRequest,
codecForForceRefreshRequest,
codecForForgetKnownBankAccounts,
codecForGetAmountRequest,
codecForGetBalanceDetailRequest,
codecForGetContractTermsDetails,
codecForGetExchangeTosRequest,
codecForGetPlanForOperationRequest,
codecForGetWithdrawalDetailsForAmountRequest,
codecForGetWithdrawalDetailsForUri,
codecForImportDbRequest,
@ -141,6 +140,7 @@ import {
import { DevExperimentHttpLib, applyDevExperiment } from "./dev-experiments.js";
import {
ActiveLongpollInfo,
CancelFn,
ExchangeOperations,
InternalWalletState,
MerchantInfo,
@ -171,6 +171,7 @@ import {
import { setWalletDeviceId } from "./operations/backup/state.js";
import { getBalanceDetail, getBalances } from "./operations/balance.js";
import {
TaskIdentifiers,
TaskRunResult,
getExchangeTosStatus,
makeExchangeListItem,
@ -196,7 +197,6 @@ import {
} from "./operations/exchanges.js";
import { getMerchantInfo } from "./operations/merchants.js";
import {
computePayMerchantTransactionActions,
computePayMerchantTransactionState,
computeRefundTransactionState,
confirmPay,
@ -271,6 +271,13 @@ import {
} from "./operations/withdraw.js";
import { PendingTaskInfo, PendingTaskType } from "./pending-types.js";
import { assertUnreachable } from "./util/assertUnreachable.js";
import {
convertDepositAmount,
convertPeerPushAmount,
convertWithdrawalAmount,
getMaxDepositAmount,
getMaxPeerPushAmount,
} from "./util/coinSelection.js";
import {
createTimeline,
selectBestForOverlappingDenominations,
@ -287,7 +294,6 @@ import {
GetReadOnlyAccess,
GetReadWriteAccess,
} from "./util/query.js";
import { TaskIdentifiers } from "./operations/common.js";
import { TimerAPI, TimerGroup } from "./util/timer.js";
import {
WALLET_BANK_INTEGRATION_PROTOCOL_VERSION,
@ -301,13 +307,6 @@ import {
WalletCoreApiClient,
WalletCoreResponseType,
} from "./wallet-api-types.js";
import {
convertDepositAmount,
convertPeerPushAmount,
convertWithdrawalAmount,
getMaxDepositAmount,
getMaxPeerPushAmount,
} from "./util/coinSelection.js";
const logger = new Logger("wallet.ts");
@ -478,9 +477,8 @@ async function runTaskLoop(
if (!AbsoluteTime.isExpired(p.timestampDue)) {
continue;
}
logger.info(`running task ${p.id}`);
await runTaskWithErrorReporting(ws, p.id, async () => {
logger.trace(`running pending ${JSON.stringify(p, undefined, 2)}`);
ws.isTaskLoopRunning = false;
return await callOperationHandler(ws, p);
});
ws.notify({
@ -1679,7 +1677,7 @@ export class Wallet {
return deepMerge(Wallet.defaultConfig, param ?? {});
}
addNotificationListener(f: (n: WalletNotification) => void): void {
addNotificationListener(f: (n: WalletNotification) => void): CancelFn {
return this.ws.addNotificationListener(f);
}
@ -1906,8 +1904,14 @@ class InternalWalletStateImpl implements InternalWalletState {
}
}
addNotificationListener(f: (n: WalletNotification) => void): void {
addNotificationListener(f: (n: WalletNotification) => void): CancelFn {
this.listeners.push(f);
return () => {
const idx = this.listeners.indexOf(f);
if (idx >= 0) {
this.listeners.splice(idx, 1);
}
};
}
/**
@ -1925,14 +1929,6 @@ class InternalWalletStateImpl implements InternalWalletState {
}
}
async runUntilDone(
req: {
maxRetries?: number;
} = {},
): Promise<void> {
await runTaskLoop(this, { ...req, stopWhenDone: true });
}
/**
* Run an async function after acquiring a list of locks, identified
* by string tokens.

View File

@ -214,7 +214,13 @@ export function installNativeWalletListener(): void {
globalThis.installNativeWalletListener = installNativeWalletListener;
export async function testWithGv() {
const w = await createNativeWalletHost2();
const w = await createNativeWalletHost2({
config: {
features: {
allowHttp: true,
},
},
});
await w.wallet.client.call(WalletApiOperation.InitWallet, {});
await w.wallet.client.call(WalletApiOperation.RunIntegrationTest, {
amountToSpend: "KUDOS:1",
@ -233,13 +239,18 @@ export async function testWithLocal() {
console.log("running local test");
const w = await createNativeWalletHost2({
persistentStoragePath: "walletdb.json",
config: {
features: {
allowHttp: true,
},
},
});
console.log("created wallet");
await w.wallet.client.call(WalletApiOperation.InitWallet, {
skipDefaults: true,
});
console.log("initialized wallet");
await w.wallet.client.call(WalletApiOperation.RunIntegrationTest, {
await w.wallet.client.call(WalletApiOperation.RunIntegrationTestV2, {
amountToSpend: "TESTKUDOS:1",
amountToWithdraw: "TESTKUDOS:3",
bankAccessApiBaseUrl: "http://localhost:8082/taler-bank-access/",