-get p2p pull integration test to run through

This commit is contained in:
Florian Dold 2022-08-24 11:11:02 +02:00
parent bc434ebb83
commit d32d2895ce
No known key found for this signature in database
GPG Key ID: D2E4F00F29D02A4B
11 changed files with 341 additions and 111 deletions

View File

@ -1945,3 +1945,8 @@ export interface ExchangeReservePurseRequest {
// if it has not been paid.
purse_expiration: TalerProtocolTimestamp;
}
export interface ExchangePurseDeposits {
// Array of coins to deposit into the purse.
deposits: PurseDeposit[];
}

View File

@ -1940,7 +1940,7 @@ export class WalletCli {
`wallet-${self.name}`,
`taler-wallet-cli ${
self.timetravelArg ?? ""
} --no-throttle -LTRACE --wallet-db '${
} --no-throttle -LTRACE --skip-defaults --wallet-db '${
self.dbfile
}' api '${op}' ${shellWrap(JSON.stringify(payload))}`,
);
@ -1990,6 +1990,7 @@ export class WalletCli {
"--no-throttle",
...this.timetravelArgArr,
"-LTRACE",
"--skip-defaults",
"--wallet-db",
this.dbfile,
"run-until-done",
@ -2005,6 +2006,7 @@ export class WalletCli {
"taler-wallet-cli",
[
"--no-throttle",
"--skip-defaults",
"-LTRACE",
...this.timetravelArgArr,
"--wallet-db",

View File

@ -197,6 +197,9 @@ export const walletCli = clk
})
.flag("verbose", ["-V", "--verbose"], {
help: "Enable verbose output.",
})
.flag("skipDefaults", ["--skip-defaults"], {
help: "Skip configuring default exchanges.",
});
type WalletCliArgsType = clk.GetArgType<typeof walletCli>;
@ -233,7 +236,9 @@ async function withWallet<T>(
ws: wallet,
client: wallet.client,
};
await wallet.handleCoreApiRequest("initWallet", "native-init", {});
await wallet.handleCoreApiRequest("initWallet", "native-init", {
skipDefaults: walletCliArgs.wallet.skipDefaults,
});
const ret = await f(w);
return ret;
} catch (e) {
@ -1159,10 +1164,7 @@ testCli
const salt = getRandomBytes(32);
tDerive.start();
const deriv = await AgeRestriction.commitmentDerive(
commitProof,
salt,
);
const deriv = await AgeRestriction.commitmentDerive(commitProof, salt);
tDerive.stop();
tCompare.start();

View File

@ -17,6 +17,7 @@
/**
* Imports.
*/
import { j2s } from "@gnu-taler/taler-util";
import { WalletApiOperation } from "@gnu-taler/taler-wallet-core";
import { GlobalTestState } from "../harness/harness.js";
import {
@ -57,6 +58,8 @@ export async function runPeerToPeerPullTest(t: GlobalTestState) {
},
);
console.log(`checkResp: ${j2s(checkResp)}`);
const acceptResp = await wallet.client.call(
WalletApiOperation.AcceptPeerPullPayment,
{
@ -64,6 +67,10 @@ export async function runPeerToPeerPullTest(t: GlobalTestState) {
},
);
const txs = await wallet.client.call(WalletApiOperation.GetTransactions, {});
console.log(`transactions: ${j2s(txs)}`);
await wallet.runUntilDone();
}

View File

@ -1669,13 +1669,27 @@ export interface PeerPushPaymentIncomingRecord {
contractPriv: string;
timestampAccepted: TalerProtocolTimestamp;
timestamp: TalerProtocolTimestamp;
contractTerms: PeerContractTerms;
// FIXME: add status etc.
}
export interface PeerPullPaymentIncomingRecord {
peerPullPaymentIncomingId: string;
pursePub: string;
exchangeBaseUrl: string;
contractTerms: PeerContractTerms;
timestamp: TalerProtocolTimestamp;
contractPriv: string;
}
export const WalletStoresV1 = {
coins: describeStore(
describeContents<CoinRecord>("coins", {
@ -1853,6 +1867,17 @@ export const WalletStoresV1 = {
]),
},
),
peerPullPaymentIncoming: describeStore(
describeContents<PeerPullPaymentIncomingRecord>("peerPullPaymentIncoming", {
keyPath: "peerPullPaymentIncomingId",
}),
{
byExchangeAndPurse: describeIndex("byExchangeAndPurse", [
"exchangeBaseUrl",
"pursePub",
]),
},
),
peerPullPaymentInitiation: describeStore(
describeContents<PeerPullPaymentInitiationRecord>(
"peerPushPaymentInitiation",

View File

@ -132,6 +132,7 @@ export async function getDefaultNodeWallet2(
});
// Atomically move the temporary file onto the DB path.
fs.renameSync(tmpPath, args.persistentStoragePath);
logger.trace("committing database done");
};
}
@ -178,7 +179,7 @@ export async function getDefaultNodeWallet2(
}
}
const timer = new SetTimeoutTimerAPI()
const timer = new SetTimeoutTimerAPI();
const w = await Wallet.create(myDb, myHttpLib, timer, workerFactory);

