0%

Swift - RxSwift源码分析

RxSwift简介

RxSwiftReactiveX 家族的重要一员, ReactiveXReactive Extensions 的缩写,一般简写为 RxReactiveX 官方给Rx的定义是:Rx是一个使用可观察数据流进行异步编程的编程接口。

1
ReactiveX` 不仅仅是一个编程接口,它是一种编程思想的突破,它影响了许多其它的程序库和框架以及编程语言。它拓展了`观察者模式`,使你能够`自由组合多个异步事件`,而`不需要去关心线程`,`同步,线程安全`,`并发数据以及I/O阻塞

RxSwiftRxSwift 语言开发的一门函数响应式编程语言, 它可以代替iOS系统的 Target Action / 代理 / 闭包 / 通知 / KVO,同时还提供网络数据绑定UI事件处理UI的展示和更新多线程……

Swift为值类型,在传值与方法回调上有影响,RxSwift一定程度上弥补Swift的灵活性

  • RxSwift使得代码复用性较强,减少代码量
  • RxSwift因为声明都是不可变更,增加代码可读性
  • RxSwift使得更易于理解业务代码,抽象异步编程,统一代码风格
  • RxSwift使得代码更易于编写集成单元测试,增加代码稳定性

RxSwift核心流程

RxSwift的api设计非常精简,流程就是:

1、创建序列()

2、订阅序列

3、发送信号

4、销毁

1
2
3
4
5
6
7
8
9
10
11
// 1: 创建序列
_ = Observable<String>.create { (obserber) -> Disposable in
// 3:发送信号
obserber.onNext("RxSwift 研究")
return Disposables.create() // 4、销毁
// 2: 订阅序列
}.subscribe(onNext: { (text) in
print("订阅到:\(text)")
})

// 控制台打印:“订阅到:RxSwift 研究”

创建序列

我们先看Create.swift文件的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
extension ObservableType {
// MARK: create
/**
Creates an observable sequence from a specified subscribe method implementation.

- seealso: [create operator on reactivex.io](http://reactivex.io/documentation/operators/create.html)

- parameter subscribe: Implementation of the resulting observable sequence's `subscribe` method.
- returns: The observable sequence with the specified implementation for the `subscribe` method.
*/
public static func create(_ subscribe: @escaping (AnyObserver<Element>) -> Disposable) -> Observable<Element> {
AnonymousObservable(subscribe)
}
}

final private class AnonymousObservable<Element>: Producer<Element> {
typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable

let subscribeHandler: SubscribeHandler

init(_ subscribeHandler: @escaping SubscribeHandler) {
self.subscribeHandler = subscribeHandler
}
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
let subscription = sink.run(self)
return (sink: sink, subscription: subscription)
}
}

我们可以看到,可观察序列的创建是利用扩展ObservableType协议的create方法实现的,里面创建了AnonymousObservable(匿名可观察序列) ,这个命名体现了作者的思维,这是一个内部类,具备一些通用特性(具有自己功能的类才会命名)可以总结一下:

  • create方法的时候创建了一个内部对象AnonymousObservable
  • AnonymousObservable保存了外界传入的闭包
  • AnonymousObservable继承了Producer

接下来我们看一下Producer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
class Producer<Element>: Observable<Element> {
override init() {
super.init()
}
override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
if !CurrentThreadScheduler.isScheduleRequired {
// The returned disposable needs to release all references once it was disposed.
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
else {
return CurrentThreadScheduler.instance.schedule(()) { _ in
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
}
}
func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
rxAbstractMethod()
}
}

可以看到Producer有一个很重要的方法subscribe(订阅),subscribe方法最后返回一个Disposable对象。

订阅序列

我们看一下ObservableType拓展(ObservableType+Extensions.swift)的功能,订阅的方法subscribe(注意这个方法和Producersubscribe不是同一个)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
/**
Subscribes an element handler, an error handler, a completion handler and disposed handler to an observable sequence.
- parameter onNext: Action to invoke for each element in the observable sequence.
- parameter onError: Action to invoke upon errored termination of the observable sequence.
- parameter onCompleted: Action to invoke upon graceful termination of the observable sequence.
- parameter onDisposed: Action to invoke upon any type of termination of sequence (if the sequence has
gracefully completed, errored, or if the generation is canceled by disposing subscription).
- returns: Subscription object used to unsubscribe from the observable sequence.
*/
public func subscribe(
onNext: ((Element) -> Void)? = nil,
onError: ((Swift.Error) -> Void)? = nil,
onCompleted: (() -> Void)? = nil,
onDisposed: (() -> Void)? = nil
) -> Disposable {
let disposable: Disposable
if let disposed = onDisposed {
disposable = Disposables.create(with: disposed)
}
else {
disposable = Disposables.create()
}
#if DEBUG
let synchronizationTracker = SynchronizationTracker()
#endif
let callStack = Hooks.recordCallStackOnError ? Hooks.customCaptureSubscriptionCallstack() : []
///以下重点关注的代码 创建匿名观察者
let observer = AnonymousObserver<Element> { event in
#if DEBUG
synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { synchronizationTracker.unregister() }
#endif
switch event {
case .next(let value):
onNext?(value)
case .error(let error):
if let onError = onError {
onError(error)
}
else {
Hooks.defaultErrorHandler(callStack, error)
}
disposable.dispose()
case .completed:
onCompleted?()
disposable.dispose()
}
}
return Disposables.create(
self.asObservable().subscribe(observer),
disposable
)
}
}

