simplify task loop, test coin suspension

This commit is contained in:
Florian Dold 2021-06-22 13:52:28 +02:00
parent 7383b89cab
commit e35c2f581b
No known key found for this signature in database
GPG Key ID: D2E4F00F29D02A4B
7 changed files with 115 additions and 102 deletions

View File

@ -32,7 +32,6 @@ import {
Headers, Headers,
WALLET_EXCHANGE_PROTOCOL_VERSION, WALLET_EXCHANGE_PROTOCOL_VERSION,
WALLET_MERCHANT_PROTOCOL_VERSION, WALLET_MERCHANT_PROTOCOL_VERSION,
runRetryLoop,
Wallet, Wallet,
} from "@gnu-taler/taler-wallet-core"; } from "@gnu-taler/taler-wallet-core";

View File

@ -33,7 +33,6 @@ import {
codecForList, codecForList,
codecForString, codecForString,
Logger, Logger,
WithdrawalType,
} from "@gnu-taler/taler-util"; } from "@gnu-taler/taler-util";
import { import {
NodeHttpLib, NodeHttpLib,
@ -45,10 +44,6 @@ import {
NodeThreadCryptoWorkerFactory, NodeThreadCryptoWorkerFactory,
CryptoApi, CryptoApi,
walletCoreDebugFlags, walletCoreDebugFlags,
handleCoreApiRequest,
runPending,
runUntilDone,
getClientFromWalletState,
WalletApiOperation, WalletApiOperation,
WalletCoreApiClient, WalletCoreApiClient,
Wallet, Wallet,
@ -314,8 +309,9 @@ walletCli
.maybeOption("maxRetries", ["--max-retries"], clk.INT) .maybeOption("maxRetries", ["--max-retries"], clk.INT)
.action(async (args) => { .action(async (args) => {
await withWallet(args, async (wallet) => { await withWallet(args, async (wallet) => {
await wallet.ws.runUntilDone({ await wallet.ws.runTaskLoop({
maxRetries: args.finishPendingOpt.maxRetries, maxRetries: args.finishPendingOpt.maxRetries,
stopWhenDone: true,
}); });
wallet.ws.stop(); wallet.ws.stop();
}); });

View File

@ -22,8 +22,9 @@
/** /**
* Imports. * Imports.
*/ */
import { Amounts } from "@gnu-taler/taler-util";
import { WalletApiOperation } from "@gnu-taler/taler-wallet-core"; import { WalletApiOperation } from "@gnu-taler/taler-wallet-core";
import { CoinConfig, defaultCoinConfig } from "./denomStructures"; import { CoinConfig, defaultCoinConfig } from "./denomStructures.js";
import { import {
BankService, BankService,
ExchangeService, ExchangeService,
@ -31,8 +32,8 @@ import {
MerchantService, MerchantService,
setupDb, setupDb,
WalletCli, WalletCli,
} from "./harness"; } from "./harness.js";
import { SimpleTestEnvironment } from "./helpers"; import { SimpleTestEnvironment } from "./helpers.js";
const merchantAuthToken = "secret-token:sandbox"; const merchantAuthToken = "secret-token:sandbox";
@ -162,6 +163,63 @@ export async function runWallettestingTest(t: GlobalTestState) {
t.assertDeepEqual(txTypes, ["withdrawal", "payment"]); t.assertDeepEqual(txTypes, ["withdrawal", "payment"]);
wallet.deleteDatabase();
await wallet.client.call(WalletApiOperation.WithdrawTestBalance, {
amount: "TESTKUDOS:10",
bankBaseUrl: bank.baseUrl,
exchangeBaseUrl: exchange.baseUrl,
});
await wallet.runUntilDone();
const coinDump = await wallet.client.call(WalletApiOperation.DumpCoins, {});
console.log("coin dump:", JSON.stringify(coinDump, undefined, 2));
let susp: string | undefined;
{
for (const c of coinDump.coins) {
if (0 === Amounts.cmp(c.remaining_value, "TESTKUDOS:8")) {
susp = c.coin_pub;
}
}
}
t.assertTrue(susp !== undefined);
console.log("suspending coin");
await wallet.client.call(WalletApiOperation.SetCoinSuspended, {
coinPub: susp,
suspended: true,
});
// This should fail, as we've suspended a coin that we need
// to pay.
await t.assertThrowsAsync(async () => {
await wallet.client.call(WalletApiOperation.TestPay, {
amount: "TESTKUDOS:5",
merchantAuthToken: merchantAuthToken,
merchantBaseUrl: merchant.makeInstanceBaseUrl(),
summary: "foo",
});
});
console.log("unsuspending coin");
await wallet.client.call(WalletApiOperation.SetCoinSuspended, {
coinPub: susp,
suspended: false,
});
await wallet.client.call(WalletApiOperation.TestPay, {
amount: "TESTKUDOS:5",
merchantAuthToken: merchantAuthToken,
merchantBaseUrl: merchant.makeInstanceBaseUrl(),
summary: "foo",
});
await t.shutdown(); await t.shutdown();
} }

View File

@ -116,9 +116,15 @@ export interface InternalWalletState {
cryptoApi: CryptoApi; cryptoApi: CryptoApi;
timerGroup: TimerGroup; timerGroup: TimerGroup;
latch: AsyncCondition;
stopped: boolean; stopped: boolean;
memoRunRetryLoop: AsyncOpMemoSingle<void>;
/**
* Asynchronous condition to interrupt the sleep of the
* retry loop.
*
* Used to allow processing of new work faster.
*/
latch: AsyncCondition;
listeners: NotificationListener[]; listeners: NotificationListener[];

View File

@ -205,10 +205,7 @@ export async function getEffectiveDepositAmount(
return Amounts.sub(Amounts.sum(amt).amount, Amounts.sum(fees).amount).amount; return Amounts.sub(Amounts.sum(amt).amount, Amounts.sum(fees).amount).amount;
} }
export function isSpendableCoin( function isSpendableCoin(coin: CoinRecord, denom: DenominationRecord): boolean {
coin: CoinRecord,
denom: DenominationRecord,
): boolean {
if (coin.suspended) { if (coin.suspended) {
return false; return false;
} }
@ -721,7 +718,9 @@ async function processDownloadProposalImpl(
); );
if (!isWellFormed) { if (!isWellFormed) {
logger.trace(`malformed contract terms: ${j2s(proposalResp.contract_terms)}`); logger.trace(
`malformed contract terms: ${j2s(proposalResp.contract_terms)}`,
);
const err = makeErrorDetails( const err = makeErrorDetails(
TalerErrorCode.WALLET_CONTRACT_TERMS_MALFORMED, TalerErrorCode.WALLET_CONTRACT_TERMS_MALFORMED,
"validation for well-formedness failed", "validation for well-formedness failed",

View File

@ -276,81 +276,31 @@ export async function runPending(
} }
} }
export interface RetryLoopOpts {
/** /**
* Run the wallet until there are no more pending operations that give * Stop when the number of retries is exceeded for any pending
* liveness left. The wallet will be in a stopped state when this function * operation.
* returns without resolving to an exception.
*/ */
export async function runUntilDone(
ws: InternalWalletState,
req: {
maxRetries?: number; maxRetries?: number;
} = {},
): Promise<void> { /**
let done = false; * Stop the retry loop when all lifeness-giving pending operations
const p = new Promise<void>((resolve, reject) => { * are done.
// Monitor for conditions that means we're done or we *
// should quit with an error (due to exceeded retries). * Defaults to false.
ws.addNotificationListener((n) => { */
if (done) { stopWhenDone?: boolean;
return;
}
if (
n.type === NotificationType.WaitingForRetry &&
n.numGivingLiveness == 0
) {
done = true;
logger.trace("no liveness-giving operations left");
resolve();
}
const maxRetries = req.maxRetries;
if (!maxRetries) {
return;
}
getPendingOperations(ws)
.then((pending) => {
for (const p of pending.pendingOperations) {
if (p.retryInfo && p.retryInfo.retryCounter > maxRetries) {
console.warn(
`stopping, as ${maxRetries} retries are exceeded in an operation of type ${p.type}`,
);
ws.stop();
done = true;
resolve();
}
}
})
.catch((e) => {
logger.error(e);
reject(e);
});
});
// Run this asynchronously
runRetryLoop(ws).catch((e) => {
logger.error("exception in wallet retry loop");
reject(e);
});
});
await p;
} }
/** /**
* Process pending operations and wait for scheduled operations in * Main retry loop of the wallet.
* a loop until the wallet is stopped explicitly. *
* Looks up pending operations from the wallet, runs them, repeat.
*/ */
export async function runRetryLoop(ws: InternalWalletState): Promise<void> { async function runTaskLoop(
// Make sure we only run one main loop at a time. ws: InternalWalletState,
return ws.memoRunRetryLoop.memo(async () => { opts: RetryLoopOpts = {},
try { ): Promise<void> {
await runRetryLoopImpl(ws);
} catch (e) {
console.error("error during retry loop execution", e);
throw e;
}
});
}
async function runRetryLoopImpl(ws: InternalWalletState): Promise<void> {
for (let iteration = 0; !ws.stopped; iteration++) { for (let iteration = 0; !ws.stopped; iteration++) {
const pending = await getPendingOperations(ws); const pending = await getPendingOperations(ws);
logger.trace(`pending operations: ${j2s(pending)}`); logger.trace(`pending operations: ${j2s(pending)}`);
@ -365,7 +315,22 @@ async function runRetryLoopImpl(ws: InternalWalletState): Promise<void> {
if (p.givesLifeness) { if (p.givesLifeness) {
numGivingLiveness++; numGivingLiveness++;
} }
const maxRetries = opts.maxRetries;
if (maxRetries && p.retryInfo && p.retryInfo.retryCounter > maxRetries) {
logger.warn(
`stopping, as ${maxRetries} retries are exceeded in an operation of type ${p.type}`,
);
return;
} }
}
if (opts.stopWhenDone && numGivingLiveness === 0) {
logger.warn(`stopping, as no pending operations have lifeness`);
return;
}
// Make sure that we run tasks that don't give lifeness at least // Make sure that we run tasks that don't give lifeness at least
// one time. // one time.
if (iteration !== 0 && numDue === 0) { if (iteration !== 0 && numDue === 0) {
@ -993,19 +958,15 @@ export class Wallet {
} }
runRetryLoop(): Promise<void> { runRetryLoop(): Promise<void> {
return runRetryLoop(this.ws); return runTaskLoop(this.ws);
} }
runPending(forceNow: boolean = false) { runPending(forceNow: boolean = false) {
return runPending(this.ws, forceNow); return runPending(this.ws, forceNow);
} }
runUntilDone( runTaskLoop(opts: RetryLoopOpts) {
req: { return runTaskLoop(this.ws, opts);
maxRetries?: number;
} = {},
) {
return runUntilDone(this.ws, req);
} }
handleCoreApiRequest( handleCoreApiRequest(
@ -1035,7 +996,6 @@ class InternalWalletStateImpl implements InternalWalletState {
timerGroup: TimerGroup = new TimerGroup(); timerGroup: TimerGroup = new TimerGroup();
latch = new AsyncCondition(); latch = new AsyncCondition();
stopped = false; stopped = false;
memoRunRetryLoop = new AsyncOpMemoSingle<void>();
listeners: NotificationListener[] = []; listeners: NotificationListener[] = [];
@ -1102,7 +1062,7 @@ class InternalWalletStateImpl implements InternalWalletState {
maxRetries?: number; maxRetries?: number;
} = {}, } = {},
): Promise<void> { ): Promise<void> {
runUntilDone(this, req); await runTaskLoop(this, { ...req, stopWhenDone: true });
} }
/** /**

View File

@ -33,12 +33,7 @@ import {
deleteTalerDatabase, deleteTalerDatabase,
DbAccess, DbAccess,
WalletStoresV1, WalletStoresV1,
handleCoreApiRequest,
runRetryLoop,
handleNotifyReserve,
InternalWalletState,
Wallet, Wallet,
WalletApiOperation,
} from "@gnu-taler/taler-wallet-core"; } from "@gnu-taler/taler-wallet-core";
import { import {
classifyTalerUri, classifyTalerUri,