ruk·si

⬜️ Unity
UniRx

Updated at 2016-11-04 17:36

UniRx (Reactive Extensions for Unity) is a reimplementation of the .NET Reactive Extensions. Vastly simplifies async operations and provides a good collection of helpers such as pooling and tuples.

With coroutines:

using System.Collections;
using UnityEngine;

public class CoroutineTicker : MonoBehaviour
{
    private void Start()
    {
        StartCoroutine(Ticker());
    }
    private IEnumerator Ticker()
    {
        var tick = true;
        while (true) {
            yield return new WaitForSeconds(0.5f);
            Debug.Log(tick ? "Tick!" : "Tock!");
            tick = !tick;
        }
    }
}

The same with streams:

using System.Collections;
using UnityEngine;
using UniRx;

public class ObservableTicker : MonoBehaviour
{
    private void Start()
    {
        var handler = Observable
            .FromCoroutine(Ticker)
            .Subscribe();
    }
    private IEnumerator Ticker()
    {
        var tick = true;
        while (true) {
            yield return new WaitForSeconds(0.5f);
            Debug.Log(tick ? "Tick!" : "Tock!");
            tick = !tick;
        }
    }
}

Or more complex stream:

using System;
using System.Collections;
using UnityEngine;
using UniRx;

public class FullObservableExample : MonoBehaviour
{
    private void Start()
    {
        // the Observable.FromCoroutine() part is usually constructed in
        // a static function.
        var handler = Observable
            .FromCoroutine<string>((obs, token) => Ticker(0.5f, obs, token))
            .Where(tickOrTock => tickOrTock == "Tock!")
            .Subscribe(
                tock => Debug.Log(tock), // we get "returned" value here
                exception => Debug.LogException(exception),
                () => Debug.Log("Completed!")
            );
            // default implementation throws inside the coroutine

        // uncomment the following to stop stream midway
        //StartCoroutine(DisposeSoon(handler));
    }
    private IEnumerator DisposeSoon(IDisposable handler)
    {
        yield return new WaitForSeconds(2.0f);
        handler.Dispose(); // stops the coroutine
    }
    private IEnumerator Ticker(
        float time,
        IObserver<string> observer,
        CancellationToken token
    ) {
        var tick = true;
        var count = 0;
        while (true) {
            yield return new WaitForSeconds(time);
            if (token.IsCancellationRequested) yield break;
            // allows "returning" a value from coroutine
            observer.OnNext(tick ? "Tick!" : "Tock!");
            tick = !tick;
            count++;
            if (count >= 6) {
                // try out what happens on both cases
                //observer.OnCompleted();
                observer.OnError(new Exception("6 ticks and tocks error"));
            }
        }
    }
}

Most operations use main thread so it is thread safe. For example Interval, Timer and Buffer(timeSpan) can be used without threading problems. Observable.Start is an exception and you must add ObserveOnMainThread to get back to the main thread. Default main thread scheduler also works under the influence of Unity Time.timeScale.

using System;
using UnityEngine;
using UniRx;

public class MultithreadingExample : MonoBehaviour
{
    private void Start()
    {
        var heavyMethod1 = Observable.Start(() => {
            System.Threading.Thread.Sleep(TimeSpan.FromSeconds(1));
            return 100;
        });

        var heavyMethod2 = Observable.Start(() => {
            System.Threading.Thread.Sleep(TimeSpan.FromSeconds(3));
            return 300;
        });

        Observable
            .WhenAll(heavyMethod1, heavyMethod2)
            .ObserveOnMainThread()
            .Subscribe(numbers => {
                // we get back to main thread with ObserveOnMainThread
                // so we can use access GameObjects etc.
                Debug.Log(numbers[0]); // => 100
                Debug.Log(numbers[1]); // => 300
            });
    }
}

You can change most game object callbacks to streams using triggers. If default triggers are not enough, you can always add your own e.g. for long tap.

