ruk·si

UniRx
Subjects

Updated at 2017-07-28 20:53

Subjects implement both IObservable and and IObserver interfaces. And as you might imagine, they are pretty useful in testing.

var s = new Subject<int>();
s.Subscribe(CreateShoutObserver<int>());
s.OnNext(1);
s.OnNext(3);
s.OnNext(2);
s.OnCompleted();
// OnNext: 1
// OnNext: 3
// OnNext: 2
// OnCompleted

Subjects are used to broadcast data to multiple subscribers.

// Subscribe a subject to a stream.
var source = Observable
    .Interval(TimeSpan.FromSeconds(1))
    .Take(2);
var subject = new Subject<long>();
source.Subscribe(subject);

// Add multiple subscribers to the subject.
subject.Subscribe(CreateShoutObserver<long>());
subject.Subscribe(CreateShoutObserver<long>());
// OnNext: 0
// OnNext: 0
// OnNext: 1
// OnNext: 1
// OnCompleted
// OnCompleted

ReplaySubject Store all emitted items so subscribers get the full history.

var s = new Subject<int>();
s.Subscribe(CreateShoutObserver<int>());
s.OnNext(1);
s.OnNext(2);
s.OnCompleted();
s.Subscribe(CreateShoutObserver<int>());
// OnNext: 1
// OnNext: 2
// OnCompleted
// OnCompleted  -  the second observer doesn't receive the values

var rs = new ReplaySubject<int>();
rs.Subscribe(CreateShoutObserver<int>());
rs.OnNext(1);
rs.OnNext(2);
rs.OnCompleted();
rs.Subscribe(CreateShoutObserver<int>());
// OnNext: 1
// OnNext: 2
// OnCompleted
// OnNext: 1
// OnNext: 2
// OnCompleted

BehaviourSubject Store only the latest emitted item, and requires a default item to emit if nothing has been emitted yet. But receives no items the stream has completed already.

var bs = new BehaviorSubject<int>(-100);
bs.Subscribe(CreateShoutObserver<int>());
bs.OnNext(1);
bs.OnNext(2);
bs.Subscribe(CreateShoutObserver<int>());
bs.OnCompleted();
bs.Subscribe(CreateShoutObserver<int>());
// OnNext: -100
// OnNext: 1
// OnNext: 2
// OnNext: 2
// OnCompleted
// OnCompleted
// OnCompleted

AsyncSubject Stores the last value but only emits it after completion. Useful for hot streams that might complete before any subscriptions.

var bs = new AsyncSubject<int>();
bs.Subscribe(CreateShoutObserver<int>());
bs.OnNext(1);
bs.OnNext(2);
bs.Subscribe(CreateShoutObserver<int>());
bs.OnCompleted();
bs.Subscribe(CreateShoutObserver<int>());
// OnNext: 2
// OnNext: 2
// OnCompleted
// OnCompleted
// OnNext: 2
// OnCompleted

Sources