重用 multicast

我们不妨尝试下,在使用 引用计数 时,重新订阅 一个不久前取消了订阅的观察者:

const subject = new Rx.Subject();

const shared = Rx.Observable.interval(1000)
  .take(6)
  .do(x => console.log(`source ${x}`))
  .multicast(subject)
  .refCount();

const observerA = {
  next: x => console.log(`A next ${x}`),
  error: e => console.error(`A error ${e}`),
  complete: () => console.log('A done')
};

// refCount: 0 --> 1 开始
const subA = shared.subscribe(observerA);
console.log('subscribed A');

const observerB = {
  next: x => console.log(`B next ${x}`),
  error: e => console.error(`B error ${e}`),
  complete: () => console.log('B done')
};

let subB;
setTimeout(() => {
  // refCount: 1 --> 2
  subB = shared.subscribe(observerB);
  console.log('subscribed B');
}, 2000);

setTimeout(() => {
  // refCount: 2 --> 1
  subA.unsubscribe();
  console.log('unsubscribe A');
}, 5000);

setTimeout(() => {
  // refCount: 1 --> 0 停止
  subB.unsubscribe();
  console.log('unsubscribe B');
}, 7000);

setTimeout(() => {
  // refCount: 0 --> 1 重新开始?
  shared.subscribe(observerA);
  console.log('subscribed A');
}, 8000);

/* 程序输出:
  "subscribed A"
  "source 0"
  "A next 0"
  "source 1"
  "A next 1"
  "subscribed B"
  ...
  "A next 4"
  "B next 4"
  "unsubscribe A"
  "source 5"
  "B next 5"
  "B done"
  "unsubscribe B"
  "A done"
  "subscribed A"
*/

查看例子

我们发现,没有成功,再次执行 shared.subscribe(observerA) 没能让我们重新开始 shared 的执行。这是因为我们的流已经结束了:

subject:  --0--1--2--3--4--5|
observerA:                     A

multicast operator 除了能够接受一个 Subject 对象作为参数外,还能接受一个 SubjectFactory 作为参数,这个工厂函数声明了当一个 Subject 完成时,如何创建新的 Subject:

function subjectFactory() {
  console.log('create new subject');
  return new Rx.Subject();
}

const shared = Rx.Observable.interval(1000)
  .take(6)
  .do(x => console.log(`source ${x}`))
  .multicast(subjectFactory)
  .refCount();

const observerA = {
  next: x => console.log(`A next ${x}`),
  error: e => console.error(`A error ${e}`),
  complete: () => console.log('A done')
};

// refCount: 0 --> 1 开始
const subA = shared.subscribe(observerA);
console.log('subscribed A');

const observerB = {
  next: x => console.log(`B next ${x}`),
  error: e => console.error(`B error ${e}`),
  complete: () => console.log('B done')
};

let subB;
setTimeout(() => {
  // refCount: 1 --> 2
  subB = shared.subscribe(observerB);
  console.log('subscribed B');
}, 2000);

setTimeout(() => {
  // refCount: 2 --> 1
  subA.unsubscribe();
  console.log('unsubscribe A');
}, 5000);

setTimeout(() => {
  // refCount: 1 --> 0 停止
  subB.unsubscribe();
  console.log('unsubscribe B');
}, 7000);

setTimeout(() => {
  // refCount: 0 --> 1 重新开始
  shared.subscribe(observerA);
  console.log('subscribed A');
}, 8000);

/* 程序输出:
"create new subject"
  "subscribed A"
  "source 0"
  "A next 0"
  "source 1"
  "A next 1"
  "subscribed B"
  "source 2"
  "A next 2"
  // ...
  "A next 4"
  "B next 4"
  "unsubscribe A"
  "source 5"
  "B next 5"
  "B done"
  "unsubscribe B"
  "create new subject"
  "subscribed A"
  "source 0"
  ...
  "source 5"
  "A next 5"
  "A done"
*/

查看例子

通过向 multicast 传入工厂函数,我们实现了 multicast 的重用,现在,每当有新的观察者到来时,即 refCount 大于 0 时,都能重新启动我们的 Observable 对象。

results matching ""

    No results matching ""