publish
为了方便,RxJS 提供了 publishXXX
operator 作为 multicast
的简写:
publish: multicast + Subject
publishReplay: multicast + ReplaySubject
publishBehavior: multicast + BehaviorSubject
publishAsync: multicast + AsyncSubject
看个例子:
const shared = Rx.Observable.interval(1000)
.do(x => console.log(`source ${x}`))
.publishReplay(100)
.refCount();
const observerA = {
next: x => console.log(`A next ${x}`),
error: e => console.error(`A error ${e}`),
complete: () => console.log('A done')
};
// refCount: 0 --> 1 开始
const subA = shared.subscribe(observerA);
const observerB = {
next: x => console.log(`B next ${x}`),
error: e => console.error(`B error ${e}`),
complete: () => console.log('B done')
};
let subB;
setTimeout(() => {
// refCount: 1 --> 2
subB = shared.subscribe(observerB);
}, 2000);
setTimeout(() => {
// refCount: 2 --> 1
subA.unsubscribe();
console.log('unsubscribe A');
}, 5000);
setTimeout(() => {
// refCount: 1 --> 0 停止
subB.unsubscribe();
console.log('unsubscribe B');
}, 7000);
/* 程序输出:
"source 0"
"A next 0"
"source 1"
...
"source 4"
"A next 4"
"B next 4"
"unsubscribe A"
"source 5"
"B next 5"
"source 6"
"B next 6"
"unsubscribe B"
*/