ReplaySubject
对 BehaviorSubject 进行订阅,总能看到最近 一次 的值。如果我们想看到最近 几次 的值,就可以使用 ReplaySubject,例如我们想缓存最近的两次值:
const subject = new Rx.ReplaySubject(2);
const observerA = {
next: x => console.log(`A next ${x}`),
error: e => console.error(`A error ${e}`),
complete: () => console.log('A done')
};
subject.subscribe(observerA);
console.log('observerA subscribed');
const observerB = {
next: x => console.log(`B next ${x}`),
error: e => console.error(`B error ${e}`),
complete: () => console.log('B done')
};
subject.next(1);
subject.next(2);
subject.next(3);
setTimeout(() => {
subject.subscribe(observerB);
console.log('observerB subscribed');
}, 2000);
/* 程序输出:
"observerA subscribed"
"A next 1"
"A next 2"
"A next 3"
"B next 2"
"B next 3"
"observerB subscribed"
*/
ReplaySubject 的第一个参数 bufferSize
指定了缓存的大小,默认为 Number.MAX_VALUE
,即缓存所有发出的值,是一个 空间限制。我们还可以向其传递第二个参数 windowSize
,指定缓存的 时间限制,默认为 Number.MAX_VALUE
,即不限制值的时效。当 windowSize == 250
时,意味着只保留 250 ms 内发出的值:
const subject = new Rx.ReplaySubject(Number.MAX_VALUE, 250);
const observerA = {
next: x => console.log(`A next ${x}`),
error: e => console.error(`A error ${e}`),
complete: () => console.log('A done')
};
subject.subscribe(observerA);
console.log('observerA subscribed');
const observerB = {
next: x => console.log(`B next ${x}`),
error: e => console.error(`B error ${e}`),
complete: () => console.log('B done')
};
setTimeout(() =>subject.next(1), 100);
setTimeout(() =>subject.next(2), 200);
setTimeout(() =>subject.next(3), 300);
setTimeout(() => {
subject.subscribe(observerB);
console.log('observerB subscribed');
}, 400);
/* 程序输出:
"observerA subscribed"
"A next 1"
"A next 2"
"A next 3"
"B next 2"
"B next 3"
"observerB subscribed"
*/
与 BehaviorSubject 的区别
如果我们指定 ReplaySubject 的 bufferSize = 1
,那么,它能够缓存最近一次发出的值,也就带来和 BehaviorSubject 类似的表现:
const subject = new Rx.ReplaySubject(1);
// ....
但是,二者无论是概念,还是行为上,都会有所区别。首先概念上的区别是最本质的,ReplaySubject 只是缓存了最近的值,他仍然反映的时不断有值产生的流(多值),而 BehaviorSubject 反映的则是随时间变化的值(单值),因此,BehaviorSubject 需要一个种子 seed 作为初始值,然后这个 seed 将不断变化,我们只能看见当前的值。
在行为上,由于 ReplaySubject 侧重于缓存,那么当 ReplaySubject 完成时,不影响我们观测缓存到的值:
const subject = new Rx.ReplaySubject(Number.MAX_VALUE, 250);
const observerA = {
next: x => console.log(`A next ${x}`),
error: e => console.error(`A error ${e}`),
complete: () => console.log('A done')
};
subject.subscribe(observerA);
console.log('observerA subscribed');
const observerB = {
next: x => console.log(`B next ${x}`),
error: e => console.error(`B error ${e}`),
complete: () => console.log('B done')
};
setTimeout(() =>subject.next(1), 100);
setTimeout(() =>subject.next(2), 200);
setTimeout(() =>subject.next(3), 300);
setTimeout(() => {
subject.subscribe(observerB);
console.log('observerB subscribed');
}, 400);
/* 程序输出:
"observerA subscribed"
"A next 1"
"A next 2"
"A next 3"
"A done"
"B next 3"
"B done"
"observerB subscribed"
*/
而 BehaviorSubject 完成时,意味着值停止变化,我们不再能够观测到值:
const subject = new Rx.BehaviorSubject(0);
const observerA = {
next: x => console.log(`A next ${x}`),
error: e => console.error(`A error ${e}`),
complete: () => console.log('A done')
};
subject.subscribe(observerA);
console.log('observerA subscribed');
const observerB = {
next: x => console.log(`B next ${x}`),
error: e => console.error(`B error ${e}`),
complete: () => console.log('B done')
};
setTimeout(() =>subject.next(1), 100);
setTimeout(() =>subject.next(2), 200);
setTimeout(() =>subject.next(3), 300);
setTimeout(() => {
subject.subscribe(observerB);
console.log('observerB subscribed');
}, 400);
/* 程序输出:
"A next 1"
"observerA subscribed"
"A next 1"
"A next 2"
"A next 3"
"A done"
"B done"
"observerB subscribed"
*/