The way we use publish() (or multicast with an RxJS Subject) makes the shared Observable not reusable if the shared execution happens to complete or emit an error. In this lesson we will see how to use a simple Subject factory function in order to create a new Subject, one for each shared execution, whenever connect() is called.
Hi, This topic of Subject factories looks very interesting but i can't wrap my head around one problem...What if i want to have a Subject (actually a BehaviourSubject) which i want to use to push values to the stream continuously and also use for multicast, and the factory would seem to solve the problem of resubscription if the stream errors somehow Basically i want to use this construct for state shaped by actions Like var actionSubject = new Rx.BehaviourSubject() var reducer = (state, action) => ( {...state, [action.type]: action.payload} ) var state = Rx.Observable.from({}) .multicast(actionSubject) .flatMap(action => isObservable(action) ? action : Rx.Observable.from([action])) .scan(reducer) .refCount()
var actionStream = subject => action => { subject.next(action) } var action = actionStream(actionSubject)
The idea is that if I swap the actionSubject with a factory in multicast I would have to find a way to push the actions to that newly created subject from that point on...correct? Or is there a better way to do this altogether?
Bogdan, if you already have a subject then you do not need to apply multicast. You can simply write:
var stateObservable = actionSubject
.flatMap(action => isObservable(action) ? action : Rx.Observable.from([action]))
.scan(reducer)
.refCount()
Multicast is useful for plain observables, not subjects. That's because multicast basically converts an observable to the subject, but you already have a subject.
The reason why directly putting a Rx.Subject() in a multicast prevents from 're-connecting' is because of the fact that multicast holds a reference to that Subject, while a factory always generates a new instance of that Subject? Do I understand this correctly?
Yes, that's correct.
This took me awhile to figure out so I'll leave it here:
If your source observable does not emit done before the refcount returns to 0, then when you re-subscribe to shared
observable A will receive each of the emitted values of source.
let shared = Observable.interval(1000).take(6).multicast(new Subject()).refCount();
let subA = shared.subscribe(observerA);
// note, we are unsubscribing before the shared observable could emit
// all 6 values, which would take 6000ms
setTimeout(function() {
subA.unsubscribe();
}, 5000);
setTimeout(function() {
subA = shared.subscribe(observerA);
}, 8000);
// this will emit
// A next 0
// A next 1
// ...
// A done
This confused me because at first glance it appears to behave similarly to if you had passed a factory to multiCast
.