wallet-core: support long-polling for peer push credit

This commit is contained in:
Florian Dold 2023-02-20 21:26:08 +01:00
parent e8b5f26ab6
commit a49959d2c8
No known key found for this signature in database
GPG Key ID: D2E4F00F29D02A4B
5 changed files with 216 additions and 78 deletions

View File

@ -1774,6 +1774,16 @@ export interface PeerPushPaymentInitiationRecord {
status: PeerPushPaymentInitiationStatus;
}
export enum PeerPullPaymentInitiationStatus {
Initial = 10 /* ACTIVE_START */,
/**
* Purse created, waiting for the other party to accept the
* invoice and deposit money into it.
*/
PurseCreated = 11 /* ACTIVE_START + 1 */,
PurseDeposited = 50 /* DORMANT_START */,
}
export interface PeerPullPaymentInitiationRecord {
/**
* What exchange are we using for the payment request?
@ -1817,7 +1827,7 @@ export interface PeerPullPaymentInitiationRecord {
/**
* Status of the peer pull payment initiation.
*/
status: OperationStatus;
status: PeerPullPaymentInitiationStatus;
withdrawalGroupId: string | undefined;
}

View File

@ -21,11 +21,13 @@ import {
AgeRestriction,
AmountJson,
Amounts,
CancellationToken,
CoinRefreshRequest,
CoinStatus,
ExchangeEntryStatus,
ExchangeListItem,
ExchangeTosStatus,
getErrorDetailFromException,
j2s,
Logger,
OperationErrorInfo,
@ -453,3 +455,42 @@ export function makeExchangeListItem(
lastUpdateErrorInfo,
};
}
export interface LongpollResult {
ready: boolean;
}
export function runLongpollAsync(
ws: InternalWalletState,
retryTag: string,
reqFn: (ct: CancellationToken) => Promise<LongpollResult>,
): void {
const asyncFn = async () => {
if (ws.stopped) {
logger.trace("not long-polling reserve, wallet already stopped");
await storeOperationPending(ws, retryTag);
return;
}
const cts = CancellationToken.create();
let res: { ready: boolean } | undefined = undefined;
try {
ws.activeLongpoll[retryTag] = {
cancel: () => {
logger.trace("cancel of reserve longpoll requested");
cts.cancel();
},
};
res = await reqFn(cts.token);
} catch (e) {
await storeOperationError(ws, retryTag, getErrorDetailFromException(e));
return;
} finally {
delete ws.activeLongpoll[retryTag];
}
if (!res.ready) {
await storeOperationPending(ws, retryTag);
}
ws.latch.trigger();
};
asyncFn();
}

View File

