停止共享 Observable 的运行
我们稍微修改上一节的例子,让 observable 不停派发值,并且手动取消订阅:
const connectableObservable = Rx.Observable.interval(1000)
.do(x => console.log(`source ${x}`))
.multicast(new Rx.Subject());
const observerA = {
next: x => console.log(`A next ${x}`),
error: e => console.error(`A error ${e}`),
complete: () => console.log('A done')
};
connectableObservable.connect();
const subA = connectableObservable.subscribe(observerA);
const observerB = {
next: x => console.log(`B next ${x}`),
error: e => console.error(`B error ${e}`),
complete: () => console.log('B done')
};
let subB;
setTimeout(() => {
subB = connectableObservable.subscribe(observerB);
}, 2000);
setTimeout(() => {
subA.unsubscribe();
subB.unsubscribe();
console.log('unsubscribe both');
}, 5000);
/* 程序输出:
"source 0"
"A next 0"
"source 1"
"A next 1"
"source 2"
"A next 2"
"B next 2"
"source 3"
"A next 3"
"B next 3"
"source 4"
"A next 4"
"B next 4"
"unsubscribe both"
"source 5"
"source 6"
*/
observerA
及 observerB
都成功取消了对 connectableObservable
的订阅,但是 connectableObservable
还在运行,如果还想要停止 connectableObservable
的执行:
const connectableObservable = Rx.Observable.interval(1000)
.do(x => console.log(`source ${x}`))
.multicast(new Rx.Subject());
const observerA = {
next: x => console.log(`A next ${x}`),
error: e => console.error(`A error ${e}`),
complete: () => console.log('A done')
};
const sub = connectableObservable.connect();
const subA = connectableObservable.subscribe(observerA);
const observerB = {
next: x => console.log(`B next ${x}`),
error: e => console.error(`B error ${e}`),
complete: () => console.log('B done')
};
let subB;
setTimeout(() => {
subB = connectableObservable.subscribe(observerB);
}, 2000);
setTimeout(() => {
sub.unsubscribe();
console.log('unsubscribe both');
}, 5000);
/* 程序输出:
"source 0"
"A next 0"
"source 1"
"A next 1"
"source 2"
"A next 2"
"B next 2"
"source 3"
"A next 3"
"B next 3"
"source 4"
"A next 4"
"B next 4"
"unsubscribe both"
*/