using UnityEngine;
using UniRx;
using UniRx.Triggers;

public class UpdateAsObservableExample : MonoBehaviour
{
    private void Start()
    {
        var cube = GameObject.CreatePrimitive(PrimitiveType.Cube);

        // this will add ObservableUpdateTrigger component to the game object
        cube.UpdateAsObservable()
            .SampleFrame(30) // do something every 30 frame
            .Subscribe(
                x => Debug.Log("cube says hello"),
                () => Debug.Log("cube was annihilated")
            );

        // destroy after 3 second:)
        GameObject.Destroy(cube, 3f);
    }
}

CompositeDisposable helps handling multiple disposables.

using System;
using System.Collections;
using UnityEngine;
using UniRx;

public class CompositeExample : MonoBehaviour
{
    CompositeDisposable handlers = new CompositeDisposable();
    private void Start()
    {
        Observable
            .EveryUpdate()
            .SampleFrame(10)
            .Subscribe(x => Debug.Log("10 frames passed"))
            .AddTo(handlers);
        Observable
            .EveryUpdate()
            .SampleFrame(12)
            .Subscribe(x => Debug.Log("12 frames passed"))
            .AddTo(handlers);
        StartCoroutine(DisposeSoon());
    }
    private IEnumerator DisposeSoon()
    {
        yield return new WaitForSeconds(2.0f);
        // .Clear() => Dispose is called for all, and the list is cleared.
        // .Dispose() => Dispose is called for all, and
        //               Dispose is called immediately after additional Adds.
        handlers.Clear();
    }
}

You can also dispose an observable when a game object is destroyed.

using UnityEngine;
using UniRx;

public class Test : MonoBehaviour
{
    private void Start()
    {
        var cube = GameObject.CreatePrimitive(PrimitiveType.Cube);

        Observable
            .IntervalFrame(30)
            .Subscribe(x => Debug.Log(x))
            .AddTo(cube); // dispose when target object is destroyed

        GameObject.Destroy(cube, 3f);
    }
}
using UnityEngine;
using UniRx;

public class Test : MonoBehaviour
{
    private void Start()
    {
        var cube = GameObject.CreatePrimitive(PrimitiveType.Cube);

        Observable
            .IntervalFrame(30)
            .TakeUntilDestroy(cube) // dispose when target object is destroyed
            .Subscribe(
                x => Debug.Log(x),
                () => Debug.Log("completed!")
            );
        // allows using OnCompleted part

        GameObject.Destroy(cube, 3f);
    }
}

UniRx provides micro coroutines. Micro coroutine is automaticaly used on UniRX frame count based operators and ObserveEveryValueChanged. They have vastly superior performance but you can only use yield return null in them, executing every frame.

using System.Collections;
using UnityEngine;
using UniRx;

public class MicroCoroutineExample : MonoBehaviour
{
    int counter;
    void Start()
    {
        for(var i = 0; i < 10000; i++) {
            // you can try out the difference yourself in profiler
            // micro coroutines are like quarter or less as intensive
            MainThreadDispatcher.StartUpdateMicroCoroutine(Worker());
            //StartCoroutine(Worker());
        }
    }
    private IEnumerator Worker()
    {
        while (true) {
            counter++;
            yield return null;
        }
    }
}

