XiaoboTalk

FRP Functional Reactive Programming

一、关键名词解释

需要先理解下边几个基本名词的概念
  1. ObservableType: 就是流 stream,是事件的发出者 (Publisher),按照时间序列发出。
  1. ObserverObject:监听者,包含三个函数:
    1. onNext
    2. onError
    3. onCompleted
  1. Subscribe 订阅动作,将 ObservableType 和 ObserverObject 关联起来
  1. Disposable :流的生命周期边界

二、流的基本用法和生命周期

import UIKit import RxSwift import RxCocoa class ViewController: UIViewController { // disposeBag 生命周期和 viewController 关联 let disposeBag = DisposeBag(); override func viewDidLoad() { super.viewDidLoad() } override func touchesBegan(_ touches: Set<UITouch>, with event: UIEvent?) { // Observable: stream let stream = Observable<Int>.create { observer in observer.onNext(1) observer.onCompleted() return Disposables.create() } // observer obj: next error completed let observer = AnyObserver<Int> { event in switch event { case .next(let val): print("val: \(val)") case .error(_): print("err") case .completed: print("completed") } } // subscribe: action 关联 stream 和 observer 和 stream let disposer = stream.subscribe(observer); // disposer 生命周期边界管理 // 1: 局部生命周期,当即释放 disposer.dispose(); // 放入 bag,bag 释放的时候,生命周期结束 disposer.disposed(by: disposeBag); } }
  1. subscribe() 会返回一个 disposable 用来管理生命周期
    1. 上边的例子,使用 disposer.dispose(),流会被立即中断,即使是异步任务,也不会回调 onNext.
    2. 使用 disposer.disposed(by: disposeBag); 表示将流的生命周期和 disposeBag 的生命周期绑定,由 disposeBag 管理生命周期。当前示例中,disposeBag 被 ViewController 持有,所以当 vc 被释放的时候,流的生命周期结束。
  1. 如果没有明确使用 dispose() 销毁 stream。stream 也会在以下情况下终止:
    1. 发出 onError 事件后,流终止
    2. 发出 onCompleted 事件后,流终止

2.1、流不会随着函数作用域结束而自动释放

流的释放时机总结:
触发时机
触发条件
是否调用 dispose
✅ 自动释放
Observable 调用了 .onCompleted() 或 .onError()
✅ 是
✅ 手动释放
手动调用 .dispose() 或 .disposed(by:) 自动触发
✅ 是
❌ 没释放
既没有 .onCompleted() 也没有 .dispose()
❌ 否
响应式编程中,因为是一套独立的编程范式,所以没有流的生命周期管理,会脱离函数作用域,所以需要特别注意流的正确释放,错误案例代码:❌
// ❌ override func touchesBegan(_ touches: Set<UITouch>, with event: UIEvent?) { let stream = Observable<Int>.create { observer in print("observable created ---") observer.onNext(1) return Disposables.create { print("observable disposed ---") } } let observer = AnyObserver<Int> { event in switch event { case .next(let val): print("val: \(val)") case .error(_): print("err") case .completed: print("completed") } } stream.subscribe(observer) }
这段代码,流只抛出了 onNext,永远没有 onCompleted / onError,也没有手动 dispose()。所以这个流一直存在,因为 Rx 也无法确定,流是否还有新的事件到来。

利用 dispose 取消任务

当 dispose() 执行的时候,如果需要做一些资源回收,或者任务取消,使用
let stream = Observable<Int>.create { observer in observer.onNext(1) observer.onCompleted() return Disposables.create { print("dispose called---") } } // ... let disposer = stream.subscribe(observer); disposer.dispose();
利用这个能力,可以做类似,网络请求取消:
let request = Observable<Data>.create { observer in let task = URLSession.shared.dataTask(with: URL(string: "https://example.com")!) { data, _, error in if let error = error { observer.onError(error) } else if let data = data { observer.onNext(data) observer.onCompleted() } } task.resume() return Disposables.create { task.cancel() // 关键 } } let disposable = request.subscribe( onNext: { data in print("Got data: \(data.count)") }, onError: { error in print("Error: \(error)") }, onCompleted: { print("Done") } ) // 假设用户切换页面或取消操作: disposable.dispose()