一、Observable 概述

1.1 什么是 Observable

Observable 是 UniRx 中的核心概念,代表一个可观察的数据流。它可以发射多个值(异步或同步),订阅者可以监听这些值。

1.2 Observable 的特点

  • 可观察:数据源可以被订阅

  • 可组合:多个 Observable 可以组合成新的数据流

  • 可操作:支持各种操作符进行数据转换

  • 可订阅:允许一个或多个订阅者监听

二、创建 Observable 的常用方法

2.1 基础创建方法

csharp

// 1. 创建一个发射单个值的 Observable
Observable.Return("Hello")
    .Subscribe(msg => Debug.Log(msg));

// 2. 创建一个空的 Observable(立即完成)
Observable.Empty<int>()
    .Subscribe(
        onNext: x => Debug.Log(x),  // 不会执行
        onCompleted: () => Debug.Log("Completed")  // 立即执行
    );

// 3. 创建一个永不结束的 Observable
Observable.Never<int>();

// 4. 创建一个立即抛出错误的 Observable
Observable.Throw<int>(new Exception("Error"));

2.2 从 Unity 事件创建

csharp

// 1. 从 Update 事件创建
Observable.EveryUpdate()
    .Subscribe(_ => {
        // 每帧执行
    });

// 2. 从 FixedUpdate 创建
Observable.EveryFixedUpdate()
    .Subscribe(_ => {
        // 每个固定物理帧执行
    });

// 3. 从协程创建
Observable.FromCoroutine<int>(observer => CountCoroutine(observer, 5))
    .Subscribe(count => Debug.Log($"Count: {count}"));

IEnumerator CountCoroutine(IObserver<int> observer, int count)
{
    for (int i = 0; i < count; i++)
    {
        observer.OnNext(i);
        yield return new WaitForSeconds(1);
    }
    observer.OnCompleted();
}

// 4. 从 UnityEvent 创建
public UnityEvent myEvent;
myEvent.AsObservable()
    .Subscribe(_ => Debug.Log("Event triggered"));

2.3 从异步操作创建

csharp

// 1. 从 Task 创建
Observable.FromAsync(() => SomeAsyncMethod())
    .Subscribe(result => Debug.Log(result));

async Task<string> SomeAsyncMethod()
{
    await Task.Delay(1000);
    return "Result";
}

// 2. 从 WWW/UnityWebRequest 创建
ObservableWWW.Get("http://example.com")
    .Subscribe(
        result => Debug.Log(result),
        error => Debug.LogError(error)
    );

2.4 定时器相关

csharp

// 1. 定时器
Observable.Timer(TimeSpan.FromSeconds(1))
    .Subscribe(_ => Debug.Log("1秒后执行"));

// 2. 间隔执行
Observable.Interval(TimeSpan.FromSeconds(1))
    .Subscribe(tick => Debug.Log($"Tick: {tick}"));

// 3. 延迟执行
Observable.Start(() => Debug.Log("Start"))
    .Delay(TimeSpan.FromSeconds(1))
    .Subscribe();

三、常用操作符

3.1 转换操作符

csharp

// 1. Select - 映射转换
Observable.Range(1, 5)
    .Select(x => x * 2)  // 每个值乘以2
    .Subscribe(x => Debug.Log(x));  // 2, 4, 6, 8, 10

// 2. Where - 过滤
Observable.Range(1, 10)
    .Where(x => x % 2 == 0)  // 只保留偶数
    .Subscribe(x => Debug.Log(x));  // 2, 4, 6, 8, 10

// 3. Cast - 类型转换
List<GameObject> objects = new List<GameObject>();
objects.AsObservable()
    .Cast<GameObject, Component>()
    .Subscribe(component => { /* 处理 */ });

3.2 组合操作符

csharp

// 1. Merge - 合并多个流
var stream1 = Observable.Interval(TimeSpan.FromSeconds(1)).Select(x => $"A{x}");
var stream2 = Observable.Interval(TimeSpan.FromSeconds(2)).Select(x => $"B{x}");
stream1.Merge(stream2)
    .Subscribe(msg => Debug.Log(msg));

// 2. Zip - 成对组合
Observable.Range(1, 3)
    .Zip(Observable.Range(10, 3), (a, b) => $"{a}-{b}")
    .Subscribe(pair => Debug.Log(pair));  // "1-10", "2-11", "3-12"

