introduce map for query streams

This commit is contained in:
Florian Dold 2016-10-19 23:55:58 +02:00
parent 9ccc6626ac
commit 9ee0823b7e
2 changed files with 49 additions and 17 deletions

View File

@ -66,7 +66,8 @@ export interface QueryStream<T> {
keyFn: (obj: T) => I): QueryStream<JoinResult<T,S>>; keyFn: (obj: T) => I): QueryStream<JoinResult<T,S>>;
filter(f: (T: any) => boolean): QueryStream<T>; filter(f: (T: any) => boolean): QueryStream<T>;
reduce<S>(f: (v: T, acc: S) => S, start?: S): Promise<S>; reduce<S>(f: (v: T, acc: S) => S, start?: S): Promise<S>;
flatMap(f: (x: T) => T[]): QueryStream<T>; map<S>(f: (x:T) => S): QueryStream<S>;
flatMap<S>(f: (x: T) => S[]): QueryStream<S>;
toArray(): Promise<T[]>; toArray(): Promise<T[]>;
} }
@ -102,10 +103,14 @@ abstract class QueryStreamBase<T> implements QueryStream<T> {
this.root = root; this.root = root;
} }
flatMap(f: (x: T) => T[]): QueryStream<T> { flatMap<S>(f: (x: T) => T[]): QueryStream<S> {
return new QueryStreamFlatMap(this, f); return new QueryStreamFlatMap(this, f);
} }
map<S>(f: (x: T) => S): QueryStream<T> {
return new QueryStreamMap(this, f);
}
indexJoin<S,I extends IDBValidKey>(index: Index<I,S>, indexJoin<S,I extends IDBValidKey>(index: Index<I,S>,
keyFn: (obj: T) => I): QueryStream<JoinResult<T, S>> { keyFn: (obj: T) => I): QueryStream<JoinResult<T, S>> {
this.root.addStoreAccess(index.storeName, false); this.root.addStoreAccess(index.storeName, false);
@ -213,6 +218,29 @@ class QueryStreamFlatMap<T> extends QueryStreamBase<T> {
} }
class QueryStreamMap<T> extends QueryStreamBase<T> {
s: QueryStreamBase<T>;
mapFn: (v: T) => T[];
constructor(s: QueryStreamBase<T>, mapFn: (v: T) => T[]) {
super(s.root);
this.s = s;
this.mapFn = mapFn;
}
subscribe(f: SubscribeFn) {
this.s.subscribe((isDone, value, tx) => {
if (isDone) {
f(true, undefined, tx);
return;
}
let mappedValue = this.mapFn(value);
f(false, mappedValue, tx);
});
}
}
class QueryStreamIndexJoin<T, S> extends QueryStreamBase<JoinResult<T, S>> { class QueryStreamIndexJoin<T, S> extends QueryStreamBase<JoinResult<T, S>> {
s: QueryStreamBase<T>; s: QueryStreamBase<T>;
storeName: string; storeName: string;

View File

@ -406,14 +406,14 @@ export class Wallet {
updateExchanges(): void { updateExchanges(): void {
console.log("updating exchanges"); console.log("updating exchanges");
this.q() let exchangesUrls = this.q().iter(Stores.exchanges).map((e) => e.baseUrl);
.iter(Stores.exchanges)
.reduce((exchange: IExchangeInfo) => { for (let url of exchangesUrls) {
this.updateExchangeFromUrl(exchange.baseUrl) this.updateExchangeFromUrl(url)
.catch((e) => { .catch((e) => {
console.error("updating exchange failed", e); console.error("updating exchange failed", e);
}); });
}); }
} }
/** /**
@ -1291,9 +1291,6 @@ export class Wallet {
async createRefreshSession(oldCoinPub: string): Promise<RefreshSession|undefined> { async createRefreshSession(oldCoinPub: string): Promise<RefreshSession|undefined> {
// FIXME: this is not running in a transaction.
let coin = await this.q().get<Coin>(Stores.coins, oldCoinPub); let coin = await this.q().get<Coin>(Stores.coins, oldCoinPub);
if (!coin) { if (!coin) {
@ -1335,13 +1332,20 @@ export class Wallet {
newCoinDenoms, newCoinDenoms,
oldDenom.fee_refresh)); oldDenom.fee_refresh));
coin.currentAmount = Amounts.sub(coin.currentAmount, function mutateCoin(c: Coin): Coin {
refreshSession.valueWithFee).amount; let r = Amounts.sub(coin.currentAmount,
refreshSession.valueWithFee);
if (r.saturated) {
// Something else must have written the coin value
throw AbortTransaction;
}
c.currentAmount = r.amount;
return c;
}
// FIXME: we should check whether the amount still matches!
await this.q() await this.q()
.put(Stores.refresh, refreshSession) .put(Stores.refresh, refreshSession)
.put(Stores.coins, coin) .mutate(Stores.coins, coin.coinPub, mutateCoin)
.finish(); .finish();
return refreshSession; return refreshSession;