View File

@ -65,10 +65,7 @@ import {
} from "../util/http.js";
import { DbAccess, GetReadOnlyAccess } from "../util/query.js";
import { RetryInfo } from "../util/retries.js";
import {
WALLET_CACHE_BREAKER_CLIENT_VERSION,
WALLET_EXCHANGE_PROTOCOL_VERSION,
} from "../versions.js";
import { WALLET_EXCHANGE_PROTOCOL_VERSION } from "../versions.js";
import { guardOperationException } from "./common.js";
const logger = new Logger("exchanges.ts");
@ -169,7 +166,6 @@ export async function downloadExchangeWithTermsOfService(
contentType: string,
): Promise<ExchangeTosDownloadResult> {
const reqUrl = new URL("terms", exchangeBaseUrl);
reqUrl.searchParams.set("cacheBreaker", WALLET_CACHE_BREAKER_CLIENT_VERSION);
const headers = {
Accept: contentType,
};
@ -352,7 +348,6 @@ async function downloadExchangeWireInfo(
timeout: Duration,
): Promise<ExchangeWireJson> {
const reqUrl = new URL("wire", exchangeBaseUrl);
reqUrl.searchParams.set("cacheBreaker", WALLET_CACHE_BREAKER_CLIENT_VERSION);
const resp = await http.get(reqUrl.href, {
timeout,
@ -439,7 +434,6 @@ async function downloadExchangeKeysInfo(
timeout: Duration,
): Promise<ExchangeKeysDownloadResult> {
const keysUrl = new URL("keys", baseUrl);
keysUrl.searchParams.set("cacheBreaker", WALLET_CACHE_BREAKER_CLIENT_VERSION);
const resp = await http.get(keysUrl.href, {
timeout,
@ -449,9 +443,6 @@ async function downloadExchangeKeysInfo(
codecForExchangeKeysJson(),
);
logger.trace("received /keys response");
logger.trace(`${j2s(exchangeKeysJsonUnchecked)}`);
if (exchangeKeysJsonUnchecked.denoms.length === 0) {
throw TalerError.fromDetail(
TalerErrorCode.WALLET_EXCHANGE_DENOMINATIONS_INSUFFICIENT,

View File

@ -19,23 +19,30 @@
*/
import {
AbsoluteTime,
AcceptPeerPullPaymentRequest,
AcceptPeerPushPaymentRequest,
AmountJson,
AmountLike,
Amounts,
AmountString,
buildCodecForObject,
CheckPeerPullPaymentRequest,
CheckPeerPullPaymentResponse,
CheckPeerPushPaymentRequest,
CheckPeerPushPaymentResponse,
Codec,
codecForAmountString,
codecForAny,
codecForExchangeGetContractResponse,
CoinPublicKey,
constructPayPullUri,
constructPayPushUri,
ContractTermsUtil,
decodeCrock,
Duration,
eddsaGetPublic,
encodeCrock,
ExchangePurseDeposits,
ExchangePurseMergeRequest,
ExchangeReservePurseRequest,
getRandomBytes,
@ -45,7 +52,9 @@ import {
InitiatePeerPushPaymentResponse,
j2s,
Logger,
parsePayPullUri,
parsePayPushUri,
RefreshReason,
strcmp,
TalerProtocolTimestamp,
UnblindedSignature,
@ -54,14 +63,15 @@ import {
import {
CoinStatus,
MergeReserveInfo,
OperationStatus,
ReserveRecordStatus,
WithdrawalGroupRecord,
WalletStoresV1,
} from "../db.js";
import { readSuccessResponseJsonOrThrow } from "../util/http.js";
import { InternalWalletState } from "../internal-wallet-state.js";
import { checkDbInvariant } from "../util/invariants.js";
import { internalCreateWithdrawalGroup } from "./withdraw.js";
import { GetReadOnlyAccess } from "../util/query.js";
import { createRefreshGroup } from "./refresh.js";
const logger = new Logger("operations/peer-to-peer.ts");
@ -105,93 +115,125 @@ interface CoinInfo {
denomSig: UnblindedSignature;
}
export async function selectPeerCoins(
ws: InternalWalletState,
tx: GetReadOnlyAccess<{
exchanges: typeof WalletStoresV1.exchanges;
denominations: typeof WalletStoresV1.denominations;
coins: typeof WalletStoresV1.coins;
}>,
instructedAmount: AmountJson,
): Promise<PeerCoinSelection | undefined> {
const exchanges = await tx.exchanges.iter().toArray();
for (const exch of exchanges) {
if (exch.detailsPointer?.currency !== instructedAmount.currency) {
continue;
}
const coins = (
await tx.coins.indexes.byBaseUrl.getAll(exch.baseUrl)
).filter((x) => x.status === CoinStatus.Fresh);
const coinInfos: CoinInfo[] = [];
for (const coin of coins) {
const denom = await ws.getDenomInfo(
ws,
tx,
coin.exchangeBaseUrl,
coin.denomPubHash,
);
if (!denom) {
throw Error("denom not found");
}
coinInfos.push({
coinPub: coin.coinPub,
feeDeposit: denom.feeDeposit,
value: denom.value,
denomPubHash: denom.denomPubHash,
coinPriv: coin.coinPriv,
denomSig: coin.denomSig,
});
}
if (coinInfos.length === 0) {
continue;
}
coinInfos.sort(
(o1, o2) =>
-Amounts.cmp(o1.value, o2.value) ||
strcmp(o1.denomPubHash, o2.denomPubHash),
);
let amountAcc = Amounts.getZero(instructedAmount.currency);
let depositFeesAcc = Amounts.getZero(instructedAmount.currency);
const resCoins: {
coinPub: string;
coinPriv: string;
contribution: AmountString;
denomPubHash: string;
denomSig: UnblindedSignature;
}[] = [];
for (const coin of coinInfos) {
if (Amounts.cmp(amountAcc, instructedAmount) >= 0) {
const res: PeerCoinSelection = {
exchangeBaseUrl: exch.baseUrl,
coins: resCoins,
depositFees: depositFeesAcc,
};
return res;
}
const gap = Amounts.add(
coin.feeDeposit,
Amounts.sub(instructedAmount, amountAcc).amount,
).amount;
const contrib = Amounts.min(gap, coin.value);
amountAcc = Amounts.add(
amountAcc,
Amounts.sub(contrib, coin.feeDeposit).amount,
).amount;
depositFeesAcc = Amounts.add(depositFeesAcc, coin.feeDeposit).amount;
resCoins.push({
coinPriv: coin.coinPriv,
coinPub: coin.coinPub,
contribution: Amounts.stringify(contrib),
denomPubHash: coin.denomPubHash,
denomSig: coin.denomSig,
});
}
continue;
}
return undefined;
}
export async function initiatePeerToPeerPush(
ws: InternalWalletState,
req: InitiatePeerPushPaymentRequest,
): Promise<InitiatePeerPushPaymentResponse> {
// FIXME: actually create a record for retries here!
const instructedAmount = Amounts.parseOrThrow(req.amount);
const coinSelRes: PeerCoinSelection | undefined = await ws.db
.mktx((x) => ({
exchanges: x.exchanges,
coins: x.coins,
denominations: x.denominations,
refreshGroups: x.refreshGroups,
}))
.runReadOnly(async (tx) => {
const exchanges = await tx.exchanges.iter().toArray();
for (const exch of exchanges) {
if (exch.detailsPointer?.currency !== instructedAmount.currency) {
continue;
}
const coins = (
await tx.coins.indexes.byBaseUrl.getAll(exch.baseUrl)
).filter((x) => x.status === CoinStatus.Fresh);
const coinInfos: CoinInfo[] = [];
for (const coin of coins) {
const denom = await ws.getDenomInfo(
ws,
tx,
coin.exchangeBaseUrl,
coin.denomPubHash,
);
if (!denom) {
throw Error("denom not found");
}
coinInfos.push({
coinPub: coin.coinPub,
feeDeposit: denom.feeDeposit,
value: denom.value,
denomPubHash: denom.denomPubHash,
coinPriv: coin.coinPriv,
denomSig: coin.denomSig,
});
}
if (coinInfos.length === 0) {
continue;
}
coinInfos.sort(
(o1, o2) =>
-Amounts.cmp(o1.value, o2.value) ||
strcmp(o1.denomPubHash, o2.denomPubHash),
);
let amountAcc = Amounts.getZero(instructedAmount.currency);
let depositFeesAcc = Amounts.getZero(instructedAmount.currency);
const resCoins: {
coinPub: string;
coinPriv: string;
contribution: AmountString;
denomPubHash: string;
denomSig: UnblindedSignature;
}[] = [];
for (const coin of coinInfos) {
if (Amounts.cmp(amountAcc, instructedAmount) >= 0) {
const res: PeerCoinSelection = {
exchangeBaseUrl: exch.baseUrl,
coins: resCoins,
depositFees: depositFeesAcc,
};
return res;
}
const gap = Amounts.add(
coin.feeDeposit,
Amounts.sub(instructedAmount, amountAcc).amount,
).amount;
const contrib = Amounts.min(gap, coin.value);
amountAcc = Amounts.add(
amountAcc,
Amounts.sub(contrib, coin.feeDeposit).amount,
).amount;
depositFeesAcc = Amounts.add(depositFeesAcc, coin.feeDeposit).amount;
resCoins.push({
coinPriv: coin.coinPriv,
coinPub: coin.coinPub,
contribution: Amounts.stringify(contrib),
denomPubHash: coin.denomPubHash,
denomSig: coin.denomSig,
});
}
continue;
.runReadWrite(async (tx) => {
const sel = await selectPeerCoins(ws, tx, instructedAmount);
if (!sel) {
return undefined;
}
return undefined;
const pubs: CoinPublicKey[] = [];
for (const c of sel.coins) {
const coin = await tx.coins.get(c.coinPub);
checkDbInvariant(!!coin);
coin.currentAmount = Amounts.sub(
coin.currentAmount,
Amounts.parseOrThrow(c.contribution),
).amount;
await tx.coins.put(coin);
}
await createRefreshGroup(ws, tx, pubs, RefreshReason.Pay);
return sel;
});
logger.info(`selected p2p coins: ${j2s(coinSelRes)}`);
@ -339,7 +381,7 @@ export async function checkPeerPushPayment(
exchangeBaseUrl: exchangeBaseUrl,
mergePriv: dec.mergePriv,
pursePub: pursePub,
timestampAccepted: TalerProtocolTimestamp.now(),
timestamp: TalerProtocolTimestamp.now(),
contractTerms: dec.contractTerms,
});
});
@ -478,6 +520,148 @@ export async function acceptPeerPushPayment(
});
}
/**
* FIXME: Bad name!
*/
export async function acceptPeerPullPayment(
ws: InternalWalletState,
req: AcceptPeerPullPaymentRequest,
) {
const peerPullInc = await ws.db
.mktx((x) => ({ peerPullPaymentIncoming: x.peerPullPaymentIncoming }))
.runReadOnly(async (tx) => {
return tx.peerPullPaymentIncoming.get(req.peerPullPaymentIncomingId);
});
if (!peerPullInc) {
throw Error(
`can't accept unknown incoming p2p pull payment (${req.peerPullPaymentIncomingId})`,
);
}
const instructedAmount = Amounts.parseOrThrow(
peerPullInc.contractTerms.amount,
);
const coinSelRes: PeerCoinSelection | undefined = await ws.db
.mktx((x) => ({
exchanges: x.exchanges,
coins: x.coins,
denominations: x.denominations,
refreshGroups: x.refreshGroups,
}))
.runReadWrite(async (tx) => {
const sel = await selectPeerCoins(ws, tx, instructedAmount);
if (!sel) {
return undefined;
}
const pubs: CoinPublicKey[] = [];
for (const c of sel.coins) {
const coin = await tx.coins.get(c.coinPub);
checkDbInvariant(!!coin);
coin.currentAmount = Amounts.sub(
coin.currentAmount,
Amounts.parseOrThrow(c.contribution),
).amount;
await tx.coins.put(coin);
}
await createRefreshGroup(ws, tx, pubs, RefreshReason.Pay);
return sel;
});
logger.info(`selected p2p coins: ${j2s(coinSelRes)}`);
if (!coinSelRes) {
throw Error("insufficient balance");
}
const pursePub = peerPullInc.pursePub;
const depositSigsResp = await ws.cryptoApi.signPurseDeposits({
exchangeBaseUrl: coinSelRes.exchangeBaseUrl,
pursePub,
coins: coinSelRes.coins,
});
const purseDepositUrl = new URL(
`purses/${pursePub}/deposit`,
coinSelRes.exchangeBaseUrl,
);
const depositPayload: ExchangePurseDeposits = {
deposits: depositSigsResp.deposits,
};
const httpResp = await ws.http.postJson(purseDepositUrl.href, depositPayload);
const resp = await readSuccessResponseJsonOrThrow(httpResp, codecForAny());
logger.trace(`purse deposit response: ${j2s(resp)}`);
}
export async function checkPeerPullPayment(
ws: InternalWalletState,
req: CheckPeerPullPaymentRequest,
): Promise<CheckPeerPullPaymentResponse> {
const uri = parsePayPullUri(req.talerUri);
if (!uri) {
throw Error("got invalid taler://pay-push URI");
}
const exchangeBaseUrl = uri.exchangeBaseUrl;
const contractPriv = uri.contractPriv;
const contractPub = encodeCrock(eddsaGetPublic(decodeCrock(contractPriv)));
const getContractUrl = new URL(`contracts/${contractPub}`, exchangeBaseUrl);
const contractHttpResp = await ws.http.get(getContractUrl.href);
const contractResp = await readSuccessResponseJsonOrThrow(
contractHttpResp,
codecForExchangeGetContractResponse(),
);
const pursePub = contractResp.purse_pub;
const dec = await ws.cryptoApi.decryptContractForDeposit({
ciphertext: contractResp.econtract,
contractPriv: contractPriv,
pursePub: pursePub,
});
const getPurseUrl = new URL(`purses/${pursePub}/merge`, exchangeBaseUrl);
const purseHttpResp = await ws.http.get(getPurseUrl.href);
const purseStatus = await readSuccessResponseJsonOrThrow(
purseHttpResp,
codecForExchangePurseStatus(),
);
const peerPullPaymentIncomingId = encodeCrock(getRandomBytes(32));
await ws.db
.mktx((x) => ({
peerPullPaymentIncoming: x.peerPullPaymentIncoming,
}))
.runReadWrite(async (tx) => {
await tx.peerPullPaymentIncoming.add({
peerPullPaymentIncomingId,
contractPriv: contractPriv,
exchangeBaseUrl: exchangeBaseUrl,
pursePub: pursePub,
timestamp: TalerProtocolTimestamp.now(),
contractTerms: dec.contractTerms,
});
});
return {
amount: purseStatus.balance,
contractTerms: dec.contractTerms,
peerPullPaymentIncomingId,
};
}
export async function initiatePeerRequestForPay(
ws: InternalWalletState,
req: InitiatePeerPullPaymentRequest,
@ -580,10 +764,18 @@ export async function initiatePeerRequestForPay(
logger.info(`reserve merge response: ${j2s(resp)}`);
// FIXME: Now create a withdrawal operation!
await internalCreateWithdrawalGroup(ws, {
amount: Amounts.parseOrThrow(req.amount),
exchangeBaseUrl: req.exchangeBaseUrl,
reserveStatus: ReserveRecordStatus.QueryingStatus,
reserveKeyPair: {
priv: mergeReserveInfo.reservePriv,
pub: mergeReserveInfo.reservePub,
},
});
return {
talerUri: constructPayPushUri({
talerUri: constructPayPullUri({
exchangeBaseUrl: req.exchangeBaseUrl,
contractPriv: econtractResp.contractPriv,
}),

View File

@ -34,12 +34,3 @@ export const WALLET_MERCHANT_PROTOCOL_VERSION = "2:0:1";
* Uses libtool's current:revision:age versioning.
*/
export const WALLET_BANK_INTEGRATION_PROTOCOL_VERSION = "0:0:0";
/**
* Cache breaker that is appended to queries such as /keys and /wire
* to break through caching, if it has been accidentally/badly configured
* by the exchange.
*
* This is only a temporary measure.
*/
export const WALLET_CACHE_BREAKER_CLIENT_VERSION = "5";

View File

@ -39,6 +39,7 @@ import {
codecForAny,
codecForApplyRefundFromPurchaseIdRequest,
codecForApplyRefundRequest,
codecForCheckPeerPullPaymentRequest,
codecForCheckPeerPushPaymentRequest,
codecForConfirmPayRequest,
codecForCreateDepositGroupRequest,
@ -150,7 +151,9 @@ import {
processPurchasePay,
} from "./operations/pay.js";
import {
acceptPeerPullPayment,
acceptPeerPushPayment,
checkPeerPullPayment,
checkPeerPushPayment,
initiatePeerRequestForPay,
initiatePeerToPeerPush,
@ -728,7 +731,12 @@ async function dispatchRequestInternal(
switch (operation) {
case "initWallet": {
ws.initCalled = true;
await fillDefaults(ws);
if (typeof payload === "object" && (payload as any).skipDefaults) {
logger.info("skipping defaults");
} else {
logger.info("filling defaults");
await fillDefaults(ws);
}
return {};
}
case "withdrawTestkudos": {
@ -1047,6 +1055,15 @@ async function dispatchRequestInternal(
const req = codecForInitiatePeerPullPaymentRequest().decode(payload);
return await initiatePeerRequestForPay(ws, req);
}
case "checkPeerPullPayment": {
const req = codecForCheckPeerPullPaymentRequest().decode(payload);
return await checkPeerPullPayment(ws, req);
}
case "acceptPeerPullPayment": {
const req = codecForAcceptPeerPullPaymentRequest().decode(payload);
await acceptPeerPullPayment(ws, req);
return {};
}
}
throw TalerError.fromDetail(
TalerErrorCode.WALLET_CORE_API_OPERATION_UNKNOWN,
@ -1239,10 +1256,8 @@ class InternalWalletStateImpl implements InternalWalletState {
const key = `${exchangeBaseUrl}:${denomPubHash}`;
const cached = this.denomCache[key];
if (cached) {
logger.trace("using cached denom");
return cached;
}
logger.trace("looking up denom denom");
const d = await tx.denominations.get([exchangeBaseUrl, denomPubHash]);
if (d) {
this.denomCache[key] = d;

View File

@ -68,7 +68,6 @@ function timeout<T>(ms: number, promise: Promise<T>): Promise<T> {
export async function queryToSlashKeys<T>(url: string): Promise<T> {
const endpoint = new URL("keys", url);
endpoint.searchParams.set("cacheBreaker", new Date().getTime() + "");
const query = fetch(endpoint.href)
.catch(() => {