wallet-core: use status consistently for querying for pending tasks

This commit is contained in:
Florian Dold 2023-06-06 16:47:32 +02:00
parent 2d243b67c8
commit f9c33136b4
No known key found for this signature in database
GPG Key ID: D2E4F00F29D02A4B
2 changed files with 100 additions and 84 deletions

View File

@ -119,7 +119,7 @@ export const CURRENT_DB_CONFIG_KEY = "currentMainDbName";
* backwards-compatible way or object stores and indices
* are added.
*/
export const WALLET_DB_MINOR_VERSION = 7;
export const WALLET_DB_MINOR_VERSION = 8;
/**
* Ranges for operation status fields.
@ -909,10 +909,11 @@ export enum RefreshOperationStatus {
}
export enum DepositGroupOperationStatus {
Finished = 50 /* DORMANT_START */,
Failed = 51 /* DORMANT_START + 1 */,
Pending = 10 /* ACTIVE_START */,
AbortingWithRefresh = 11 /* ACTIVE_START + 1 */,
Pending = 10,
AbortingWithRefresh = 11,
Finished = 50,
Failed = 51,
}
/**
@ -2437,6 +2438,9 @@ export const WalletStoresV1 = {
"merchantTipId",
"merchantBaseUrl",
]),
byStatus: describeIndex("byStatus", "status", {
versionAdded: 8,
}),
},
),
withdrawalGroups: describeStore(

View File

@ -26,12 +26,14 @@ import {
WalletStoresV1,
BackupProviderStateTag,
RefreshCoinStatus,
OperationStatus,
OperationStatusRange,
PeerPushPaymentInitiationStatus,
PeerPullDebitRecordStatus,
PeerPushPaymentIncomingStatus,
PeerPullPaymentInitiationStatus,
WithdrawalGroupStatus,
DepositGroupOperationStatus,
TipRecordStatus,
} from "../db.js";
import {
PendingOperationsResponse,
@ -79,7 +81,8 @@ async function gatherExchangePending(
const opTag = TaskIdentifiers.forExchangeUpdate(exch);
let opr = await tx.operationRetries.get(opTag);
const timestampDue =
opr?.retryInfo.nextRetry ?? AbsoluteTime.fromPreciseTimestamp(exch.nextUpdate);
opr?.retryInfo.nextRetry ??
AbsoluteTime.fromPreciseTimestamp(exch.nextUpdate);
resp.pendingOperations.push({
type: PendingTaskType.ExchangeUpdate,
...getPendingCommon(ws, opTag, timestampDue),
@ -150,16 +153,12 @@ async function gatherWithdrawalPending(
now: AbsoluteTime,
resp: PendingOperationsResponse,
): Promise<void> {
const wsrs = await tx.withdrawalGroups.indexes.byStatus.getAll(
GlobalIDB.KeyRange.bound(
OperationStatusRange.ACTIVE_START,
OperationStatusRange.ACTIVE_END,
),
const range = GlobalIDB.KeyRange.bound(
WithdrawalGroupStatus.PendingRegisteringBank,
WithdrawalGroupStatus.PendingAml,
);
const wsrs = await tx.withdrawalGroups.indexes.byStatus.getAll(range);
for (const wsr of wsrs) {
if (wsr.timestampFinish) {
return;
}
const opTag = TaskIdentifiers.forWithdrawal(wsr);
let opr = await tx.operationRetries.get(opTag);
const now = AbsoluteTime.now();
@ -199,8 +198,8 @@ async function gatherDepositPending(
): Promise<void> {
const dgs = await tx.depositGroups.indexes.byStatus.getAll(
GlobalIDB.KeyRange.bound(
OperationStatusRange.ACTIVE_START,
OperationStatusRange.ACTIVE_END,
DepositGroupOperationStatus.Pending,
DepositGroupOperationStatus.AbortingWithRefresh,
),
);
for (const dg of dgs) {
@ -239,7 +238,11 @@ async function gatherTipPending(
now: AbsoluteTime,
resp: PendingOperationsResponse,
): Promise<void> {
await tx.tips.iter().forEachAsync(async (tip) => {
const range = GlobalIDB.KeyRange.bound(
TipRecordStatus.PendingPickup,
TipRecordStatus.PendingPickup,
);
await tx.tips.indexes.byStatus.iter(range).forEachAsync(async (tip) => {
// FIXME: The tip record needs a proper status field!
if (tip.pickedUpTimestamp) {
return;
@ -271,8 +274,8 @@ async function gatherPurchasePending(
resp: PendingOperationsResponse,
): Promise<void> {
const keyRange = GlobalIDB.KeyRange.bound(
OperationStatusRange.ACTIVE_START,
OperationStatusRange.ACTIVE_END,
PurchaseStatus.PendingDownloadingProposal,
PurchaseStatus.PendingAcceptRefund,
);
await tx.purchases.indexes.byStatus
.iter(keyRange)
@ -302,6 +305,7 @@ async function gatherRecoupPending(
now: AbsoluteTime,
resp: PendingOperationsResponse,
): Promise<void> {
// FIXME: Have a status field!
await tx.recoupGroups.iter().forEachAsync(async (rg) => {
if (rg.timestampFinished) {
return;
@ -367,21 +371,25 @@ async function gatherPeerPullInitiationPending(
now: AbsoluteTime,
resp: PendingOperationsResponse,
): Promise<void> {
await tx.peerPullPaymentInitiations.iter().forEachAsync(async (pi) => {
if (pi.status === PeerPullPaymentInitiationStatus.DonePurseDeposited) {
return;
}
const opId = TaskIdentifiers.forPeerPullPaymentInitiation(pi);
const retryRecord = await tx.operationRetries.get(opId);
const timestampDue = retryRecord?.retryInfo.nextRetry ?? AbsoluteTime.now();
resp.pendingOperations.push({
type: PendingTaskType.PeerPullCredit,
...getPendingCommon(ws, opId, timestampDue),
givesLifeness: true,
retryInfo: retryRecord?.retryInfo,
pursePub: pi.pursePub,
const keyRange = GlobalIDB.KeyRange.bound(
PeerPullPaymentInitiationStatus.PendingReady,
PeerPullPaymentInitiationStatus.AbortingDeletePurse,
);
await tx.peerPullPaymentInitiations.indexes.byStatus
.iter(keyRange)
.forEachAsync(async (pi) => {
const opId = TaskIdentifiers.forPeerPullPaymentInitiation(pi);
const retryRecord = await tx.operationRetries.get(opId);
const timestampDue =
retryRecord?.retryInfo.nextRetry ?? AbsoluteTime.now();
resp.pendingOperations.push({
type: PendingTaskType.PeerPullCredit,
...getPendingCommon(ws, opId, timestampDue),
givesLifeness: true,
retryInfo: retryRecord?.retryInfo,
pursePub: pi.pursePub,
});
});
});
}
async function gatherPeerPullDebitPending(
@ -393,26 +401,25 @@ async function gatherPeerPullDebitPending(
now: AbsoluteTime,
resp: PendingOperationsResponse,
): Promise<void> {
await tx.peerPullPaymentIncoming.iter().forEachAsync(async (pi) => {
switch (pi.status) {
case PeerPullDebitRecordStatus.DonePaid:
return;
case PeerPullDebitRecordStatus.DialogProposed:
return;
case PeerPullDebitRecordStatus.PendingDeposit:
break;
}
const opId = TaskIdentifiers.forPeerPullPaymentDebit(pi);
const retryRecord = await tx.operationRetries.get(opId);
const timestampDue = retryRecord?.retryInfo.nextRetry ?? AbsoluteTime.now();
resp.pendingOperations.push({
type: PendingTaskType.PeerPullDebit,
...getPendingCommon(ws, opId, timestampDue),
givesLifeness: true,
retryInfo: retryRecord?.retryInfo,
peerPullPaymentIncomingId: pi.peerPullPaymentIncomingId,
const keyRange = GlobalIDB.KeyRange.bound(
PeerPullDebitRecordStatus.PendingDeposit,
PeerPullDebitRecordStatus.AbortingRefresh,
);
await tx.peerPullPaymentIncoming.indexes.byStatus
.iter(keyRange)
.forEachAsync(async (pi) => {
const opId = TaskIdentifiers.forPeerPullPaymentDebit(pi);
const retryRecord = await tx.operationRetries.get(opId);
const timestampDue =
retryRecord?.retryInfo.nextRetry ?? AbsoluteTime.now();
resp.pendingOperations.push({
type: PendingTaskType.PeerPullDebit,
...getPendingCommon(ws, opId, timestampDue),
givesLifeness: true,
retryInfo: retryRecord?.retryInfo,
peerPullPaymentIncomingId: pi.peerPullPaymentIncomingId,
});
});
});
}
async function gatherPeerPushInitiationPending(
@ -424,21 +431,25 @@ async function gatherPeerPushInitiationPending(
now: AbsoluteTime,
resp: PendingOperationsResponse,
): Promise<void> {
await tx.peerPushPaymentInitiations.iter().forEachAsync(async (pi) => {
if (pi.status === PeerPushPaymentInitiationStatus.Done) {
return;
}
const opId = TaskIdentifiers.forPeerPushPaymentInitiation(pi);
const retryRecord = await tx.operationRetries.get(opId);
const timestampDue = retryRecord?.retryInfo.nextRetry ?? AbsoluteTime.now();
resp.pendingOperations.push({
type: PendingTaskType.PeerPushDebit,
...getPendingCommon(ws, opId, timestampDue),
givesLifeness: true,
retryInfo: retryRecord?.retryInfo,
pursePub: pi.pursePub,
const keyRange = GlobalIDB.KeyRange.bound(
PeerPushPaymentInitiationStatus.PendingCreatePurse,
PeerPushPaymentInitiationStatus.AbortingRefresh,
);
await tx.peerPushPaymentInitiations.indexes.byStatus
.iter(keyRange)
.forEachAsync(async (pi) => {
const opId = TaskIdentifiers.forPeerPushPaymentInitiation(pi);
const retryRecord = await tx.operationRetries.get(opId);
const timestampDue =
retryRecord?.retryInfo.nextRetry ?? AbsoluteTime.now();
resp.pendingOperations.push({
type: PendingTaskType.PeerPushDebit,
...getPendingCommon(ws, opId, timestampDue),
givesLifeness: true,
retryInfo: retryRecord?.retryInfo,
pursePub: pi.pursePub,
});
});
});
}
async function gatherPeerPushCreditPending(
@ -450,24 +461,25 @@ async function gatherPeerPushCreditPending(
now: AbsoluteTime,
resp: PendingOperationsResponse,
): Promise<void> {
await tx.peerPushPaymentIncoming.iter().forEachAsync(async (pi) => {
switch (pi.status) {
case PeerPushPaymentIncomingStatus.DialogProposed:
return;
case PeerPushPaymentIncomingStatus.Done:
return;
}
const opId = TaskIdentifiers.forPeerPushCredit(pi);
const retryRecord = await tx.operationRetries.get(opId);
const timestampDue = retryRecord?.retryInfo.nextRetry ?? AbsoluteTime.now();
resp.pendingOperations.push({
type: PendingTaskType.PeerPushCredit,
...getPendingCommon(ws, opId, timestampDue),
givesLifeness: true,
retryInfo: retryRecord?.retryInfo,
peerPushPaymentIncomingId: pi.peerPushPaymentIncomingId,
const keyRange = GlobalIDB.KeyRange.bound(
PeerPushPaymentIncomingStatus.PendingMerge,
PeerPushPaymentIncomingStatus.PendingWithdrawing,
);
await tx.peerPushPaymentIncoming.indexes.byStatus
.iter(keyRange)
.forEachAsync(async (pi) => {
const opId = TaskIdentifiers.forPeerPushCredit(pi);
const retryRecord = await tx.operationRetries.get(opId);
const timestampDue =
retryRecord?.retryInfo.nextRetry ?? AbsoluteTime.now();
resp.pendingOperations.push({
type: PendingTaskType.PeerPushCredit,
...getPendingCommon(ws, opId, timestampDue),
givesLifeness: true,
retryInfo: retryRecord?.retryInfo,
peerPushPaymentIncomingId: pi.peerPushPaymentIncomingId,
});
});
});
}
export async function getPendingOperations(