ruk·si

UniRx
Custom Observables

Updated at 2017-08-13 13:22

Observable.Create Create a new stream template that defines how this kind of stream behaves. This is for more advanced users and requires the following:

  1. May call observer.OnNext to emit new values.
  2. May call either observer.OnComplete or observer.OnError exactly once.
  3. Does not call any of the observer methods after a onComplete or onError.
  4. Returns dispose handler Disposable.
Func<UniRx.IObservable<int>> createStream = () =>
{
    return Observable.Create<int>(
        (observer) =>
        {
            observer.OnNext(5);                         // emit 5
            var timerCancel = Observable
                .Timer(TimeSpan.FromSeconds(0.5f))      // wait for 500ms
                .Subscribe((_) =>
                {
                    observer.OnNext(10);                // emit 10
                    observer.OnCompleted();             // notify complete
                });
            // You could return "timerCancel" here.
            return Disposable.Create(() => timerCancel.Dispose());
        }
    );
};

var source = createStream();
var handler = source.Subscribe(CreateShoutObserver<int>());
handler.Dispose();
// OnNext: 5 - only print as subscription is canceled before timer finishes

// You can also subscribe with the observer functions alone:
source.Subscribe(
    (value) => Debug.Log("OnNext: " + value),
    (error) => Debug.Log("OnError: " + error),  // optional
    () => Debug.Log("OnCompleted")              // optional
);
// OnNext: 5
// OnNext: 10
// OnCompleted

Sources