wallet-core: pending operation for peer push credit, save withdrawalGroupId

This commit is contained in:
Florian Dold 2023-02-20 00:36:02 +01:00
parent e6ed901626
commit 30b3949d2b
No known key found for this signature in database
GPG Key ID: D2E4F00F29D02A4B
7 changed files with 232 additions and 31 deletions

View File

@ -1818,6 +1818,18 @@ export interface PeerPullPaymentInitiationRecord {
* Status of the peer pull payment initiation.
*/
status: OperationStatus;
withdrawalGroupId: string | undefined;
}
export enum PeerPushPaymentIncomingStatus {
Proposed = 30 /* USER_ATTENTION_START */,
Accepted = 10 /* ACTIVE_START */,
/**
* Merge was successful and withdrawal group has been created, now
* everything is in the hand of the withdrawal group.
*/
WithdrawalCreated = 50 /* DORMANT_START */,
}
/**
@ -1847,7 +1859,12 @@ export interface PeerPushPaymentIncomingRecord {
/**
* Status of the peer push payment incoming initiation.
*/
status: OperationStatus;
status: PeerPushPaymentIncomingStatus;
/**
* Associated withdrawal group.
*/
withdrawalGroupId: string | undefined;
}
export enum PeerPullPaymentIncomingStatus {
@ -2260,6 +2277,21 @@ export const WalletStoresV1 = {
"exchangeBaseUrl",
"pursePub",
]),
byExchangeAndContractPriv: describeIndex(
"byExchangeAndContractPriv",
["exchangeBaseUrl", "contractPriv"],
{
versionAdded: 4,
unique: true,
},
),
byWithdrawalGroupId: describeIndex(
"byWithdrawalGroupId",
"withdrawalGroupId",
{
versionAdded: 4,
},
),
byStatus: describeIndex("byStatus", "status"),
},
),
@ -2291,6 +2323,13 @@ export const WalletStoresV1 = {
}),
{
byStatus: describeIndex("byStatus", "status"),
byWithdrawalGroupId: describeIndex(
"byWithdrawalGroupId",
"withdrawalGroupId",
{
versionAdded: 4,
},
),
},
),
peerPushPaymentInitiations: describeStore(

View File

@ -77,6 +77,7 @@ import {
PeerPullPaymentIncomingStatus,
PeerPushPaymentCoinSelection,
PeerPushPaymentIncomingRecord,
PeerPushPaymentIncomingStatus,
PeerPushPaymentInitiationStatus,
ReserveRecord,
WithdrawalGroupStatus,
@ -619,18 +620,50 @@ export const codecForExchangePurseStatus = (): Codec<ExchangePurseStatus> =>
.property("balance", codecForAmountString())
.build("ExchangePurseStatus");
export async function checkPeerPushPayment(
export async function preparePeerPushCredit(
ws: InternalWalletState,
req: CheckPeerPushPaymentRequest,
): Promise<CheckPeerPushPaymentResponse> {
// FIXME: Check if existing record exists!
const uri = parsePayPushUri(req.talerUri);
if (!uri) {
throw Error("got invalid taler://pay-push URI");
}
const existing = await ws.db
.mktx((x) => [x.contractTerms, x.peerPushPaymentIncoming])
.runReadOnly(async (tx) => {
const existingPushInc =
await tx.peerPushPaymentIncoming.indexes.byExchangeAndContractPriv.get([
uri.exchangeBaseUrl,
uri.contractPriv,
]);
if (!existingPushInc) {
return;
}
const existingContractTermsRec = await tx.contractTerms.get(
existingPushInc.contractTermsHash,
);
if (!existingContractTermsRec) {
throw Error(
"contract terms for peer push payment credit not found in database",
);
}
const existingContractTerms = codecForPeerContractTerms().decode(
existingContractTermsRec.contractTermsRaw,
);
return { existingPushInc, existingContractTerms };
});
if (existing) {
return {
amount: existing.existingContractTerms.amount,
contractTerms: existing.existingContractTerms,
peerPushPaymentIncomingId:
existing.existingPushInc.peerPushPaymentIncomingId,
};
}
const exchangeBaseUrl = uri.exchangeBaseUrl;
await updateExchangeFromUrl(ws, exchangeBaseUrl);
@ -670,6 +703,8 @@ export async function checkPeerPushPayment(
dec.contractTerms,
);
const withdrawalGroupId = encodeCrock(getRandomBytes(32));
await ws.db
.mktx((x) => [x.contractTerms, x.peerPushPaymentIncoming])
.runReadWrite(async (tx) => {
@ -681,7 +716,8 @@ export async function checkPeerPushPayment(
pursePub: pursePub,
timestamp: TalerProtocolTimestamp.now(),
contractTermsHash,
status: OperationStatus.Finished,
status: PeerPushPaymentIncomingStatus.Proposed,
withdrawalGroupId,
});
await tx.contractTerms.put({
@ -754,18 +790,16 @@ async function getMergeReserveInfo(
return mergeReserveRecord;
}
export async function acceptPeerPushPayment(
export async function processPeerPushCredit(
ws: InternalWalletState,
req: AcceptPeerPushPaymentRequest,
): Promise<AcceptPeerPushPaymentResponse> {
peerPushPaymentIncomingId: string,
): Promise<OperationAttemptResult> {
let peerInc: PeerPushPaymentIncomingRecord | undefined;
let contractTerms: PeerContractTerms | undefined;
await ws.db
.mktx((x) => [x.contractTerms, x.peerPushPaymentIncoming])
.runReadOnly(async (tx) => {
peerInc = await tx.peerPushPaymentIncoming.get(
req.peerPushPaymentIncomingId,
);
.runReadWrite(async (tx) => {
peerInc = await tx.peerPushPaymentIncoming.get(peerPushPaymentIncomingId);
if (!peerInc) {
return;
}
@ -773,18 +807,17 @@ export async function acceptPeerPushPayment(
if (ctRec) {
contractTerms = ctRec.contractTermsRaw;
}
await tx.peerPushPaymentIncoming.put(peerInc);
});
if (!peerInc) {
throw Error(
`can't accept unknown incoming p2p push payment (${req.peerPushPaymentIncomingId})`,
`can't accept unknown incoming p2p push payment (${peerPushPaymentIncomingId})`,
);
}
checkDbInvariant(!!contractTerms);
await updateExchangeFromUrl(ws, peerInc.exchangeBaseUrl);
const amount = Amounts.parseOrThrow(contractTerms.amount);
const mergeReserveInfo = await getMergeReserveInfo(ws, {
@ -825,16 +858,17 @@ export async function acceptPeerPushPayment(
const mergeHttpReq = await ws.http.postJson(mergePurseUrl.href, mergeReq);
logger.info(`merge request: ${j2s(mergeReq)}`);
logger.trace(`merge request: ${j2s(mergeReq)}`);
const res = await readSuccessResponseJsonOrThrow(mergeHttpReq, codecForAny());
logger.info(`merge response: ${j2s(res)}`);
logger.trace(`merge response: ${j2s(res)}`);
const wg = await internalCreateWithdrawalGroup(ws, {
await internalCreateWithdrawalGroup(ws, {
amount,
wgInfo: {
withdrawalType: WithdrawalRecordType.PeerPushCredit,
contractTerms,
},
forcedWithdrawalGroupId: peerInc.withdrawalGroupId,
exchangeBaseUrl: peerInc.exchangeBaseUrl,
reserveStatus: WithdrawalGroupStatus.QueryingStatus,
reserveKeyPair: {
@ -843,10 +877,72 @@ export async function acceptPeerPushPayment(
},
});
await ws.db
.mktx((x) => [x.contractTerms, x.peerPushPaymentIncoming])
.runReadWrite(async (tx) => {
const peerInc = await tx.peerPushPaymentIncoming.get(
peerPushPaymentIncomingId,
);
if (!peerInc) {
return;
}
if (peerInc.status === PeerPushPaymentIncomingStatus.Accepted) {
peerInc.status = PeerPushPaymentIncomingStatus.WithdrawalCreated;
}
await tx.peerPushPaymentIncoming.put(peerInc);
});
return {
type: OperationAttemptResultType.Finished,
result: undefined,
};
}
export async function acceptPeerPushPayment(
ws: InternalWalletState,
req: AcceptPeerPushPaymentRequest,
): Promise<AcceptPeerPushPaymentResponse> {
let peerInc: PeerPushPaymentIncomingRecord | undefined;
let contractTerms: PeerContractTerms | undefined;
await ws.db
.mktx((x) => [x.contractTerms, x.peerPushPaymentIncoming])
.runReadWrite(async (tx) => {
peerInc = await tx.peerPushPaymentIncoming.get(
req.peerPushPaymentIncomingId,
);
if (!peerInc) {
return;
}
const ctRec = await tx.contractTerms.get(peerInc.contractTermsHash);
if (ctRec) {
contractTerms = ctRec.contractTermsRaw;
}
if (peerInc.status === PeerPushPaymentIncomingStatus.Proposed) {
peerInc.status = PeerPushPaymentIncomingStatus.Accepted;
}
await tx.peerPushPaymentIncoming.put(peerInc);
});
if (!peerInc) {
throw Error(
`can't accept unknown incoming p2p push payment (${req.peerPushPaymentIncomingId})`,
);
}
checkDbInvariant(!!contractTerms);
await updateExchangeFromUrl(ws, peerInc.exchangeBaseUrl);
const retryTag = RetryTags.forPeerPushCredit(peerInc);
await runOperationWithErrorReporting(ws, retryTag, () =>
processPeerPushCredit(ws, req.peerPushPaymentIncomingId),
);
return {
transactionId: makeTransactionId(
TransactionType.PeerPushCredit,
wg.withdrawalGroupId,
req.peerPushPaymentIncomingId,
),
};
}
@ -1017,7 +1113,7 @@ export async function acceptIncomingPeerPullPayment(
* Look up information about an incoming peer pull payment.
* Store the results in the wallet DB.
*/
export async function prepareIncomingPeerPullPayment(
export async function preparePeerPullCredit(
ws: InternalWalletState,
req: CheckPeerPullPaymentRequest,
): Promise<CheckPeerPullPaymentResponse> {
@ -1135,7 +1231,7 @@ export async function prepareIncomingPeerPullPayment(
};
}
export async function processPeerPullInitiation(
export async function processPeerPullCredit(
ws: InternalWalletState,
pursePub: string,
): Promise<OperationAttemptResult> {
@ -1359,6 +1455,8 @@ export async function initiatePeerPullPayment(
const contractKeyPair = await ws.cryptoApi.createEddsaKeypair({});
const withdrawalGroupId = encodeCrock(getRandomBytes(32));
const mergeReserveRowId = mergeReserveInfo.rowId;
checkDbInvariant(!!mergeReserveRowId);
@ -1379,6 +1477,7 @@ export async function initiatePeerPullPayment(
mergeReserveRowId: mergeReserveRowId,
contractPriv: contractKeyPair.priv,
contractPub: contractKeyPair.pub,
withdrawalGroupId,
});
await tx.contractTerms.put({
contractTermsRaw: contractTerms,
@ -1394,20 +1493,24 @@ export async function initiatePeerPullPayment(
ws,
RetryTags.byPeerPullPaymentInitiationPursePub(pursePair.pub),
async () => {
return processPeerPullInitiation(ws, pursePair.pub);
return processPeerPullCredit(ws, pursePair.pub);
},
);
// FIXME: Why do we create this only here?
// What if the previous operation didn't succeed?
const wg = await internalCreateWithdrawalGroup(ws, {
// FIXME: Use a pre-computed withdrawal group ID
// so we don't create it multiple times.
await internalCreateWithdrawalGroup(ws, {
amount: instructedAmount,
wgInfo: {
withdrawalType: WithdrawalRecordType.PeerPullCredit,
contractTerms,
contractPriv: contractKeyPair.priv,
},
forcedWithdrawalGroupId: withdrawalGroupId,
exchangeBaseUrl: exchangeBaseUrl,
reserveStatus: WithdrawalGroupStatus.QueryingStatus,
reserveKeyPair: {
@ -1423,7 +1526,7 @@ export async function initiatePeerPullPayment(
}),
transactionId: makeTransactionId(
TransactionType.PeerPullCredit,
wg.withdrawalGroupId,
pursePair.pub,
),
};
}

View File

@ -30,6 +30,7 @@ import {
OperationStatusRange,
PeerPushPaymentInitiationStatus,
PeerPullPaymentIncomingStatus,
PeerPushPaymentIncomingStatus,
} from "../db.js";
import {
PendingOperationsResponse,
@ -430,6 +431,35 @@ async function gatherPeerPushInitiationPending(
});
}
async function gatherPeerPushCreditPending(
ws: InternalWalletState,
tx: GetReadOnlyAccess<{
peerPushPaymentIncoming: typeof WalletStoresV1.peerPushPaymentIncoming;
operationRetries: typeof WalletStoresV1.operationRetries;
}>,
now: AbsoluteTime,
resp: PendingOperationsResponse,
): Promise<void> {
await tx.peerPushPaymentIncoming.iter().forEachAsync(async (pi) => {
switch (pi.status) {
case PeerPushPaymentIncomingStatus.Accepted:
return;
case PeerPushPaymentIncomingStatus.WithdrawalCreated:
return;
}
const opId = RetryTags.forPeerPushCredit(pi);
const retryRecord = await tx.operationRetries.get(opId);
const timestampDue = retryRecord?.retryInfo.nextRetry ?? AbsoluteTime.now();
resp.pendingOperations.push({
type: PendingTaskType.PeerPushCredit,
...getPendingCommon(ws, opId, timestampDue),
givesLifeness: true,
retryInfo: retryRecord?.retryInfo,
peerPushPaymentIncomingId: pi.peerPushPaymentIncomingId,
});
});
}
export async function getPendingOperations(
ws: InternalWalletState,
): Promise<PendingOperationsResponse> {
@ -451,6 +481,7 @@ export async function getPendingOperations(
x.peerPullPaymentInitiations,
x.peerPushPaymentInitiations,
x.peerPullPaymentIncoming,
x.peerPushPaymentIncoming,
])
.runReadWrite(async (tx) => {
const resp: PendingOperationsResponse = {
@ -467,6 +498,7 @@ export async function getPendingOperations(
await gatherPeerPushInitiationPending(ws, tx, now, resp);
await gatherPeerPullInitiationPending(ws, tx, now, resp);
await gatherPeerPullDebitPending(ws, tx, now, resp);
await gatherPeerPushCreditPending(ws, tx, now, resp);
return resp;
});
}

View File

@ -1839,6 +1839,7 @@ export async function internalCreateWithdrawalGroup(
reserveStatus: WithdrawalGroupStatus;
amount: AmountJson;
exchangeBaseUrl: string;
forcedWithdrawalGroupId?: string;
forcedDenomSel?: ForcedDenomSel;
reserveKeyPair?: EddsaKeypair;
restrictAge?: number;
@ -1850,9 +1851,16 @@ export async function internalCreateWithdrawalGroup(
const now = AbsoluteTime.toTimestamp(AbsoluteTime.now());
const secretSeed = encodeCrock(getRandomBytes(32));
const canonExchange = canonicalizeBaseUrl(args.exchangeBaseUrl);
const withdrawalGroupId = encodeCrock(getRandomBytes(32));
const amount = args.amount;
let withdrawalGroupId;
if (args.forcedWithdrawalGroupId) {
withdrawalGroupId = args.forcedWithdrawalGroupId;
} else {
withdrawalGroupId = encodeCrock(getRandomBytes(32));
}
await updateWithdrawalDenoms(ws, canonExchange);
const denoms = await getCandidateWithdrawalDenoms(ws, canonExchange);

View File

@ -37,8 +37,10 @@ export enum PendingTaskType {
Withdraw = "withdraw",
Deposit = "deposit",
Backup = "backup",
// FIXME: Rename to peer-push-debit and peer-pull-debit
PeerPushInitiation = "peer-push-initiation",
PeerPullInitiation = "peer-pull-initiation",
PeerPushCredit = "peer-push-credit",
PeerPullDebit = "peer-pull-debit",
}
@ -59,6 +61,7 @@ export type PendingTaskInfo = PendingTaskInfoCommon &
| PendingPeerPushInitiationTask
| PendingPeerPullInitiationTask
| PendingPeerPullDebitTask
| PendingPeerPushCreditTask
);
export interface PendingBackupTask {
@ -100,6 +103,13 @@ export interface PendingPeerPullDebitTask {
peerPullPaymentIncomingId: string;
}
/**
*/
export interface PendingPeerPushCreditTask {
type: PendingTaskType.PeerPushCredit;
peerPushPaymentIncomingId: string;
}
/**
* The wallet should check whether coins from this exchange
* need to be auto-refreshed.

View File

@ -33,6 +33,7 @@ import {
ExchangeRecord,
PeerPullPaymentIncomingRecord,
PeerPullPaymentInitiationRecord,
PeerPushPaymentIncomingRecord,
PeerPushPaymentInitiationRecord,
PurchaseRecord,
RecoupGroupRecord,
@ -221,6 +222,11 @@ export namespace RetryTags {
): string {
return `${PendingTaskType.PeerPullDebit}:${ppi.pursePub}`;
}
export function forPeerPushCredit(
ppi: PeerPushPaymentIncomingRecord,
): string {
return `${PendingTaskType.PeerPushCredit}:${ppi.pursePub}`;
}
export function byPaymentProposalId(proposalId: string): string {
return `${PendingTaskType.Purchase}:${proposalId}`;
}

View File

@ -197,15 +197,16 @@ import {
import {
acceptIncomingPeerPullPayment,
acceptPeerPushPayment,
prepareIncomingPeerPullPayment,
checkPeerPushPayment,
preparePeerPullCredit,
preparePeerPushCredit,
initiatePeerPullPayment,
initiatePeerPushPayment,
checkPeerPullPaymentInitiation,
preparePeerPushPayment,
processPeerPullInitiation,
processPeerPullCredit,
processPeerPushInitiation,
processPeerPullDebit,
processPeerPushCredit,
} from "./operations/pay-peer.js";
import { getPendingOperations } from "./operations/pending.js";
import {
@ -328,9 +329,11 @@ async function callOperationHandler(
case PendingTaskType.PeerPushInitiation:
return await processPeerPushInitiation(ws, pending.pursePub);
case PendingTaskType.PeerPullInitiation:
return await processPeerPullInitiation(ws, pending.pursePub);
return await processPeerPullCredit(ws, pending.pursePub);
case PendingTaskType.PeerPullDebit:
return await processPeerPullDebit(ws, pending.peerPullPaymentIncomingId);
case PendingTaskType.PeerPushCredit:
return await processPeerPushCredit(ws, pending.peerPushPaymentIncomingId);
default:
return assertUnreachable(pending);
}
@ -1435,7 +1438,7 @@ async function dispatchRequestInternal<Op extends WalletApiOperation>(
}
case WalletApiOperation.CheckPeerPushPayment: {
const req = codecForCheckPeerPushPaymentRequest().decode(payload);
return await checkPeerPushPayment(ws, req);
return await preparePeerPushCredit(ws, req);
}
case WalletApiOperation.AcceptPeerPushPayment: {
const req = codecForAcceptPeerPushPaymentRequest().decode(payload);
@ -1451,7 +1454,7 @@ async function dispatchRequestInternal<Op extends WalletApiOperation>(
}
case WalletApiOperation.CheckPeerPullPayment: {
const req = codecForCheckPeerPullPaymentRequest().decode(payload);
return await prepareIncomingPeerPullPayment(ws, req);
return await preparePeerPullCredit(ws, req);
}
case WalletApiOperation.AcceptPeerPullPayment: {
const req = codecForAcceptPeerPullPaymentRequest().decode(payload);