test recoup, fix bug in reserve state machine, fix bug in recoup-refresh

This commit is contained in:
Florian Dold 2020-09-04 02:20:20 +05:30
parent 54c0d1c258
commit 9ec6018efe
No known key found for this signature in database
GPG Key ID: D2E4F00F29D02A4B
10 changed files with 261 additions and 62 deletions

View File

@ -69,6 +69,9 @@ import {
ApplyRefundRequest, ApplyRefundRequest,
codecForApplyRefundResponse, codecForApplyRefundResponse,
codecForAny, codecForAny,
CoinDumpJson,
ForceExchangeUpdateRequest,
ForceRefreshRequest,
} from "taler-wallet-core"; } from "taler-wallet-core";
import { URL } from "url"; import { URL } from "url";
import axios, { AxiosError } from "axios"; import axios, { AxiosError } from "axios";
@ -1077,6 +1080,23 @@ export class ExchangeService implements ExchangeServiceInterface {
); );
} }
async revokeDenomination(denomPubHash: string) {
if (this.isRunning()) {
throw Error("exchange must be stopped when revoking denominations");
}
await runCommand(
this.globalState,
"exchange-keyup",
"taler-exchange-keyup",
[
"-c", this.configFilename,
...this.timetravelArgArr,
"--revoke",
denomPubHash,
],
);
}
async start(): Promise<void> { async start(): Promise<void> {
if (this.isRunning()) { if (this.isRunning()) {
throw Error("exchange is already running"); throw Error("exchange is already running");
@ -1540,6 +1560,14 @@ export class WalletCli {
throw new OperationFailedError(resp.error); throw new OperationFailedError(resp.error);
} }
async dumpCoins(): Promise<CoinDumpJson> {
const resp = await this.apiRequest("dumpCoins", {});
if (resp.type === "response") {
return codecForAny().decode(resp.result);
}
throw new OperationFailedError(resp.error);
}
async addExchange(req: AddExchangeRequest): Promise<void> { async addExchange(req: AddExchangeRequest): Promise<void> {
const resp = await this.apiRequest("addExchange", req); const resp = await this.apiRequest("addExchange", req);
if (resp.type === "response") { if (resp.type === "response") {
@ -1548,6 +1576,22 @@ export class WalletCli {
throw new OperationFailedError(resp.error); throw new OperationFailedError(resp.error);
} }
async forceUpdateExchange(req: ForceExchangeUpdateRequest): Promise<void> {
const resp = await this.apiRequest("forceUpdateExchange", req);
if (resp.type === "response") {
return;
}
throw new OperationFailedError(resp.error);
}
async forceRefresh(req: ForceRefreshRequest): Promise<void> {
const resp = await this.apiRequest("forceRefresh", req);
if (resp.type === "response") {
return;
}
throw new OperationFailedError(resp.error);
}
async listExchanges(): Promise<ExchangesListRespose> { async listExchanges(): Promise<ExchangesListRespose> {
const resp = await this.apiRequest("listExchanges", {}); const resp = await this.apiRequest("listExchanges", {});
if (resp.type === "response") { if (resp.type === "response") {

View File

@ -36,8 +36,9 @@ import {
MerchantServiceInterface, MerchantServiceInterface,
BankApi, BankApi,
BankAccessApi, BankAccessApi,
MerchantPrivateApi,
} from "./harness"; } from "./harness";
import { AmountString } from "taler-wallet-core"; import { AmountString, Duration, PreparePayResultType, ConfirmPayResultType, ContractTerms } from "taler-wallet-core";
import { FaultInjectedMerchantService } from "./faultInjection"; import { FaultInjectedMerchantService } from "./faultInjection";
export interface SimpleTestEnvironment { export interface SimpleTestEnvironment {
@ -280,3 +281,85 @@ export async function withdrawViaBank(
const balApiResp = await wallet.apiRequest("getBalances", {}); const balApiResp = await wallet.apiRequest("getBalances", {});
t.assertTrue(balApiResp.type === "response"); t.assertTrue(balApiResp.type === "response");
} }
export async function applyTimeTravel(
timetravelDuration: Duration,
s: {
exchange?: ExchangeService;
merchant?: MerchantService;
wallet?: WalletCli;
},
): Promise<void> {
if (s.exchange) {
await s.exchange.stop();
s.exchange.setTimetravel(timetravelDuration);
await s.exchange.start();
await s.exchange.pingUntilAvailable();
}
if (s.merchant) {
await s.merchant.stop();
s.merchant.setTimetravel(timetravelDuration);
await s.merchant.start();
await s.merchant.pingUntilAvailable();
}
if (s.wallet) {
s.wallet.setTimetravel(timetravelDuration);
}
}
/**
* Make a simple payment and check that it succeeded.
*/
export async function makeTestPayment(t: GlobalTestState, args: {
merchant: MerchantServiceInterface,
wallet: WalletCli,
order: Partial<ContractTerms>,
instance?: string
}): Promise<void> {
// Set up order.
const { wallet, merchant } = args;
const instance = args.instance ?? "default";
const orderResp = await MerchantPrivateApi.createOrder(merchant, instance, {
order: {
summary: "Buy me!",
amount: "TESTKUDOS:5",
fulfillment_url: "taler://fulfillment-success/thx",
},
});
let orderStatus = await MerchantPrivateApi.queryPrivateOrderStatus(merchant, {
orderId: orderResp.order_id,
});
t.assertTrue(orderStatus.order_status === "unpaid");
// Make wallet pay for the order
const preparePayResult = await wallet.preparePay({
talerPayUri: orderStatus.taler_pay_uri,
});
t.assertTrue(
preparePayResult.status === PreparePayResultType.PaymentPossible,
);
const r2 = await wallet.confirmPay({
proposalId: preparePayResult.proposalId,
});
t.assertTrue(r2.type === ConfirmPayResultType.Done);
// Check if payment was successful.
orderStatus = await MerchantPrivateApi.queryPrivateOrderStatus(merchant, {
orderId: orderResp.order_id,
instance,
});
t.assertTrue(orderStatus.order_status === "paid");
}

View File

@ -20,14 +20,15 @@
import { import {
runTest, runTest,
GlobalTestState, GlobalTestState,
MerchantPrivateApi,
WalletCli,
} from "./harness"; } from "./harness";
import { createSimpleTestkudosEnvironment, withdrawViaBank } from "./helpers"; import {
import { PreparePayResultType } from "taler-wallet-core"; createSimpleTestkudosEnvironment,
withdrawViaBank,
makeTestPayment,
} from "./helpers";
/** /**
* Run test for basic, bank-integrated withdrawal. * Run test for basic, bank-integrated withdrawal and payment.
*/ */
runTest(async (t: GlobalTestState) => { runTest(async (t: GlobalTestState) => {
// Set up test environment // Set up test environment
@ -43,45 +44,11 @@ runTest(async (t: GlobalTestState) => {
await withdrawViaBank(t, { wallet, bank, exchange, amount: "TESTKUDOS:20" }); await withdrawViaBank(t, { wallet, bank, exchange, amount: "TESTKUDOS:20" });
// Set up order. const order = {
summary: "Buy me!",
amount: "TESTKUDOS:5",
fulfillment_url: "taler://fulfillment-success/thx",
};
const orderResp = await MerchantPrivateApi.createOrder(merchant, "default", { await makeTestPayment(t, { wallet, merchant, order });
order: {
summary: "Buy me!",
amount: "TESTKUDOS:5",
fulfillment_url: "taler://fulfillment-success/thx",
},
});
let orderStatus = await MerchantPrivateApi.queryPrivateOrderStatus(merchant, {
orderId: orderResp.order_id,
});
t.assertTrue(orderStatus.order_status === "unpaid");
// Make wallet pay for the order
const preparePayResult = await wallet.preparePay({
talerPayUri: orderStatus.taler_pay_uri,
});
t.assertTrue(
preparePayResult.status === PreparePayResultType.PaymentPossible,
);
const r2 = await wallet.apiRequest("confirmPay", {
// FIXME: should be validated, don't cast!
proposalId: preparePayResult.proposalId,
});
t.assertTrue(r2.type === "response");
// Check if payment was successful.
orderStatus = await MerchantPrivateApi.queryPrivateOrderStatus(merchant, {
orderId: orderResp.order_id,
});
t.assertTrue(orderStatus.order_status === "paid");
await t.shutdown();
}); });

View File

@ -8,7 +8,7 @@ import { IDBFactory, IDBDatabase } from "idb-bridge";
* with each major change. When incrementing the major version, * with each major change. When incrementing the major version,
* the wallet should import data from the previous version. * the wallet should import data from the previous version.
*/ */
const TALER_DB_NAME = "taler-walletdb-v9"; const TALER_DB_NAME = "taler-walletdb-v10";
/** /**
* Current database minor version, should be incremented * Current database minor version, should be incremented

View File

@ -201,6 +201,7 @@ async function recoupWithdrawCoin(
const currency = updatedCoin.currentAmount.currency; const currency = updatedCoin.currentAmount.currency;
updatedCoin.currentAmount = Amounts.getZero(currency); updatedCoin.currentAmount = Amounts.getZero(currency);
updatedReserve.reserveStatus = ReserveRecordStatus.QUERYING_STATUS; updatedReserve.reserveStatus = ReserveRecordStatus.QUERYING_STATUS;
updatedReserve.retryInfo = initRetryInfo();
await tx.put(Stores.coins, updatedCoin); await tx.put(Stores.coins, updatedCoin);
await tx.put(Stores.reserves, updatedReserve); await tx.put(Stores.reserves, updatedReserve);
await putGroupAsFinished(ws, tx, recoupGroup, coinIdx); await putGroupAsFinished(ws, tx, recoupGroup, coinIdx);
@ -253,7 +254,13 @@ async function recoupRefreshCoin(
} }
await ws.db.runWithWriteTransaction( await ws.db.runWithWriteTransaction(
[Stores.coins, Stores.reserves, Stores.recoupGroups, Stores.refreshGroups], [
Stores.coins,
Stores.denominations,
Stores.reserves,
Stores.recoupGroups,
Stores.refreshGroups,
],
async (tx) => { async (tx) => {
const recoupGroup = await tx.get(Stores.recoupGroups, recoupGroupId); const recoupGroup = await tx.get(Stores.recoupGroups, recoupGroupId);
if (!recoupGroup) { if (!recoupGroup) {

View File

@ -74,6 +74,7 @@ import {
import { import {
reconcileReserveHistory, reconcileReserveHistory,
summarizeReserveHistory, summarizeReserveHistory,
ReserveHistorySummary,
} from "../util/reserveHistoryUtil"; } from "../util/reserveHistoryUtil";
import { TransactionHandle } from "../util/query"; import { TransactionHandle } from "../util/query";
import { addPaytoQueryParams } from "../util/payto"; import { addPaytoQueryParams } from "../util/payto";
@ -162,6 +163,7 @@ export async function createReserve(
retryInfo: initRetryInfo(), retryInfo: initRetryInfo(),
lastError: undefined, lastError: undefined,
currency: req.amount.currency, currency: req.amount.currency,
requestedQuery: false,
}; };
const reserveHistoryRecord: ReserveHistoryRecord = { const reserveHistoryRecord: ReserveHistoryRecord = {
@ -285,13 +287,12 @@ export async function forceQueryReserve(
// Only force status query where it makes sense // Only force status query where it makes sense
switch (reserve.reserveStatus) { switch (reserve.reserveStatus) {
case ReserveRecordStatus.DORMANT: case ReserveRecordStatus.DORMANT:
case ReserveRecordStatus.WITHDRAWING: reserve.reserveStatus = ReserveRecordStatus.QUERYING_STATUS;
case ReserveRecordStatus.QUERYING_STATUS:
break; break;
default: default:
reserve.requestedQuery = true;
return; return;
} }
reserve.reserveStatus = ReserveRecordStatus.QUERYING_STATUS;
reserve.retryInfo = initRetryInfo(); reserve.retryInfo = initRetryInfo();
await tx.put(Stores.reserves, reserve); await tx.put(Stores.reserves, reserve);
}); });
@ -551,6 +552,7 @@ async function updateReserve(
const balance = Amounts.parseOrThrow(reserveInfo.balance); const balance = Amounts.parseOrThrow(reserveInfo.balance);
const currency = balance.currency; const currency = balance.currency;
let updateSummary: ReserveHistorySummary | undefined;
await ws.db.runWithWriteTransaction( await ws.db.runWithWriteTransaction(
[Stores.reserves, Stores.reserveUpdatedEvents, Stores.reserveHistory], [Stores.reserves, Stores.reserveUpdatedEvents, Stores.reserveHistory],
async (tx) => { async (tx) => {
@ -578,7 +580,7 @@ async function updateReserve(
reserveInfo.history, reserveInfo.history,
); );
const summary = summarizeReserveHistory( updateSummary = summarizeReserveHistory(
reconciled.updatedLocalHistory, reconciled.updatedLocalHistory,
currency, currency,
); );
@ -591,16 +593,24 @@ async function updateReserve(
reservePub: r.reservePub, reservePub: r.reservePub,
timestamp: getTimestampNow(), timestamp: getTimestampNow(),
amountReserveBalance: Amounts.stringify(balance), amountReserveBalance: Amounts.stringify(balance),
amountExpected: Amounts.stringify(summary.awaitedReserveAmount), amountExpected: Amounts.stringify(updateSummary.awaitedReserveAmount),
newHistoryTransactions, newHistoryTransactions,
reserveUpdateId, reserveUpdateId,
}; };
await tx.put(Stores.reserveUpdatedEvents, reserveUpdate); await tx.put(Stores.reserveUpdatedEvents, reserveUpdate);
logger.trace("setting reserve status to 'withdrawing' after query");
r.reserveStatus = ReserveRecordStatus.WITHDRAWING; r.reserveStatus = ReserveRecordStatus.WITHDRAWING;
r.retryInfo = initRetryInfo(); r.retryInfo = initRetryInfo();
} else { } else {
r.reserveStatus = ReserveRecordStatus.DORMANT; logger.trace("setting reserve status to 'dormant' after query");
r.retryInfo = initRetryInfo(false); if (r.requestedQuery) {
r.reserveStatus = ReserveRecordStatus.QUERYING_STATUS;
r.requestedQuery = false;
r.retryInfo = initRetryInfo();
} else {
r.reserveStatus = ReserveRecordStatus.DORMANT;
r.retryInfo = initRetryInfo(false);
}
} }
r.lastSuccessfulStatusQuery = getTimestampNow(); r.lastSuccessfulStatusQuery = getTimestampNow();
hist.reserveTransactions = reconciled.updatedLocalHistory; hist.reserveTransactions = reconciled.updatedLocalHistory;
@ -609,7 +619,11 @@ async function updateReserve(
await tx.put(Stores.reserveHistory, hist); await tx.put(Stores.reserveHistory, hist);
}, },
); );
ws.notify({ type: NotificationType.ReserveUpdated }); ws.notify({ type: NotificationType.ReserveUpdated, updateSummary });
const reserve2 = await ws.db.get(Stores.reserves, reservePub);
if (reserve2) {
logger.trace(`after db transaction, reserve status is ${reserve2.reserveStatus}`);
}
return { ready: true }; return { ready: true };
} }
@ -782,6 +796,7 @@ async function depleteReserve(
}); });
} }
} }
logger.trace("setting reserve status to dormant after depletion");
newReserve.reserveStatus = ReserveRecordStatus.DORMANT; newReserve.reserveStatus = ReserveRecordStatus.DORMANT;
newReserve.retryInfo = initRetryInfo(false); newReserve.retryInfo = initRetryInfo(false);

View File

@ -320,6 +320,12 @@ export interface ReserveRecord {
reserveStatus: ReserveRecordStatus; reserveStatus: ReserveRecordStatus;
/**
* Was a reserve query requested? If so, query again instead
* of going into dormant status.
*/
requestedQuery: boolean;
/** /**
* Time of the last successful status query. * Time of the last successful status query.
*/ */

View File

@ -24,6 +24,7 @@
*/ */
import { TalerErrorDetails } from "./walletTypes"; import { TalerErrorDetails } from "./walletTypes";
import { WithdrawalSource } from "./dbTypes"; import { WithdrawalSource } from "./dbTypes";
import { ReserveHistorySummary } from "../util/reserveHistoryUtil";
export enum NotificationType { export enum NotificationType {
CoinWithdrawn = "coin-withdrawn", CoinWithdrawn = "coin-withdrawn",
@ -126,6 +127,7 @@ export interface RefreshRefusedNotification {
export interface ReserveUpdatedNotification { export interface ReserveUpdatedNotification {
type: NotificationType.ReserveUpdated; type: NotificationType.ReserveUpdated;
updateSummary?: ReserveHistorySummary;
} }
export interface ReserveConfirmedNotification { export interface ReserveConfirmedNotification {

View File

@ -691,6 +691,17 @@ export const codecForAddExchangeRequest = (): Codec<AddExchangeRequest> =>
.property("exchangeBaseUrl", codecForString()) .property("exchangeBaseUrl", codecForString())
.build("AddExchangeRequest"); .build("AddExchangeRequest");
export interface ForceExchangeUpdateRequest {
exchangeBaseUrl: string;
}
export const codecForForceExchangeUpdateRequest = (): Codec<
AddExchangeRequest
> =>
buildCodecForObject<AddExchangeRequest>()
.property("exchangeBaseUrl", codecForString())
.build("AddExchangeRequest");
export interface GetExchangeTosRequest { export interface GetExchangeTosRequest {
exchangeBaseUrl: string; exchangeBaseUrl: string;
} }
@ -870,3 +881,25 @@ export const codecForApplyRefundResponse = (): Codec<ApplyRefundResponse> =>
.property("pendingAtExchange", codecForBoolean()) .property("pendingAtExchange", codecForBoolean())
.property("proposalId", codecForString()) .property("proposalId", codecForString())
.build("ApplyRefundResponse"); .build("ApplyRefundResponse");
export interface SetCoinSuspendedRequest {
coinPub: string;
suspended: boolean;
}
export const codecForSetCoinSuspendedRequest = (): Codec<
SetCoinSuspendedRequest
> =>
buildCodecForObject<SetCoinSuspendedRequest>()
.property("coinPub", codecForString())
.property("suspended", codecForBoolean())
.build("SetCoinSuspendedRequest");
export interface ForceRefreshRequest {
coinPubList: string[];
}
export const codecForForceRefreshRequest = (): Codec<ForceRefreshRequest> =>
buildCodecForObject<ForceRefreshRequest>()
.property("coinPubList", codecForList(codecForString()))
.build("ForceRefreshRequest");

View File

@ -90,6 +90,9 @@ import {
withdrawTestBalanceDefaults, withdrawTestBalanceDefaults,
codecForWithdrawTestBalance, codecForWithdrawTestBalance,
codecForTestPayArgs, codecForTestPayArgs,
codecForSetCoinSuspendedRequest,
codecForForceExchangeUpdateRequest,
codecForForceRefreshRequest,
} from "./types/walletTypes"; } from "./types/walletTypes";
import { Logger } from "./util/logging"; import { Logger } from "./util/logging";
@ -110,7 +113,11 @@ import {
import { InternalWalletState } from "./operations/state"; import { InternalWalletState } from "./operations/state";
import { createReserve } from "./operations/reserves"; import { createReserve } from "./operations/reserves";
import { processRefreshGroup, createRefreshGroup, autoRefresh } from "./operations/refresh"; import {
processRefreshGroup,
createRefreshGroup,
autoRefresh,
} from "./operations/refresh";
import { processWithdrawGroup } from "./operations/withdraw"; import { processWithdrawGroup } from "./operations/withdraw";
import { getPendingOperations } from "./operations/pending"; import { getPendingOperations } from "./operations/pending";
import { getBalances } from "./operations/balance"; import { getBalances } from "./operations/balance";
@ -268,7 +275,7 @@ export class Wallet {
await processRecoupGroup(this.ws, pending.recoupGroupId, forceNow); await processRecoupGroup(this.ws, pending.recoupGroupId, forceNow);
break; break;
case PendingOperationType.ExchangeCheckRefresh: case PendingOperationType.ExchangeCheckRefresh:
await autoRefresh(this.ws, pending.exchangeBaseUrl) await autoRefresh(this.ws, pending.exchangeBaseUrl);
break; break;
default: default:
assertUnreachable(pending); assertUnreachable(pending);
@ -371,7 +378,8 @@ export class Wallet {
} }
private async runRetryLoopImpl(): Promise<void> { private async runRetryLoopImpl(): Promise<void> {
while (!this.stopped) { let iteration = 0;
for (; !this.stopped; iteration++) {
const pending = await this.getPendingOperations({ onlyDue: true }); const pending = await this.getPendingOperations({ onlyDue: true });
let numDueAndLive = 0; let numDueAndLive = 0;
for (const p of pending.pendingOperations) { for (const p of pending.pendingOperations) {
@ -379,7 +387,9 @@ export class Wallet {
numDueAndLive++; numDueAndLive++;
} }
} }
if (numDueAndLive === 0) { // Make sure that we run tasks that don't give lifeness at least
// one time.
if (iteration !== 0 && numDueAndLive === 0) {
const allPending = await this.getPendingOperations({ onlyDue: false }); const allPending = await this.getPendingOperations({ onlyDue: false });
let numPending = 0; let numPending = 0;
let numGivingLiveness = 0; let numGivingLiveness = 0;
@ -406,11 +416,12 @@ export class Wallet {
numPending, numPending,
}); });
await Promise.race([timeout, this.latch.wait()]); await Promise.race([timeout, this.latch.wait()]);
logger.trace("timeout done");
} else { } else {
// FIXME: maybe be a bit smarter about executing these // FIXME: maybe be a bit smarter about executing these
// operations in parallel? // operations in parallel?
logger.trace(`running ${pending.pendingOperations.length} pending operations`); logger.trace(
`running ${pending.pendingOperations.length} pending operations`,
);
for (const p of pending.pendingOperations) { for (const p of pending.pendingOperations) {
try { try {
await this.processOnePendingOperation(p); await this.processOnePendingOperation(p);
@ -985,6 +996,11 @@ export class Wallet {
await this.updateExchangeFromUrl(req.exchangeBaseUrl); await this.updateExchangeFromUrl(req.exchangeBaseUrl);
return {}; return {};
} }
case "forceUpdateExchange": {
const req = codecForForceExchangeUpdateRequest().decode(payload);
await this.updateExchangeFromUrl(req.exchangeBaseUrl, true);
return {};
}
case "listExchanges": { case "listExchanges": {
return await this.getExchanges(); return await this.getExchanges();
} }
@ -1054,6 +1070,32 @@ export class Wallet {
const req = codecForConfirmPayRequest().decode(payload); const req = codecForConfirmPayRequest().decode(payload);
return await this.confirmPay(req.proposalId, req.sessionId); return await this.confirmPay(req.proposalId, req.sessionId);
} }
case "dumpCoins": {
return await this.dumpCoins();
}
case "setCoinSuspended": {
const req = codecForSetCoinSuspendedRequest().decode(payload);
await this.setCoinSuspended(req.coinPub, req.suspended);
return {};
}
case "forceRefresh": {
const req = codecForForceRefreshRequest().decode(payload);
const coinPubs = req.coinPubList.map((x) => ({ coinPub: x }));
const refreshGroupId = await this.db.runWithWriteTransaction(
[Stores.refreshGroups, Stores.denominations, Stores.coins],
async (tx) => {
return await createRefreshGroup(
this.ws,
tx,
coinPubs,
RefreshReason.Manual,
);
},
);
return {
refreshGroupId,
};
}
} }
throw OperationFailedError.fromCode( throw OperationFailedError.fromCode(
TalerErrorCode.WALLET_CORE_API_OPERATION_UNKNOWN, TalerErrorCode.WALLET_CORE_API_OPERATION_UNKNOWN,