@ -69,12 +69,17 @@ import {
TransactionType,
UnblindedSignature,
WalletAccountMergeFlags,
codecOptional,
codecForTimestamp,
CancellationToken,
} from "@gnu-taler/taler-util";
import { SpendCoinDetails } from "../crypto/cryptoImplementation.js";
import {
DenominationRecord,
OperationStatus,
PeerPullPaymentIncomingStatus,
PeerPullPaymentInitiationRecord,
PeerPullPaymentInitiationStatus,
PeerPushPaymentCoinSelection,
PeerPushPaymentIncomingRecord,
PeerPushPaymentIncomingStatus,
@ -86,12 +91,19 @@ import {
import { TalerError } from "@gnu-taler/taler-util";
import { InternalWalletState } from "../internal-wallet-state.js";
import {
LongpollResult,
makeTransactionId,
resetOperationTimeout,
runLongpollAsync,
runOperationWithErrorReporting,
spendCoins,
storeOperationPending,
} from "../operations/common.js";
import { readSuccessResponseJsonOrThrow } from "@gnu-taler/taler-util/http";
import {
readSuccessResponseJsonOrErrorCode,
readSuccessResponseJsonOrThrow,
throwUnexpectedRequestError,
} from "@gnu-taler/taler-util/http";
import { checkDbInvariant } from "../util/invariants.js";
import {
constructTaskIdentifier,
@ -622,11 +634,13 @@ export async function initiatePeerPushPayment(
interface ExchangePurseStatus {
balance: AmountString;
deposit_timestamp?: TalerProtocolTimestamp;
}
export const codecForExchangePurseStatus = (): Codec<ExchangePurseStatus> =>
buildCodecForObject<ExchangePurseStatus>()
.property("balance", codecForAmountString())
.property("deposit_timestamp", codecOptional(codecForTimestamp))
.build("ExchangePurseStatus");
export async function preparePeerPushCredit(
@ -1255,6 +1269,87 @@ export async function preparePeerPullCredit(
};
}
export async function queryPurseForPeerPullCredit(
ws: InternalWalletState,
pullIni: PeerPullPaymentInitiationRecord,
cancellationToken: CancellationToken,
): Promise<LongpollResult> {
const purseDepositUrl = new URL(
`purses/${pullIni.pursePub}/merge`,
pullIni.exchangeBaseUrl,
);
purseDepositUrl.searchParams.set("timeout_ms", "30000");
logger.info(`querying purse status via ${purseDepositUrl.href}`);
const resp = await ws.http.get(purseDepositUrl.href, {
timeout: { d_ms: 60000 },
cancellationToken,
});
logger.info(`purse status code: HTTP ${resp.status}`);
const result = await readSuccessResponseJsonOrErrorCode(
resp,
codecForExchangePurseStatus(),
);
if (result.isError) {
logger.info(`got purse status error, EC=${result.talerErrorResponse.code}`);
if (resp.status === 404) {
return { ready: false };
} else {
throwUnexpectedRequestError(resp, result.talerErrorResponse);
}
}
if (!result.response.deposit_timestamp) {
logger.info("purse not ready yet (no deposit)");
return { ready: false };
}
const reserve = await ws.db
.mktx((x) => [x.reserves])
.runReadOnly(async (tx) => {
return await tx.reserves.get(pullIni.mergeReserveRowId);
});
if (!reserve) {
throw Error("reserve for peer pull credit not found in wallet DB");
}
await internalCreateWithdrawalGroup(ws, {
amount: Amounts.parseOrThrow(pullIni.amount),
wgInfo: {
withdrawalType: WithdrawalRecordType.PeerPullCredit,
contractTerms: pullIni.contractTerms,
contractPriv: pullIni.contractPriv,
},
forcedWithdrawalGroupId: pullIni.withdrawalGroupId,
exchangeBaseUrl: pullIni.exchangeBaseUrl,
reserveStatus: WithdrawalGroupStatus.QueryingStatus,
reserveKeyPair: {
priv: reserve.reservePriv,
pub: reserve.reservePub,
},
});
await ws.db
.mktx((x) => [x.peerPullPaymentInitiations])
.runReadWrite(async (tx) => {
const finPi = await tx.peerPullPaymentInitiations.get(pullIni.pursePub);
if (!finPi) {
logger.warn("peerPullPaymentInitiation not found anymore");
return;
}
if (finPi.status === PeerPullPaymentInitiationStatus.PurseCreated) {
finPi.status = PeerPullPaymentInitiationStatus.PurseDeposited;
}
await tx.peerPullPaymentInitiations.put(finPi);
});
return {
ready: true,
};
}
export async function processPeerPullCredit(
ws: InternalWalletState,
pursePub: string,
@ -1268,28 +1363,52 @@ export async function processPeerPullCredit(
throw Error("peer pull payment initiation not found in database");
}
if (pullIni.status === OperationStatus.Finished) {
logger.warn(
"peer pull payment initiation is already finished, retrying withdrawal",
);
const retryTag = constructTaskIdentifier({
tag: PendingTaskType.PeerPullInitiation,
pursePub,
});
const withdrawalGroupId = pullIni.withdrawalGroupId;
switch (pullIni.status) {
case PeerPullPaymentInitiationStatus.PurseDeposited: {
// We implement this case so that the "retry" action on a peer-pull-credit transaction
// also retries the withdrawal task.
if (withdrawalGroupId) {
const taskId = constructTaskIdentifier({
tag: PendingTaskType.Withdraw,
withdrawalGroupId,
});
stopLongpolling(ws, taskId);
await resetOperationTimeout(ws, taskId);
await runOperationWithErrorReporting(ws, taskId, () =>
processWithdrawalGroup(ws, withdrawalGroupId),
logger.warn(
"peer pull payment initiation is already finished, retrying withdrawal",
);
const withdrawalGroupId = pullIni.withdrawalGroupId;
if (withdrawalGroupId) {
const taskId = constructTaskIdentifier({
tag: PendingTaskType.Withdraw,
withdrawalGroupId,
});
stopLongpolling(ws, taskId);
await resetOperationTimeout(ws, taskId);
await runOperationWithErrorReporting(ws, taskId, () =>
processWithdrawalGroup(ws, withdrawalGroupId),
);
}
return {
type: OperationAttemptResultType.Finished,
result: undefined,
};
}
return {
type: OperationAttemptResultType.Finished,
result: undefined,
};
case PeerPullPaymentInitiationStatus.PurseCreated:
runLongpollAsync(ws, retryTag, async (cancellationToken) =>
queryPurseForPeerPullCredit(ws, pullIni, cancellationToken),
);
logger.trace(
"returning early from processPeerPullCredit for long-polling in background",
);
return {
type: OperationAttemptResultType.Longpoll,
};
case PeerPullPaymentInitiationStatus.Initial:
break;
default:
throw Error(`unknown PeerPullPaymentInitiationStatus ${pullIni.status}`);
}
const mergeReserve = await ws.db
@ -1370,7 +1489,7 @@ export async function processPeerPullCredit(
if (!pi2) {
return;
}
pi2.status = OperationStatus.Finished;
pi2.status = PeerPullPaymentInitiationStatus.PurseCreated;
await tx.peerPullPaymentInitiations.put(pi2);
});
@ -1518,7 +1637,7 @@ export async function initiatePeerPullPayment(
pursePub: pursePair.pub,
mergePriv: mergePair.priv,
mergePub: mergePair.pub,
status: OperationStatus.Pending,
status: PeerPullPaymentInitiationStatus.Initial,
contractTerms: contractTerms,
mergeTimestamp,
mergeReserveRowId: mergeReserveRowId,
@ -1545,27 +1664,6 @@ export async function initiatePeerPullPayment(
return processPeerPullCredit(ws, pursePair.pub);
});
// FIXME: Why do we create this only here?
// What if the previous operation didn't succeed?
// We actually should create it once we know the
// money arrived (via long-polling).
await internalCreateWithdrawalGroup(ws, {
amount: instructedAmount,
wgInfo: {
withdrawalType: WithdrawalRecordType.PeerPullCredit,
contractTerms,
contractPriv: contractKeyPair.priv,
},
forcedWithdrawalGroupId: withdrawalGroupId,
exchangeBaseUrl: exchangeBaseUrl,
reserveStatus: WithdrawalGroupStatus.QueryingStatus,
reserveKeyPair: {
priv: mergeReserveInfo.reservePriv,
pub: mergeReserveInfo.reservePub,
},
});
return {
talerUri: constructPayPullUri({
exchangeBaseUrl: exchangeBaseUrl,

View File

@ -31,6 +31,7 @@ import {
PeerPushPaymentInitiationStatus,
PeerPullPaymentIncomingStatus,
PeerPushPaymentIncomingStatus,
PeerPullPaymentInitiationStatus,
} from "../db.js";
import {
PendingOperationsResponse,
@ -363,7 +364,7 @@ async function gatherPeerPullInitiationPending(
resp: PendingOperationsResponse,
): Promise<void> {
await tx.peerPullPaymentInitiations.iter().forEachAsync(async (pi) => {
if (pi.status === OperationStatus.Finished) {
if (pi.status === PeerPullPaymentInitiationStatus.PurseDeposited) {
return;
}
const opId = TaskIdentifiers.forPeerPullPaymentInitiation(pi);

View File

@ -90,6 +90,7 @@ import { InternalWalletState } from "../internal-wallet-state.js";
import {
makeCoinAvailable,
makeExchangeListItem,
runLongpollAsync,
runOperationWithErrorReporting,
} from "../operations/common.js";
import { walletCoreDebugFlags } from "../util/debugFlags.js";
@ -1022,8 +1023,7 @@ export interface WithdrawalGroupContext {
export async function processWithdrawalGroup(
ws: InternalWalletState,
withdrawalGroupId: string,
options: {
} = {},
options: {} = {},
): Promise<OperationAttemptResult> {
logger.trace("processing withdrawal group", withdrawalGroupId);
const withdrawalGroup = await ws.db
@ -1053,38 +1053,9 @@ export async function processWithdrawalGroup(
forceNow: true,
});
case WithdrawalGroupStatus.QueryingStatus: {
const doQueryAsync = async () => {
if (ws.stopped) {
logger.trace("not long-polling reserve, wallet already stopped");
await storeOperationPending(ws, retryTag);
return;
}
const cts = CancellationToken.create();
let res: { ready: boolean } | undefined = undefined;
try {
ws.activeLongpoll[retryTag] = {
cancel: () => {
logger.trace("cancel of reserve longpoll requested");
cts.cancel();
},
};
res = await queryReserve(ws, withdrawalGroupId, cts.token);
} catch (e) {
await storeOperationError(
ws,
retryTag,
getErrorDetailFromException(e),
);
return;
} finally {
delete ws.activeLongpoll[retryTag];
}
if (!res.ready) {
await storeOperationPending(ws, retryTag);
}
ws.latch.trigger();
};
doQueryAsync();
runLongpollAsync(ws, retryTag, (ct) => {
return queryReserve(ws, withdrawalGroupId, ct);
});
logger.trace(
"returning early from withdrawal for long-polling in background",
);
@ -1832,6 +1803,14 @@ async function processReserveBankStatus(
}
}
/**
* Create a withdrawal group.
*
* If a forcedWithdrawalGroupId is given and a
* withdrawal group with this ID already exists,
* the existing one is returned. No conflict checking
* of the other arguments is done in that case.
*/
export async function internalCreateWithdrawalGroup(
ws: InternalWalletState,
args: {
@ -1856,6 +1835,15 @@ export async function internalCreateWithdrawalGroup(
if (args.forcedWithdrawalGroupId) {
withdrawalGroupId = args.forcedWithdrawalGroupId;
const wgId = withdrawalGroupId;
const existingWg = await ws.db
.mktx((x) => [x.withdrawalGroups])
.runReadOnly(async (tx) => {
return tx.withdrawalGroups.get(wgId);
});
if (existingWg) {
return existingWg;
}
} else {
withdrawalGroupId = encodeCrock(getRandomBytes(32));
}