wallet-core: fix default auditor/exchange loading logic

This commit is contained in:
Florian Dold 2022-10-05 18:31:56 +02:00
parent 99ace8b7d2
commit 957f9a5efb
No known key found for this signature in database
GPG Key ID: D2E4F00F29D02A4B
8 changed files with 74 additions and 68 deletions

View File

@ -255,9 +255,9 @@ async function withWallet<T>(
console.error("Error details:", JSON.stringify(ed, undefined, 2)); console.error("Error details:", JSON.stringify(ed, undefined, 2));
process.exit(1); process.exit(1);
} finally { } finally {
logger.info("operation with wallet finished, stopping"); logger.trace("operation with wallet finished, stopping");
wallet.stop(); wallet.stop();
logger.info("stopped wallet"); logger.trace("stopped wallet");
} }
} }
@ -495,6 +495,7 @@ walletCli
talerWithdrawUri: uri, talerWithdrawUri: uri,
}, },
); );
console.log("accept withdrawal response", res);
} }
break; break;
default: default:
@ -731,7 +732,7 @@ const advancedCli = walletCli.subcommand("advancedArgs", "advanced", {
advancedCli advancedCli
.subcommand("init", "init", { .subcommand("init", "init", {
help: "Initialize the wallet (with DB) and exit." help: "Initialize the wallet (with DB) and exit.",
}) })
.action(async (args) => { .action(async (args) => {
await withWallet(args, async () => {}); await withWallet(args, async () => {});

View File

@ -37,7 +37,6 @@ export async function processRequestWithImpl(
reqMsg: CryptoWorkerRequestMessage, reqMsg: CryptoWorkerRequestMessage,
impl: TalerCryptoInterfaceR, impl: TalerCryptoInterfaceR,
): Promise<CryptoWorkerResponseMessage> { ): Promise<CryptoWorkerResponseMessage> {
logger.info(`processing crypto request ${j2s(reqMsg)}`);
if (typeof reqMsg !== "object") { if (typeof reqMsg !== "object") {
logger.error("request must be an object"); logger.error("request must be an object");
return { return {

View File

@ -510,7 +510,7 @@ export interface ExchangeRecord {
permanent: boolean; permanent: boolean;
/** /**
* Last time when the exchange was updated. * Last time when the exchange was updated (both /keys and /wire).
*/ */
lastUpdate: TalerProtocolTimestamp | undefined; lastUpdate: TalerProtocolTimestamp | undefined;
@ -521,6 +521,10 @@ export interface ExchangeRecord {
*/ */
nextUpdate: TalerProtocolTimestamp; nextUpdate: TalerProtocolTimestamp;
lastKeysEtag: string | undefined;
lastWireEtag: string | undefined;
/** /**
* Next time that we should check if coins need to be refreshed. * Next time that we should check if coins need to be refreshed.
* *

View File

@ -362,6 +362,8 @@ export async function importBackup(
lastUpdate: undefined, lastUpdate: undefined,
nextUpdate: TalerProtocolTimestamp.now(), nextUpdate: TalerProtocolTimestamp.now(),
nextRefreshCheck: TalerProtocolTimestamp.now(), nextRefreshCheck: TalerProtocolTimestamp.now(),
lastKeysEtag: undefined,
lastWireEtag: undefined,
}); });
} }

View File

@ -63,7 +63,11 @@ import {
readSuccessResponseJsonOrThrow, readSuccessResponseJsonOrThrow,
readSuccessResponseTextOrThrow, readSuccessResponseTextOrThrow,
} from "../util/http.js"; } from "../util/http.js";
import { DbAccess, GetReadOnlyAccess } from "../util/query.js"; import {
DbAccess,
GetReadOnlyAccess,
GetReadWriteAccess,
} from "../util/query.js";
import { import {
OperationAttemptResult, OperationAttemptResult,
OperationAttemptResultType, OperationAttemptResultType,
@ -316,33 +320,35 @@ async function downloadExchangeWireInfo(
return wireInfo; return wireInfo;
} }
async function provideExchangeRecord( export async function provideExchangeRecordInTx(
ws: InternalWalletState, ws: InternalWalletState,
tx: GetReadWriteAccess<{
exchanges: typeof WalletStoresV1.exchanges;
exchangeDetails: typeof WalletStoresV1.exchangeDetails;
}>,
baseUrl: string, baseUrl: string,
now: AbsoluteTime, now: AbsoluteTime,
): Promise<{ ): Promise<{
exchange: ExchangeRecord; exchange: ExchangeRecord;
exchangeDetails: ExchangeDetailsRecord | undefined; exchangeDetails: ExchangeDetailsRecord | undefined;
}> { }> {
return await ws.db let exchange = await tx.exchanges.get(baseUrl);
.mktx((x) => [x.exchanges, x.exchangeDetails]) if (!exchange) {
.runReadWrite(async (tx) => { const r: ExchangeRecord = {
let exchange = await tx.exchanges.get(baseUrl); permanent: true,
if (!exchange) { baseUrl: baseUrl,
const r: ExchangeRecord = { detailsPointer: undefined,
permanent: true, lastUpdate: undefined,
baseUrl: baseUrl, nextUpdate: AbsoluteTime.toTimestamp(now),
detailsPointer: undefined, nextRefreshCheck: AbsoluteTime.toTimestamp(now),
lastUpdate: undefined, lastKeysEtag: undefined,
nextUpdate: AbsoluteTime.toTimestamp(now), lastWireEtag: undefined,
nextRefreshCheck: AbsoluteTime.toTimestamp(now), };
}; await tx.exchanges.put(r);
await tx.exchanges.put(r); exchange = r;
exchange = r; }
} const exchangeDetails = await getExchangeDetails(tx, baseUrl);
const exchangeDetails = await getExchangeDetails(tx, baseUrl); return { exchange, exchangeDetails };
return { exchange, exchangeDetails };
});
} }
interface ExchangeKeysDownloadResult { interface ExchangeKeysDownloadResult {
@ -499,15 +505,16 @@ export async function updateExchangeFromUrlHandler(
> { > {
const forceNow = options.forceNow ?? false; const forceNow = options.forceNow ?? false;
logger.info(`updating exchange info for ${baseUrl}, forced: ${forceNow}`); logger.info(`updating exchange info for ${baseUrl}, forced: ${forceNow}`);
console.trace("here");
const now = AbsoluteTime.now(); const now = AbsoluteTime.now();
baseUrl = canonicalizeBaseUrl(baseUrl); baseUrl = canonicalizeBaseUrl(baseUrl);
const { exchange, exchangeDetails } = await provideExchangeRecord( const { exchange, exchangeDetails } = await ws.db
ws, .mktx((x) => [x.exchanges, x.exchangeDetails])
baseUrl, .runReadWrite(async (tx) => {
now, return provideExchangeRecordInTx(ws, tx, baseUrl, now);
); });
if ( if (
!forceNow && !forceNow &&

View File

@ -50,6 +50,7 @@ async function gatherExchangePending(
now: AbsoluteTime, now: AbsoluteTime,
resp: PendingOperationsResponse, resp: PendingOperationsResponse,
): Promise<void> { ): Promise<void> {
// FIXME: We should do a range query here based on the update time.
await tx.exchanges.iter().forEachAsync(async (exch) => { await tx.exchanges.iter().forEachAsync(async (exch) => {
const opTag = RetryTags.forExchangeUpdate(exch); const opTag = RetryTags.forExchangeUpdate(exch);
let opr = await tx.operationRetries.get(opTag); let opr = await tx.operationRetries.get(opTag);

View File

@ -1071,7 +1071,7 @@ export async function processWithdrawalGroup(
case WithdrawalGroupStatus.QueryingStatus: { case WithdrawalGroupStatus.QueryingStatus: {
const doQueryAsync = async () => { const doQueryAsync = async () => {
if (ws.stopped) { if (ws.stopped) {
logger.info("not long-polling reserve, wallet already stopped"); logger.trace("not long-polling reserve, wallet already stopped");
await storeOperationPending(ws, retryTag); await storeOperationPending(ws, retryTag);
return; return;
} }
@ -1080,7 +1080,7 @@ export async function processWithdrawalGroup(
try { try {
ws.activeLongpoll[retryTag] = { ws.activeLongpoll[retryTag] = {
cancel: () => { cancel: () => {
logger.info("cancel of reserve longpoll requested"); logger.trace("cancel of reserve longpoll requested");
cts.cancel(); cts.cancel();
}, },
}; };
@ -1094,16 +1094,13 @@ export async function processWithdrawalGroup(
return; return;
} }
delete ws.activeLongpoll[retryTag]; delete ws.activeLongpoll[retryTag];
logger.info(
`active longpoll keys (2) ${Object.keys(ws.activeLongpoll)}`,
);
if (!res.ready) { if (!res.ready) {
await storeOperationPending(ws, retryTag); await storeOperationPending(ws, retryTag);
} }
ws.latch.trigger(); ws.latch.trigger();
}; };
doQueryAsync(); doQueryAsync();
logger.info( logger.trace(
"returning early from withdrawal for long-polling in background", "returning early from withdrawal for long-polling in background",
); );
return { return {
@ -1918,12 +1915,12 @@ export async function acceptWithdrawalFromUri(
); );
} }
// Start withdrawal in the background. // Start withdrawal in the background
await processWithdrawalGroup(ws, withdrawalGroupId, { forceNow: true }).catch( processWithdrawalGroup(ws, withdrawalGroupId, {
(err) => { forceNow: true,
logger.error("Processing withdrawal (after creation) failed:", err); }).catch((err) => {
}, logger.error("Processing withdrawal (after creation) failed:", err);
); });
return { return {
reservePub: withdrawalGroup.reservePub, reservePub: withdrawalGroup.reservePub,

View File

@ -155,6 +155,7 @@ import {
getExchangeDetails, getExchangeDetails,
getExchangeRequestTimeout, getExchangeRequestTimeout,
getExchangeTrust, getExchangeTrust,
provideExchangeRecordInTx,
updateExchangeFromUrl, updateExchangeFromUrl,
updateExchangeFromUrlHandler, updateExchangeFromUrlHandler,
updateExchangeTermsOfService, updateExchangeTermsOfService,
@ -583,32 +584,26 @@ async function runTaskLoop(
*/ */
async function fillDefaults(ws: InternalWalletState): Promise<void> { async function fillDefaults(ws: InternalWalletState): Promise<void> {
await ws.db await ws.db
.mktx((x) => [x.config, x.auditorTrust]) .mktx((x) => [x.config, x.auditorTrust, x.exchanges, x.exchangeDetails])
.runReadWrite(async (tx) => { .runReadWrite(async (tx) => {
let applied = false; const appliedRec = await tx.config.get("currencyDefaultsApplied");
await tx.config.iter().forEach((x) => { let alreadyApplied = appliedRec ? !!appliedRec.value : false;
if (x.key == "currencyDefaultsApplied" && x.value == true) { if (alreadyApplied) {
applied = true; logger.info("defaults already applied");
} return;
});
if (!applied) {
for (const c of builtinAuditors) {
await tx.auditorTrust.put(c);
}
} }
// FIXME: make sure exchanges are added transactionally to for (const c of builtinAuditors) {
// DB in first-time default application await tx.auditorTrust.put(c);
}
for (const baseUrl of builtinExchanges) {
const now = AbsoluteTime.now();
provideExchangeRecordInTx(ws, tx, baseUrl, now);
}
await tx.config.put({
key: "currencyDefaultsApplied",
value: true,
});
}); });
for (const url of builtinExchanges) {
try {
await updateExchangeFromUrl(ws, url, { forceNow: true });
} catch (e) {
logger.warn(
`could not update builtin exchange ${url} during wallet initialization`,
);
}
}
} }
async function getExchangeTos( async function getExchangeTos(
@ -1719,12 +1714,12 @@ class InternalWalletStateImpl implements InternalWalletState {
* Stop ongoing processing. * Stop ongoing processing.
*/ */
stop(): void { stop(): void {
logger.info("stopping (at internal wallet state)"); logger.trace("stopping (at internal wallet state)");
this.stopped = true; this.stopped = true;
this.timerGroup.stopCurrentAndFutureTimers(); this.timerGroup.stopCurrentAndFutureTimers();
this.cryptoDispatcher.stop(); this.cryptoDispatcher.stop();
for (const key of Object.keys(this.activeLongpoll)) { for (const key of Object.keys(this.activeLongpoll)) {
logger.info(`cancelling active longpoll ${key}`); logger.trace(`cancelling active longpoll ${key}`);
this.activeLongpoll[key].cancel(); this.activeLongpoll[key].cancel();
} }
} }