multicast 及 connect
我们引入了 Subject 来实现对各个 Observer 广播,但因此也引入一些 boilerplate code(样板代码):
// 新建一个 Subject
const subject = new Rx.Subject();
// subejct 订阅某个 observable
observable.subscribe(subject);
// 各个 observer 订阅 subject
subject.subscribe(observerA);
subject.subscribe(observerB);
还好 RxJS 为我们提供了 multicast
operator 来直接操纵 observable,实现广播:
const connectableObservable = Rx.Observable.interval(1000)
.take(5)
.multicast(new Rx.Subject());
const observerA = {
next: x => console.log(`A next ${x}`),
error: e => console.error(`A error ${e}`),
complete: () => console.log('A done')
};
connectableObservable.connect();
connectableObservable.subscribe(observerA);
const observerB = {
next: x => console.log(`B next ${x}`),
error: e => console.error(`B error ${e}`),
complete: () => console.log('B done')
};
setTimeout(() => {
connectableObservable.subscribe(observerB);
}, 2000);
/* 程序输出:
"A next 0"
"A next 1"
"A next 2"
"B next 2"
"A next 3"
"B next 3"
"A next 4"
"B next 4"
"A done"
"B done"
*/
经 multicast
操作的 observable 称为 Connectable Observable,Connectable Observerble 具有一个 connect
方法,当我们执行 connectableObservable.connect()
后,我们就可以注册不同的 Observer 对象,并且不会引起 observable 的重新执行。
我们还可以向 multicast
传递不同类型的 Subject:
const connectableObservable = Rx.Observable.interval(1000)
.take(5)
.multicast(new Rx.ReplaySubject(100));
const observerA = {
next: x => console.log(`A next ${x}`),
error: e => console.error(`A error ${e}`),
complete: () => console.log('A done')
};
connectableObservable.connect();
connectableObservable.subscribe(observerA);
const observerB = {
next: x => console.log(`B next ${x}`),
error: e => console.error(`B error ${e}`),
complete: () => console.log('B done')
};
setTimeout(() => {
connectableObservable.subscribe(observerB);
}, 2000);
/* 程序输出:
"A next 0"
"A next 1"
"B next 0"
"B next 1"
"A next 2"
"B next 2"
"A next 3"
"B next 3"
"A next 4"
"B next 4"
"A done"
"B done"
*/