// 3. CombineLatest - 任意一个发射时使用最新值组合
var inputX = GetAxisStream("Horizontal");
var inputY = GetAxisStream("Vertical");
inputX.CombineLatest(inputY, (x, y) => new Vector2(x, y))
    .Subscribe(vector => Move(vector));

// 4. Concat - 顺序连接
Observable.Return("First")
    .Concat(Observable.Return("Second"))
    .Subscribe(msg => Debug.Log(msg));  // "First", 然后 "Second"

3.3 时间相关操作符

csharp

// 1. Throttle - 防抖(一段时间内只取最后一个)
inputField.OnValueChangedAsObservable()
    .Throttle(TimeSpan.FromSeconds(0.5))
    .Subscribe(text => Search(text));

// 2. Sample - 采样(定期取最新值)
Observable.EveryUpdate()
    .Sample(TimeSpan.FromSeconds(0.1))
    .Subscribe(_ => UpdatePer100ms());

// 3. Buffer - 缓冲
Observable.Interval(TimeSpan.FromSeconds(0.2))
    .Buffer(TimeSpan.FromSeconds(1))
    .Subscribe(buffer => {
        Debug.Log($"1秒内收到了 {buffer.Count} 个值");
    });

// 4. Timeout - 超时处理
GetDataFromServer()
    .Timeout(TimeSpan.FromSeconds(5))
    .Subscribe(
        data => Process(data),
        error => {
            if (error is TimeoutException)
                Debug.Log("请求超时");
        }
    );

3.4 错误处理操作符

csharp

// 1. Catch - 捕获错误并切换流
Observable.Throw<int>(new Exception("Error"))
    .Catch((Exception ex) => {
        Debug.LogError(ex.Message);
        return Observable.Return(-1);
    })
    .Subscribe(x => Debug.Log(x));  // 输出 -1

// 2. Retry - 重试
GetNetworkData()
    .Retry(3)  // 最多重试3次
    .Subscribe(
        data => Debug.Log(data),
        error => Debug.LogError($"失败: {error}")
    );

// 3. Finally - 最终执行
Observable.Create<int>(observer => {
        // 创建资源
        return Disposable.Create(() => {
            // 清理资源
        });
    })
    .Finally(() => Debug.Log("最终清理"))
    .Subscribe();

四、实际应用示例

4.1 输入处理

csharp

public class InputHandler : MonoBehaviour
{
    void Start()
    {
        // 组合按键检测
        var jumpStream = Observable.EveryUpdate()
            .Where(_ => Input.GetButtonDown("Jump"));
            
        var attackStream = Observable.EveryUpdate()
            .Where(_ => Input.GetMouseButtonDown(0));
            
        jumpStream.Subscribe(_ => Jump());
        attackStream.Subscribe(_ => Attack());
        
        // 鼠标拖拽
        var dragStream = Observable.EveryUpdate()
            .Where(_ => Input.GetMouseButton(0))
            .Select(_ => Input.mousePosition)
            .Pairwise()  // 获取前后两个值
            .Select(pair => pair.Current - pair.Previous);
            
        dragStream.Subscribe(delta => DragObject(delta));
    }
}

4.2 UI 交互

csharp

public class UIHandler : MonoBehaviour
{
    public Button button;
    public InputField inputField;
    public Slider slider;
    public Toggle toggle;
    
    void Start()
    {
        // 按钮点击
        button.OnClickAsObservable()
            .Subscribe(_ => OnButtonClick());
            
        // 输入框实时搜索
        inputField.OnValueChangedAsObservable()
            .Throttle(TimeSpan.FromSeconds(0.3))
            .Where(text => text.Length >= 2)
            .DistinctUntilChanged()
            .Subscribe(Search);
            
        // 滑块值变化
        slider.OnValueChangedAsObservable()
            .Subscribe(value => UpdateValue(value));
            
        // 开关状态
        toggle.OnValueChangedAsObservable()
            .Subscribe(isOn => ToggleFeature(isOn));
    }
}

4.3 游戏逻辑

csharp

