ReplaySubject

对 BehaviorSubject 进行订阅,总能看到最近 一次 的值。如果我们想看到最近 几次 的值,就可以使用 ReplaySubject,例如我们想缓存最近的两次值:

const subject = new Rx.ReplaySubject(2);

const observerA = {
  next: x => console.log(`A next ${x}`),
  error: e => console.error(`A error ${e}`),
  complete: () => console.log('A done')
};

subject.subscribe(observerA);
console.log('observerA subscribed');

const observerB = {
  next: x => console.log(`B next ${x}`),
  error: e => console.error(`B error ${e}`),
  complete: () => console.log('B done')
};

subject.next(1);
subject.next(2);
subject.next(3);

setTimeout(() => {
  subject.subscribe(observerB);
  console.log('observerB subscribed');
}, 2000);

/* 程序输出:
  "observerA subscribed"
  "A next 1"
  "A next 2"
  "A next 3"
  "B next 2"
  "B next 3"
  "observerB subscribed"
 */

查看例子

ReplaySubject 的第一个参数 bufferSize 指定了缓存的大小,默认为 Number.MAX_VALUE,即缓存所有发出的值,是一个 空间限制。我们还可以向其传递第二个参数 windowSize,指定缓存的 时间限制,默认为 Number.MAX_VALUE,即不限制值的时效。当 windowSize == 250 时,意味着只保留 250 ms 内发出的值:

const subject = new Rx.ReplaySubject(Number.MAX_VALUE, 250);

const observerA = {
  next: x => console.log(`A next ${x}`),
  error: e => console.error(`A error ${e}`),
  complete: () => console.log('A done')
};

subject.subscribe(observerA);
console.log('observerA subscribed');

const observerB = {
  next: x => console.log(`B next ${x}`),
  error: e => console.error(`B error ${e}`),
  complete: () => console.log('B done')
};

setTimeout(() =>subject.next(1), 100);
setTimeout(() =>subject.next(2), 200);
setTimeout(() =>subject.next(3), 300);

setTimeout(() => {
  subject.subscribe(observerB);
  console.log('observerB subscribed');
}, 400);

/* 程序输出:
  "observerA subscribed"
  "A next 1"
  "A next 2"
  "A next 3"
  "B next 2"
  "B next 3"
  "observerB subscribed"
 */

查看例子

与 BehaviorSubject 的区别

如果我们指定 ReplaySubject 的 bufferSize = 1,那么,它能够缓存最近一次发出的值,也就带来和 BehaviorSubject 类似的表现:

const subject = new Rx.ReplaySubject(1);

// ....

但是,二者无论是概念,还是行为上,都会有所区别。首先概念上的区别是最本质的,ReplaySubject 只是缓存了最近的值,他仍然反映的时不断有值产生的流(多值),而 BehaviorSubject 反映的则是随时间变化的值(单值),因此,BehaviorSubject 需要一个种子 seed 作为初始值,然后这个 seed 将不断变化,我们只能看见当前的值。

在行为上,由于 ReplaySubject 侧重于缓存,那么当 ReplaySubject 完成时,不影响我们观测缓存到的值:

const subject = new Rx.ReplaySubject(Number.MAX_VALUE, 250);

const observerA = {
  next: x => console.log(`A next ${x}`),
  error: e => console.error(`A error ${e}`),
  complete: () => console.log('A done')
};

subject.subscribe(observerA);
console.log('observerA subscribed');

const observerB = {
  next: x => console.log(`B next ${x}`),
  error: e => console.error(`B error ${e}`),
  complete: () => console.log('B done')
};

setTimeout(() =>subject.next(1), 100);
setTimeout(() =>subject.next(2), 200);
setTimeout(() =>subject.next(3), 300);

setTimeout(() => {
  subject.subscribe(observerB);
  console.log('observerB subscribed');
}, 400);

/* 程序输出:
  "observerA subscribed"
  "A next 1"
  "A next 2"
  "A next 3"
  "A done"
  "B next 3"
  "B done"
  "observerB subscribed"
 */

查看例子

而 BehaviorSubject 完成时,意味着值停止变化,我们不再能够观测到值:

const subject = new Rx.BehaviorSubject(0);

const observerA = {
  next: x => console.log(`A next ${x}`),
  error: e => console.error(`A error ${e}`),
  complete: () => console.log('A done')
};

subject.subscribe(observerA);
console.log('observerA subscribed');

const observerB = {
  next: x => console.log(`B next ${x}`),
  error: e => console.error(`B error ${e}`),
  complete: () => console.log('B done')
};

setTimeout(() =>subject.next(1), 100);
setTimeout(() =>subject.next(2), 200);
setTimeout(() =>subject.next(3), 300);

setTimeout(() => {
  subject.subscribe(observerB);
  console.log('observerB subscribed');
}, 400);

/* 程序输出:
  "A next 1"
  "observerA subscribed"
  "A next 1"
  "A next 2"
  "A next 3"
  "A done"
  "B done"
  "observerB subscribed"
 */

查看例子

results matching ""

    No results matching ""