代码说明:

  • ESwift的关联类型,这个如果仔细看过可观察序列的继承链源码应该不难得出:这个E 就是我们的 序列类型,我们这里就是String

    1
    2
    3
    public class Observable<Element> : ObservableType {
    /// Type of elements in sequence.
    public typealias E = Element
  • 创建AnonymousObserver对象,可以类比前面createAnonymousObservable对象,初始化参数为闭包,保存了外界传入的onNext,onError,onComplete,onDisposed的处理回调闭包。

  • self.asObservable()是我们的RxSwift为了保持一致性的写法。

  • self.asObservable().subscribe(observer)其实本质就是self.subscribe(observer),通过可观察序列的继承关系,我们可以快速定位到Producer订阅代码

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
    if !CurrentThreadScheduler.isScheduleRequired {
    // The returned disposable needs to release all references once it was disposed.
    let disposer = SinkDisposer()
    let sinkAndSubscription = self.run(observer, cancel: disposer)
    disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
    return disposer
    }
    else {
    return CurrentThreadScheduler.instance.schedule(()) { _ in
    let disposer = SinkDisposer()
    let sinkAndSubscription = self.run(observer, cancel: disposer)
    disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
    return disposer
    }
    }
    }
  • 销毁代码后面在分析

  • self.run这个代码最终由我们生产者Producer(抽象方法找子类)延伸到具体事务代码AnonymousObservable.run

    1
    2
    3
    4
    5
    override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
    let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
    let subscription = sink.run(self)
    return (sink: sink, subscription: subscription)
    }
  • 这里调用了sink.run(self)方法,将业务处理下沉,分工更加明确。

    1
    2
    3
    func run(_ parent: Parent) -> Disposable {
    parent.subscribeHandler(AnyObserver(self))
    }
  • parent是上面传入进来的AnonymousObservable对象

  • 这个地方我们可以看到调用了AnonymousObservable对象的subscribeHandler方法,这里我们清楚了,为什么序列订阅的时候流程会执行我们的序列闭包,然后去执行发送响应

  • 发送响应的代码后面再分析,下面还有个点是AnyObserver(self)

    1
    2
    3
    4
    5
    6
    /// Construct an instance whose `on(event)` calls `observer.on(event)`
    ///
    /// - parameter observer: Observer that receives sequence events.
    public init<Observer: ObserverType>(_ observer: Observer) where Observer.Element == Element {
    self.observer = observer.on
    }
  • 这个构造方法里面,我们创建了一个结构体AnyObserver保存了一个信息AnonymousObservableSink.on函数。注意不是AnonymousObservableSink

发送响应