Frame count based operators in UniRx. You can access these through Observable static class.

  • EveryUpdate(): on every Update().
  • EveryFixedUpdate(): on every FixedUpdate().
  • EveryLateUpdate(): on every LateUpdate().
  • EveryEndOfFrame(): on end of every frame.
  • EveryGameObjectUpdate(): on every update on MainThreadDispatcher singleton.
  • NextFrame(FrameCountType): only on first next frame of the frame type.
  • IntervalFrame(int, FrameCountType): execute after every count of frame type.
  • TimerFrame(int, FrameCountType): execute once after count of frame type.
  • DelayFrame(Observable, int, FrameCountType): listen to another stream and delay execution on this stream by the frame count.
  • SampleFrame(Observable, int, FrameCountType): tells total number of events in stream, returned every given frame count.
  • ThrottleFrame(Observable, int, FrameCountType): listen to another stream and delay execution on this stream until the given amount of frames has passed without new items in stream.
  • ThrottleFirstFrame(Observable, int, FrameCountType): same as ThrottleFrame but makes throttling work as a "cooldown" so that stream is triggered once but doesn't trigger again until the given amount of frame has passed without an item in the stream.
  • TimeoutFrame(Observable, int, FrameCountType): throws an exception inside the stream if no event comes in the given number of frames.
  • DelayFrameSubscription(Observable, int, FrameCountType): similar to DelayFrame but only delays the subscription, but after that triggers when the original stream triggers.
  • FrameInterval(Observable): tells interval between 2 events latest in frames e.g. 2@101 (third event was 101 frames away from the second frame)
  • FrameTimeInterval(Observable, bool): tells interval between 2 events latest in time e.g. 2@00:00:01.7000000 (third frame was 1.7 seconds away from the second frame)
  • BatchFrame(Observable, int, FrameCountType): groups source stream events to lists by the given frame count.
  • ObserveOnMainThread(): hand off execution to MainThreadDispatcher.

UnityEvent can be turned into a stream. OnValueChangedAsObservable can also be called on inputs and other widgets.

using UnityEngine;
using UnityEngine.UI;
using UnityEngine.EventSystems;
using UniRx;

public class UnityEventExample : MonoBehaviour
{
    void Start()
    {
        // Boilerplate stuff for UI, don't mind this
        var esGo = new GameObject("EventSystem");
        esGo.AddComponent<EventSystem>();
        esGo.AddComponent<StandaloneInputModule>();
        var canvasGo = new GameObject("Canvas");
        var canvas = canvasGo.AddComponent<Canvas>();
        canvasGo.AddComponent<GraphicRaycaster>();
        canvas.renderMode = RenderMode.ScreenSpaceOverlay;
        var buttonGo = new GameObject("Button");
        buttonGo.transform.SetParent(canvasGo.transform);
        var button = buttonGo.AddComponent<Button>();
        var image = buttonGo.AddComponent<Image>();
        button.targetGraphic = image;

        button.onClick
            .AsObservable()
            .Subscribe(_ => Debug.Log("clicked"));
    }
}

Reactive properties allow streaming changes to listeners. There is also ReadOnlyReactiveProperty. You can use these in non-MonoBehaviour classes without problems.

using UnityEngine;
using UniRx;

public class ReactivePropertyExample : MonoBehaviour
{
    // these *ReactiveProperty variants are serializable and show in Editor
    // note that this cannot have get/set
    public FloatReactiveProperty currentHp = new FloatReactiveProperty(10f);
    public ReactiveProperty<bool> IsDead { get; private set; }
    void Start()
    {
        IsDead = currentHp.Select(x => x <= 0).ToReactiveProperty();
        IsDead.Where(isDead => isDead == true).Subscribe(_ => Debug.Log("DEAD!"));
    }
}

Reactive collections and dictionaries.

using UnityEngine;
using UniRx;

public class ReactiveCollectionExample : MonoBehaviour
{
    public ReactiveCollection<int> numbers = new ReactiveCollection<int>();
    public ReactiveDictionary<string, int> stringToNumber
        = new ReactiveDictionary<string, int>();
    void Start()
    {
        numbers
            .ObserveAdd()
            .Where(number => number.Value % 2 == 0)
            .Subscribe(number => Debug.Log(number));
        numbers.Add(1);
        numbers.Add(2);
        numbers.Add(3);
        numbers.Add(4);

        stringToNumber
            .ObserveAdd()
            .Where(number => number.Key.StartsWith("t"))
            .Subscribe(number => Debug.Log(number));
        stringToNumber["one"] = 1;
        stringToNumber["two"] = 2;
        stringToNumber["three"] = 3;
        stringToNumber["four"] = 4;
    }
}

