ruk·si

UniRx
Transform Operators

Updated at 2017-07-28 16:21

IObservable.Select Transform the emitted items, same as "map" in some libraries.

Observable
    .Range(2, 3)
    .Select(x => x * 2)
    .Subscribe(CreateShoutObserver<int>());
// OnNext: 4
// OnNext: 6
// OnNext: 4
// OnCompleted

IObservable.SelectMany Automatically flattens streams provided by the select. It works like "Select().Merge()" The same as "flat map" in some libraries.

Observable
    .Return(3)
    .SelectMany(i => Observable.Range(5, i))
    .Subscribe(CreateShoutObserver<int>());
// OnNext: 5
// OnNext: 6
// OnNext: 7
// OnCompleted

Observable.Range(1, 3)
    .SelectMany(i => Observable.Range(5, i))
    .Subscribe(CreateShoutObserver<int>());
// OnNext: 5
// OnNext: 5
// OnNext: 6
// OnNext: 5
// OnNext: 6
// OnNext: 7
// OnCompleted

IObservable.Scan Takes items one by one, keeps reference to an accumulated item and emits after each received value.

  • Running total as we receive values.
  • Working with infinite sequences.
Observable
    .Repeat(1, 5)
    .Scan(0, (acc, current) => acc + current)
    .Subscribe(CreateShoutObserver<int>());
// OnNext: 1   -  0 + 1
// OnNext: 2   -  1 + 1
// OnNext: 3   -  2 + 1
// OnNext: 4   -  3 + 1
// OnNext: 5   -  4 + 1
// OnCompleted
S.1: ---1----1--1----1------1-->
     vvvvvvvvv scan() vvvvvvvvvv
S.2: ---1----2--3----4------5-->

IObservable.Aggregate The same as scan but only takes the last value, thus works like "Scan().TakeLast()".

Observable
    .Repeat(1, 5)
    .Aggregate(0, (acc, current) => acc + current)
    .Subscribe(CreateShoutObserver<int>());
// OnNext: 5
// Oncompleted
S.1: ---1----1--1----1----1---|--->
     vvvvvvvv aggregate() vvvvvvvvv
S.2: -------------------------5-|->

IObservable.Buffer Buffer items and then re-emit them as a list once the buffering condition is true. You can buffer by count, time, selector or custom logic.

Observable
    .Range(5, 5)
    .Buffer(2)
    .Subscribe(buffer =>
    {
        var asStrings = buffer.Select(x => x.ToString()).ToArray();
        var str = String.Join(", ", asStrings);
        Debug.Log(str);
    });
// 5, 6
// 7, 8
// 9
S.1: -a--b--c--d--e--f--g-|----------->
     vvvvvvvv buffer(count=2) vvvvvvvvv
S.2: ---ab----cd----ef----g-|--------->

IObservable.GroupBy Divide stream into a set of streams that each emit a different subset of items from the original stream.

Observable
    .Range(1, 4)
    .GroupBy(x => x % 2)
    .Subscribe(
        grp => grp.Subscribe((x) => Debug.Log(grp.Key + ":" + x))
    );
// 1:1  --  group 1 (odd),  number 1
// 0:2  --  group 0 (even), number 2
// 1:3  --  group 1 (odd),  number 3
// 0:4  --  group 0 (even), number 4
S.1: -a-b-c----b-c--b------>
     vvv groupBy(value) vvvv
S.2: -A-B-C---------------->
S.A: -a-------------------->
S.B: ---b------b----b------>
S.C: -----c------c--------->

Sources