wallet-core: fix busy wait when long-polling for manual withdrawal

This commit is contained in:
Florian Dold 2022-10-07 14:23:23 +02:00
parent 1256c8704b
commit a93a0cae13
No known key found for this signature in database
GPG Key ID: D2E4F00F29D02A4B
3 changed files with 82 additions and 39 deletions

View File

@ -41,7 +41,28 @@ import { RetryTags } from "../util/retries.js";
import { Wallet } from "../wallet.js"; import { Wallet } from "../wallet.js";
import { GlobalIDB } from "@gnu-taler/idb-bridge"; import { GlobalIDB } from "@gnu-taler/idb-bridge";
function getPendingCommon(
ws: InternalWalletState,
opTag: string,
timestampDue: AbsoluteTime,
): {
id: string;
isDue: boolean;
timestampDue: AbsoluteTime;
isLongpolling: boolean;
} {
const isDue =
AbsoluteTime.isExpired(timestampDue) && !ws.activeLongpoll[opTag];
return {
id: opTag,
isDue,
timestampDue,
isLongpolling: !!ws.activeLongpoll[opTag],
};
}
async function gatherExchangePending( async function gatherExchangePending(
ws: InternalWalletState,
tx: GetReadOnlyAccess<{ tx: GetReadOnlyAccess<{
exchanges: typeof WalletStoresV1.exchanges; exchanges: typeof WalletStoresV1.exchanges;
exchangeDetails: typeof WalletStoresV1.exchangeDetails; exchangeDetails: typeof WalletStoresV1.exchangeDetails;
@ -54,12 +75,12 @@ async function gatherExchangePending(
await tx.exchanges.iter().forEachAsync(async (exch) => { await tx.exchanges.iter().forEachAsync(async (exch) => {
const opTag = RetryTags.forExchangeUpdate(exch); const opTag = RetryTags.forExchangeUpdate(exch);
let opr = await tx.operationRetries.get(opTag); let opr = await tx.operationRetries.get(opTag);
const timestampDue =
opr?.retryInfo.nextRetry ?? AbsoluteTime.fromTimestamp(exch.nextUpdate);
resp.pendingOperations.push({ resp.pendingOperations.push({
type: PendingTaskType.ExchangeUpdate, type: PendingTaskType.ExchangeUpdate,
id: opTag, ...getPendingCommon(ws, opTag, timestampDue),
givesLifeness: false, givesLifeness: false,
timestampDue:
opr?.retryInfo.nextRetry ?? AbsoluteTime.fromTimestamp(exch.nextUpdate),
exchangeBaseUrl: exch.baseUrl, exchangeBaseUrl: exch.baseUrl,
lastError: opr?.lastError, lastError: opr?.lastError,
}); });
@ -69,7 +90,7 @@ async function gatherExchangePending(
if (!opr?.lastError) { if (!opr?.lastError) {
resp.pendingOperations.push({ resp.pendingOperations.push({
type: PendingTaskType.ExchangeCheckRefresh, type: PendingTaskType.ExchangeCheckRefresh,
id: RetryTags.forExchangeCheckRefresh(exch), ...getPendingCommon(ws, opTag, timestampDue),
timestampDue: AbsoluteTime.fromTimestamp(exch.nextRefreshCheck), timestampDue: AbsoluteTime.fromTimestamp(exch.nextRefreshCheck),
givesLifeness: false, givesLifeness: false,
exchangeBaseUrl: exch.baseUrl, exchangeBaseUrl: exch.baseUrl,
@ -79,6 +100,7 @@ async function gatherExchangePending(
} }
async function gatherRefreshPending( async function gatherRefreshPending(
ws: InternalWalletState,
tx: GetReadOnlyAccess<{ tx: GetReadOnlyAccess<{
refreshGroups: typeof WalletStoresV1.refreshGroups; refreshGroups: typeof WalletStoresV1.refreshGroups;
operationRetries: typeof WalletStoresV1.operationRetries; operationRetries: typeof WalletStoresV1.operationRetries;
@ -99,11 +121,12 @@ async function gatherRefreshPending(
const opId = RetryTags.forRefresh(r); const opId = RetryTags.forRefresh(r);
const retryRecord = await tx.operationRetries.get(opId); const retryRecord = await tx.operationRetries.get(opId);
const timestampDue = retryRecord?.retryInfo.nextRetry ?? AbsoluteTime.now();
resp.pendingOperations.push({ resp.pendingOperations.push({
type: PendingTaskType.Refresh, type: PendingTaskType.Refresh,
id: opId, ...getPendingCommon(ws, opId, timestampDue),
givesLifeness: true, givesLifeness: true,
timestampDue: retryRecord?.retryInfo.nextRetry ?? AbsoluteTime.now(),
refreshGroupId: r.refreshGroupId, refreshGroupId: r.refreshGroupId,
finishedPerCoin: r.statusPerCoin.map( finishedPerCoin: r.statusPerCoin.map(
(x) => x === RefreshCoinStatus.Finished, (x) => x === RefreshCoinStatus.Finished,
@ -114,6 +137,7 @@ async function gatherRefreshPending(
} }
async function gatherWithdrawalPending( async function gatherWithdrawalPending(
ws: InternalWalletState,
tx: GetReadOnlyAccess<{ tx: GetReadOnlyAccess<{
withdrawalGroups: typeof WalletStoresV1.withdrawalGroups; withdrawalGroups: typeof WalletStoresV1.withdrawalGroups;
planchets: typeof WalletStoresV1.planchets; planchets: typeof WalletStoresV1.planchets;
@ -147,9 +171,12 @@ async function gatherWithdrawalPending(
} }
resp.pendingOperations.push({ resp.pendingOperations.push({
type: PendingTaskType.Withdraw, type: PendingTaskType.Withdraw,
id: opTag, ...getPendingCommon(
ws,
opTag,
opr.retryInfo?.nextRetry ?? AbsoluteTime.now(),
),
givesLifeness: true, givesLifeness: true,
timestampDue: opr.retryInfo?.nextRetry ?? AbsoluteTime.now(),
withdrawalGroupId: wsr.withdrawalGroupId, withdrawalGroupId: wsr.withdrawalGroupId,
lastError: opr.lastError, lastError: opr.lastError,
retryInfo: opr.retryInfo, retryInfo: opr.retryInfo,
@ -158,6 +185,7 @@ async function gatherWithdrawalPending(
} }
async function gatherProposalPending( async function gatherProposalPending(
ws: InternalWalletState,
tx: GetReadOnlyAccess<{ tx: GetReadOnlyAccess<{
proposals: typeof WalletStoresV1.proposals; proposals: typeof WalletStoresV1.proposals;
operationRetries: typeof WalletStoresV1.operationRetries; operationRetries: typeof WalletStoresV1.operationRetries;
@ -175,9 +203,8 @@ async function gatherProposalPending(
retryRecord?.retryInfo?.nextRetry ?? AbsoluteTime.now(); retryRecord?.retryInfo?.nextRetry ?? AbsoluteTime.now();
resp.pendingOperations.push({ resp.pendingOperations.push({
type: PendingTaskType.ProposalDownload, type: PendingTaskType.ProposalDownload,
id: opId, ...getPendingCommon(ws, opId, timestampDue),
givesLifeness: true, givesLifeness: true,
timestampDue,
merchantBaseUrl: proposal.merchantBaseUrl, merchantBaseUrl: proposal.merchantBaseUrl,
orderId: proposal.orderId, orderId: proposal.orderId,
proposalId: proposal.proposalId, proposalId: proposal.proposalId,
@ -190,6 +217,7 @@ async function gatherProposalPending(
} }
async function gatherDepositPending( async function gatherDepositPending(
ws: InternalWalletState,
tx: GetReadOnlyAccess<{ tx: GetReadOnlyAccess<{
depositGroups: typeof WalletStoresV1.depositGroups; depositGroups: typeof WalletStoresV1.depositGroups;
operationRetries: typeof WalletStoresV1.operationRetries; operationRetries: typeof WalletStoresV1.operationRetries;
@ -209,9 +237,8 @@ async function gatherDepositPending(
const timestampDue = retryRecord?.retryInfo.nextRetry ?? AbsoluteTime.now(); const timestampDue = retryRecord?.retryInfo.nextRetry ?? AbsoluteTime.now();
resp.pendingOperations.push({ resp.pendingOperations.push({
type: PendingTaskType.Deposit, type: PendingTaskType.Deposit,
id: opId, ...getPendingCommon(ws, opId, timestampDue),
givesLifeness: true, givesLifeness: true,
timestampDue,
depositGroupId: dg.depositGroupId, depositGroupId: dg.depositGroupId,
lastError: retryRecord?.lastError, lastError: retryRecord?.lastError,
retryInfo: retryRecord?.retryInfo, retryInfo: retryRecord?.retryInfo,
@ -220,6 +247,7 @@ async function gatherDepositPending(
} }
async function gatherTipPending( async function gatherTipPending(
ws: InternalWalletState,
tx: GetReadOnlyAccess<{ tx: GetReadOnlyAccess<{
tips: typeof WalletStoresV1.tips; tips: typeof WalletStoresV1.tips;
operationRetries: typeof WalletStoresV1.operationRetries; operationRetries: typeof WalletStoresV1.operationRetries;
@ -234,10 +262,11 @@ async function gatherTipPending(
} }
const opId = RetryTags.forTipPickup(tip); const opId = RetryTags.forTipPickup(tip);
const retryRecord = await tx.operationRetries.get(opId); const retryRecord = await tx.operationRetries.get(opId);
const timestampDue = retryRecord?.retryInfo.nextRetry ?? AbsoluteTime.now();
if (tip.acceptedTimestamp) { if (tip.acceptedTimestamp) {
resp.pendingOperations.push({ resp.pendingOperations.push({
type: PendingTaskType.TipPickup, type: PendingTaskType.TipPickup,
id: opId, ...getPendingCommon(ws, opId, timestampDue),
givesLifeness: true, givesLifeness: true,
timestampDue: retryRecord?.retryInfo.nextRetry ?? AbsoluteTime.now(), timestampDue: retryRecord?.retryInfo.nextRetry ?? AbsoluteTime.now(),
merchantBaseUrl: tip.merchantBaseUrl, merchantBaseUrl: tip.merchantBaseUrl,
@ -249,6 +278,7 @@ async function gatherTipPending(
} }
async function gatherPurchasePending( async function gatherPurchasePending(
ws: InternalWalletState,
tx: GetReadOnlyAccess<{ tx: GetReadOnlyAccess<{
purchases: typeof WalletStoresV1.purchases; purchases: typeof WalletStoresV1.purchases;
operationRetries: typeof WalletStoresV1.operationRetries; operationRetries: typeof WalletStoresV1.operationRetries;
@ -270,9 +300,8 @@ async function gatherPurchasePending(
payRetryRecord?.retryInfo.nextRetry ?? AbsoluteTime.now(); payRetryRecord?.retryInfo.nextRetry ?? AbsoluteTime.now();
resp.pendingOperations.push({ resp.pendingOperations.push({
type: PendingTaskType.Pay, type: PendingTaskType.Pay,
id: payOpId, ...getPendingCommon(ws, payOpId, timestampDue),
givesLifeness: true, givesLifeness: true,
timestampDue,
isReplay: false, isReplay: false,
proposalId: pr.proposalId, proposalId: pr.proposalId,
retryInfo: payRetryRecord?.retryInfo, retryInfo: payRetryRecord?.retryInfo,
@ -284,12 +313,12 @@ async function gatherPurchasePending(
const refundQueryRetryRecord = await tx.operationRetries.get( const refundQueryRetryRecord = await tx.operationRetries.get(
refundQueryOpId, refundQueryOpId,
); );
const timestampDue =
refundQueryRetryRecord?.retryInfo.nextRetry ?? AbsoluteTime.now();
resp.pendingOperations.push({ resp.pendingOperations.push({
type: PendingTaskType.RefundQuery, type: PendingTaskType.RefundQuery,
id: refundQueryOpId, ...getPendingCommon(ws, refundQueryOpId, timestampDue),
givesLifeness: true, givesLifeness: true,
timestampDue:
refundQueryRetryRecord?.retryInfo.nextRetry ?? AbsoluteTime.now(),
proposalId: pr.proposalId, proposalId: pr.proposalId,
retryInfo: refundQueryRetryRecord?.retryInfo, retryInfo: refundQueryRetryRecord?.retryInfo,
lastError: refundQueryRetryRecord?.lastError, lastError: refundQueryRetryRecord?.lastError,
@ -299,6 +328,7 @@ async function gatherPurchasePending(
} }
async function gatherRecoupPending( async function gatherRecoupPending(
ws: InternalWalletState,
tx: GetReadOnlyAccess<{ tx: GetReadOnlyAccess<{
recoupGroups: typeof WalletStoresV1.recoupGroups; recoupGroups: typeof WalletStoresV1.recoupGroups;
operationRetries: typeof WalletStoresV1.operationRetries; operationRetries: typeof WalletStoresV1.operationRetries;
@ -312,11 +342,11 @@ async function gatherRecoupPending(
} }
const opId = RetryTags.forRecoup(rg); const opId = RetryTags.forRecoup(rg);
const retryRecord = await tx.operationRetries.get(opId); const retryRecord = await tx.operationRetries.get(opId);
const timestampDue = retryRecord?.retryInfo.nextRetry ?? AbsoluteTime.now();
resp.pendingOperations.push({ resp.pendingOperations.push({
type: PendingTaskType.Recoup, type: PendingTaskType.Recoup,
id: opId, ...getPendingCommon(ws, opId, timestampDue),
givesLifeness: true, givesLifeness: true,
timestampDue: retryRecord?.retryInfo.nextRetry ?? AbsoluteTime.now(),
recoupGroupId: rg.recoupGroupId, recoupGroupId: rg.recoupGroupId,
retryInfo: retryRecord?.retryInfo, retryInfo: retryRecord?.retryInfo,
lastError: retryRecord?.lastError, lastError: retryRecord?.lastError,
@ -325,6 +355,7 @@ async function gatherRecoupPending(
} }
async function gatherBackupPending( async function gatherBackupPending(
ws: InternalWalletState,
tx: GetReadOnlyAccess<{ tx: GetReadOnlyAccess<{
backupProviders: typeof WalletStoresV1.backupProviders; backupProviders: typeof WalletStoresV1.backupProviders;
operationRetries: typeof WalletStoresV1.operationRetries; operationRetries: typeof WalletStoresV1.operationRetries;
@ -336,20 +367,23 @@ async function gatherBackupPending(
const opId = RetryTags.forBackup(bp); const opId = RetryTags.forBackup(bp);
const retryRecord = await tx.operationRetries.get(opId); const retryRecord = await tx.operationRetries.get(opId);
if (bp.state.tag === BackupProviderStateTag.Ready) { if (bp.state.tag === BackupProviderStateTag.Ready) {
const timestampDue = AbsoluteTime.fromTimestamp(
bp.state.nextBackupTimestamp,
);
resp.pendingOperations.push({ resp.pendingOperations.push({
type: PendingTaskType.Backup, type: PendingTaskType.Backup,
id: opId, ...getPendingCommon(ws, opId, timestampDue),
givesLifeness: false, givesLifeness: false,
timestampDue: AbsoluteTime.fromTimestamp(bp.state.nextBackupTimestamp),
backupProviderBaseUrl: bp.baseUrl, backupProviderBaseUrl: bp.baseUrl,
lastError: undefined, lastError: undefined,
}); });
} else if (bp.state.tag === BackupProviderStateTag.Retrying) { } else if (bp.state.tag === BackupProviderStateTag.Retrying) {
const timestampDue =
retryRecord?.retryInfo?.nextRetry ?? AbsoluteTime.now();
resp.pendingOperations.push({ resp.pendingOperations.push({
type: PendingTaskType.Backup, type: PendingTaskType.Backup,
id: opId, ...getPendingCommon(ws, opId, timestampDue),
givesLifeness: false, givesLifeness: false,
timestampDue: retryRecord?.retryInfo?.nextRetry ?? AbsoluteTime.now(),
backupProviderBaseUrl: bp.baseUrl, backupProviderBaseUrl: bp.baseUrl,
retryInfo: retryRecord?.retryInfo, retryInfo: retryRecord?.retryInfo,
lastError: retryRecord?.lastError, lastError: retryRecord?.lastError,
@ -382,15 +416,15 @@ export async function getPendingOperations(
const resp: PendingOperationsResponse = { const resp: PendingOperationsResponse = {
pendingOperations: [], pendingOperations: [],
}; };
await gatherExchangePending(tx, now, resp); await gatherExchangePending(ws, tx, now, resp);
await gatherRefreshPending(tx, now, resp); await gatherRefreshPending(ws, tx, now, resp);
await gatherWithdrawalPending(tx, now, resp); await gatherWithdrawalPending(ws, tx, now, resp);
await gatherProposalPending(tx, now, resp); await gatherProposalPending(ws, tx, now, resp);
await gatherDepositPending(tx, now, resp); await gatherDepositPending(ws, tx, now, resp);
await gatherTipPending(tx, now, resp); await gatherTipPending(ws, tx, now, resp);
await gatherPurchasePending(tx, now, resp); await gatherPurchasePending(ws, tx, now, resp);
await gatherRecoupPending(tx, now, resp); await gatherRecoupPending(ws, tx, now, resp);
await gatherBackupPending(tx, now, resp); await gatherBackupPending(ws, tx, now, resp);
return resp; return resp;
}); });
} }

View File

@ -26,7 +26,6 @@
*/ */
import { import {
TalerErrorDetail, TalerErrorDetail,
BalancesResponse,
AbsoluteTime, AbsoluteTime,
TalerProtocolTimestamp, TalerProtocolTimestamp,
} from "@gnu-taler/taler-util"; } from "@gnu-taler/taler-util";
@ -203,6 +202,16 @@ export interface PendingTaskInfoCommon {
*/ */
givesLifeness: boolean; givesLifeness: boolean;
/**
* Operation is active and waiting for a longpoll result.
*/
isLongpolling: boolean;
/**
* Operation is waiting to be executed.
*/
isDue: boolean;
/** /**
* Timestamp when the pending operation should be executed next. * Timestamp when the pending operation should be executed next.
*/ */

View File

@ -512,14 +512,14 @@ async function runTaskLoop(
); );
continue; continue;
} }
minDue = AbsoluteTime.min(minDue, p.timestampDue);
if (AbsoluteTime.isExpired(p.timestampDue) && !ws.activeLongpoll[p.id]) {
numDue++;
}
if (p.givesLifeness) { if (p.givesLifeness) {
numGivingLiveness++; numGivingLiveness++;
} }
if (!p.isDue) {
continue;
}
minDue = AbsoluteTime.min(minDue, p.timestampDue);
numDue++;
} }
if (opts.stopWhenDone && numGivingLiveness === 0 && iteration !== 0) { if (opts.stopWhenDone && numGivingLiveness === 0 && iteration !== 0) {