public class HealthSystem : MonoBehaviour
{
    public IntReactiveProperty currentHealth = new IntReactiveProperty(100);
    public IObservable<bool> IsDead { get; private set; }
    public IObservable<float> HealthPercentage { get; private set; }
    
    void Start()
    {
        // 计算是否死亡
        IsDead = currentHealth
            .Select(health => health <= 0)
            .DistinctUntilChanged();
            
        // 计算血量百分比
        HealthPercentage = currentHealth
            .Select(health => health / 100f);
            
        // 监听死亡事件
        IsDead.Where(isDead => isDead)
            .Subscribe(_ => OnDeath());
            
        // 低血量警告
        currentHealth
            .Where(health => health < 30 && health > 0)
            .Subscribe(_ => ShowLowHealthWarning());
    }
    
    public void TakeDamage(int damage)
    {
        currentHealth.Value -= damage;
    }
}

4.4 网络请求

csharp

public class NetworkManager : MonoBehaviour
{
    public IObservable<string> DownloadData(string url)
    {
        return ObservableWWW.Get(url)
            .Timeout(TimeSpan.FromSeconds(10))
            .Retry(3)
            .ObserveOnMainThread();  // 在主线程处理结果
    }
    
    void LoadData()
    {
        DownloadData("http://api.example.com/data")
            .Subscribe(
                data => {
                    // 处理数据
                    ProcessData(JsonUtility.FromJson<Data>(data));
                },
                error => {
                    // 错误处理
                    if (error is TimeoutException)
                        ShowMessage("请求超时");
                    else
                        ShowMessage("网络错误");
                }
            );
    }
}

五、高级技巧

5.1 自定义 Observable

csharp

public static class CustomObservables
{
    // 自定义 Observable 扩展方法
    public static IObservable<Vector3> MouseDragAsObservable(this Component component)
    {
        return Observable.Create<Vector3>(observer => {
            bool isDragging = false;
            Vector3 startPos = Vector3.zero;
            
            var mouseDown = Observable.EveryUpdate()
                .Where(_ => Input.GetMouseButtonDown(0))
                .Subscribe(_ => {
                    isDragging = true;
                    startPos = Input.mousePosition;
                });
                
            var mouseUp = Observable.EveryUpdate()
                .Where(_ => Input.GetMouseButtonUp(0))
                .Subscribe(_ => {
                    isDragging = false;
                });
                
            var mouseDrag = Observable.EveryUpdate()
                .Where(_ => isDragging)
                .Select(_ => Input.mousePosition - startPos)
                .Subscribe(observer);
                
            return Disposable.Create(() => {
                mouseDown.Dispose();
                mouseUp.Dispose();
                mouseDrag.Dispose();
            });
        });
    }
}

5.2 内存管理

csharp

public class DisposableExample : MonoBehaviour
{
    private CompositeDisposable disposables = new CompositeDisposable();
    
    void Start()
    {
        // 添加到 CompositeDisposable 统一管理
        Observable.Interval(TimeSpan.FromSeconds(1))
            .Subscribe(_ => Debug.Log("Tick"))
            .AddTo(disposables);
            
        Observable.EveryUpdate()
            .Subscribe(_ => { /* 更新逻辑 */ })
            .AddTo(disposables);
    }
    
    void OnDestroy()
    {
        // 统一释放所有订阅
        disposables.Dispose();
    }
}

六、性能考虑

  1. 及时释放订阅:避免内存泄漏

  2. 合理使用 Throttle/Sample:减少不必要的处理

  3. 避免频繁创建 Observable:考虑重用

  4. 使用合适的调度器:注意线程切换开销

总结

UniRx 的 Observable 提供了强大的响应式编程能力,特别适合处理 Unity 中的异步操作和事件流。通过合理使用各种操作符,可以写出更简洁、可维护的代码。关键是要理解数据流的概念,并掌握如何组合和转换这些流来满足不同的需求。

Logo

这里是一个专注于游戏开发的社区,我们致力于为广大游戏爱好者提供一个良好的学习和交流平台。我们的专区包含了各大流行引擎的技术博文,涵盖了从入门到进阶的各个阶段,无论你是初学者还是资深开发者,都能在这里找到适合自己的内容。除此之外,我们还会不定期举办游戏开发相关的活动,让大家更好地交流互动。加入我们,一起探索游戏开发的奥秘吧!

更多推荐