通过上面的分析,我们清楚了observer.onNext("")本质是AnyObserver.onNext(""),我们发现AnyObserver没有这个方法,顺着思路找父类,找ObserverType

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/// Convenience API extensions to provide alternate next, error, completed events
extension ObserverType {
/// Convenience method equivalent to `on(.next(element: Element))`
///
/// - parameter element: Next element to send to observer(s)
///这里是我们关注的点 AnyObserver.onNext("")实际调用这里
public func onNext(_ element: Element) {
self.on(.next(element))
}
/// Convenience method equivalent to `on(.completed)`
public func onCompleted() {
self.on(.completed)
}
/// Convenience method equivalent to `on(.error(Swift.Error))`
/// - parameter error: Swift.Error to send to observer(s)
public func onError(_ error: Swift.Error) {
self.on(.error(error))
}
}
  • 外界调用的observer.onNext("")再次变形:AnyObserver.on(.next("")),AnyObserver调用了on里面的.next函数,.next函数带有我们最终的参数

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    /// Construct an instance whose `on(event)` calls `observer.on(event)`
    /// - parameter observer: Observer that receives sequence events.
    public init<Observer: ObserverType>(_ observer: Observer) where Observer.Element == Element {
    self.observer = observer.on
    }
    /// Send `event` to this observer.
    /// - parameter event: Event instance.
    public func on(_ event: Event<Element>) {
    self.observer(event)
    }
  • self.observer 构造初始化就是:AnonymousObservableSink .on 函数

  • self.observer(event) -> AnonymousObservableSink .on(event) 其中 event = .next("") 最终我们的核心逻辑又回到了 sink

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    final private class AnonymousObservableSink<Observer: ObserverType>: Sink<Observer>, ObserverType {
    func on(_ event: Event<Element>) {
    #if DEBUG
    self.synchronizationTracker.register(synchronizationErrorMessage: .default)
    defer { self.synchronizationTracker.unregister() }
    #endif
    switch event {
    case .next:
    if load(self.isStopped) == 1 {
    return
    }
    self.forwardOn(event)
    case .error, .completed:
    if fetchOr(self.isStopped, 1) == 0 {
    self.forwardOn(event)
    self.dispose()
    }
    }
    }
    }
  • self.forwardOn(event) 这也是执行的核心代码,因为 AnonymousObservableSink 继承 Sink 这里还有封装,请看下面的代码

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    class Sink<Observer: ObserverType>: Disposable {
    final func forwardOn(_ event: Event<Observer.Element>) {
    #if DEBUG
    self.synchronizationTracker.register(synchronizationErrorMessage: .default)
    defer { self.synchronizationTracker.unregister() }
    #endif
    if isFlagSet(self.disposed, 1) {
    return
    }
    self.observer.on(event)
    }
    }
  • 其中 self.observer 就是我们初始化保存的 观察者:AnonymousObserver

  • 到这里我们得出了发送序列的本质就是AnonymousObserver.on(.next("")),这个逻辑又回到了我们订阅序列时候创建的AnonymousObserver参数闭包的调用。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    let observer = AnonymousObserver<E> { event in
    switch event {
    case .next(let value):
    onNext?(value)
    case .error(let error):
    if let onError = onError {
    onError(error)
    }
    else {
    Hooks.defaultErrorHandler(callStack, error)
    }
    disposable.dispose()
    case .completed:
    onCompleted?()
    disposable.dispose()
    }
    }
  • 判断event进而调用onNext?(value),因为枚举的关联值value="",接下来外界onNext的调用参数。

销毁

我们先看一下创建序列到销毁的执行代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 创建序列
let ob = Observable<Any>.create { (observer) -> Disposable in
observer.onNext("Jason")
return Disposables.create {
print("销毁释放了")
}
}
// 序列订阅
let dispose = ob.subscribe(onNext: { (anything) in
print("订阅到了:\(anything)")
}, onError: { (error) in
print("订阅到了:\(error)")
}, onCompleted: {
print("完成了")
}) {
print("销毁回调")
}
  • 这段代码里面关于销毁相关的代码就是Disposables.create {print("销毁释放了")},所以我们直接定位到Disposables类(AnonymousDisposable.swift文件)。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    extension Disposables {
    /// Constructs a new disposable with the given action used for disposal.
    ///
    /// - parameter dispose: Disposal action which will be run upon calling `dispose`.
    public static func create(with dispose: @escaping () -> Void) -> Cancelable {
    AnonymousDisposable(disposeAction: dispose)
    }

    }
  • 可以看出,这里创建了一个匿名销毁序列AnonymousDisposable,和订阅一样的手法。继续看AnonymousDisposable代码

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    /// When dispose method is called, disposal action will be dereferenced.
    private final class AnonymousDisposable : DisposeBase, Cancelable {
    // Non-deprecated version of the constructor, used by `Disposables.create(with:)`
    fileprivate init(disposeAction: @escaping DisposeAction) {
    self.disposeAction = disposeAction
    super.init()
    }
    /// Calls the disposal action if and only if the current instance hasn't been disposed yet.
    ///
    /// After invoking disposal action, disposal action will be dereferenced.
    ///销毁核心的逻辑
    fileprivate func dispose() {
    if fetchOr(self.disposed, 1) == 0 {
    if let action = self.disposeAction {
    self.disposeAction = nil
    action()
    }
    }
    }
    }
  • 上我们看到,初始化方法里保存了销毁响应闭包,什么时候调用,我们看下面的dispose()方法。

  • fetchOr(self.disposed, 1)是一个单项标记手段,这里利用了牛逼的算法标记可以降低依赖和更加快速。

  • 主要就是保证只会销毁一次

  • 销毁会首先self.disposeAction = nil,将回调闭包置空

  • 最后调用闭包调用action(),这里是一个局部变量不需要再置空

下面我们看一下dispose()是什么时候调用的

上面的流程,我们再序列的回调闭包:subscriberHandle里面,这个流程之前有一个重要的流程就是订阅subscriber