__UniRx also provides good tuples.__
Really simple but ever so powerful.

```cs
using UnityEngine;
using UniRx;

public class TupleExample : MonoBehaviour
{
    void Start()
    {
        var t1 = Tuple.Create<int, string>(1, "dwqqwdd");
        Debug.Log(t1);
        var t2 = Tuple.Create(
            "something",
            12314,
            new GameObject(),
            Tuple.Create(1f, 2f));
        Debug.Log(t2);
        Debug.Log(t2.Item3);
    }
}

Reactive commands allow binding values to button state. There is also AsyncReactiveCommand.

using UnityEngine;
using UnityEngine.UI;
using UnityEngine.EventSystems;
using UniRx;

public class ReactiveCommandExample : MonoBehaviour
{
    public IntReactiveProperty hp = new IntReactiveProperty(10);
    public ReactiveCommand resurrect;

    void Start()
    {
        // Boilerplate stuff for UI, don't mind this
        var esGo = new GameObject("EventSystem");
        esGo.AddComponent<EventSystem>();
        esGo.AddComponent<StandaloneInputModule>();
        var canvasGo = new GameObject("Canvas");
        var canvas = canvasGo.AddComponent<Canvas>();
        canvasGo.AddComponent<GraphicRaycaster>();
        canvas.renderMode = RenderMode.ScreenSpaceOverlay;
        var buttonGo = new GameObject("Button");
        buttonGo.transform.SetParent(canvasGo.transform);
        var button = buttonGo.AddComponent<Button>();
        var image = buttonGo.AddComponent<Image>();
        button.targetGraphic = image;

        // you can change HP in Editor and only resurrect if hp <= 0
        resurrect = hp.Select(x => x <= 0).ToReactiveCommand();
        resurrect.Subscribe(_ => { hp.Value = 10; });
        resurrect.BindTo(button);
    }
}

Message brokers allow listening for events based on type. MessageBroker.Default is the global scope message broker but you can always instantiate your own.

using System.Collections;
using UnityEngine;
using UniRx;

public class GlobalBrokerExample : MonoBehaviour
{
    public class BrokerItem1 { public int Value { get; set; } }
    public class BrokerItem2 { public int Value { get; set; } }
    private void Start()
    {
        MessageBroker.Default
            .Receive<BrokerItem1>()
            .Subscribe(x => Debug.Log(x));
        StartCoroutine(PublishSoon());
    }
    private IEnumerator PublishSoon()
    {
        yield return new WaitForSeconds(1.0f);
        MessageBroker.Default.Publish(new BrokerItem1 { Value = 1000 });
        MessageBroker.Default.Publish(new BrokerItem2 { Value = 1000 });
        yield return new WaitForSeconds(1.0f);
        MessageBroker.Default.Publish(new BrokerItem1 { Value = 1000 });
        MessageBroker.Default.Publish(new BrokerItem2 { Value = 1000 });
    }
}
using System.Collections;
using UnityEngine;
using UniRx;

public class CustomBrokerExample : MonoBehaviour
{
    public class BrokerItem1 { public int Value { get; set; } }
    public class BrokerItem2 { public int Value { get; set; } }
    private MessageBroker broker;
    private void Start()
    {
        broker = new MessageBroker();
        broker.Receive<BrokerItem1>().Subscribe(x => Debug.Log(x));
        StartCoroutine(PublishSoon());
    }
    private IEnumerator PublishSoon()
    {
        yield return new WaitForSeconds(1.0f);
        broker.Publish(new BrokerItem1 { Value = 1000 });
        broker.Publish(new BrokerItem2 { Value = 1000 });
        yield return new WaitForSeconds(1.0f);
        broker.Publish(new BrokerItem1 { Value = 1000 });
        broker.Publish(new BrokerItem2 { Value = 1000 });
    }
}

Async message brokers are also useful from time to time. Publisher is notified after all subscribers have completed.

using System;
using System.Collections;
using UnityEngine;
using UniRx;

public class AsyncBrokerExample : MonoBehaviour
{
    public class BrokerItem { public int Value { get; set; } }
    private MessageBroker broker;
    private void Start()
    {
        AsyncMessageBroker.Default
            .Subscribe<BrokerItem>(x => {
                return Observable
                    .Timer(TimeSpan.FromSeconds(1))
                    .ForEachAsync(_ => { Debug.Log(x); });
            });
        AsyncMessageBroker.Default
            .Subscribe<BrokerItem>(x => {
                return Observable
                    .Timer(TimeSpan.FromSeconds(2))
                    .ForEachAsync(_ => { Debug.Log(x); });
            });
        StartCoroutine(PublishSoon());
    }
    private IEnumerator PublishSoon()
    {
        yield return new WaitForSeconds(1.0f);
        AsyncMessageBroker.Default
            .PublishAsync(new BrokerItem { Value = 3000 })
            .Subscribe(_ => {
                Debug.Log("call all subscriber completed");
            });
        yield return new WaitForSeconds(1.0f);
        AsyncMessageBroker.Default
            .PublishAsync(new BrokerItem { Value = 3000 })
            .Subscribe(_ => {
                Debug.Log("call all subscriber completed");
            });
    }
}

Pooling.

using System;
using System.Collections;
using UnityEngine;
using UniRx;
using UniRx.Toolkit;

public class PoolUserExample : MonoBehaviour
{
    FoobarPool pool = null;
    public Foobar prefab;
    private void Start()
    {
        pool = new FoobarPool(prefab, this.transform);
        StartCoroutine(MassCreate());
    }
    private IEnumerator MassCreate()
    {
        var count = 0;
        while (true) {
            yield return new WaitForSeconds((count < 5) ? 2.0f : 5.0f);
            count++;
            Debug.LogFormat("Rental #{0}", count);
            var foobar = pool.Rent();
            foobar.ActionAsync().Subscribe(f => pool.Return(f));
            if (count >= 8) { break; }
        }
        Debug.Log("stopping creation");
    }
}
using UnityEngine;
using UniRx;
using UniRx.Toolkit;

public class FoobarPool : ObjectPool<Foobar>
{
    readonly Foobar prefab;
    readonly Transform hierarchyParent;
    public FoobarPool(Foobar prefab, Transform hierarchyParent)
    {
        this.prefab = prefab;
        this.hierarchyParent = hierarchyParent;
    }
    protected override Foobar CreateInstance()
    {
        var foobar = GameObject.Instantiate<Foobar>(prefab);
        foobar.transform.SetParent(hierarchyParent);
        return foobar;
    }
    protected override void OnBeforeRent(Foobar instance)
    {
        instance.gameObject.SetActive(true);
    }
    protected override void OnBeforeReturn(Foobar instance)
    {
        instance.gameObject.SetActive(false);
    }
    protected override void OnClear(Foobar instance)
    {
        Debug.Log("FoobarPool.OnClear");
    }
}
using System;
using UnityEngine;
using UniRx;

public class Foobar : MonoBehaviour
{
    public IObservable<Foobar> ActionAsync()
    {
        return Observable.Timer(TimeSpan.FromSeconds(3)).Select(_ => this);
    }
    private void Awake() { Debug.Log("Foobar.Awake"); }
    private void Start() { Debug.Log("Foobar.Start"); }
    private void OnEnable() { Debug.Log("Foobar.OnEnable"); }
    private void OnDisable() { Debug.Log("Foobar.OnDisable"); }
    private void OnDestroy() { Debug.Log("Foobar.OnDestroy"); }
}

Extra goodies:

// Global coroutine execution.
MainThreadDispatcher.StartCoroutine(enumerator)

// Application related streams.
Observable.EveryApplicationPause();
Observable.EveryApplicationFocus();
Observable.OnceApplicationQuit();

Source