refcount
在 Connectable Observable 中,我们需要 手动 控制 observable 的开始和结束:
const sub = connectableObserverble.connect(); // 开始
// ...
sub.unsubscribe(); // 结束
更理想的一种情况是,当 Observable 被 Observer 观察时,他能够 自动 开始执行,当 Observable 不再被任何 Observer 观察时,他能够 自动 停止执行。RxJS 为我们提供了 refCount
operator 来实现 引用计数 机制,每次有新的 Observer 观察某 Observerble 时,引用计数就 +1,取消订阅时,引用计数就 -1。当引用计数变为 0 时,Observable 将停止执行,引用计数大于 0 时,Observable 又开始执行:
const connectableObservable = Rx.Observable.interval(1000)
.do(x => console.log(`source ${x}`))
.multicast(new Rx.Subject());
const autoConnectableObservable = connectableObservable.refCount();
const observerA = {
next: x => console.log(`A next ${x}`),
error: e => console.error(`A error ${e}`),
complete: () => console.log('A done')
};
// refCount: 0 --> 1 开始
const subA = autoConnectableObservable.subscribe(observerA);
const observerB = {
next: x => console.log(`B next ${x}`),
error: e => console.error(`B error ${e}`),
complete: () => console.log('B done')
};
let subB;
setTimeout(() => {
// refCount: 1 --> 2
subB = autoConnectableObservable.subscribe(observerB);
}, 2000);
setTimeout(() => {
// refCount: 2 --> 1
subA.unsubscribe();
console.log('unsubscribe A');
}, 5000);
setTimeout(() => {
// refCount: 1 --> 0 停止
subB.unsubscribe();
console.log('unsubscribe B');
}, 7000);
/* 程序输出:
Console Run Clear
"source 0"
"A next 0"
...
"source 4"
"A next 4"
"B next 4"
"unsubscribe A"
"source 5"
"B next 5"
"source 6"
"B next 6"
"unsubscribe B"
*/