wallet-core: fix peer-(push,pull)-debit withdrawal states

This commit is contained in:
Florian Dold 2023-06-19 16:03:06 +02:00
parent ffa68ce8dd
commit 54f0c82999
No known key found for this signature in database
GPG Key ID: D2E4F00F29D02A4B
10 changed files with 564 additions and 252 deletions

View File

@ -2179,6 +2179,20 @@ export class WalletService {
return unixPath; return unixPath;
} }
get dbPath() {
return path.join(
this.globalState.testDir,
`walletdb-${this.opts.name}.json`,
);
}
async stop(): Promise<void> {
if (this.walletProc) {
this.walletProc.proc.kill("SIGTERM");
await this.walletProc.wait();
}
}
async start(): Promise<void> { async start(): Promise<void> {
let dbPath: string; let dbPath: string;
if (this.opts.useInMemoryDb) { if (this.opts.useInMemoryDb) {
@ -2190,7 +2204,7 @@ export class WalletService {
); );
} }
const unixPath = this.socketPath; const unixPath = this.socketPath;
this.globalState.spawnService( this.walletProc = this.globalState.spawnService(
"taler-wallet-cli", "taler-wallet-cli",
[ [
"--wallet-db", "--wallet-db",

View File

@ -331,6 +331,7 @@ export async function createSimpleTestkudosEnvironmentV2(
export interface CreateWalletArgs { export interface CreateWalletArgs {
handleNotification?(wn: WalletNotification): void; handleNotification?(wn: WalletNotification): void;
name: string; name: string;
persistent?: boolean;
} }
export async function createWalletDaemonWithClient( export async function createWalletDaemonWithClient(
@ -338,8 +339,8 @@ export async function createWalletDaemonWithClient(
args: CreateWalletArgs, args: CreateWalletArgs,
): Promise<{ walletClient: WalletClient; walletService: WalletService }> { ): Promise<{ walletClient: WalletClient; walletService: WalletService }> {
const walletService = new WalletService(t, { const walletService = new WalletService(t, {
name: "wallet", name: args.name,
useInMemoryDb: true, useInMemoryDb: !args.persistent,
}); });
await walletService.start(); await walletService.start();
await walletService.pingUntilAvailable(); await walletService.pingUntilAvailable();

View File

@ -1881,7 +1881,7 @@ export enum PeerPullPaymentInitiationStatus {
SuspendedWithdrawing = 33, SuspendedWithdrawing = 33,
SuspendedAbortingDeletePurse = 34, SuspendedAbortingDeletePurse = 34,
DonePurseDeposited = 50 /* DORMANT_START */, Done = 50 /* DORMANT_START */,
Failed = 51, Failed = 51,
Aborted = 52, Aborted = 52,
} }

View File

@ -474,3 +474,16 @@ export function constructTombstone(p: ParsedTombstone): TombstoneIdStr {
return `tmb:${p.tag}:${p.refundGroupId}` as TombstoneIdStr; return `tmb:${p.tag}:${p.refundGroupId}` as TombstoneIdStr;
} }
} }
/**
* Uniform interface for a particular wallet transaction.
*/
export interface TransactionManager {
get taskId(): TaskId;
get transactionId(): TransactionIdStr;
fail(): Promise<void>;
abort(): Promise<void>;
suspend(): Promise<void>;
resume(): Promise<void>;
process(): Promise<OperationAttemptResult>;
}

View File

@ -91,7 +91,7 @@ import {
const logger = new Logger("pay-peer-pull-credit.ts"); const logger = new Logger("pay-peer-pull-credit.ts");
export async function queryPurseForPeerPullCredit( async function queryPurseForPeerPullCredit(
ws: InternalWalletState, ws: InternalWalletState,
pullIni: PeerPullPaymentInitiationRecord, pullIni: PeerPullPaymentInitiationRecord,
cancellationToken: CancellationToken, cancellationToken: CancellationToken,
@ -102,7 +102,7 @@ export async function queryPurseForPeerPullCredit(
); );
purseDepositUrl.searchParams.set("timeout_ms", "30000"); purseDepositUrl.searchParams.set("timeout_ms", "30000");
logger.info(`querying purse status via ${purseDepositUrl.href}`); logger.info(`querying purse status via ${purseDepositUrl.href}`);
const resp = await ws.http.get(purseDepositUrl.href, { const resp = await ws.http.fetch(purseDepositUrl.href, {
timeout: { d_ms: 60000 }, timeout: { d_ms: 60000 },
cancellationToken, cancellationToken,
}); });
@ -153,8 +153,11 @@ export async function queryPurseForPeerPullCredit(
pub: reserve.reservePub, pub: reserve.reservePub,
}, },
}); });
const transactionId = constructTransactionIdentifier({
await ws.db tag: TransactionType.PeerPullCredit,
pursePub: pullIni.pursePub,
});
const transitionInfo = await ws.db
.mktx((x) => [x.peerPullPaymentInitiations]) .mktx((x) => [x.peerPullPaymentInitiations])
.runReadWrite(async (tx) => { .runReadWrite(async (tx) => {
const finPi = await tx.peerPullPaymentInitiations.get(pullIni.pursePub); const finPi = await tx.peerPullPaymentInitiations.get(pullIni.pursePub);
@ -162,11 +165,15 @@ export async function queryPurseForPeerPullCredit(
logger.warn("peerPullPaymentInitiation not found anymore"); logger.warn("peerPullPaymentInitiation not found anymore");
return; return;
} }
const oldTxState = computePeerPullCreditTransactionState(finPi);
if (finPi.status === PeerPullPaymentInitiationStatus.PendingReady) { if (finPi.status === PeerPullPaymentInitiationStatus.PendingReady) {
finPi.status = PeerPullPaymentInitiationStatus.DonePurseDeposited; finPi.status = PeerPullPaymentInitiationStatus.PendingWithdrawing;
} }
await tx.peerPullPaymentInitiations.put(finPi); await tx.peerPullPaymentInitiations.put(finPi);
const newTxState = computePeerPullCreditTransactionState(finPi);
return { oldTxState, newTxState };
}); });
notifyTransition(ws, transactionId, transitionInfo);
return { return {
ready: true, ready: true,
}; };
@ -293,91 +300,68 @@ async function processPeerPullCreditAbortingDeletePurse(
return OperationAttemptResult.pendingEmpty(); return OperationAttemptResult.pendingEmpty();
} }
export async function processPeerPullCredit( async function handlePeerPullCreditWithdrawing(
ws: InternalWalletState, ws: InternalWalletState,
pursePub: string, pullIni: PeerPullPaymentInitiationRecord,
): Promise<OperationAttemptResult> { ): Promise<OperationAttemptResult> {
const pullIni = await ws.db if (!pullIni.withdrawalGroupId) {
.mktx((x) => [x.peerPullPaymentInitiations]) throw Error("invalid db state (withdrawing, but no withdrawal group ID");
.runReadOnly(async (tx) => { }
return tx.peerPullPaymentInitiations.get(pursePub); const transactionId = constructTransactionIdentifier({
tag: TransactionType.PeerPullCredit,
pursePub: pullIni.pursePub,
}); });
if (!pullIni) { const wgId = pullIni.withdrawalGroupId;
throw Error("peer pull payment initiation not found in database"); let finished: boolean = false;
} const transitionInfo = await ws.db
.mktx((x) => [x.peerPullPaymentInitiations, x.withdrawalGroups])
const retryTag = constructTaskIdentifier({ .runReadWrite(async (tx) => {
tag: PendingTaskType.PeerPullCredit, const ppi = await tx.peerPullPaymentInitiations.get(
pursePub, pullIni.pursePub,
});
// We're already running!
if (ws.activeLongpoll[retryTag]) {
logger.info("peer-pull-credit already in long-polling, returning!");
return {
type: OperationAttemptResultType.Longpoll,
};
}
logger.trace(`processing ${retryTag}, status=${pullIni.status}`);
switch (pullIni.status) {
case PeerPullPaymentInitiationStatus.DonePurseDeposited: {
// We implement this case so that the "retry" action on a peer-pull-credit transaction
// also retries the withdrawal task.
logger.warn(
"peer pull payment initiation is already finished, retrying withdrawal",
);
const withdrawalGroupId = pullIni.withdrawalGroupId;
if (withdrawalGroupId) {
const taskId = constructTaskIdentifier({
tag: PendingTaskType.Withdraw,
withdrawalGroupId,
});
stopLongpolling(ws, taskId);
await resetOperationTimeout(ws, taskId);
await runOperationWithErrorReporting(ws, taskId, () =>
processWithdrawalGroup(ws, withdrawalGroupId),
); );
if (!ppi) {
finished = true;
return;
} }
return { if (ppi.status !== PeerPullPaymentInitiationStatus.PendingWithdrawing) {
type: OperationAttemptResultType.Finished, finished = true;
result: undefined, return;
};
} }
case PeerPullPaymentInitiationStatus.PendingReady: const oldTxState = computePeerPullCreditTransactionState(ppi);
runLongpollAsync(ws, retryTag, async (cancellationToken) => const wg = await tx.withdrawalGroups.get(wgId);
queryPurseForPeerPullCredit(ws, pullIni, cancellationToken), if (!wg) {
); // FIXME: Fail the operation instead?
logger.trace( return undefined;
"returning early from processPeerPullCredit for long-polling in background",
);
return {
type: OperationAttemptResultType.Longpoll,
};
case PeerPullPaymentInitiationStatus.PendingMergeKycRequired: {
if (!pullIni.kycInfo) {
throw Error("invalid state, kycInfo required");
} }
return await longpollKycStatus( switch (wg.status) {
ws, case WithdrawalGroupStatus.Finished:
pursePub, finished = true;
pullIni.exchangeBaseUrl, ppi.status = PeerPullPaymentInitiationStatus.Done;
pullIni.kycInfo,
"individual",
);
}
case PeerPullPaymentInitiationStatus.PendingCreatePurse:
break; break;
case PeerPullPaymentInitiationStatus.AbortingDeletePurse: // FIXME: Also handle other final states!
return await processPeerPullCreditAbortingDeletePurse(ws, pullIni); }
default: await tx.peerPullPaymentInitiations.put(ppi);
throw Error(`unknown PeerPullPaymentInitiationStatus ${pullIni.status}`); const newTxState = computePeerPullCreditTransactionState(ppi);
return {
oldTxState,
newTxState,
};
});
notifyTransition(ws, transactionId, transitionInfo);
if (finished) {
return OperationAttemptResult.finishedEmpty();
} else {
// FIXME: Return indicator that we depend on the other operation!
return OperationAttemptResult.pendingEmpty();
}
} }
async function handlePeerPullCreditCreatePurse(
ws: InternalWalletState,
pullIni: PeerPullPaymentInitiationRecord,
): Promise<OperationAttemptResult> {
const purseFee = Amounts.stringify(Amounts.zeroOfAmount(pullIni.amount));
const pursePub = pullIni.pursePub;
const mergeReserve = await ws.db const mergeReserve = await ws.db
.mktx((x) => [x.reserves]) .mktx((x) => [x.reserves])
.runReadOnly(async (tx) => { .runReadOnly(async (tx) => {
@ -388,8 +372,6 @@ export async function processPeerPullCredit(
throw Error("merge reserve for peer pull payment not found in database"); throw Error("merge reserve for peer pull payment not found in database");
} }
const purseFee = Amounts.stringify(Amounts.zeroOfAmount(pullIni.amount));
const reservePayto = talerPaytoFromExchangeReserve( const reservePayto = talerPaytoFromExchangeReserve(
pullIni.exchangeBaseUrl, pullIni.exchangeBaseUrl,
mergeReserve.reservePub, mergeReserve.reservePub,
@ -474,6 +456,104 @@ export async function processPeerPullCredit(
}; };
} }
export async function processPeerPullCredit(
ws: InternalWalletState,
pursePub: string,
): Promise<OperationAttemptResult> {
const pullIni = await ws.db
.mktx((x) => [x.peerPullPaymentInitiations])
.runReadOnly(async (tx) => {
return tx.peerPullPaymentInitiations.get(pursePub);
});
if (!pullIni) {
throw Error("peer pull payment initiation not found in database");
}
const retryTag = constructTaskIdentifier({
tag: PendingTaskType.PeerPullCredit,
pursePub,
});
// We're already running!
if (ws.activeLongpoll[retryTag]) {
logger.info("peer-pull-credit already in long-polling, returning!");
return {
type: OperationAttemptResultType.Longpoll,
};
}
logger.trace(`processing ${retryTag}, status=${pullIni.status}`);
switch (pullIni.status) {
case PeerPullPaymentInitiationStatus.Done: {
// We implement this case so that the "retry" action on a peer-pull-credit transaction
// also retries the withdrawal task.
logger.warn(
"peer pull payment initiation is already finished, retrying withdrawal",
);
const withdrawalGroupId = pullIni.withdrawalGroupId;
if (withdrawalGroupId) {
const taskId = constructTaskIdentifier({
tag: PendingTaskType.Withdraw,
withdrawalGroupId,
});
stopLongpolling(ws, taskId);
await resetOperationTimeout(ws, taskId);
await runOperationWithErrorReporting(ws, taskId, () =>
processWithdrawalGroup(ws, withdrawalGroupId),
);
}
return {
type: OperationAttemptResultType.Finished,
result: undefined,
};
}
case PeerPullPaymentInitiationStatus.PendingReady:
runLongpollAsync(ws, retryTag, async (cancellationToken) =>
queryPurseForPeerPullCredit(ws, pullIni, cancellationToken),
);
logger.trace(
"returning early from processPeerPullCredit for long-polling in background",
);
return {
type: OperationAttemptResultType.Longpoll,
};
case PeerPullPaymentInitiationStatus.PendingMergeKycRequired: {
if (!pullIni.kycInfo) {
throw Error("invalid state, kycInfo required");
}
return await longpollKycStatus(
ws,
pursePub,
pullIni.exchangeBaseUrl,
pullIni.kycInfo,
"individual",
);
}
case PeerPullPaymentInitiationStatus.PendingCreatePurse:
return handlePeerPullCreditCreatePurse(ws, pullIni);
case PeerPullPaymentInitiationStatus.AbortingDeletePurse:
return await processPeerPullCreditAbortingDeletePurse(ws, pullIni);
case PeerPullPaymentInitiationStatus.PendingWithdrawing:
return handlePeerPullCreditWithdrawing(ws, pullIni);
case PeerPullPaymentInitiationStatus.Aborted:
case PeerPullPaymentInitiationStatus.Failed:
case PeerPullPaymentInitiationStatus.SuspendedAbortingDeletePurse:
case PeerPullPaymentInitiationStatus.SuspendedCreatePurse:
case PeerPullPaymentInitiationStatus.SuspendedMergeKycRequired:
case PeerPullPaymentInitiationStatus.SuspendedReady:
case PeerPullPaymentInitiationStatus.SuspendedWithdrawing:
break;
default:
assertUnreachable(pullIni.status);
}
return OperationAttemptResult.finishedEmpty();
}
async function processPeerPullCreditKycRequired( async function processPeerPullCreditKycRequired(
ws: InternalWalletState, ws: InternalWalletState,
peerIni: PeerPullPaymentInitiationRecord, peerIni: PeerPullPaymentInitiationRecord,
@ -789,7 +869,7 @@ export async function suspendPeerPullCreditTransaction(
newStatus = newStatus =
PeerPullPaymentInitiationStatus.SuspendedAbortingDeletePurse; PeerPullPaymentInitiationStatus.SuspendedAbortingDeletePurse;
break; break;
case PeerPullPaymentInitiationStatus.DonePurseDeposited: case PeerPullPaymentInitiationStatus.Done:
case PeerPullPaymentInitiationStatus.SuspendedCreatePurse: case PeerPullPaymentInitiationStatus.SuspendedCreatePurse:
case PeerPullPaymentInitiationStatus.SuspendedMergeKycRequired: case PeerPullPaymentInitiationStatus.SuspendedMergeKycRequired:
case PeerPullPaymentInitiationStatus.SuspendedReady: case PeerPullPaymentInitiationStatus.SuspendedReady:
@ -848,7 +928,7 @@ export async function abortPeerPullCreditTransaction(
case PeerPullPaymentInitiationStatus.PendingReady: case PeerPullPaymentInitiationStatus.PendingReady:
newStatus = PeerPullPaymentInitiationStatus.AbortingDeletePurse; newStatus = PeerPullPaymentInitiationStatus.AbortingDeletePurse;
break; break;
case PeerPullPaymentInitiationStatus.DonePurseDeposited: case PeerPullPaymentInitiationStatus.Done:
case PeerPullPaymentInitiationStatus.SuspendedCreatePurse: case PeerPullPaymentInitiationStatus.SuspendedCreatePurse:
case PeerPullPaymentInitiationStatus.SuspendedMergeKycRequired: case PeerPullPaymentInitiationStatus.SuspendedMergeKycRequired:
case PeerPullPaymentInitiationStatus.SuspendedReady: case PeerPullPaymentInitiationStatus.SuspendedReady:
@ -903,7 +983,7 @@ export async function failPeerPullCreditTransaction(
case PeerPullPaymentInitiationStatus.PendingMergeKycRequired: case PeerPullPaymentInitiationStatus.PendingMergeKycRequired:
case PeerPullPaymentInitiationStatus.PendingWithdrawing: case PeerPullPaymentInitiationStatus.PendingWithdrawing:
case PeerPullPaymentInitiationStatus.PendingReady: case PeerPullPaymentInitiationStatus.PendingReady:
case PeerPullPaymentInitiationStatus.DonePurseDeposited: case PeerPullPaymentInitiationStatus.Done:
case PeerPullPaymentInitiationStatus.SuspendedCreatePurse: case PeerPullPaymentInitiationStatus.SuspendedCreatePurse:
case PeerPullPaymentInitiationStatus.SuspendedMergeKycRequired: case PeerPullPaymentInitiationStatus.SuspendedMergeKycRequired:
case PeerPullPaymentInitiationStatus.SuspendedReady: case PeerPullPaymentInitiationStatus.SuspendedReady:
@ -961,7 +1041,7 @@ export async function resumePeerPullCreditTransaction(
case PeerPullPaymentInitiationStatus.PendingWithdrawing: case PeerPullPaymentInitiationStatus.PendingWithdrawing:
case PeerPullPaymentInitiationStatus.PendingReady: case PeerPullPaymentInitiationStatus.PendingReady:
case PeerPullPaymentInitiationStatus.AbortingDeletePurse: case PeerPullPaymentInitiationStatus.AbortingDeletePurse:
case PeerPullPaymentInitiationStatus.DonePurseDeposited: case PeerPullPaymentInitiationStatus.Done:
case PeerPullPaymentInitiationStatus.Failed: case PeerPullPaymentInitiationStatus.Failed:
case PeerPullPaymentInitiationStatus.Aborted: case PeerPullPaymentInitiationStatus.Aborted:
break; break;
@ -1018,7 +1098,7 @@ export function computePeerPullCreditTransactionState(
major: TransactionMajorState.Pending, major: TransactionMajorState.Pending,
minor: TransactionMinorState.Ready, minor: TransactionMinorState.Ready,
}; };
case PeerPullPaymentInitiationStatus.DonePurseDeposited: case PeerPullPaymentInitiationStatus.Done:
return { return {
major: TransactionMajorState.Done, major: TransactionMajorState.Done,
}; };
@ -1078,7 +1158,7 @@ export function computePeerPullCreditTransactionActions(
return [TransactionAction.Abort, TransactionAction.Suspend]; return [TransactionAction.Abort, TransactionAction.Suspend];
case PeerPullPaymentInitiationStatus.PendingReady: case PeerPullPaymentInitiationStatus.PendingReady:
return [TransactionAction.Abort, TransactionAction.Suspend]; return [TransactionAction.Abort, TransactionAction.Suspend];
case PeerPullPaymentInitiationStatus.DonePurseDeposited: case PeerPullPaymentInitiationStatus.Done:
return [TransactionAction.Delete]; return [TransactionAction.Delete];
case PeerPullPaymentInitiationStatus.PendingWithdrawing: case PeerPullPaymentInitiationStatus.PendingWithdrawing:
return [TransactionAction.Abort, TransactionAction.Suspend]; return [TransactionAction.Abort, TransactionAction.Suspend];

View File

@ -113,11 +113,18 @@ async function handlePurseCreationConflict(
} }
const repair: PeerCoinRepair = { const repair: PeerCoinRepair = {
coinPubs: sel.coinPubs, coinPubs: [],
contribs: sel.contributions.map((x) => Amounts.parseOrThrow(x)), contribs: [],
exchangeBaseUrl: peerPullInc.exchangeBaseUrl, exchangeBaseUrl: peerPullInc.exchangeBaseUrl,
}; };
for (let i = 0; i < sel.coinPubs.length; i++) {
if (sel.coinPubs[i] != brokenCoinPub) {
repair.coinPubs.push(sel.coinPubs[i]);
repair.contribs.push(Amounts.parseOrThrow(sel.contributions[i]));
}
}
const coinSelRes = await selectPeerCoins(ws, { instructedAmount, repair }); const coinSelRes = await selectPeerCoins(ws, { instructedAmount, repair });
if (coinSelRes.type == "failure") { if (coinSelRes.type == "failure") {

View File

@ -15,76 +15,74 @@
*/ */
import { import {
PreparePeerPushCredit,
PreparePeerPushCreditResponse,
parsePayPushUri,
codecForPeerContractTerms,
TransactionType,
encodeCrock,
eddsaGetPublic,
decodeCrock,
codecForExchangeGetContractResponse,
getRandomBytes,
ContractTermsUtil,
Amounts,
TalerPreciseTimestamp,
AcceptPeerPushPaymentResponse, AcceptPeerPushPaymentResponse,
Amounts,
ConfirmPeerPushCreditRequest, ConfirmPeerPushCreditRequest,
ContractTermsUtil,
ExchangePurseMergeRequest, ExchangePurseMergeRequest,
HttpStatusCode, HttpStatusCode,
PeerContractTerms,
TalerProtocolTimestamp,
WalletAccountMergeFlags,
codecForAny,
codecForWalletKycUuid,
j2s,
Logger, Logger,
ExchangePurseDeposits, PeerContractTerms,
PreparePeerPushCredit,
PreparePeerPushCreditResponse,
TalerErrorCode,
TalerPreciseTimestamp,
TalerProtocolTimestamp,
TransactionAction, TransactionAction,
TransactionMajorState, TransactionMajorState,
TransactionMinorState, TransactionMinorState,
TransactionState, TransactionState,
TalerError, TransactionType,
TalerErrorCode, WalletAccountMergeFlags,
WalletKycUuid, WalletKycUuid,
codecForAny,
codecForExchangeGetContractResponse,
codecForPeerContractTerms,
codecForWalletKycUuid,
decodeCrock,
eddsaGetPublic,
encodeCrock,
getRandomBytes,
j2s,
makeErrorDetail, makeErrorDetail,
parsePayPushUri,
} from "@gnu-taler/taler-util"; } from "@gnu-taler/taler-util";
import { readSuccessResponseJsonOrThrow } from "@gnu-taler/taler-util/http"; import { readSuccessResponseJsonOrThrow } from "@gnu-taler/taler-util/http";
import { import {
InternalWalletState, InternalWalletState,
KycPendingInfo, KycPendingInfo,
KycUserType, KycUserType,
PeerPullDebitRecordStatus,
PeerPushPaymentIncomingRecord, PeerPushPaymentIncomingRecord,
PeerPushPaymentIncomingStatus, PeerPushPaymentIncomingStatus,
PendingTaskType, PendingTaskType,
WithdrawalGroupStatus, WithdrawalGroupStatus,
WithdrawalRecordType, WithdrawalRecordType,
} from "../index.js"; } from "../index.js";
import { updateExchangeFromUrl } from "./exchanges.js"; import { assertUnreachable } from "../util/assertUnreachable.js";
import {
codecForExchangePurseStatus,
getMergeReserveInfo,
queryCoinInfosForSelection,
talerPaytoFromExchangeReserve,
} from "./pay-peer-common.js";
import {
constructTransactionIdentifier,
notifyTransition,
stopLongpolling,
} from "./transactions.js";
import {
getExchangeWithdrawalInfo,
internalCreateWithdrawalGroup,
} from "./withdraw.js";
import { checkDbInvariant } from "../util/invariants.js"; import { checkDbInvariant } from "../util/invariants.js";
import { import {
OperationAttemptResult, OperationAttemptResult,
OperationAttemptResultType, OperationAttemptResultType,
constructTaskIdentifier, constructTaskIdentifier,
} from "../util/retries.js"; } from "../util/retries.js";
import { assertUnreachable } from "../util/assertUnreachable.js";
import { runLongpollAsync } from "./common.js"; import { runLongpollAsync } from "./common.js";
import { updateExchangeFromUrl } from "./exchanges.js";
import {
codecForExchangePurseStatus,
getMergeReserveInfo,
talerPaytoFromExchangeReserve,
} from "./pay-peer-common.js";
import {
TransitionInfo,
constructTransactionIdentifier,
notifyTransition,
stopLongpolling,
} from "./transactions.js";
import {
getExchangeWithdrawalInfo,
internalPerformCreateWithdrawalGroup,
internalPrepareCreateWithdrawalGroup,
} from "./withdraw.js";
const logger = new Logger("pay-peer-push-credit.ts"); const logger = new Logger("pay-peer-push-credit.ts");
@ -148,7 +146,7 @@ export async function preparePeerPushCredit(
const getContractUrl = new URL(`contracts/${contractPub}`, exchangeBaseUrl); const getContractUrl = new URL(`contracts/${contractPub}`, exchangeBaseUrl);
const contractHttpResp = await ws.http.get(getContractUrl.href); const contractHttpResp = await ws.http.fetch(getContractUrl.href);
const contractResp = await readSuccessResponseJsonOrThrow( const contractResp = await readSuccessResponseJsonOrThrow(
contractHttpResp, contractHttpResp,
@ -375,51 +373,19 @@ async function processPeerPushCreditKycRequired(
} }
} }
export async function processPeerPushCredit( async function handlePendingMerge(
ws: InternalWalletState, ws: InternalWalletState,
peerPushPaymentIncomingId: string, peerInc: PeerPushPaymentIncomingRecord,
contractTerms: PeerContractTerms,
): Promise<OperationAttemptResult> { ): Promise<OperationAttemptResult> {
let peerInc: PeerPushPaymentIncomingRecord | undefined; const { peerPushPaymentIncomingId } = peerInc;
let contractTerms: PeerContractTerms | undefined; const transactionId = constructTransactionIdentifier({
await ws.db tag: TransactionType.PeerPushCredit,
.mktx((x) => [x.contractTerms, x.peerPushPaymentIncoming]) peerPushPaymentIncomingId,
.runReadWrite(async (tx) => {
peerInc = await tx.peerPushPaymentIncoming.get(peerPushPaymentIncomingId);
if (!peerInc) {
return;
}
const ctRec = await tx.contractTerms.get(peerInc.contractTermsHash);
if (ctRec) {
contractTerms = ctRec.contractTermsRaw;
}
await tx.peerPushPaymentIncoming.put(peerInc);
}); });
if (!peerInc) {
throw Error(
`can't accept unknown incoming p2p push payment (${peerPushPaymentIncomingId})`,
);
}
checkDbInvariant(!!contractTerms);
const amount = Amounts.parseOrThrow(contractTerms.amount); const amount = Amounts.parseOrThrow(contractTerms.amount);
if (
peerInc.status === PeerPushPaymentIncomingStatus.PendingMergeKycRequired
) {
if (!peerInc.kycInfo) {
throw Error("invalid state, kycInfo required");
}
return await longpollKycStatus(
ws,
peerPushPaymentIncomingId,
peerInc.exchangeBaseUrl,
peerInc.kycInfo,
"individual",
);
}
const mergeReserveInfo = await getMergeReserveInfo(ws, { const mergeReserveInfo = await getMergeReserveInfo(ws, {
exchangeBaseUrl: peerInc.exchangeBaseUrl, exchangeBaseUrl: peerInc.exchangeBaseUrl,
}); });
@ -475,7 +441,7 @@ export async function processPeerPushCredit(
); );
logger.trace(`merge response: ${j2s(res)}`); logger.trace(`merge response: ${j2s(res)}`);
await internalCreateWithdrawalGroup(ws, { const withdrawalGroupPrep = await internalPrepareCreateWithdrawalGroup(ws, {
amount, amount,
wgInfo: { wgInfo: {
withdrawalType: WithdrawalRecordType.PeerPushCredit, withdrawalType: WithdrawalRecordType.PeerPushCredit,
@ -490,23 +456,51 @@ export async function processPeerPushCredit(
}, },
}); });
await ws.db const txRes = await ws.db
.mktx((x) => [x.contractTerms, x.peerPushPaymentIncoming]) .mktx((x) => [
x.contractTerms,
x.peerPushPaymentIncoming,
x.withdrawalGroups,
x.reserves,
x.exchanges,
x.exchangeDetails,
x.exchangeTrust,
])
.runReadWrite(async (tx) => { .runReadWrite(async (tx) => {
const peerInc = await tx.peerPushPaymentIncoming.get( const peerInc = await tx.peerPushPaymentIncoming.get(
peerPushPaymentIncomingId, peerPushPaymentIncomingId,
); );
if (!peerInc) { if (!peerInc) {
return; return undefined;
}
let withdrawalTransition: TransitionInfo | undefined;
const oldTxState = computePeerPushCreditTransactionState(peerInc);
switch (peerInc.status) {
case PeerPushPaymentIncomingStatus.PendingMerge:
case PeerPushPaymentIncomingStatus.PendingMergeKycRequired: {
peerInc.status = PeerPushPaymentIncomingStatus.PendingWithdrawing;
const wgRes = await internalPerformCreateWithdrawalGroup(
ws,
tx,
withdrawalGroupPrep,
);
peerInc.withdrawalGroupId = wgRes.withdrawalGroup.withdrawalGroupId;
break;
} }
if (
peerInc.status === PeerPushPaymentIncomingStatus.PendingMerge ||
peerInc.status === PeerPushPaymentIncomingStatus.PendingMergeKycRequired
) {
peerInc.status = PeerPushPaymentIncomingStatus.Done;
} }
await tx.peerPushPaymentIncoming.put(peerInc); await tx.peerPushPaymentIncoming.put(peerInc);
const newTxState = computePeerPushCreditTransactionState(peerInc);
return {
peerPushCreditTransition: { oldTxState, newTxState },
withdrawalTransition,
};
}); });
notifyTransition(
ws,
withdrawalGroupPrep.transactionId,
txRes?.withdrawalTransition,
);
notifyTransition(ws, transactionId, txRes?.peerPushCreditTransition);
return { return {
type: OperationAttemptResultType.Finished, type: OperationAttemptResultType.Finished,
@ -514,6 +508,115 @@ export async function processPeerPushCredit(
}; };
} }
async function handlePendingWithdrawing(
ws: InternalWalletState,
peerInc: PeerPushPaymentIncomingRecord,
): Promise<OperationAttemptResult> {
if (!peerInc.withdrawalGroupId) {
throw Error("invalid db state (withdrawing, but no withdrawal group ID");
}
const transactionId = constructTransactionIdentifier({
tag: TransactionType.PeerPushCredit,
peerPushPaymentIncomingId: peerInc.peerPushPaymentIncomingId,
});
const wgId = peerInc.withdrawalGroupId;
let finished: boolean = false;
const transitionInfo = await ws.db
.mktx((x) => [x.peerPushPaymentIncoming, x.withdrawalGroups])
.runReadWrite(async (tx) => {
const ppi = await tx.peerPushPaymentIncoming.get(
peerInc.peerPushPaymentIncomingId,
);
if (!ppi) {
finished = true;
return;
}
if (ppi.status !== PeerPushPaymentIncomingStatus.PendingWithdrawing) {
finished = true;
return;
}
const oldTxState = computePeerPushCreditTransactionState(ppi);
const wg = await tx.withdrawalGroups.get(wgId);
if (!wg) {
// FIXME: Fail the operation instead?
return undefined;
}
switch (wg.status) {
case WithdrawalGroupStatus.Finished:
finished = true;
ppi.status = PeerPushPaymentIncomingStatus.Done;
break;
// FIXME: Also handle other final states!
}
await tx.peerPushPaymentIncoming.put(ppi);
const newTxState = computePeerPushCreditTransactionState(ppi);
return {
oldTxState,
newTxState,
};
});
notifyTransition(ws, transactionId, transitionInfo);
if (finished) {
return OperationAttemptResult.finishedEmpty();
} else {
// FIXME: Return indicator that we depend on the other operation!
return OperationAttemptResult.pendingEmpty();
}
}
export async function processPeerPushCredit(
ws: InternalWalletState,
peerPushPaymentIncomingId: string,
): Promise<OperationAttemptResult> {
let peerInc: PeerPushPaymentIncomingRecord | undefined;
let contractTerms: PeerContractTerms | undefined;
await ws.db
.mktx((x) => [x.contractTerms, x.peerPushPaymentIncoming])
.runReadWrite(async (tx) => {
peerInc = await tx.peerPushPaymentIncoming.get(peerPushPaymentIncomingId);
if (!peerInc) {
return;
}
const ctRec = await tx.contractTerms.get(peerInc.contractTermsHash);
if (ctRec) {
contractTerms = ctRec.contractTermsRaw;
}
await tx.peerPushPaymentIncoming.put(peerInc);
});
checkDbInvariant(!!contractTerms);
if (!peerInc) {
throw Error(
`can't accept unknown incoming p2p push payment (${peerPushPaymentIncomingId})`,
);
}
switch (peerInc.status) {
case PeerPushPaymentIncomingStatus.PendingMergeKycRequired: {
if (!peerInc.kycInfo) {
throw Error("invalid state, kycInfo required");
}
return await longpollKycStatus(
ws,
peerPushPaymentIncomingId,
peerInc.exchangeBaseUrl,
peerInc.kycInfo,
"individual",
);
}
case PeerPushPaymentIncomingStatus.PendingMerge:
return handlePendingMerge(ws, peerInc, contractTerms);
case PeerPushPaymentIncomingStatus.PendingWithdrawing:
return handlePendingWithdrawing(ws, peerInc);
default:
return OperationAttemptResult.finishedEmpty();
}
}
export async function confirmPeerPushCredit( export async function confirmPeerPushCredit(
ws: InternalWalletState, ws: InternalWalletState,
req: ConfirmPeerPushCreditRequest, req: ConfirmPeerPushCreditRequest,

View File

@ -125,15 +125,21 @@ async function handlePurseCreationConflict(
} }
const instructedAmount = Amounts.parseOrThrow(peerPushInitiation.amount); const instructedAmount = Amounts.parseOrThrow(peerPushInitiation.amount);
const sel = peerPushInitiation.coinSel;
const repair: PeerCoinRepair = { const repair: PeerCoinRepair = {
coinPubs: peerPushInitiation.coinSel.coinPubs, coinPubs: [],
contribs: peerPushInitiation.coinSel.contributions.map((x) => contribs: [],
Amounts.parseOrThrow(x),
),
exchangeBaseUrl: peerPushInitiation.exchangeBaseUrl, exchangeBaseUrl: peerPushInitiation.exchangeBaseUrl,
}; };
for (let i = 0; i < sel.coinPubs.length; i++) {
if (sel.coinPubs[i] != brokenCoinPub) {
repair.coinPubs.push(sel.coinPubs[i]);
repair.contribs.push(Amounts.parseOrThrow(sel.contributions[i]));
}
}
const coinSelRes = await selectPeerCoins(ws, { instructedAmount, repair }); const coinSelRes = await selectPeerCoins(ws, { instructedAmount, repair });
if (coinSelRes.type == "failure") { if (coinSelRes.type == "failure") {
@ -244,9 +250,10 @@ async function processPeerPushDebitCreateReserve(
body: reqBody, body: reqBody,
}); });
{
const resp = await httpResp.json(); const resp = await httpResp.json();
logger.info(`resp: ${j2s(resp)}`); logger.info(`resp: ${j2s(resp)}`);
}
switch (httpResp.status) { switch (httpResp.status) {
case HttpStatusCode.Ok: case HttpStatusCode.Ok:
@ -258,10 +265,10 @@ async function processPeerPushDebitCreateReserve(
} }
case HttpStatusCode.Conflict: { case HttpStatusCode.Conflict: {
// Handle double-spending // Handle double-spending
return handlePurseCreationConflict(ws, peerPushInitiation, resp); return handlePurseCreationConflict(ws, peerPushInitiation, httpResp);
} }
default: { default: {
const errResp = await readTalerErrorResponse(resp); const errResp = await readTalerErrorResponse(httpResp);
return { return {
type: OperationAttemptResultType.Error, type: OperationAttemptResultType.Error,
errorDetail: errResp, errorDetail: errResp,

View File

@ -1887,19 +1887,19 @@ export interface TransitionInfo {
export function notifyTransition( export function notifyTransition(
ws: InternalWalletState, ws: InternalWalletState,
transactionId: string, transactionId: string,
ti: TransitionInfo | undefined, transitionInfo: TransitionInfo | undefined,
): void { ): void {
if ( if (
ti && transitionInfo &&
!( !(
ti.oldTxState.major === ti.newTxState.major && transitionInfo.oldTxState.major === transitionInfo.newTxState.major &&
ti.oldTxState.minor === ti.newTxState.minor transitionInfo.oldTxState.minor === transitionInfo.newTxState.minor
) )
) { ) {
ws.notify({ ws.notify({
type: NotificationType.TransactionStateTransition, type: NotificationType.TransactionStateTransition,
oldTxState: ti.oldTxState, oldTxState: transitionInfo.oldTxState,
newTxState: ti.newTxState, newTxState: transitionInfo.newTxState,
transactionId, transactionId,
}); });
} }

View File

@ -109,7 +109,11 @@ import {
checkLogicInvariant, checkLogicInvariant,
InvariantViolatedError, InvariantViolatedError,
} from "../util/invariants.js"; } from "../util/invariants.js";
import { DbAccess, GetReadOnlyAccess } from "../util/query.js"; import {
DbAccess,
GetReadOnlyAccess,
GetReadWriteAccess,
} from "../util/query.js";
import { import {
OperationAttemptResult, OperationAttemptResult,
OperationAttemptResultType, OperationAttemptResultType,
@ -130,8 +134,13 @@ import {
selectForcedWithdrawalDenominations, selectForcedWithdrawalDenominations,
selectWithdrawalDenominations, selectWithdrawalDenominations,
} from "../util/coinSelection.js"; } from "../util/coinSelection.js";
import { PendingTaskType, isWithdrawableDenom } from "../index.js";
import { import {
ExchangeDetailsRecord,
PendingTaskType,
isWithdrawableDenom,
} from "../index.js";
import {
TransitionInfo,
constructTransactionIdentifier, constructTransactionIdentifier,
notifyTransition, notifyTransition,
stopLongpolling, stopLongpolling,
@ -2202,15 +2211,19 @@ async function processReserveBankStatus(
} }
} }
/** export interface PrepareCreateWithdrawalGroupResult {
* Create a withdrawal group. withdrawalGroup: WithdrawalGroupRecord;
* transactionId: string;
* If a forcedWithdrawalGroupId is given and a creationInfo?: {
* withdrawal group with this ID already exists, isTrusted: boolean;
* the existing one is returned. No conflict checking isAudited: boolean;
* of the other arguments is done in that case. amount: AmountJson;
*/ canonExchange: string;
export async function internalCreateWithdrawalGroup( exchangeDetails: ExchangeDetailsRecord;
};
}
export async function internalPrepareCreateWithdrawalGroup(
ws: InternalWalletState, ws: InternalWalletState,
args: { args: {
reserveStatus: WithdrawalGroupStatus; reserveStatus: WithdrawalGroupStatus;
@ -2222,7 +2235,7 @@ export async function internalCreateWithdrawalGroup(
restrictAge?: number; restrictAge?: number;
wgInfo: WgInfo; wgInfo: WgInfo;
}, },
): Promise<WithdrawalGroupRecord> { ): Promise<PrepareCreateWithdrawalGroupResult> {
const reserveKeyPair = const reserveKeyPair =
args.reserveKeyPair ?? (await ws.cryptoApi.createEddsaKeypair({})); args.reserveKeyPair ?? (await ws.cryptoApi.createEddsaKeypair({}));
const now = AbsoluteTime.toPreciseTimestamp(AbsoluteTime.now()); const now = AbsoluteTime.toPreciseTimestamp(AbsoluteTime.now());
@ -2240,18 +2253,18 @@ export async function internalCreateWithdrawalGroup(
.runReadOnly(async (tx) => { .runReadOnly(async (tx) => {
return tx.withdrawalGroups.get(wgId); return tx.withdrawalGroups.get(wgId);
}); });
if (existingWg) { if (existingWg) {
return existingWg; const transactionId = constructTransactionIdentifier({
tag: TransactionType.Withdrawal,
withdrawalGroupId: existingWg.withdrawalGroupId,
});
return { withdrawalGroup: existingWg, transactionId };
} }
} else { } else {
withdrawalGroupId = encodeCrock(getRandomBytes(32)); withdrawalGroupId = encodeCrock(getRandomBytes(32));
} }
const transactionId = constructTransactionIdentifier({
tag: TransactionType.Withdrawal,
withdrawalGroupId,
});
await updateWithdrawalDenoms(ws, canonExchange); await updateWithdrawalDenoms(ws, canonExchange);
const denoms = await getCandidateWithdrawalDenoms(ws, canonExchange); const denoms = await getCandidateWithdrawalDenoms(ws, canonExchange);
@ -2302,16 +2315,50 @@ export async function internalCreateWithdrawalGroup(
ws, ws,
exchangeInfo.exchange, exchangeInfo.exchange,
); );
const transactionId = constructTransactionIdentifier({
tag: TransactionType.Withdrawal,
withdrawalGroupId: withdrawalGroup.withdrawalGroupId,
});
return {
withdrawalGroup,
transactionId,
creationInfo: {
isAudited,
isTrusted,
canonExchange,
amount,
exchangeDetails,
},
};
}
export interface PerformCreateWithdrawalGroupResult {
withdrawalGroup: WithdrawalGroupRecord;
transitionInfo: TransitionInfo | undefined;
}
export async function internalPerformCreateWithdrawalGroup(
ws: InternalWalletState,
tx: GetReadWriteAccess<{
withdrawalGroups: typeof WalletStoresV1.withdrawalGroups;
reserves: typeof WalletStoresV1.reserves;
exchanges: typeof WalletStoresV1.exchanges;
exchangeTrust: typeof WalletStoresV1.exchangeTrust;
}>,
prep: PrepareCreateWithdrawalGroupResult,
): Promise<PerformCreateWithdrawalGroupResult> {
const transactionId = constructTransactionIdentifier({
tag: TransactionType.Withdrawal,
withdrawalGroupId: prep.withdrawalGroup.withdrawalGroupId,
});
const { withdrawalGroup } = prep;
if (!prep.creationInfo) {
return { withdrawalGroup, transitionInfo: undefined };
}
const { isAudited, isTrusted, amount, canonExchange, exchangeDetails } =
prep.creationInfo;
const transitionInfo = await ws.db
.mktx((x) => [
x.withdrawalGroups,
x.reserves,
x.exchanges,
x.exchangeDetails,
x.exchangeTrust,
])
.runReadWrite(async (tx) => {
await tx.withdrawalGroups.add(withdrawalGroup); await tx.withdrawalGroups.add(withdrawalGroup);
await tx.reserves.put({ await tx.reserves.put({
reservePub: withdrawalGroup.reservePub, reservePub: withdrawalGroup.reservePub,
@ -2335,17 +2382,57 @@ export async function internalCreateWithdrawalGroup(
const oldTxState = { const oldTxState = {
major: TransactionMajorState.None, major: TransactionMajorState.None,
minor: undefined,
}; };
const newTxState = computeWithdrawalTransactionStatus(withdrawalGroup); const newTxState = computeWithdrawalTransactionStatus(withdrawalGroup);
return { const transitionInfo = {
oldTxState, oldTxState,
newTxState, newTxState,
}; };
});
notifyTransition(ws, transactionId, transitionInfo); notifyTransition(ws, transactionId, transitionInfo);
return withdrawalGroup; return { withdrawalGroup, transitionInfo };
}
/**
* Create a withdrawal group.
*
* If a forcedWithdrawalGroupId is given and a
* withdrawal group with this ID already exists,
* the existing one is returned. No conflict checking
* of the other arguments is done in that case.
*/
export async function internalCreateWithdrawalGroup(
ws: InternalWalletState,
args: {
reserveStatus: WithdrawalGroupStatus;
amount: AmountJson;
exchangeBaseUrl: string;
forcedWithdrawalGroupId?: string;
forcedDenomSel?: ForcedDenomSel;
reserveKeyPair?: EddsaKeypair;
restrictAge?: number;
wgInfo: WgInfo;
},
): Promise<WithdrawalGroupRecord> {
const prep = await internalPrepareCreateWithdrawalGroup(ws, args);
const transactionId = constructTransactionIdentifier({
tag: TransactionType.Withdrawal,
withdrawalGroupId: prep.withdrawalGroup.withdrawalGroupId,
});
const res = await ws.db
.mktx((x) => [
x.withdrawalGroups,
x.reserves,
x.exchanges,
x.exchangeDetails,
x.exchangeTrust,
])
.runReadWrite(async (tx) => {
return await internalPerformCreateWithdrawalGroup(ws, tx, prep);
});
notifyTransition(ws, transactionId, res.transitionInfo);
return res.withdrawalGroup;
} }
export async function acceptWithdrawalFromUri( export async function acceptWithdrawalFromUri(