wallet-core: retries for peer pull payments

This commit is contained in:
Florian Dold 2023-01-12 16:57:51 +01:00
parent 24694eae73
commit 1e378e4499
No known key found for this signature in database
GPG Key ID: D2E4F00F29D02A4B
8 changed files with 249 additions and 86 deletions

View File

@ -60,6 +60,7 @@ import {
hashCoinPub,
hashDenomPub,
hashTruncate32,
j2s,
kdf,
kdfKw,
keyExchangeEcdhEddsa,
@ -447,11 +448,11 @@ export interface SignPurseCreationRequest {
export interface SpendCoinDetails {
coinPub: string;
coinPriv: string;
contribution: AmountString;
denomPubHash: string;
denomSig: UnblindedSignature;
ageCommitmentProof: AgeCommitmentProof | undefined;
coinPriv: string;
contribution: AmountString;
denomPubHash: string;
denomSig: UnblindedSignature;
ageCommitmentProof: AgeCommitmentProof | undefined;
}
export interface SignPurseDepositsRequest {
@ -1453,7 +1454,6 @@ export const nativeCryptoR: TalerCryptoInterfaceR = {
tci: TalerCryptoInterfaceR,
req: EncryptContractRequest,
): Promise<EncryptContractResponse> {
const enc = await encryptContractForMerge(
decodeCrock(req.pursePub),
decodeCrock(req.contractPriv),
@ -1491,24 +1491,22 @@ export const nativeCryptoR: TalerCryptoInterfaceR = {
tci: TalerCryptoInterfaceR,
req: EncryptContractForDepositRequest,
): Promise<EncryptContractForDepositResponse> {
const contractKeyPair = await this.createEddsaKeypair(tci, {});
const enc = await encryptContractForDeposit(
decodeCrock(req.pursePub),
decodeCrock(contractKeyPair.priv),
decodeCrock(req.contractPriv),
req.contractTerms,
);
const sigBlob = buildSigPS(TalerSignaturePurpose.WALLET_PURSE_ECONTRACT)
.put(hash(enc))
.put(decodeCrock(contractKeyPair.pub))
.put(decodeCrock(req.contractPub))
.build();
const sig = eddsaSign(sigBlob, decodeCrock(req.pursePriv));
return {
econtract: {
contract_pub: contractKeyPair.pub,
contract_pub: req.contractPub,
econtract: encodeCrock(enc),
econtract_sig: encodeCrock(sig),
},
contractPriv: contractKeyPair.priv,
};
},
async decryptContractForDeposit(

View File

@ -190,14 +190,15 @@ export interface EncryptContractResponse {
export interface EncryptContractForDepositRequest {
contractTerms: any;
contractPriv: string;
contractPub: string;
pursePub: string;
pursePriv: string;
}
export interface EncryptContractForDepositResponse {
econtract: EncryptedContract;
contractPriv: string;
}
export interface DecryptContractRequest {

View File

@ -1780,6 +1780,18 @@ export interface PeerPullPaymentInitiationRecord {
*/
contractTermsHash: string;
mergePub: string;
mergePriv: string;
contractPub: string;
contractPriv: string;
contractTerms: PeerContractTerms;
mergeTimestamp: TalerProtocolTimestamp;
mergeReserveRowId: number;
/**
* Status of the peer pull payment initiation.
*/

View File

@ -340,7 +340,7 @@ export async function preparePeerPushPayment(
};
}
export async function processPeerPushOutgoing(
export async function processPeerPushInitiation(
ws: InternalWalletState,
pursePub: string,
): Promise<OperationAttemptResult> {
@ -417,6 +417,7 @@ export async function processPeerPushOutgoing(
return;
}
ppi.status = PeerPushPaymentInitiationStatus.PurseCreated;
await tx.peerPushPaymentInitiations.put(ppi);
});
return {
@ -428,7 +429,7 @@ export async function processPeerPushOutgoing(
/**
* Initiate sending a peer-to-peer push payment.
*/
export async function initiatePeerToPeerPush(
export async function initiatePeerPushPayment(
ws: InternalWalletState,
req: InitiatePeerPushPaymentRequest,
): Promise<InitiatePeerPushPaymentResponse> {
@ -513,7 +514,7 @@ export async function initiatePeerToPeerPush(
ws,
RetryTags.byPeerPushPaymentInitiationPursePub(pursePair.pub),
async () => {
return await processPeerPushOutgoing(ws, pursePair.pub);
return await processPeerPushInitiation(ws, pursePair.pub);
},
);
@ -935,6 +936,115 @@ export async function checkPeerPullPayment(
};
}
export async function processPeerPullInitiation(
ws: InternalWalletState,
pursePub: string,
): Promise<OperationAttemptResult> {
const pullIni = await ws.db
.mktx((x) => [x.peerPullPaymentInitiations])
.runReadOnly(async (tx) => {
return tx.peerPullPaymentInitiations.get(pursePub);
});
if (!pullIni) {
throw Error("peer pull payment initiation not found in database");
}
if (pullIni.status === OperationStatus.Finished) {
logger.warn("peer pull payment initiation is already finished");
return {
type: OperationAttemptResultType.Finished,
result: undefined,
}
}
const mergeReserve = await ws.db
.mktx((x) => [x.reserves])
.runReadOnly(async (tx) => {
return tx.reserves.get(pullIni.mergeReserveRowId);
});
if (!mergeReserve) {
throw Error("merge reserve for peer pull payment not found in database");
}
const purseFee = Amounts.stringify(Amounts.zeroOfAmount(pullIni.amount));
const reservePayto = talerPaytoFromExchangeReserve(
pullIni.exchangeBaseUrl,
mergeReserve.reservePub,
);
const econtractResp = await ws.cryptoApi.encryptContractForDeposit({
contractPriv: pullIni.contractPriv,
contractPub: pullIni.contractPub,
contractTerms: pullIni.contractTerms,
pursePriv: pullIni.pursePriv,
pursePub: pullIni.pursePub,
});
const purseExpiration = pullIni.contractTerms.purse_expiration;
const sigRes = await ws.cryptoApi.signReservePurseCreate({
contractTermsHash: pullIni.contractTermsHash,
flags: WalletAccountMergeFlags.CreateWithPurseFee,
mergePriv: pullIni.mergePriv,
mergeTimestamp: pullIni.mergeTimestamp,
purseAmount: pullIni.contractTerms.amount,
purseExpiration: purseExpiration,
purseFee: purseFee,
pursePriv: pullIni.pursePriv,
pursePub: pullIni.pursePub,
reservePayto,
reservePriv: mergeReserve.reservePriv,
});
const reservePurseReqBody: ExchangeReservePurseRequest = {
merge_sig: sigRes.mergeSig,
merge_timestamp: pullIni.mergeTimestamp,
h_contract_terms: pullIni.contractTermsHash,
merge_pub: pullIni.mergePub,
min_age: 0,
purse_expiration: purseExpiration,
purse_fee: purseFee,
purse_pub: pullIni.pursePub,
purse_sig: sigRes.purseSig,
purse_value: pullIni.contractTerms.amount,
reserve_sig: sigRes.accountSig,
econtract: econtractResp.econtract,
};
logger.info(`reserve purse request: ${j2s(reservePurseReqBody)}`);
const reservePurseMergeUrl = new URL(
`reserves/${mergeReserve.reservePub}/purse`,
pullIni.exchangeBaseUrl,
);
const httpResp = await ws.http.postJson(
reservePurseMergeUrl.href,
reservePurseReqBody,
);
const resp = await readSuccessResponseJsonOrThrow(httpResp, codecForAny());
logger.info(`reserve merge response: ${j2s(resp)}`);
await ws.db
.mktx((x) => [x.peerPullPaymentInitiations])
.runReadWrite(async (tx) => {
const pi2 = await tx.peerPullPaymentInitiations.get(pursePub);
if (!pi2) {
return;
}
pi2.status = OperationStatus.Finished;
await tx.peerPullPaymentInitiations.put(pi2);
});
return {
type: OperationAttemptResultType.Finished,
result: undefined,
};
}
export async function preparePeerPullPayment(
ws: InternalWalletState,
req: PreparePeerPullPaymentRequest,
@ -967,39 +1077,14 @@ export async function initiatePeerPullPayment(
const instructedAmount = Amounts.parseOrThrow(
req.partialContractTerms.amount,
);
const purseExpiration = req.partialContractTerms.purse_expiration;
const contractTerms = req.partialContractTerms;
const reservePayto = talerPaytoFromExchangeReserve(
req.exchangeBaseUrl,
mergeReserveInfo.reservePub,
);
const econtractResp = await ws.cryptoApi.encryptContractForDeposit({
contractTerms,
pursePriv: pursePair.priv,
pursePub: pursePair.pub,
});
const hContractTerms = ContractTermsUtil.hashContractTerms(contractTerms);
const purseFee = Amounts.stringify(
Amounts.zeroOfCurrency(instructedAmount.currency),
);
const contractKeyPair = await ws.cryptoApi.createEddsaKeypair({});
const sigRes = await ws.cryptoApi.signReservePurseCreate({
contractTermsHash: hContractTerms,
flags: WalletAccountMergeFlags.CreateWithPurseFee,
mergePriv: mergePair.priv,
mergeTimestamp: mergeTimestamp,
purseAmount: req.partialContractTerms.amount,
purseExpiration: purseExpiration,
purseFee: purseFee,
pursePriv: pursePair.priv,
pursePub: pursePair.pub,
reservePayto,
reservePriv: mergeReserveInfo.reservePriv,
});
const mergeReserveRowId = mergeReserveInfo.rowId;
checkDbInvariant(!!mergeReserveRowId);
await ws.db
.mktx((x) => [x.peerPullPaymentInitiations, x.contractTerms])
@ -1010,7 +1095,14 @@ export async function initiatePeerPullPayment(
exchangeBaseUrl: req.exchangeBaseUrl,
pursePriv: pursePair.priv,
pursePub: pursePair.pub,
status: OperationStatus.Finished,
mergePriv: mergePair.priv,
mergePub: mergePair.pub,
status: OperationStatus.Pending,
contractTerms: contractTerms,
mergeTimestamp,
mergeReserveRowId: mergeReserveRowId,
contractPriv: contractKeyPair.priv,
contractPub: contractKeyPair.pub,
});
await tx.contractTerms.put({
contractTermsRaw: contractTerms,
@ -1018,43 +1110,24 @@ export async function initiatePeerPullPayment(
});
});
const reservePurseReqBody: ExchangeReservePurseRequest = {
merge_sig: sigRes.mergeSig,
merge_timestamp: mergeTimestamp,
h_contract_terms: hContractTerms,
merge_pub: mergePair.pub,
min_age: 0,
purse_expiration: purseExpiration,
purse_fee: purseFee,
purse_pub: pursePair.pub,
purse_sig: sigRes.purseSig,
purse_value: req.partialContractTerms.amount,
reserve_sig: sigRes.accountSig,
econtract: econtractResp.econtract,
};
// FIXME: Should we somehow signal to the client
// whether purse creation has failed, or does the client/
// check this asynchronously from the transaction status?
logger.info(`reserve purse request: ${j2s(reservePurseReqBody)}`);
const reservePurseMergeUrl = new URL(
`reserves/${mergeReserveInfo.reservePub}/purse`,
req.exchangeBaseUrl,
await runOperationWithErrorReporting(
ws,
RetryTags.byPeerPullPaymentInitiationPursePub(pursePair.pub),
async () => {
return processPeerPullInitiation(ws, pursePair.pub);
},
);
const httpResp = await ws.http.postJson(
reservePurseMergeUrl.href,
reservePurseReqBody,
);
const resp = await readSuccessResponseJsonOrThrow(httpResp, codecForAny());
logger.info(`reserve merge response: ${j2s(resp)}`);
const wg = await internalCreateWithdrawalGroup(ws, {
amount: instructedAmount,
wgInfo: {
withdrawalType: WithdrawalRecordType.PeerPullCredit,
contractTerms,
contractPriv: econtractResp.contractPriv,
contractPriv: contractKeyPair.priv,
},
exchangeBaseUrl: req.exchangeBaseUrl,
reserveStatus: WithdrawalGroupStatus.QueryingStatus,
@ -1067,7 +1140,7 @@ export async function initiatePeerPullPayment(
return {
talerUri: constructPayPullUri({
exchangeBaseUrl: req.exchangeBaseUrl,
contractPriv: econtractResp.contractPriv,
contractPriv: contractKeyPair.priv,
}),
transactionId: makeTransactionId(
TransactionType.PeerPullCredit,

View File

@ -28,6 +28,7 @@ import {
RefreshCoinStatus,
OperationStatus,
OperationStatusRange,
PeerPushPaymentInitiationStatus,
} from "../db.js";
import {
PendingOperationsResponse,
@ -341,6 +342,58 @@ async function gatherBackupPending(
});
}
async function gatherPeerPullInitiationPending(
ws: InternalWalletState,
tx: GetReadOnlyAccess<{
peerPullPaymentInitiations: typeof WalletStoresV1.peerPullPaymentInitiations;
operationRetries: typeof WalletStoresV1.operationRetries;
}>,
now: AbsoluteTime,
resp: PendingOperationsResponse,
): Promise<void> {
await tx.peerPullPaymentInitiations.iter().forEachAsync(async (pi) => {
if (pi.status === OperationStatus.Finished) {
return;
}
const opId = RetryTags.forPeerPullPaymentInitiation(pi);
const retryRecord = await tx.operationRetries.get(opId);
const timestampDue = retryRecord?.retryInfo.nextRetry ?? AbsoluteTime.now();
resp.pendingOperations.push({
type: PendingTaskType.PeerPullInitiation,
...getPendingCommon(ws, opId, timestampDue),
givesLifeness: true,
retryInfo: retryRecord?.retryInfo,
pursePub: pi.pursePub,
});
});
}
async function gatherPeerPushInitiationPending(
ws: InternalWalletState,
tx: GetReadOnlyAccess<{
peerPushPaymentInitiations: typeof WalletStoresV1.peerPushPaymentInitiations;
operationRetries: typeof WalletStoresV1.operationRetries;
}>,
now: AbsoluteTime,
resp: PendingOperationsResponse,
): Promise<void> {
await tx.peerPushPaymentInitiations.iter().forEachAsync(async (pi) => {
if (pi.status === PeerPushPaymentInitiationStatus.PurseCreated) {
return;
}
const opId = RetryTags.forPeerPushPaymentInitiation(pi);
const retryRecord = await tx.operationRetries.get(opId);
const timestampDue = retryRecord?.retryInfo.nextRetry ?? AbsoluteTime.now();
resp.pendingOperations.push({
type: PendingTaskType.PeerPushInitiation,
...getPendingCommon(ws, opId, timestampDue),
givesLifeness: true,
retryInfo: retryRecord?.retryInfo,
pursePub: pi.pursePub,
});
});
}
export async function getPendingOperations(
ws: InternalWalletState,
): Promise<PendingOperationsResponse> {
@ -359,6 +412,8 @@ export async function getPendingOperations(
x.depositGroups,
x.recoupGroups,
x.operationRetries,
x.peerPullPaymentInitiations,
x.peerPushPaymentInitiations,
])
.runReadWrite(async (tx) => {
const resp: PendingOperationsResponse = {
@ -372,6 +427,8 @@ export async function getPendingOperations(
await gatherPurchasePending(ws, tx, now, resp);
await gatherRecoupPending(ws, tx, now, resp);
await gatherBackupPending(ws, tx, now, resp);
await gatherPeerPushInitiationPending(ws, tx, now, resp);
await gatherPeerPullInitiationPending(ws, tx, now, resp);
return resp;
});
}

View File

@ -37,7 +37,8 @@ export enum PendingTaskType {
Withdraw = "withdraw",
Deposit = "deposit",
Backup = "backup",
PeerPushOutgoing = "peer-push-outgoing",
PeerPushInitiation = "peer-push-initiation",
PeerPullInitiation = "peer-pull\-initiation",
}
/**
@ -54,7 +55,8 @@ export type PendingTaskInfo = PendingTaskInfoCommon &
| PendingRecoupTask
| PendingDepositTask
| PendingBackupTask
| PendingPeerPushOutgoingTask
| PendingPeerPushInitiationTask
| PendingPeerPullInitiationTask
);
export interface PendingBackupTask {
@ -75,8 +77,16 @@ export interface PendingExchangeUpdateTask {
/**
* The wallet wants to send a peer push payment.
*/
export interface PendingPeerPushOutgoingTask {
type: PendingTaskType.PeerPushOutgoing;
export interface PendingPeerPushInitiationTask {
type: PendingTaskType.PeerPushInitiation;
pursePub: string;
}
/**
* The wallet wants to send a peer pull payment.
*/
export interface PendingPeerPullInitiationTask {
type: PendingTaskType.PeerPullInitiation;
pursePub: string;
}

View File

@ -30,6 +30,7 @@ import {
BackupProviderRecord,
DepositGroupRecord,
ExchangeRecord,
PeerPullPaymentInitiationRecord,
PeerPushPaymentInitiationRecord,
PurchaseRecord,
RecoupGroupRecord,
@ -204,13 +205,21 @@ export namespace RetryTags {
export function forPeerPushPaymentInitiation(
ppi: PeerPushPaymentInitiationRecord,
): string {
return `${PendingTaskType.PeerPushOutgoing}:${ppi.pursePub}`;
return `${PendingTaskType.PeerPushInitiation}:${ppi.pursePub}`;
}
export function forPeerPullPaymentInitiation(
ppi: PeerPullPaymentInitiationRecord,
): string {
return `${PendingTaskType.PeerPullInitiation}:${ppi.pursePub}`;
}
export function byPaymentProposalId(proposalId: string): string {
return `${PendingTaskType.Purchase}:${proposalId}`;
}
export function byPeerPushPaymentInitiationPursePub(pursePub: string): string {
return `${PendingTaskType.PeerPushOutgoing}:${pursePub}`;
return `${PendingTaskType.PeerPushInitiation}:${pursePub}`;
}
export function byPeerPullPaymentInitiationPursePub(pursePub: string): string {
return `${PendingTaskType.PeerPullInitiation}:${pursePub}`;
}
}

View File

@ -195,10 +195,11 @@ import {
checkPeerPullPayment,
checkPeerPushPayment,
initiatePeerPullPayment,
initiatePeerToPeerPush,
initiatePeerPushPayment,
preparePeerPullPayment,
preparePeerPushPayment,
processPeerPushOutgoing,
processPeerPullInitiation,
processPeerPushInitiation,
} from "./operations/pay-peer.js";
import { getPendingOperations } from "./operations/pending.js";
import {
@ -318,8 +319,10 @@ async function callOperationHandler(
}
case PendingTaskType.Backup:
return await processBackupForProvider(ws, pending.backupProviderBaseUrl);
case PendingTaskType.PeerPushOutgoing:
return await processPeerPushOutgoing(ws, pending.pursePub);
case PendingTaskType.PeerPushInitiation:
return await processPeerPushInitiation(ws, pending.pursePub);
case PendingTaskType.PeerPullInitiation:
return await processPeerPullInitiation(ws, pending.pursePub);
default:
return assertUnreachable(pending);
}
@ -1381,7 +1384,7 @@ async function dispatchRequestInternal<Op extends WalletApiOperation>(
}
case WalletApiOperation.InitiatePeerPushPayment: {
const req = codecForInitiatePeerPushPaymentRequest().decode(payload);
return await initiatePeerToPeerPush(ws, req);
return await initiatePeerPushPayment(ws, req);
}
case WalletApiOperation.CheckPeerPushPayment: {
const req = codecForCheckPeerPushPaymentRequest().decode(payload);