diff options
author | Florian Dold <florian@dold.me> | 2023-07-11 15:41:48 +0200 |
---|---|---|
committer | Florian Dold <florian@dold.me> | 2023-08-22 08:01:13 +0200 |
commit | b2d0ad57ddf251a109d536cdc49fb6505dbdc50c (patch) | |
tree | 7eaeca3ad8ec97a9c1970c1004feda2d61c3441b /packages/taler-wallet-core/src/operations | |
parent | 58fdf9dc091b076787a9746c405fe6a9366f5da6 (diff) |
sqlite3 backend for idb-bridge / wallet-core
Diffstat (limited to 'packages/taler-wallet-core/src/operations')
-rw-r--r-- | packages/taler-wallet-core/src/operations/pending.ts | 350 | ||||
-rw-r--r-- | packages/taler-wallet-core/src/operations/testing.ts | 3 | ||||
-rw-r--r-- | packages/taler-wallet-core/src/operations/transactions.ts | 31 |
3 files changed, 286 insertions, 98 deletions
diff --git a/packages/taler-wallet-core/src/operations/pending.ts b/packages/taler-wallet-core/src/operations/pending.ts index cc9217d67..6c6546f83 100644 --- a/packages/taler-wallet-core/src/operations/pending.ts +++ b/packages/taler-wallet-core/src/operations/pending.ts @@ -34,13 +34,24 @@ import { WithdrawalGroupStatus, RewardRecordStatus, DepositOperationStatus, + RefreshGroupRecord, + WithdrawalGroupRecord, + DepositGroupRecord, + RewardRecord, + PurchaseRecord, + PeerPullPaymentInitiationRecord, + PeerPullPaymentIncomingRecord, + PeerPushPaymentInitiationRecord, + PeerPushPaymentIncomingRecord, + RefundGroupRecord, + RefundGroupStatus, } from "../db.js"; import { PendingOperationsResponse, PendingTaskType, TaskId, } from "../pending-types.js"; -import { AbsoluteTime } from "@gnu-taler/taler-util"; +import { AbsoluteTime, TransactionRecordFilter } from "@gnu-taler/taler-util"; import { InternalWalletState } from "../internal-wallet-state.js"; import { GetReadOnlyAccess } from "../util/query.js"; import { GlobalIDB } from "@gnu-taler/idb-bridge"; @@ -105,6 +116,32 @@ async function gatherExchangePending( }); } +/** + * Iterate refresh records based on a filter. + */ +export async function iterRecordsForRefresh( + tx: GetReadOnlyAccess<{ + refreshGroups: typeof WalletStoresV1.refreshGroups; + }>, + filter: TransactionRecordFilter, + f: (r: RefreshGroupRecord) => Promise<void>, +): Promise<void> { + let refreshGroups: RefreshGroupRecord[]; + if (filter.onlyState === "nonfinal") { + const keyRange = GlobalIDB.KeyRange.bound( + OperationStatusRange.ACTIVE_START, + OperationStatusRange.ACTIVE_END, + ); + refreshGroups = await tx.refreshGroups.indexes.byStatus.getAll(keyRange); + } else { + refreshGroups = await tx.refreshGroups.indexes.byStatus.getAll(); + } + + for (const r of refreshGroups) { + await f(r); + } +} + async function gatherRefreshPending( ws: InternalWalletState, tx: GetReadOnlyAccess<{ @@ -114,22 +151,13 @@ async function gatherRefreshPending( now: AbsoluteTime, resp: PendingOperationsResponse, ): Promise<void> { - const keyRange = GlobalIDB.KeyRange.bound( - OperationStatusRange.ACTIVE_START, - OperationStatusRange.ACTIVE_END, - ); - const refreshGroups = await tx.refreshGroups.indexes.byStatus.getAll( - keyRange, - ); - for (const r of refreshGroups) { + await iterRecordsForRefresh(tx, { onlyState: "nonfinal" }, async (r) => { if (r.timestampFinished) { return; } const opId = TaskIdentifiers.forRefresh(r); const retryRecord = await tx.operationRetries.get(opId); - const timestampDue = retryRecord?.retryInfo.nextRetry ?? AbsoluteTime.now(); - resp.pendingOperations.push({ type: PendingTaskType.Refresh, ...getPendingCommon(ws, opId, timestampDue), @@ -140,6 +168,30 @@ async function gatherRefreshPending( ), retryInfo: retryRecord?.retryInfo, }); + }); +} + +export async function iterRecordsForWithdrawal( + tx: GetReadOnlyAccess<{ + withdrawalGroups: typeof WalletStoresV1.withdrawalGroups; + }>, + filter: TransactionRecordFilter, + f: (r: WithdrawalGroupRecord) => Promise<void>, +): Promise<void> { + let withdrawalGroupRecords: WithdrawalGroupRecord[]; + if (filter.onlyState === "nonfinal") { + const range = GlobalIDB.KeyRange.bound( + WithdrawalGroupStatus.PendingRegisteringBank, + WithdrawalGroupStatus.PendingAml, + ); + withdrawalGroupRecords = + await tx.withdrawalGroups.indexes.byStatus.getAll(range); + } else { + withdrawalGroupRecords = + await tx.withdrawalGroups.indexes.byStatus.getAll(); + } + for (const wgr of withdrawalGroupRecords) { + await f(wgr); } } @@ -153,12 +205,7 @@ async function gatherWithdrawalPending( now: AbsoluteTime, resp: PendingOperationsResponse, ): Promise<void> { - const range = GlobalIDB.KeyRange.bound( - WithdrawalGroupStatus.PendingRegisteringBank, - WithdrawalGroupStatus.PendingAml, - ); - const wsrs = await tx.withdrawalGroups.indexes.byStatus.getAll(range); - for (const wsr of wsrs) { + await iterRecordsForWithdrawal(tx, { onlyState: "nonfinal" }, async (wsr) => { const opTag = TaskIdentifiers.forWithdrawal(wsr); let opr = await tx.operationRetries.get(opTag); const now = AbsoluteTime.now(); @@ -184,6 +231,30 @@ async function gatherWithdrawalPending( lastError: opr.lastError, retryInfo: opr.retryInfo, }); + }); +} + +export async function iterRecordsForDeposit( + tx: GetReadOnlyAccess<{ + depositGroups: typeof WalletStoresV1.depositGroups; + }>, + filter: TransactionRecordFilter, + f: (r: DepositGroupRecord) => Promise<void>, +): Promise<void> { + let dgs: DepositGroupRecord[]; + if (filter.onlyState === "nonfinal") { + dgs = await tx.depositGroups.indexes.byStatus.getAll( + GlobalIDB.KeyRange.bound( + DepositOperationStatus.PendingDeposit, + DepositOperationStatus.PendingKyc, + ), + ); + } else { + dgs = await tx.depositGroups.indexes.byStatus.getAll(); + } + + for (const dg of dgs) { + await f(dg); } } @@ -196,16 +267,7 @@ async function gatherDepositPending( now: AbsoluteTime, resp: PendingOperationsResponse, ): Promise<void> { - const dgs = await tx.depositGroups.indexes.byStatus.getAll( - GlobalIDB.KeyRange.bound( - DepositOperationStatus.PendingDeposit, - DepositOperationStatus.PendingKyc, - ), - ); - for (const dg of dgs) { - if (dg.timestampFinished) { - return; - } + await iterRecordsForDeposit(tx, { onlyState: "nonfinal" }, async (dg) => { let deposited = true; for (const d of dg.depositedPerCoin) { if (!d) { @@ -226,10 +288,28 @@ async function gatherDepositPending( lastError: retryRecord?.lastError, retryInfo: retryRecord?.retryInfo, }); + }); +} + +export async function iterRecordsForReward( + tx: GetReadOnlyAccess<{ + rewards: typeof WalletStoresV1.rewards; + }>, + filter: TransactionRecordFilter, + f: (r: RewardRecord) => Promise<void>, +): Promise<void> { + if (filter.onlyState === "nonfinal") { + const range = GlobalIDB.KeyRange.bound( + RewardRecordStatus.PendingPickup, + RewardRecordStatus.PendingPickup, + ); + await tx.rewards.indexes.byStatus.iter(range).forEachAsync(f); + } else { + await tx.rewards.indexes.byStatus.iter().forEachAsync(f); } } -async function gatherTipPending( +async function gatherRewardPending( ws: InternalWalletState, tx: GetReadOnlyAccess<{ rewards: typeof WalletStoresV1.rewards; @@ -238,15 +318,7 @@ async function gatherTipPending( now: AbsoluteTime, resp: PendingOperationsResponse, ): Promise<void> { - const range = GlobalIDB.KeyRange.bound( - RewardRecordStatus.PendingPickup, - RewardRecordStatus.PendingPickup, - ); - await tx.rewards.indexes.byStatus.iter(range).forEachAsync(async (tip) => { - // FIXME: The tip record needs a proper status field! - if (tip.pickedUpTimestamp) { - return; - } + await iterRecordsForReward(tx, { onlyState: "nonfinal" }, async (tip) => { const opId = TaskIdentifiers.forTipPickup(tip); const retryRecord = await tx.operationRetries.get(opId); const timestampDue = retryRecord?.retryInfo.nextRetry ?? AbsoluteTime.now(); @@ -264,6 +336,43 @@ async function gatherTipPending( }); } +export async function iterRecordsForRefund( + tx: GetReadOnlyAccess<{ + refundGroups: typeof WalletStoresV1.refundGroups; + }>, + filter: TransactionRecordFilter, + f: (r: RefundGroupRecord) => Promise<void>, +): Promise<void> { + if (filter.onlyState === "nonfinal") { + const keyRange = GlobalIDB.KeyRange.only( + RefundGroupStatus.Pending + ); + await tx.refundGroups.indexes.byStatus + .iter(keyRange) + .forEachAsync(f); + } else { + await tx.refundGroups.iter().forEachAsync(f); + } +} + +export async function iterRecordsForPurchase( + tx: GetReadOnlyAccess<{ + purchases: typeof WalletStoresV1.purchases; + }>, + filter: TransactionRecordFilter, + f: (r: PurchaseRecord) => Promise<void>, +): Promise<void> { + if (filter.onlyState === "nonfinal") { + const keyRange = GlobalIDB.KeyRange.bound( + PurchaseStatus.PendingDownloadingProposal, + PurchaseStatus.PendingAcceptRefund, + ); + await tx.purchases.indexes.byStatus.iter(keyRange).forEachAsync(f); + } else { + await tx.purchases.indexes.byStatus.iter().forEachAsync(f); + } +} + async function gatherPurchasePending( ws: InternalWalletState, tx: GetReadOnlyAccess<{ @@ -273,27 +382,20 @@ async function gatherPurchasePending( now: AbsoluteTime, resp: PendingOperationsResponse, ): Promise<void> { - const keyRange = GlobalIDB.KeyRange.bound( - PurchaseStatus.PendingDownloadingProposal, - PurchaseStatus.PendingAcceptRefund, - ); - await tx.purchases.indexes.byStatus - .iter(keyRange) - .forEachAsync(async (pr) => { - const opId = TaskIdentifiers.forPay(pr); - const retryRecord = await tx.operationRetries.get(opId); - const timestampDue = - retryRecord?.retryInfo.nextRetry ?? AbsoluteTime.now(); - resp.pendingOperations.push({ - type: PendingTaskType.Purchase, - ...getPendingCommon(ws, opId, timestampDue), - givesLifeness: true, - statusStr: PurchaseStatus[pr.purchaseStatus], - proposalId: pr.proposalId, - retryInfo: retryRecord?.retryInfo, - lastError: retryRecord?.lastError, - }); + await iterRecordsForPurchase(tx, { onlyState: "nonfinal" }, async (pr) => { + const opId = TaskIdentifiers.forPay(pr); + const retryRecord = await tx.operationRetries.get(opId); + const timestampDue = retryRecord?.retryInfo.nextRetry ?? AbsoluteTime.now(); + resp.pendingOperations.push({ + type: PendingTaskType.Purchase, + ...getPendingCommon(ws, opId, timestampDue), + givesLifeness: true, + statusStr: PurchaseStatus[pr.purchaseStatus], + proposalId: pr.proposalId, + retryInfo: retryRecord?.retryInfo, + lastError: retryRecord?.lastError, }); + }); } async function gatherRecoupPending( @@ -362,6 +464,26 @@ async function gatherBackupPending( }); } +export async function iterRecordsForPeerPullInitiation( + tx: GetReadOnlyAccess<{ + peerPullPaymentInitiations: typeof WalletStoresV1.peerPullPaymentInitiations; + }>, + filter: TransactionRecordFilter, + f: (r: PeerPullPaymentInitiationRecord) => Promise<void>, +): Promise<void> { + if (filter.onlyState === "nonfinal") { + const keyRange = GlobalIDB.KeyRange.bound( + PeerPullPaymentInitiationStatus.PendingCreatePurse, + PeerPullPaymentInitiationStatus.AbortingDeletePurse, + ); + await tx.peerPullPaymentInitiations.indexes.byStatus + .iter(keyRange) + .forEachAsync(f); + } else { + await tx.peerPullPaymentInitiations.indexes.byStatus.iter().forEachAsync(f); + } +} + async function gatherPeerPullInitiationPending( ws: InternalWalletState, tx: GetReadOnlyAccess<{ @@ -371,13 +493,10 @@ async function gatherPeerPullInitiationPending( now: AbsoluteTime, resp: PendingOperationsResponse, ): Promise<void> { - const keyRange = GlobalIDB.KeyRange.bound( - PeerPullPaymentInitiationStatus.PendingCreatePurse, - PeerPullPaymentInitiationStatus.AbortingDeletePurse, - ); - await tx.peerPullPaymentInitiations.indexes.byStatus - .iter(keyRange) - .forEachAsync(async (pi) => { + await iterRecordsForPeerPullInitiation( + tx, + { onlyState: "nonfinal" }, + async (pi) => { const opId = TaskIdentifiers.forPeerPullPaymentInitiation(pi); const retryRecord = await tx.operationRetries.get(opId); const timestampDue = @@ -389,7 +508,28 @@ async function gatherPeerPullInitiationPending( retryInfo: retryRecord?.retryInfo, pursePub: pi.pursePub, }); - }); + }, + ); +} + +export async function iterRecordsForPeerPullDebit( + tx: GetReadOnlyAccess<{ + peerPullPaymentIncoming: typeof WalletStoresV1.peerPullPaymentIncoming; + }>, + filter: TransactionRecordFilter, + f: (r: PeerPullPaymentIncomingRecord) => Promise<void>, +): Promise<void> { + if (filter.onlyState === "nonfinal") { + const keyRange = GlobalIDB.KeyRange.bound( + PeerPullDebitRecordStatus.PendingDeposit, + PeerPullDebitRecordStatus.AbortingRefresh, + ); + await tx.peerPullPaymentIncoming.indexes.byStatus + .iter(keyRange) + .forEachAsync(f); + } else { + await tx.peerPullPaymentIncoming.indexes.byStatus.iter().forEachAsync(f); + } } async function gatherPeerPullDebitPending( @@ -401,13 +541,10 @@ async function gatherPeerPullDebitPending( now: AbsoluteTime, resp: PendingOperationsResponse, ): Promise<void> { - const keyRange = GlobalIDB.KeyRange.bound( - PeerPullDebitRecordStatus.PendingDeposit, - PeerPullDebitRecordStatus.AbortingRefresh, - ); - await tx.peerPullPaymentIncoming.indexes.byStatus - .iter(keyRange) - .forEachAsync(async (pi) => { + await iterRecordsForPeerPullDebit( + tx, + { onlyState: "nonfinal" }, + async (pi) => { const opId = TaskIdentifiers.forPeerPullPaymentDebit(pi); const retryRecord = await tx.operationRetries.get(opId); const timestampDue = @@ -419,7 +556,28 @@ async function gatherPeerPullDebitPending( retryInfo: retryRecord?.retryInfo, peerPullPaymentIncomingId: pi.peerPullPaymentIncomingId, }); - }); + }, + ); +} + +export async function iterRecordsForPeerPushInitiation( + tx: GetReadOnlyAccess<{ + peerPushPaymentInitiations: typeof WalletStoresV1.peerPushPaymentInitiations; + }>, + filter: TransactionRecordFilter, + f: (r: PeerPushPaymentInitiationRecord) => Promise<void>, +): Promise<void> { + if (filter.onlyState === "nonfinal") { + const keyRange = GlobalIDB.KeyRange.bound( + PeerPushPaymentInitiationStatus.PendingCreatePurse, + PeerPushPaymentInitiationStatus.AbortingRefresh, + ); + await tx.peerPushPaymentInitiations.indexes.byStatus + .iter(keyRange) + .forEachAsync(f); + } else { + await tx.peerPushPaymentInitiations.indexes.byStatus.iter().forEachAsync(f); + } } async function gatherPeerPushInitiationPending( @@ -431,13 +589,10 @@ async function gatherPeerPushInitiationPending( now: AbsoluteTime, resp: PendingOperationsResponse, ): Promise<void> { - const keyRange = GlobalIDB.KeyRange.bound( - PeerPushPaymentInitiationStatus.PendingCreatePurse, - PeerPushPaymentInitiationStatus.AbortingRefresh, - ); - await tx.peerPushPaymentInitiations.indexes.byStatus - .iter(keyRange) - .forEachAsync(async (pi) => { + await iterRecordsForPeerPushInitiation( + tx, + { onlyState: "nonfinal" }, + async (pi) => { const opId = TaskIdentifiers.forPeerPushPaymentInitiation(pi); const retryRecord = await tx.operationRetries.get(opId); const timestampDue = @@ -449,7 +604,28 @@ async function gatherPeerPushInitiationPending( retryInfo: retryRecord?.retryInfo, pursePub: pi.pursePub, }); - }); + }, + ); +} + +export async function iterRecordsForPeerPushCredit( + tx: GetReadOnlyAccess<{ + peerPushPaymentIncoming: typeof WalletStoresV1.peerPushPaymentIncoming; + }>, + filter: TransactionRecordFilter, + f: (r: PeerPushPaymentIncomingRecord) => Promise<void>, +): Promise<void> { + if (filter.onlyState === "nonfinal") { + const keyRange = GlobalIDB.KeyRange.bound( + PeerPushPaymentIncomingStatus.PendingMerge, + PeerPushPaymentIncomingStatus.PendingWithdrawing, + ); + await tx.peerPushPaymentIncoming.indexes.byStatus + .iter(keyRange) + .forEachAsync(f); + } else { + await tx.peerPushPaymentIncoming.indexes.byStatus.iter().forEachAsync(f); + } } async function gatherPeerPushCreditPending( @@ -465,9 +641,10 @@ async function gatherPeerPushCreditPending( PeerPushPaymentIncomingStatus.PendingMerge, PeerPushPaymentIncomingStatus.PendingWithdrawing, ); - await tx.peerPushPaymentIncoming.indexes.byStatus - .iter(keyRange) - .forEachAsync(async (pi) => { + await iterRecordsForPeerPushCredit( + tx, + { onlyState: "nonfinal" }, + async (pi) => { const opId = TaskIdentifiers.forPeerPushCredit(pi); const retryRecord = await tx.operationRetries.get(opId); const timestampDue = @@ -479,7 +656,8 @@ async function gatherPeerPushCreditPending( retryInfo: retryRecord?.retryInfo, peerPushPaymentIncomingId: pi.peerPushPaymentIncomingId, }); - }); + }, + ); } export async function getPendingOperations( @@ -513,7 +691,7 @@ export async function getPendingOperations( await gatherRefreshPending(ws, tx, now, resp); await gatherWithdrawalPending(ws, tx, now, resp); await gatherDepositPending(ws, tx, now, resp); - await gatherTipPending(ws, tx, now, resp); + await gatherRewardPending(ws, tx, now, resp); await gatherPurchasePending(ws, tx, now, resp); await gatherRecoupPending(ws, tx, now, resp); await gatherBackupPending(ws, tx, now, resp); diff --git a/packages/taler-wallet-core/src/operations/testing.ts b/packages/taler-wallet-core/src/operations/testing.ts index ea373e914..3090549d5 100644 --- a/packages/taler-wallet-core/src/operations/testing.ts +++ b/packages/taler-wallet-core/src/operations/testing.ts @@ -472,12 +472,15 @@ export async function waitUntilDone(ws: InternalWalletState): Promise<void> { p = openPromise(); const txs = await getTransactions(ws, { includeRefreshes: true, + filterByState: "nonfinal", }); let finished = true; for (const tx of txs.transactions) { switch (tx.txState.major) { case TransactionMajorState.Pending: case TransactionMajorState.Aborting: + case TransactionMajorState.Suspended: + case TransactionMajorState.SuspendedAborting: finished = false; logger.info( `continuing waiting, ${tx.transactionId} in ${tx.txState.major}(${tx.txState.minor})`, diff --git a/packages/taler-wallet-core/src/operations/transactions.ts b/packages/taler-wallet-core/src/operations/transactions.ts index a16809b36..af04cb161 100644 --- a/packages/taler-wallet-core/src/operations/transactions.ts +++ b/packages/taler-wallet-core/src/operations/transactions.ts @@ -36,6 +36,7 @@ import { TransactionByIdRequest, TransactionIdStr, TransactionMajorState, + TransactionRecordFilter, TransactionsRequest, TransactionsResponse, TransactionState, @@ -153,6 +154,7 @@ import { resumePeerPushDebitTransaction, abortPeerPushDebitTransaction, } from "./pay-peer-push-debit.js"; +import { iterRecordsForDeposit, iterRecordsForPeerPullDebit, iterRecordsForPeerPullInitiation, iterRecordsForPeerPushCredit, iterRecordsForPeerPushInitiation, iterRecordsForPurchase, iterRecordsForRefresh, iterRecordsForRefund, iterRecordsForReward, iterRecordsForWithdrawal } from "./pending.js"; const logger = new Logger("taler-wallet-core:transactions.ts"); @@ -929,6 +931,11 @@ export async function getTransactions( ): Promise<TransactionsResponse> { const transactions: Transaction[] = []; + const filter: TransactionRecordFilter = {}; + if (transactionsRequest?.filterByState) { + filter.onlyState = transactionsRequest.filterByState; + } + await ws.db .mktx((x) => [ x.coins, @@ -952,7 +959,7 @@ export async function getTransactions( x.refundGroups, ]) .runReadOnly(async (tx) => { - tx.peerPushPaymentInitiations.iter().forEachAsync(async (pi) => { + await iterRecordsForPeerPushInitiation(tx, filter, async (pi) => { const amount = Amounts.parseOrThrow(pi.amount); if (shouldSkipCurrency(transactionsRequest, amount.currency)) { @@ -968,7 +975,7 @@ export async function getTransactions( ); }); - tx.peerPullPaymentIncoming.iter().forEachAsync(async (pi) => { + await iterRecordsForPeerPullDebit(tx, filter, async (pi) => { const amount = Amounts.parseOrThrow(pi.contractTerms.amount); if (shouldSkipCurrency(transactionsRequest, amount.currency)) { return; @@ -986,7 +993,7 @@ export async function getTransactions( transactions.push(buildTransactionForPullPaymentDebit(pi)); }); - tx.peerPushPaymentIncoming.iter().forEachAsync(async (pi) => { + await iterRecordsForPeerPushCredit(tx, filter, async (pi) => { if (!pi.currency) { // Legacy transaction return; @@ -1026,8 +1033,8 @@ export async function getTransactions( ), ); }); - - tx.peerPullPaymentInitiations.iter().forEachAsync(async (pi) => { + + await iterRecordsForPeerPullInitiation(tx, filter, async (pi) => { const currency = Amounts.currencyOf(pi.amount); if (shouldSkipCurrency(transactionsRequest, currency)) { return; @@ -1060,7 +1067,7 @@ export async function getTransactions( ); }); - tx.refundGroups.iter().forEachAsync(async (refundGroup) => { + await iterRecordsForRefund(tx, filter, async (refundGroup) => { const currency = Amounts.currencyOf(refundGroup.amountRaw); if (shouldSkipCurrency(transactionsRequest, currency)) { return; @@ -1071,8 +1078,8 @@ export async function getTransactions( ); transactions.push(buildTransactionForRefund(refundGroup, contractData)); }); - - tx.refreshGroups.iter().forEachAsync(async (rg) => { + + await iterRecordsForRefresh(tx, filter, async (rg) => { if (shouldSkipCurrency(transactionsRequest, rg.currency)) { return; } @@ -1092,7 +1099,7 @@ export async function getTransactions( } }); - tx.withdrawalGroups.iter().forEachAsync(async (wsr) => { + await iterRecordsForWithdrawal(tx, filter ,async (wsr) => { if ( shouldSkipCurrency( transactionsRequest, @@ -1146,7 +1153,7 @@ export async function getTransactions( } }); - tx.depositGroups.iter().forEachAsync(async (dg) => { + await iterRecordsForDeposit(tx, filter, async (dg) => { const amount = Amounts.parseOrThrow(dg.contractTermsRaw.amount); if (shouldSkipCurrency(transactionsRequest, amount.currency)) { return; @@ -1157,7 +1164,7 @@ export async function getTransactions( transactions.push(buildTransactionForDeposit(dg, retryRecord)); }); - tx.purchases.iter().forEachAsync(async (purchase) => { + await iterRecordsForPurchase(tx, filter, async (purchase) => { const download = purchase.download; if (!download) { return; @@ -1200,7 +1207,7 @@ export async function getTransactions( ); }); - tx.rewards.iter().forEachAsync(async (tipRecord) => { + await iterRecordsForReward(tx, filter, async (tipRecord) => { if ( shouldSkipCurrency( transactionsRequest, |