implement flatMap operation for query streams
This commit is contained in:
parent
a96d9aa386
commit
afc87f3c59
@ -38,6 +38,7 @@ export interface QueryStream<T> {
|
||||
keyFn: (obj: any) => any): QueryStream<[T,S]>;
|
||||
filter(f: (any) => boolean): QueryStream<T>;
|
||||
reduce<S>(f: (v: T, acc: S) => S, start?: S): Promise<S>;
|
||||
flatMap(f: (T) => T[]): QueryStream<T>;
|
||||
}
|
||||
|
||||
|
||||
@ -68,6 +69,10 @@ abstract class QueryStreamBase<T> implements QueryStream<T> {
|
||||
this.root = root;
|
||||
}
|
||||
|
||||
flatMap(f: (T) => T[]): QueryStream<T> {
|
||||
return new QueryStreamFlatMap(this, f);
|
||||
}
|
||||
|
||||
indexJoin<S>(storeName: string,
|
||||
indexName: string,
|
||||
key: any): QueryStream<[T,S]> {
|
||||
@ -117,6 +122,31 @@ class QueryStreamFilter<T> extends QueryStreamBase<T> {
|
||||
return;
|
||||
}
|
||||
if (this.filterFn(value)) {
|
||||
f(false, value, tx);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
class QueryStreamFlatMap<T> extends QueryStreamBase<T> {
|
||||
s: QueryStreamBase<T>;
|
||||
flatMapFn;
|
||||
|
||||
constructor(s: QueryStreamBase<T>, flatMapFn) {
|
||||
super(s.root);
|
||||
this.s = s;
|
||||
this.flatMap = flatMapFn;
|
||||
}
|
||||
|
||||
subscribe(f) {
|
||||
this.s.subscribe((isDone, value, tx) => {
|
||||
if (isDone) {
|
||||
f(true, undefined, tx);
|
||||
return;
|
||||
}
|
||||
let values = this.flatMapFn(value);
|
||||
for (let v in values) {
|
||||
f(false, value, tx)
|
||||
}
|
||||
});
|
||||
|
Loading…
Reference in New Issue
Block a user