199 lines
5.3 KiB
JavaScript
199 lines
5.3 KiB
JavaScript
'use strict';
|
|
// https://github.com/zenparsing/es-observable
|
|
var $export = require('./_export')
|
|
, global = require('./_global')
|
|
, core = require('./_core')
|
|
, microtask = require('./_microtask')()
|
|
, OBSERVABLE = require('./_wks')('observable')
|
|
, aFunction = require('./_a-function')
|
|
, anObject = require('./_an-object')
|
|
, anInstance = require('./_an-instance')
|
|
, redefineAll = require('./_redefine-all')
|
|
, hide = require('./_hide')
|
|
, forOf = require('./_for-of')
|
|
, RETURN = forOf.RETURN;
|
|
|
|
var getMethod = function(fn){
|
|
return fn == null ? undefined : aFunction(fn);
|
|
};
|
|
|
|
var cleanupSubscription = function(subscription){
|
|
var cleanup = subscription._c;
|
|
if(cleanup){
|
|
subscription._c = undefined;
|
|
cleanup();
|
|
}
|
|
};
|
|
|
|
var subscriptionClosed = function(subscription){
|
|
return subscription._o === undefined;
|
|
};
|
|
|
|
var closeSubscription = function(subscription){
|
|
if(!subscriptionClosed(subscription)){
|
|
subscription._o = undefined;
|
|
cleanupSubscription(subscription);
|
|
}
|
|
};
|
|
|
|
var Subscription = function(observer, subscriber){
|
|
anObject(observer);
|
|
this._c = undefined;
|
|
this._o = observer;
|
|
observer = new SubscriptionObserver(this);
|
|
try {
|
|
var cleanup = subscriber(observer)
|
|
, subscription = cleanup;
|
|
if(cleanup != null){
|
|
if(typeof cleanup.unsubscribe === 'function')cleanup = function(){ subscription.unsubscribe(); };
|
|
else aFunction(cleanup);
|
|
this._c = cleanup;
|
|
}
|
|
} catch(e){
|
|
observer.error(e);
|
|
return;
|
|
} if(subscriptionClosed(this))cleanupSubscription(this);
|
|
};
|
|
|
|
Subscription.prototype = redefineAll({}, {
|
|
unsubscribe: function unsubscribe(){ closeSubscription(this); }
|
|
});
|
|
|
|
var SubscriptionObserver = function(subscription){
|
|
this._s = subscription;
|
|
};
|
|
|
|
SubscriptionObserver.prototype = redefineAll({}, {
|
|
next: function next(value){
|
|
var subscription = this._s;
|
|
if(!subscriptionClosed(subscription)){
|
|
var observer = subscription._o;
|
|
try {
|
|
var m = getMethod(observer.next);
|
|
if(m)return m.call(observer, value);
|
|
} catch(e){
|
|
try {
|
|
closeSubscription(subscription);
|
|
} finally {
|
|
throw e;
|
|
}
|
|
}
|
|
}
|
|
},
|
|
error: function error(value){
|
|
var subscription = this._s;
|
|
if(subscriptionClosed(subscription))throw value;
|
|
var observer = subscription._o;
|
|
subscription._o = undefined;
|
|
try {
|
|
var m = getMethod(observer.error);
|
|
if(!m)throw value;
|
|
value = m.call(observer, value);
|
|
} catch(e){
|
|
try {
|
|
cleanupSubscription(subscription);
|
|
} finally {
|
|
throw e;
|
|
}
|
|
} cleanupSubscription(subscription);
|
|
return value;
|
|
},
|
|
complete: function complete(value){
|
|
var subscription = this._s;
|
|
if(!subscriptionClosed(subscription)){
|
|
var observer = subscription._o;
|
|
subscription._o = undefined;
|
|
try {
|
|
var m = getMethod(observer.complete);
|
|
value = m ? m.call(observer, value) : undefined;
|
|
} catch(e){
|
|
try {
|
|
cleanupSubscription(subscription);
|
|
} finally {
|
|
throw e;
|
|
}
|
|
} cleanupSubscription(subscription);
|
|
return value;
|
|
}
|
|
}
|
|
});
|
|
|
|
var $Observable = function Observable(subscriber){
|
|
anInstance(this, $Observable, 'Observable', '_f')._f = aFunction(subscriber);
|
|
};
|
|
|
|
redefineAll($Observable.prototype, {
|
|
subscribe: function subscribe(observer){
|
|
return new Subscription(observer, this._f);
|
|
},
|
|
forEach: function forEach(fn){
|
|
var that = this;
|
|
return new (core.Promise || global.Promise)(function(resolve, reject){
|
|
aFunction(fn);
|
|
var subscription = that.subscribe({
|
|
next : function(value){
|
|
try {
|
|
return fn(value);
|
|
} catch(e){
|
|
reject(e);
|
|
subscription.unsubscribe();
|
|
}
|
|
},
|
|
error: reject,
|
|
complete: resolve
|
|
});
|
|
});
|
|
}
|
|
});
|
|
|
|
redefineAll($Observable, {
|
|
from: function from(x){
|
|
var C = typeof this === 'function' ? this : $Observable;
|
|
var method = getMethod(anObject(x)[OBSERVABLE]);
|
|
if(method){
|
|
var observable = anObject(method.call(x));
|
|
return observable.constructor === C ? observable : new C(function(observer){
|
|
return observable.subscribe(observer);
|
|
});
|
|
}
|
|
return new C(function(observer){
|
|
var done = false;
|
|
microtask(function(){
|
|
if(!done){
|
|
try {
|
|
if(forOf(x, false, function(it){
|
|
observer.next(it);
|
|
if(done)return RETURN;
|
|
}) === RETURN)return;
|
|
} catch(e){
|
|
if(done)throw e;
|
|
observer.error(e);
|
|
return;
|
|
} observer.complete();
|
|
}
|
|
});
|
|
return function(){ done = true; };
|
|
});
|
|
},
|
|
of: function of(){
|
|
for(var i = 0, l = arguments.length, items = Array(l); i < l;)items[i] = arguments[i++];
|
|
return new (typeof this === 'function' ? this : $Observable)(function(observer){
|
|
var done = false;
|
|
microtask(function(){
|
|
if(!done){
|
|
for(var i = 0; i < items.length; ++i){
|
|
observer.next(items[i]);
|
|
if(done)return;
|
|
} observer.complete();
|
|
}
|
|
});
|
|
return function(){ done = true; };
|
|
});
|
|
}
|
|
});
|
|
|
|
hide($Observable.prototype, OBSERVABLE, function(){ return this; });
|
|
|
|
$export($export.G, {Observable: $Observable});
|
|
|
|
require('./_set-species')('Observable'); |