RxSwift简介
RxSwift
是 ReactiveX
家族的重要一员, ReactiveX
是 Reactive Extensions
的缩写,一般简写为 Rx
。ReactiveX
官方给Rx
的定义是:Rx是一个使用可观察数据流进行异步编程的编程接口。
1 | ReactiveX` 不仅仅是一个编程接口,它是一种编程思想的突破,它影响了许多其它的程序库和框架以及编程语言。它拓展了`观察者模式`,使你能够`自由组合多个异步事件`,而`不需要去关心线程`,`同步,线程安全`,`并发数据以及I/O阻塞 |
RxSwift
是 Rx
为 Swift
语言开发的一门函数响应式编程语言, 它可以代替iOS系统的 Target Action
/ 代理
/ 闭包
/ 通知
/ KVO
,同时还提供网络
、数据绑定
、UI事件处理
、UI的展示和更新
、多线程
……
Swift
为值类型,在传值与方法回调上有影响,RxSwift
一定程度上弥补Swift
的灵活性
RxSwift
使得代码复用性较强,减少代码量RxSwift
因为声明都是不可变更,增加代码可读性RxSwift
使得更易于理解业务代码,抽象异步编程,统一代码风格RxSwift
使得代码更易于编写集成单元测试,增加代码稳定性
RxSwift核心流程
RxSwift的api设计非常精简,流程就是:
1、创建序列()
2、订阅序列
3、发送信号
4、销毁
1 | // 1: 创建序列 |
创建序列
我们先看Create.swift文件的代码:
1 | extension ObservableType { |
我们可以看到,可观察序列的创建是利用扩展ObservableType
协议的create
方法实现的,里面创建了AnonymousObservable
(匿名可观察序列) ,这个命名体现了作者的思维,这是一个内部类,具备一些通用特性(具有自己功能的类才会命名)可以总结一下:
create
方法的时候创建了一个内部对象AnonymousObservable
AnonymousObservable
保存了外界传入的闭包AnonymousObservable
继承了Producer
接下来我们看一下Producer
类
1 | class Producer<Element>: Observable<Element> { |
可以看到Producer有一个很重要的方法subscribe
(订阅),subscribe
方法最后返回一个Disposable
对象。
订阅序列
我们看一下ObservableType
拓展(ObservableType+Extensions.swift)的功能,订阅的方法subscribe
(注意这个方法和Producer
的subscribe
不是同一个)
1 | /** |
代码说明:
E
是Swift
的关联类型,这个如果仔细看过可观察序列的继承链源码应该不难得出:这个E
就是我们的 序列类型,我们这里就是String
1
2
3public class Observable<Element> : ObservableType {
/// Type of elements in sequence.
public typealias E = Element创建
AnonymousObserver
对象,可以类比前面create
的AnonymousObservable
对象,初始化参数为闭包,保存了外界传入的onNext
,onError
,onComplete
,onDisposed
的处理回调闭包。self.asObservable()
是我们的RxSwift
为了保持一致性的写法。self.asObservable().subscribe(observer)
其实本质就是self.
subscribe(observer),通过可观察序列的继承关系,我们可以快速定位到Producer
订阅代码1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
if !CurrentThreadScheduler.isScheduleRequired {
// The returned disposable needs to release all references once it was disposed.
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
else {
return CurrentThreadScheduler.instance.schedule(()) { _ in
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
}
}销毁代码后面在分析
self.run
这个代码最终由我们生产者Producer
(抽象方法找子类)延伸到具体事务代码AnonymousObservable.run
1
2
3
4
5override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
let subscription = sink.run(self)
return (sink: sink, subscription: subscription)
}这里调用了
sink.run(self)
方法,将业务处理下沉,分工更加明确。1
2
3func run(_ parent: Parent) -> Disposable {
parent.subscribeHandler(AnyObserver(self))
}parent
是上面传入进来的AnonymousObservable
对象这个地方我们可以看到调用了
AnonymousObservable
对象的subscribeHandler
方法,这里我们清楚了,为什么序列订阅
的时候流程会执行我们的序列闭包
,然后去执行发送响应
发送响应的代码后面再分析,下面还有个点是
AnyObserver(self)
1
2
3
4
5
6/// Construct an instance whose `on(event)` calls `observer.on(event)`
///
/// - parameter observer: Observer that receives sequence events.
public init<Observer: ObserverType>(_ observer: Observer) where Observer.Element == Element {
self.observer = observer.on
}这个构造方法里面,我们创建了一个结构体
AnyObserver
保存了一个信息AnonymousObservableSink.on
函数。注意不是AnonymousObservableSink
发送响应
通过上面的分析,我们清楚了observer.onNext("")
本质是AnyObserver.onNext("")
,我们发现AnyObserver没有这个方法,顺着思路找父类,找ObserverType
1 | /// Convenience API extensions to provide alternate next, error, completed events |
外界调用的
observer.onNext("")
再次变形:AnyObserver.on(.next(""))
,AnyObserver
调用了on
里面的.next
函数,.next
函数带有我们最终的参数1
2
3
4
5
6
7
8
9
10/// Construct an instance whose `on(event)` calls `observer.on(event)`
/// - parameter observer: Observer that receives sequence events.
public init<Observer: ObserverType>(_ observer: Observer) where Observer.Element == Element {
self.observer = observer.on
}
/// Send `event` to this observer.
/// - parameter event: Event instance.
public func on(_ event: Event<Element>) {
self.observer(event)
}self.observer
构造初始化就是:AnonymousObservableSink .on 函数
self.observer(event)
->AnonymousObservableSink .on(event)
其中event = .next("")
最终我们的核心逻辑又回到了sink
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20final private class AnonymousObservableSink<Observer: ObserverType>: Sink<Observer>, ObserverType {
func on(_ event: Event<Element>) {
#if DEBUG
self.synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { self.synchronizationTracker.unregister() }
#endif
switch event {
case .next:
if load(self.isStopped) == 1 {
return
}
self.forwardOn(event)
case .error, .completed:
if fetchOr(self.isStopped, 1) == 0 {
self.forwardOn(event)
self.dispose()
}
}
}
}self.forwardOn(event)
这也是执行的核心代码,因为AnonymousObservableSink
继承Sink
这里还有封装,请看下面的代码1
2
3
4
5
6
7
8
9
10
11
12class Sink<Observer: ObserverType>: Disposable {
final func forwardOn(_ event: Event<Observer.Element>) {
#if DEBUG
self.synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { self.synchronizationTracker.unregister() }
#endif
if isFlagSet(self.disposed, 1) {
return
}
self.observer.on(event)
}
}其中
self.observer
就是我们初始化保存的观察者:AnonymousObserver
到这里我们得出了发送序列的本质就是
AnonymousObserver.on(.next(""))
,这个逻辑又回到了我们订阅序列
时候创建的AnonymousObserver
参数闭包的调用。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17let observer = AnonymousObserver<E> { event in
switch event {
case .next(let value):
onNext?(value)
case .error(let error):
if let onError = onError {
onError(error)
}
else {
Hooks.defaultErrorHandler(callStack, error)
}
disposable.dispose()
case .completed:
onCompleted?()
disposable.dispose()
}
}判断
event
进而调用onNext?(value)
,因为枚举的关联值value=""
,接下来外界onNext
的调用参数。
销毁
我们先看一下创建序列到销毁的执行代码
1 | // 创建序列 |
这段代码里面关于销毁相关的代码就是
Disposables.create {print("销毁释放了")}
,所以我们直接定位到Disposables
类(AnonymousDisposable.swift文件)。1
2
3
4
5
6
7
8
9extension Disposables {
/// Constructs a new disposable with the given action used for disposal.
///
/// - parameter dispose: Disposal action which will be run upon calling `dispose`.
public static func create(with dispose: @escaping () -> Void) -> Cancelable {
AnonymousDisposable(disposeAction: dispose)
}
}可以看出,这里创建了一个匿名销毁序列
AnonymousDisposable
,和订阅一样的手法。继续看AnonymousDisposable
代码1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20/// When dispose method is called, disposal action will be dereferenced.
private final class AnonymousDisposable : DisposeBase, Cancelable {
// Non-deprecated version of the constructor, used by `Disposables.create(with:)`
fileprivate init(disposeAction: @escaping DisposeAction) {
self.disposeAction = disposeAction
super.init()
}
/// Calls the disposal action if and only if the current instance hasn't been disposed yet.
///
/// After invoking disposal action, disposal action will be dereferenced.
///销毁核心的逻辑
fileprivate func dispose() {
if fetchOr(self.disposed, 1) == 0 {
if let action = self.disposeAction {
self.disposeAction = nil
action()
}
}
}
}上我们看到,初始化方法里保存了销毁响应闭包,什么时候调用,我们看下面的
dispose()
方法。fetchOr(self.disposed, 1)是一个单项标记手段,这里利用了牛逼的算法标记可以降低依赖和更加快速。
主要就是保证只会销毁一次
销毁会首先
self.disposeAction = nil
,将回调闭包置空最后调用闭包调用
action()
,这里是一个局部变量不需要再置空
下面我们看一下dispose()
是什么时候调用的
上面的流程,我们再序列的回调闭包:subscriberHandle
里面,这个流程之前有一个重要的流程就是订阅subscriber
1 | if let disposed = onDisposed { |
这里保存外界传入的销毁闭包
1
2
3
4
5
6
7
8
9
10switch event {
case .next(let value):
onNext?(value)
case .error(let error):
// 响应外界调回闭包
disposable.dispose()
case .completed:
// 响应外界调回闭包
disposable.dispose()
}观察者回调里面调用, 响应外界调回闭包
return Disposables.create(self.asObservable().subscribe(observer),disposable)
综合来看,我们的重点必然在这句代码,沟通下面流程的subscribe
, 外界订阅返回的销毁者(可以随时随地进行dispose.dispose()
)- 上面代码跟进去看到
BinaryDisposable(disposable1, disposable2)
原来创建的二元销毁者!
1 | func dispose() { |
二元销毁者的
dispose
方法也在预料之中,分别销毁那么我们的重点就应该探索,在
subscribe
这里面创建的关键销毁者是什么?下面我们进入非常熟悉的:
Producer
1
2
3
4
5
6let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink,
subscription: sinkAndSubscription.subscription)
// 返回销毁者
return disposer看到
SinkDisposer
就熟悉了,普通销毁者:AnonymousDisposable
, 关键销毁者:SinkDisposer
先看什么东西进入了
SinkDisposer
self.run(observer, cancel: disposer)
证明里面需要用到SinkDisposer
disposer.setSinkAndSubscription
常规操作1
2
3
4
5
6
7
8
9
10
11
12
13
14func setSinkAndSubscription(sink: Disposable, subscription: Disposable) {
self._sink = sink
self._subscription = subscription
// 获取状态
let previousState = fetchOr(self._state,
DisposeState.sinkAndSubscriptionSet.rawValue)
// 如果状态满足就销毁
if (previousState & DisposeState.disposed.rawValue) != 0 {
sink.dispose()
subscription.dispose()
self._sink = nil
self._subscription = nil
}
}保存了两个属性 :
sink
和subscription
(就是外界创建序列的闭包的返回销毁者)取了某一个状态:
previousState
,判断状态的条件,然后执行 这两个保存属性的销毁和置空释放销毁 :.dispose() + = nil
其实是可以理解,就是我们在加入的东西其实需要销毁的,不应该保留的,那么没必要给它继续保留生命迹象
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15// 创建 sink 保存了销毁者
let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
// 省略不相管代码。。。。
func on(_ event: Event<Element>) {
switch event {
case .next:
self.forwardOn(event)
case .error, .completed:
if fetchOr(self._isStopped, 1) == 0 {
self.forwardOn(event)
// 关键点:完成和错误信号的响应式必然会直接开启销毁的
self.dispose()
}
}
}完成和错误信号的响应式必然会直接开启销毁的 :
self.dispose()
! 这里也解释了:一旦我们的序列发出完成或者错误就无法再次响应了!剩下一个问题: 到底我们的销毁的是什么
1
2
3
4
5
6
7
8
9func dispose() {
let previousState = fetchOr(self._state, DisposeState.disposed.rawValue)
if (previousState & DisposeState.sinkAndSubscriptionSet.rawValue) != 0 {
sink.dispose()
subscription.dispose()
self._sink = nil
self._subscription = nil
}
}无论我们直接销毁还是系统帮助我们销毁必然会调用:
dispose()
我们查看
dispose()
得出: 就是在初始化初期我们保留的两个属性的操作sink.dispose() + self._sink = nil
&subscription.dispose() + self._subscription = nil
执行相关释放和销毁
总结一下销毁
第一:内部创建的临时序列和观察者都会随着对外的观察者和序列的生命周期而销毁释放。
第二:外界观察者和序列会随着他们的作用域空间而释放
第三:释放不了只是对象的释放有问题,常规内存管理问题
第四:最为一个再牛逼的框架也不能对程序员写的代码直接管理控制
第五:RxSwift
的观察和序列以及销毁者就是普通对象。
到这里RxSwift从创建序列->订阅序列->发送信号->销毁的源码解析就结束了,里面的精妙还是需要细细品味的。