一、关键名词解释
需要先理解下边几个基本名词的概念
- ObservableType: 就是流 stream,是事件的发出者 (Publisher),按照时间序列发出。
- ObserverObject:监听者,包含三个函数:
- onNext
- onError
- onCompleted
- Subscribe 订阅动作,将 ObservableType 和 ObserverObject 关联起来
- Disposable :流的生命周期边界
二、流的基本用法和生命周期
import UIKit import RxSwift import RxCocoa class ViewController: UIViewController { // disposeBag 生命周期和 viewController 关联 let disposeBag = DisposeBag(); override func viewDidLoad() { super.viewDidLoad() } override func touchesBegan(_ touches: Set<UITouch>, with event: UIEvent?) { // Observable: stream let stream = Observable<Int>.create { observer in observer.onNext(1) observer.onCompleted() return Disposables.create() } // observer obj: next error completed let observer = AnyObserver<Int> { event in switch event { case .next(let val): print("val: \(val)") case .error(_): print("err") case .completed: print("completed") } } // subscribe: action 关联 stream 和 observer 和 stream let disposer = stream.subscribe(observer); // disposer 生命周期边界管理 // 1: 局部生命周期,当即释放 disposer.dispose(); // 放入 bag,bag 释放的时候,生命周期结束 disposer.disposed(by: disposeBag); } }
- subscribe() 会返回一个 disposable 用来管理生命周期
- 上边的例子,使用 disposer.dispose(),流会被立即中断,即使是异步任务,也不会回调 onNext.
- 使用 disposer.disposed(by: disposeBag); 表示将流的生命周期和 disposeBag 的生命周期绑定,由 disposeBag 管理生命周期。当前示例中,disposeBag 被 ViewController 持有,所以当 vc 被释放的时候,流的生命周期结束。
- 如果没有明确使用 dispose() 销毁 stream。stream 也会在以下情况下终止:
- 发出 onError 事件后,流终止
- 发出 onCompleted 事件后,流终止
2.1、流不会随着函数作用域结束而自动释放
流的释放时机总结:
触发时机 | 触发条件 | 是否调用 dispose |
✅ 自动释放 | Observable 调用了 .onCompleted() 或 .onError() | ✅ 是 |
✅ 手动释放 | 手动调用 .dispose() 或 .disposed(by:) 自动触发 | ✅ 是 |
❌ 没释放 | 既没有 .onCompleted() 也没有 .dispose() | ❌ 否 |
响应式编程中,因为是一套独立的编程范式,所以没有流的生命周期管理,会脱离函数作用域,所以需要特别注意流的正确释放,错误案例代码:❌
// ❌ override func touchesBegan(_ touches: Set<UITouch>, with event: UIEvent?) { let stream = Observable<Int>.create { observer in print("observable created ---") observer.onNext(1) return Disposables.create { print("observable disposed ---") } } let observer = AnyObserver<Int> { event in switch event { case .next(let val): print("val: \(val)") case .error(_): print("err") case .completed: print("completed") } } stream.subscribe(observer) }
这段代码,流只抛出了 onNext,永远没有 onCompleted / onError,也没有手动 dispose()。所以这个流一直存在,因为 Rx 也无法确定,流是否还有新的事件到来。
利用 dispose 取消任务
当 dispose() 执行的时候,如果需要做一些资源回收,或者任务取消,使用
let stream = Observable<Int>.create { observer in observer.onNext(1) observer.onCompleted() return Disposables.create { print("dispose called---") } } // ... let disposer = stream.subscribe(observer); disposer.dispose();
利用这个能力,可以做类似,网络请求取消:
let request = Observable<Data>.create { observer in let task = URLSession.shared.dataTask(with: URL(string: "https://example.com")!) { data, _, error in if let error = error { observer.onError(error) } else if let data = data { observer.onNext(data) observer.onCompleted() } } task.resume() return Disposables.create { task.cancel() // 关键 } } let disposable = request.subscribe( onNext: { data in print("Got data: \(data.count)") }, onError: { error in print("Error: \(error)") }, onCompleted: { print("Done") } ) // 假设用户切换页面或取消操作: disposable.dispose()