1
2
3
4
5
if let disposed = onDisposed {
disposable = Disposables.create(with: disposed)
}else {
disposable = Disposables.create()
}
  • 这里保存外界传入的销毁闭包

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    switch event {
    case .next(let value):
    onNext?(value)
    case .error(let error):
    // 响应外界调回闭包
    disposable.dispose()
    case .completed:
    // 响应外界调回闭包
    disposable.dispose()
    }
  • 观察者回调里面调用, 响应外界调回闭包

  • return Disposables.create(self.asObservable().subscribe(observer),disposable) 综合来看,我们的重点必然在这句代码,沟通下面流程的 subscribe, 外界订阅返回的销毁者(可以随时随地进行 dispose.dispose()
  • 上面代码跟进去看到BinaryDisposable(disposable1, disposable2) 原来创建的二元销毁者!
1
2
3
4
5
6
7
8
func dispose() {
if fetchOr(self._isDisposed, 1) == 0 {
self._disposable1?.dispose()
self._disposable2?.dispose()
self._disposable1 = nil
self._disposable2 = nil
}
}
  • 二元销毁者的 dispose 方法也在预料之中,分别销毁

  • 那么我们的重点就应该探索,在 subscribe 这里面创建的关键销毁者是什么?

  • 下面我们进入非常熟悉的:Producer

    1
    2
    3
    4
    5
    6
    let disposer = SinkDisposer()
    let sinkAndSubscription = self.run(observer, cancel: disposer)
    disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink,
    subscription: sinkAndSubscription.subscription)
    // 返回销毁者
    return disposer
  • 看到 SinkDisposer 就熟悉了,普通销毁者:AnonymousDisposable , 关键销毁者: SinkDisposer

  • 先看什么东西进入了 SinkDisposer

  • self.run(observer, cancel: disposer) 证明里面需要用到 SinkDisposer

  • disposer.setSinkAndSubscription 常规操作

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    func setSinkAndSubscription(sink: Disposable, subscription: Disposable) {
    self._sink = sink
    self._subscription = subscription
    // 获取状态
    let previousState = fetchOr(self._state,
    DisposeState.sinkAndSubscriptionSet.rawValue)
    // 如果状态满足就销毁
    if (previousState & DisposeState.disposed.rawValue) != 0 {
    sink.dispose()
    subscription.dispose()
    self._sink = nil
    self._subscription = nil
    }
    }
  • 保存了两个属性 : sinksubscription(就是外界创建序列的闭包的返回销毁者)

  • 取了某一个状态:previousState,判断状态的条件,然后执行 这两个保存属性的销毁和置空释放销毁 : .dispose() + = nil

  • 其实是可以理解,就是我们在加入的东西其实需要销毁的,不应该保留的,那么没必要给它继续保留生命迹象

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    // 创建 sink 保存了销毁者
    let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
    // 省略不相管代码。。。。
    func on(_ event: Event<Element>) {
    switch event {
    case .next:
    self.forwardOn(event)
    case .error, .completed:
    if fetchOr(self._isStopped, 1) == 0 {
    self.forwardOn(event)
    // 关键点:完成和错误信号的响应式必然会直接开启销毁的
    self.dispose()
    }
    }
    }
  • 完成和错误信号的响应式必然会直接开启销毁的 : self.dispose()! 这里也解释了:一旦我们的序列发出完成或者错误就无法再次响应了!

  • 剩下一个问题: 到底我们的销毁的是什么

    1
    2
    3
    4
    5
    6
    7
    8
    9
    func dispose() {
    let previousState = fetchOr(self._state, DisposeState.disposed.rawValue)
    if (previousState & DisposeState.sinkAndSubscriptionSet.rawValue) != 0 {
    sink.dispose()
    subscription.dispose()
    self._sink = nil
    self._subscription = nil
    }
    }
  • 无论我们直接销毁还是系统帮助我们销毁必然会调用:dispose()

  • 我们查看 dispose() 得出: 就是在初始化初期我们保留的两个属性的操作

  • sink.dispose() + self._sink = nil & subscription.dispose() + self._subscription = nil 执行相关释放和销毁

总结一下销毁

第一:内部创建的临时序列和观察者都会随着对外的观察者和序列的生命周期而销毁释放。

第二:外界观察者和序列会随着他们的作用域空间而释放

第三:释放不了只是对象的释放有问题,常规内存管理问题

第四:最为一个再牛逼的框架也不能对程序员写的代码直接管理控制

第五:RxSwift 的观察和序列以及销毁者就是普通对象。

到这里RxSwift从创建序列->订阅序列->发送信号->销毁的源码解析就结束了,里面的精妙还是需要细细品味的。