732 lines
22 KiB
TypeScript
732 lines
22 KiB
TypeScript
/*
|
|
This file is part of GNU Taler
|
|
(C) 2019 GNUnet e.V.
|
|
|
|
GNU Taler is free software; you can redistribute it and/or modify it under the
|
|
terms of the GNU General Public License as published by the Free Software
|
|
Foundation; either version 3, or (at your option) any later version.
|
|
|
|
GNU Taler is distributed in the hope that it will be useful, but WITHOUT ANY
|
|
WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
|
|
A PARTICULAR PURPOSE. See the GNU General Public License for more details.
|
|
|
|
You should have received a copy of the GNU General Public License along with
|
|
GNU Taler; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
|
|
*/
|
|
|
|
/**
|
|
* Derive pending tasks from the wallet database.
|
|
*/
|
|
|
|
/**
|
|
* Imports.
|
|
*/
|
|
import { GlobalIDB } from "@gnu-taler/idb-bridge";
|
|
import { AbsoluteTime, TransactionRecordFilter } from "@gnu-taler/taler-util";
|
|
import {
|
|
BackupProviderStateTag,
|
|
DepositElementStatus,
|
|
DepositGroupRecord,
|
|
DepositOperationStatus,
|
|
ExchangeEntryDbUpdateStatus,
|
|
PeerPullCreditRecord,
|
|
PeerPullDebitRecordStatus,
|
|
PeerPullPaymentCreditStatus,
|
|
PeerPullPaymentIncomingRecord,
|
|
PeerPushCreditStatus,
|
|
PeerPushDebitRecord,
|
|
PeerPushDebitStatus,
|
|
PeerPushPaymentIncomingRecord,
|
|
PurchaseRecord,
|
|
PurchaseStatus,
|
|
RefreshCoinStatus,
|
|
RefreshGroupRecord,
|
|
RefreshOperationStatus,
|
|
RefundGroupRecord,
|
|
RefundGroupStatus,
|
|
RewardRecord,
|
|
RewardRecordStatus,
|
|
WalletStoresV1,
|
|
WithdrawalGroupRecord,
|
|
WithdrawalGroupStatus,
|
|
timestampAbsoluteFromDb,
|
|
timestampOptionalAbsoluteFromDb,
|
|
timestampPreciseFromDb,
|
|
timestampPreciseToDb,
|
|
} from "../db.js";
|
|
import { InternalWalletState } from "../internal-wallet-state.js";
|
|
import {
|
|
PendingOperationsResponse,
|
|
PendingTaskType,
|
|
TaskId,
|
|
} from "../pending-types.js";
|
|
import { GetReadOnlyAccess } from "../util/query.js";
|
|
import { TaskIdentifiers } from "./common.js";
|
|
|
|
function getPendingCommon(
|
|
ws: InternalWalletState,
|
|
opTag: TaskId,
|
|
timestampDue: AbsoluteTime,
|
|
): {
|
|
id: TaskId;
|
|
isDue: boolean;
|
|
timestampDue: AbsoluteTime;
|
|
isLongpolling: boolean;
|
|
} {
|
|
const isDue =
|
|
AbsoluteTime.isExpired(timestampDue) && !ws.activeLongpoll[opTag];
|
|
return {
|
|
id: opTag,
|
|
isDue,
|
|
timestampDue,
|
|
isLongpolling: !!ws.activeLongpoll[opTag],
|
|
};
|
|
}
|
|
|
|
async function gatherExchangePending(
|
|
ws: InternalWalletState,
|
|
tx: GetReadOnlyAccess<{
|
|
exchanges: typeof WalletStoresV1.exchanges;
|
|
operationRetries: typeof WalletStoresV1.operationRetries;
|
|
}>,
|
|
now: AbsoluteTime,
|
|
resp: PendingOperationsResponse,
|
|
): Promise<void> {
|
|
// FIXME: We should do a range query here based on the update time
|
|
// and/or the entry state.
|
|
await tx.exchanges.iter().forEachAsync(async (exch) => {
|
|
switch (exch.updateStatus) {
|
|
case ExchangeEntryDbUpdateStatus.Initial:
|
|
case ExchangeEntryDbUpdateStatus.Suspended:
|
|
case ExchangeEntryDbUpdateStatus.Failed:
|
|
return;
|
|
}
|
|
const opTag = TaskIdentifiers.forExchangeUpdate(exch);
|
|
let opr = await tx.operationRetries.get(opTag);
|
|
const timestampDue = opr?.retryInfo.nextRetry ?? exch.nextRefreshCheckStamp;
|
|
resp.pendingOperations.push({
|
|
type: PendingTaskType.ExchangeUpdate,
|
|
...getPendingCommon(
|
|
ws,
|
|
opTag,
|
|
AbsoluteTime.fromPreciseTimestamp(timestampPreciseFromDb(timestampDue)),
|
|
),
|
|
givesLifeness: false,
|
|
exchangeBaseUrl: exch.baseUrl,
|
|
lastError: opr?.lastError,
|
|
});
|
|
|
|
// We only schedule a check for auto-refresh if the exchange update
|
|
// was successful.
|
|
if (!opr?.lastError) {
|
|
resp.pendingOperations.push({
|
|
type: PendingTaskType.ExchangeCheckRefresh,
|
|
...getPendingCommon(
|
|
ws,
|
|
opTag,
|
|
AbsoluteTime.fromPreciseTimestamp(
|
|
timestampPreciseFromDb(timestampDue),
|
|
),
|
|
),
|
|
timestampDue: AbsoluteTime.fromPreciseTimestamp(
|
|
timestampPreciseFromDb(exch.nextRefreshCheckStamp),
|
|
),
|
|
givesLifeness: false,
|
|
exchangeBaseUrl: exch.baseUrl,
|
|
});
|
|
}
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Iterate refresh records based on a filter.
|
|
*/
|
|
export async function iterRecordsForRefresh(
|
|
tx: GetReadOnlyAccess<{
|
|
refreshGroups: typeof WalletStoresV1.refreshGroups;
|
|
}>,
|
|
filter: TransactionRecordFilter,
|
|
f: (r: RefreshGroupRecord) => Promise<void>,
|
|
): Promise<void> {
|
|
let refreshGroups: RefreshGroupRecord[];
|
|
if (filter.onlyState === "nonfinal") {
|
|
const keyRange = GlobalIDB.KeyRange.bound(
|
|
RefreshOperationStatus.Pending,
|
|
RefreshOperationStatus.Suspended,
|
|
);
|
|
refreshGroups = await tx.refreshGroups.indexes.byStatus.getAll(keyRange);
|
|
} else {
|
|
refreshGroups = await tx.refreshGroups.indexes.byStatus.getAll();
|
|
}
|
|
|
|
for (const r of refreshGroups) {
|
|
await f(r);
|
|
}
|
|
}
|
|
|
|
async function gatherRefreshPending(
|
|
ws: InternalWalletState,
|
|
tx: GetReadOnlyAccess<{
|
|
refreshGroups: typeof WalletStoresV1.refreshGroups;
|
|
operationRetries: typeof WalletStoresV1.operationRetries;
|
|
}>,
|
|
now: AbsoluteTime,
|
|
resp: PendingOperationsResponse,
|
|
): Promise<void> {
|
|
await iterRecordsForRefresh(tx, { onlyState: "nonfinal" }, async (r) => {
|
|
if (r.timestampFinished) {
|
|
return;
|
|
}
|
|
const opId = TaskIdentifiers.forRefresh(r);
|
|
const retryRecord = await tx.operationRetries.get(opId);
|
|
const timestampDue =
|
|
timestampOptionalAbsoluteFromDb(retryRecord?.retryInfo.nextRetry) ??
|
|
AbsoluteTime.now();
|
|
resp.pendingOperations.push({
|
|
type: PendingTaskType.Refresh,
|
|
...getPendingCommon(ws, opId, timestampDue),
|
|
givesLifeness: true,
|
|
refreshGroupId: r.refreshGroupId,
|
|
finishedPerCoin: r.statusPerCoin.map(
|
|
(x) => x === RefreshCoinStatus.Finished,
|
|
),
|
|
retryInfo: retryRecord?.retryInfo,
|
|
});
|
|
});
|
|
}
|
|
|
|
export async function iterRecordsForWithdrawal(
|
|
tx: GetReadOnlyAccess<{
|
|
withdrawalGroups: typeof WalletStoresV1.withdrawalGroups;
|
|
}>,
|
|
filter: TransactionRecordFilter,
|
|
f: (r: WithdrawalGroupRecord) => Promise<void>,
|
|
): Promise<void> {
|
|
let withdrawalGroupRecords: WithdrawalGroupRecord[];
|
|
if (filter.onlyState === "nonfinal") {
|
|
const range = GlobalIDB.KeyRange.bound(
|
|
WithdrawalGroupStatus.PendingRegisteringBank,
|
|
WithdrawalGroupStatus.PendingAml,
|
|
);
|
|
withdrawalGroupRecords = await tx.withdrawalGroups.indexes.byStatus.getAll(
|
|
range,
|
|
);
|
|
} else {
|
|
withdrawalGroupRecords =
|
|
await tx.withdrawalGroups.indexes.byStatus.getAll();
|
|
}
|
|
for (const wgr of withdrawalGroupRecords) {
|
|
await f(wgr);
|
|
}
|
|
}
|
|
|
|
async function gatherWithdrawalPending(
|
|
ws: InternalWalletState,
|
|
tx: GetReadOnlyAccess<{
|
|
withdrawalGroups: typeof WalletStoresV1.withdrawalGroups;
|
|
planchets: typeof WalletStoresV1.planchets;
|
|
operationRetries: typeof WalletStoresV1.operationRetries;
|
|
}>,
|
|
now: AbsoluteTime,
|
|
resp: PendingOperationsResponse,
|
|
): Promise<void> {
|
|
await iterRecordsForWithdrawal(tx, { onlyState: "nonfinal" }, async (wsr) => {
|
|
const opTag = TaskIdentifiers.forWithdrawal(wsr);
|
|
let opr = await tx.operationRetries.get(opTag);
|
|
const now = AbsoluteTime.now();
|
|
if (!opr) {
|
|
opr = {
|
|
id: opTag,
|
|
retryInfo: {
|
|
firstTry: timestampPreciseToDb(AbsoluteTime.toPreciseTimestamp(now)),
|
|
nextRetry: timestampPreciseToDb(AbsoluteTime.toPreciseTimestamp(now)),
|
|
retryCounter: 0,
|
|
},
|
|
};
|
|
}
|
|
resp.pendingOperations.push({
|
|
type: PendingTaskType.Withdraw,
|
|
...getPendingCommon(
|
|
ws,
|
|
opTag,
|
|
timestampOptionalAbsoluteFromDb(opr.retryInfo?.nextRetry) ??
|
|
AbsoluteTime.now(),
|
|
),
|
|
givesLifeness: true,
|
|
withdrawalGroupId: wsr.withdrawalGroupId,
|
|
lastError: opr.lastError,
|
|
retryInfo: opr.retryInfo,
|
|
});
|
|
});
|
|
}
|
|
|
|
export async function iterRecordsForDeposit(
|
|
tx: GetReadOnlyAccess<{
|
|
depositGroups: typeof WalletStoresV1.depositGroups;
|
|
}>,
|
|
filter: TransactionRecordFilter,
|
|
f: (r: DepositGroupRecord) => Promise<void>,
|
|
): Promise<void> {
|
|
let dgs: DepositGroupRecord[];
|
|
if (filter.onlyState === "nonfinal") {
|
|
dgs = await tx.depositGroups.indexes.byStatus.getAll(
|
|
GlobalIDB.KeyRange.bound(
|
|
DepositOperationStatus.PendingDeposit,
|
|
DepositOperationStatus.PendingKyc,
|
|
),
|
|
);
|
|
} else {
|
|
dgs = await tx.depositGroups.indexes.byStatus.getAll();
|
|
}
|
|
|
|
for (const dg of dgs) {
|
|
await f(dg);
|
|
}
|
|
}
|
|
|
|
async function gatherDepositPending(
|
|
ws: InternalWalletState,
|
|
tx: GetReadOnlyAccess<{
|
|
depositGroups: typeof WalletStoresV1.depositGroups;
|
|
operationRetries: typeof WalletStoresV1.operationRetries;
|
|
}>,
|
|
now: AbsoluteTime,
|
|
resp: PendingOperationsResponse,
|
|
): Promise<void> {
|
|
await iterRecordsForDeposit(tx, { onlyState: "nonfinal" }, async (dg) => {
|
|
let deposited = true;
|
|
for (const d of dg.statusPerCoin) {
|
|
if (d === DepositElementStatus.DepositPending) {
|
|
deposited = false;
|
|
}
|
|
}
|
|
const opId = TaskIdentifiers.forDeposit(dg);
|
|
const retryRecord = await tx.operationRetries.get(opId);
|
|
const timestampDue =
|
|
timestampOptionalAbsoluteFromDb(retryRecord?.retryInfo.nextRetry) ??
|
|
AbsoluteTime.now();
|
|
resp.pendingOperations.push({
|
|
type: PendingTaskType.Deposit,
|
|
...getPendingCommon(ws, opId, timestampDue),
|
|
// Fully deposited operations don't give lifeness,
|
|
// because there is no reason to wait on the
|
|
// deposit tracking status.
|
|
givesLifeness: !deposited,
|
|
depositGroupId: dg.depositGroupId,
|
|
lastError: retryRecord?.lastError,
|
|
retryInfo: retryRecord?.retryInfo,
|
|
});
|
|
});
|
|
}
|
|
|
|
export async function iterRecordsForReward(
|
|
tx: GetReadOnlyAccess<{
|
|
rewards: typeof WalletStoresV1.rewards;
|
|
}>,
|
|
filter: TransactionRecordFilter,
|
|
f: (r: RewardRecord) => Promise<void>,
|
|
): Promise<void> {
|
|
if (filter.onlyState === "nonfinal") {
|
|
const range = GlobalIDB.KeyRange.bound(
|
|
RewardRecordStatus.PendingPickup,
|
|
RewardRecordStatus.PendingPickup,
|
|
);
|
|
await tx.rewards.indexes.byStatus.iter(range).forEachAsync(f);
|
|
} else {
|
|
await tx.rewards.indexes.byStatus.iter().forEachAsync(f);
|
|
}
|
|
}
|
|
|
|
async function gatherRewardPending(
|
|
ws: InternalWalletState,
|
|
tx: GetReadOnlyAccess<{
|
|
rewards: typeof WalletStoresV1.rewards;
|
|
operationRetries: typeof WalletStoresV1.operationRetries;
|
|
}>,
|
|
now: AbsoluteTime,
|
|
resp: PendingOperationsResponse,
|
|
): Promise<void> {
|
|
await iterRecordsForReward(tx, { onlyState: "nonfinal" }, async (tip) => {
|
|
const opId = TaskIdentifiers.forTipPickup(tip);
|
|
const retryRecord = await tx.operationRetries.get(opId);
|
|
const timestampDue =
|
|
timestampOptionalAbsoluteFromDb(retryRecord?.retryInfo.nextRetry) ??
|
|
AbsoluteTime.now();
|
|
if (tip.acceptedTimestamp) {
|
|
resp.pendingOperations.push({
|
|
type: PendingTaskType.RewardPickup,
|
|
...getPendingCommon(ws, opId, timestampDue),
|
|
givesLifeness: true,
|
|
timestampDue,
|
|
merchantBaseUrl: tip.merchantBaseUrl,
|
|
tipId: tip.walletRewardId,
|
|
merchantTipId: tip.merchantRewardId,
|
|
});
|
|
}
|
|
});
|
|
}
|
|
|
|
export async function iterRecordsForRefund(
|
|
tx: GetReadOnlyAccess<{
|
|
refundGroups: typeof WalletStoresV1.refundGroups;
|
|
}>,
|
|
filter: TransactionRecordFilter,
|
|
f: (r: RefundGroupRecord) => Promise<void>,
|
|
): Promise<void> {
|
|
if (filter.onlyState === "nonfinal") {
|
|
const keyRange = GlobalIDB.KeyRange.only(RefundGroupStatus.Pending);
|
|
await tx.refundGroups.indexes.byStatus.iter(keyRange).forEachAsync(f);
|
|
} else {
|
|
await tx.refundGroups.iter().forEachAsync(f);
|
|
}
|
|
}
|
|
|
|
export async function iterRecordsForPurchase(
|
|
tx: GetReadOnlyAccess<{
|
|
purchases: typeof WalletStoresV1.purchases;
|
|
}>,
|
|
filter: TransactionRecordFilter,
|
|
f: (r: PurchaseRecord) => Promise<void>,
|
|
): Promise<void> {
|
|
if (filter.onlyState === "nonfinal") {
|
|
const keyRange = GlobalIDB.KeyRange.bound(
|
|
PurchaseStatus.PendingDownloadingProposal,
|
|
PurchaseStatus.PendingAcceptRefund,
|
|
);
|
|
await tx.purchases.indexes.byStatus.iter(keyRange).forEachAsync(f);
|
|
} else {
|
|
await tx.purchases.indexes.byStatus.iter().forEachAsync(f);
|
|
}
|
|
}
|
|
|
|
async function gatherPurchasePending(
|
|
ws: InternalWalletState,
|
|
tx: GetReadOnlyAccess<{
|
|
purchases: typeof WalletStoresV1.purchases;
|
|
operationRetries: typeof WalletStoresV1.operationRetries;
|
|
}>,
|
|
now: AbsoluteTime,
|
|
resp: PendingOperationsResponse,
|
|
): Promise<void> {
|
|
await iterRecordsForPurchase(tx, { onlyState: "nonfinal" }, async (pr) => {
|
|
const opId = TaskIdentifiers.forPay(pr);
|
|
const retryRecord = await tx.operationRetries.get(opId);
|
|
const timestampDue =
|
|
timestampOptionalAbsoluteFromDb(retryRecord?.retryInfo.nextRetry) ??
|
|
AbsoluteTime.now();
|
|
resp.pendingOperations.push({
|
|
type: PendingTaskType.Purchase,
|
|
...getPendingCommon(ws, opId, timestampDue),
|
|
givesLifeness: true,
|
|
statusStr: PurchaseStatus[pr.purchaseStatus],
|
|
proposalId: pr.proposalId,
|
|
retryInfo: retryRecord?.retryInfo,
|
|
lastError: retryRecord?.lastError,
|
|
});
|
|
});
|
|
}
|
|
|
|
async function gatherRecoupPending(
|
|
ws: InternalWalletState,
|
|
tx: GetReadOnlyAccess<{
|
|
recoupGroups: typeof WalletStoresV1.recoupGroups;
|
|
operationRetries: typeof WalletStoresV1.operationRetries;
|
|
}>,
|
|
now: AbsoluteTime,
|
|
resp: PendingOperationsResponse,
|
|
): Promise<void> {
|
|
// FIXME: Have a status field!
|
|
await tx.recoupGroups.iter().forEachAsync(async (rg) => {
|
|
if (rg.timestampFinished) {
|
|
return;
|
|
}
|
|
const opId = TaskIdentifiers.forRecoup(rg);
|
|
const retryRecord = await tx.operationRetries.get(opId);
|
|
const timestampDue =
|
|
timestampOptionalAbsoluteFromDb(retryRecord?.retryInfo.nextRetry) ??
|
|
AbsoluteTime.now();
|
|
resp.pendingOperations.push({
|
|
type: PendingTaskType.Recoup,
|
|
...getPendingCommon(ws, opId, timestampDue),
|
|
givesLifeness: true,
|
|
recoupGroupId: rg.recoupGroupId,
|
|
retryInfo: retryRecord?.retryInfo,
|
|
lastError: retryRecord?.lastError,
|
|
});
|
|
});
|
|
}
|
|
|
|
async function gatherBackupPending(
|
|
ws: InternalWalletState,
|
|
tx: GetReadOnlyAccess<{
|
|
backupProviders: typeof WalletStoresV1.backupProviders;
|
|
operationRetries: typeof WalletStoresV1.operationRetries;
|
|
}>,
|
|
now: AbsoluteTime,
|
|
resp: PendingOperationsResponse,
|
|
): Promise<void> {
|
|
await tx.backupProviders.iter().forEachAsync(async (bp) => {
|
|
const opId = TaskIdentifiers.forBackup(bp);
|
|
const retryRecord = await tx.operationRetries.get(opId);
|
|
if (bp.state.tag === BackupProviderStateTag.Ready) {
|
|
const timestampDue = timestampAbsoluteFromDb(
|
|
bp.state.nextBackupTimestamp,
|
|
);
|
|
resp.pendingOperations.push({
|
|
type: PendingTaskType.Backup,
|
|
...getPendingCommon(ws, opId, timestampDue),
|
|
givesLifeness: false,
|
|
backupProviderBaseUrl: bp.baseUrl,
|
|
lastError: undefined,
|
|
});
|
|
} else if (bp.state.tag === BackupProviderStateTag.Retrying) {
|
|
const timestampDue =
|
|
timestampOptionalAbsoluteFromDb(retryRecord?.retryInfo?.nextRetry) ??
|
|
AbsoluteTime.now();
|
|
resp.pendingOperations.push({
|
|
type: PendingTaskType.Backup,
|
|
...getPendingCommon(ws, opId, timestampDue),
|
|
givesLifeness: false,
|
|
backupProviderBaseUrl: bp.baseUrl,
|
|
retryInfo: retryRecord?.retryInfo,
|
|
lastError: retryRecord?.lastError,
|
|
});
|
|
}
|
|
});
|
|
}
|
|
|
|
export async function iterRecordsForPeerPullInitiation(
|
|
tx: GetReadOnlyAccess<{
|
|
peerPullCredit: typeof WalletStoresV1.peerPullCredit;
|
|
}>,
|
|
filter: TransactionRecordFilter,
|
|
f: (r: PeerPullCreditRecord) => Promise<void>,
|
|
): Promise<void> {
|
|
if (filter.onlyState === "nonfinal") {
|
|
const keyRange = GlobalIDB.KeyRange.bound(
|
|
PeerPullPaymentCreditStatus.PendingCreatePurse,
|
|
PeerPullPaymentCreditStatus.AbortingDeletePurse,
|
|
);
|
|
await tx.peerPullCredit.indexes.byStatus.iter(keyRange).forEachAsync(f);
|
|
} else {
|
|
await tx.peerPullCredit.indexes.byStatus.iter().forEachAsync(f);
|
|
}
|
|
}
|
|
|
|
async function gatherPeerPullInitiationPending(
|
|
ws: InternalWalletState,
|
|
tx: GetReadOnlyAccess<{
|
|
peerPullCredit: typeof WalletStoresV1.peerPullCredit;
|
|
operationRetries: typeof WalletStoresV1.operationRetries;
|
|
}>,
|
|
now: AbsoluteTime,
|
|
resp: PendingOperationsResponse,
|
|
): Promise<void> {
|
|
await iterRecordsForPeerPullInitiation(
|
|
tx,
|
|
{ onlyState: "nonfinal" },
|
|
async (pi) => {
|
|
const opId = TaskIdentifiers.forPeerPullPaymentInitiation(pi);
|
|
const retryRecord = await tx.operationRetries.get(opId);
|
|
const timestampDue =
|
|
timestampOptionalAbsoluteFromDb(retryRecord?.retryInfo.nextRetry) ??
|
|
AbsoluteTime.now();
|
|
resp.pendingOperations.push({
|
|
type: PendingTaskType.PeerPullCredit,
|
|
...getPendingCommon(ws, opId, timestampDue),
|
|
givesLifeness: true,
|
|
retryInfo: retryRecord?.retryInfo,
|
|
pursePub: pi.pursePub,
|
|
});
|
|
},
|
|
);
|
|
}
|
|
|
|
export async function iterRecordsForPeerPullDebit(
|
|
tx: GetReadOnlyAccess<{
|
|
peerPullDebit: typeof WalletStoresV1.peerPullDebit;
|
|
}>,
|
|
filter: TransactionRecordFilter,
|
|
f: (r: PeerPullPaymentIncomingRecord) => Promise<void>,
|
|
): Promise<void> {
|
|
if (filter.onlyState === "nonfinal") {
|
|
const keyRange = GlobalIDB.KeyRange.bound(
|
|
PeerPullDebitRecordStatus.PendingDeposit,
|
|
PeerPullDebitRecordStatus.AbortingRefresh,
|
|
);
|
|
await tx.peerPullDebit.indexes.byStatus.iter(keyRange).forEachAsync(f);
|
|
} else {
|
|
await tx.peerPullDebit.indexes.byStatus.iter().forEachAsync(f);
|
|
}
|
|
}
|
|
|
|
async function gatherPeerPullDebitPending(
|
|
ws: InternalWalletState,
|
|
tx: GetReadOnlyAccess<{
|
|
peerPullDebit: typeof WalletStoresV1.peerPullDebit;
|
|
operationRetries: typeof WalletStoresV1.operationRetries;
|
|
}>,
|
|
now: AbsoluteTime,
|
|
resp: PendingOperationsResponse,
|
|
): Promise<void> {
|
|
await iterRecordsForPeerPullDebit(
|
|
tx,
|
|
{ onlyState: "nonfinal" },
|
|
async (pi) => {
|
|
const opId = TaskIdentifiers.forPeerPullPaymentDebit(pi);
|
|
const retryRecord = await tx.operationRetries.get(opId);
|
|
const timestampDue =
|
|
timestampOptionalAbsoluteFromDb(retryRecord?.retryInfo.nextRetry) ??
|
|
AbsoluteTime.now();
|
|
resp.pendingOperations.push({
|
|
type: PendingTaskType.PeerPullDebit,
|
|
...getPendingCommon(ws, opId, timestampDue),
|
|
givesLifeness: true,
|
|
retryInfo: retryRecord?.retryInfo,
|
|
peerPullDebitId: pi.peerPullDebitId,
|
|
});
|
|
},
|
|
);
|
|
}
|
|
|
|
export async function iterRecordsForPeerPushInitiation(
|
|
tx: GetReadOnlyAccess<{
|
|
peerPushDebit: typeof WalletStoresV1.peerPushDebit;
|
|
}>,
|
|
filter: TransactionRecordFilter,
|
|
f: (r: PeerPushDebitRecord) => Promise<void>,
|
|
): Promise<void> {
|
|
if (filter.onlyState === "nonfinal") {
|
|
const keyRange = GlobalIDB.KeyRange.bound(
|
|
PeerPushDebitStatus.PendingCreatePurse,
|
|
PeerPushDebitStatus.AbortingRefresh,
|
|
);
|
|
await tx.peerPushDebit.indexes.byStatus.iter(keyRange).forEachAsync(f);
|
|
} else {
|
|
await tx.peerPushDebit.indexes.byStatus.iter().forEachAsync(f);
|
|
}
|
|
}
|
|
|
|
async function gatherPeerPushInitiationPending(
|
|
ws: InternalWalletState,
|
|
tx: GetReadOnlyAccess<{
|
|
peerPushDebit: typeof WalletStoresV1.peerPushDebit;
|
|
operationRetries: typeof WalletStoresV1.operationRetries;
|
|
}>,
|
|
now: AbsoluteTime,
|
|
resp: PendingOperationsResponse,
|
|
): Promise<void> {
|
|
await iterRecordsForPeerPushInitiation(
|
|
tx,
|
|
{ onlyState: "nonfinal" },
|
|
async (pi) => {
|
|
const opId = TaskIdentifiers.forPeerPushPaymentInitiation(pi);
|
|
const retryRecord = await tx.operationRetries.get(opId);
|
|
const timestampDue =
|
|
timestampOptionalAbsoluteFromDb(retryRecord?.retryInfo.nextRetry) ??
|
|
AbsoluteTime.now();
|
|
resp.pendingOperations.push({
|
|
type: PendingTaskType.PeerPushDebit,
|
|
...getPendingCommon(ws, opId, timestampDue),
|
|
givesLifeness: true,
|
|
retryInfo: retryRecord?.retryInfo,
|
|
pursePub: pi.pursePub,
|
|
});
|
|
},
|
|
);
|
|
}
|
|
|
|
export async function iterRecordsForPeerPushCredit(
|
|
tx: GetReadOnlyAccess<{
|
|
peerPushCredit: typeof WalletStoresV1.peerPushCredit;
|
|
}>,
|
|
filter: TransactionRecordFilter,
|
|
f: (r: PeerPushPaymentIncomingRecord) => Promise<void>,
|
|
): Promise<void> {
|
|
if (filter.onlyState === "nonfinal") {
|
|
const keyRange = GlobalIDB.KeyRange.bound(
|
|
PeerPushCreditStatus.PendingMerge,
|
|
PeerPushCreditStatus.PendingWithdrawing,
|
|
);
|
|
await tx.peerPushCredit.indexes.byStatus.iter(keyRange).forEachAsync(f);
|
|
} else {
|
|
await tx.peerPushCredit.indexes.byStatus.iter().forEachAsync(f);
|
|
}
|
|
}
|
|
|
|
async function gatherPeerPushCreditPending(
|
|
ws: InternalWalletState,
|
|
tx: GetReadOnlyAccess<{
|
|
peerPushCredit: typeof WalletStoresV1.peerPushCredit;
|
|
operationRetries: typeof WalletStoresV1.operationRetries;
|
|
}>,
|
|
now: AbsoluteTime,
|
|
resp: PendingOperationsResponse,
|
|
): Promise<void> {
|
|
const keyRange = GlobalIDB.KeyRange.bound(
|
|
PeerPushCreditStatus.PendingMerge,
|
|
PeerPushCreditStatus.PendingWithdrawing,
|
|
);
|
|
await iterRecordsForPeerPushCredit(
|
|
tx,
|
|
{ onlyState: "nonfinal" },
|
|
async (pi) => {
|
|
const opId = TaskIdentifiers.forPeerPushCredit(pi);
|
|
const retryRecord = await tx.operationRetries.get(opId);
|
|
const timestampDue =
|
|
timestampOptionalAbsoluteFromDb(retryRecord?.retryInfo.nextRetry) ??
|
|
AbsoluteTime.now();
|
|
resp.pendingOperations.push({
|
|
type: PendingTaskType.PeerPushCredit,
|
|
...getPendingCommon(ws, opId, timestampDue),
|
|
givesLifeness: true,
|
|
retryInfo: retryRecord?.retryInfo,
|
|
peerPushCreditId: pi.peerPushCreditId,
|
|
});
|
|
},
|
|
);
|
|
}
|
|
|
|
export async function getPendingOperations(
|
|
ws: InternalWalletState,
|
|
): Promise<PendingOperationsResponse> {
|
|
const now = AbsoluteTime.now();
|
|
return await ws.db
|
|
.mktx((x) => [
|
|
x.backupProviders,
|
|
x.exchanges,
|
|
x.exchangeDetails,
|
|
x.refreshGroups,
|
|
x.coins,
|
|
x.withdrawalGroups,
|
|
x.rewards,
|
|
x.purchases,
|
|
x.planchets,
|
|
x.depositGroups,
|
|
x.recoupGroups,
|
|
x.operationRetries,
|
|
x.peerPullCredit,
|
|
x.peerPushDebit,
|
|
x.peerPullDebit,
|
|
x.peerPushCredit,
|
|
])
|
|
.runReadWrite(async (tx) => {
|
|
const resp: PendingOperationsResponse = {
|
|
pendingOperations: [],
|
|
};
|
|
await gatherExchangePending(ws, tx, now, resp);
|
|
await gatherRefreshPending(ws, tx, now, resp);
|
|
await gatherWithdrawalPending(ws, tx, now, resp);
|
|
await gatherDepositPending(ws, tx, now, resp);
|
|
await gatherRewardPending(ws, tx, now, resp);
|
|
await gatherPurchasePending(ws, tx, now, resp);
|
|
await gatherRecoupPending(ws, tx, now, resp);
|
|
await gatherBackupPending(ws, tx, now, resp);
|
|
await gatherPeerPushInitiationPending(ws, tx, now, resp);
|
|
await gatherPeerPullInitiationPending(ws, tx, now, resp);
|
|
await gatherPeerPullDebitPending(ws, tx, now, resp);
|
|
await gatherPeerPushCreditPending(ws, tx, now, resp);
|
|
return resp;
|
|
});
|
|
}
|