PromiseKit源码分析

源码版本:6.8.4

首先从官方实例开始

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import PromiseKit

func promise3() -> Promise<Int> {
return after(.seconds(1)).map{ 3 }
}

firstly {
Promise.value(1)
}.map { _ in
2
}.then { _ in
promise3()
}.done {
print($0) // => 3
}.catch { error in
// only happens for errors
}.finally {
PlaygroundPage.current.finishExecution()
}
//使Playground等待异步操作执行完在退出
PlaygroundPage.current.needsIndefiniteExecution = true

firstly

是一个全局函数

1
2
3
4
5
6
7
8
9
public func firstly<U: Thenable>(execute body: () throws -> U) -> Promise<U.T> {
do {
let rp = Promise<U.T>(.pending)
try body().pipe(to: rp.box.seal)
return rp
} catch {
return Promise(error: error)
}
}

接受一个返回Thenable的闭包,返回一个Promise<U.T>对于官方实例返回的就是Promise<Int>

Promise

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public final class Promise<T>: Thenable, CatchMixin {
let box: Box<Result<T>>

fileprivate init(box: SealedBox<Result<T>>) {
self.box = box
}

public class func value(_ value: T) -> Promise<T> {
return Promise(box: SealedBox(value: .fulfilled(value)))
}

public init<U: Thenable>(_ bridge: U) where U.T == T {
box = EmptyBox()
bridge.pipe(to: box.seal)
}
}

关于Promise先看这个几个方法,首先内部维护了一个Box范型对象,构造器或者类方法都会对box属性赋值,但是传递的参数不同,下面看看Box是什么。

Box

1
2
3
4
5
class Box<T> {
func inspect() -> Sealant<T> { fatalError() }
func inspect(_: (Sealant<T>) -> Void) { fatalError() }
func seal(_: T) {}
}

这个类可以理解为一个抽象类,由于Swift不支持抽象类,所以用这种抛出错误的方式强制用户不可以直接调用,让子类重写。(PS:为啥不用协议呢?~不理解)看看子类实现:

1
2
3
4
5
6
7
8
9
10
11
final class SealedBox<T>: Box<T> {
let value: T

init(value: T) {
self.value = value
}

override func inspect() -> Sealant<T> {
return .resolved(value)
}
}

看类的命名是一个密封了的盒子,就是已经得到确定值的Boxinspect方法返回一个Enum类型携带了这个值

1
2
3
4
enum Sealant<R> {
case pending(Handlers<R>)
case resolved(R) //携带的确定值
}

另一个子类实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
final class Handlers<R> {
var bodies: [(R) -> Void] = []
func append(_ item: @escaping(R) -> Void) { bodies.append(item) }
}

class EmptyBox<T>: Box<T> {
//一个枚举关联值是Handlers,默认是个空数组,数组内部元素是你期待类型操作的闭包
private var sealant = Sealant<T>.pending(.init())
// 一个异步队列
private let barrier = DispatchQueue(label: "org.promisekit.barrier", attributes: .concurrent)

//改变了sealant的类型,并且拿到持有的操作数组,将得到的值传递给数组中的闭包执行
override func seal(_ value: T) {
var handlers: Handlers<T>!
barrier.sync(flags: .barrier) {
guard case .pending(let _handlers) = self.sealant else {
return // already fulfilled!
}
handlers = _handlers
self.sealant = .resolved(value)
}

//FIXME we are resolved so should `pipe(to:)` be called at this instant, “thens are called in order” would be invalid
//NOTE we don’t do this in the above `sync` because that could potentially deadlock
//THOUGH since `then` etc. typically invoke after a run-loop cycle, this issue is somewhat less severe

if let handlers = handlers {
handlers.bodies.forEach{ $0(value) }
}

//TODO solution is an unfortunate third state “sealed” where then's get added
// to a separate handler pool for that state
// any other solution has potential races
}
//同步读取内部的sealant的值返回
override func inspect() -> Sealant<T> {
var rv: Sealant<T>!
barrier.sync {
rv = self.sealant
}
return rv
}
//这个方法有点复杂 会被Promise的pipe方法调用
override func inspect(_ body: (Sealant<T>) -> Void) {
var sealed = false
barrier.sync(flags: .barrier) {
switch sealant {
case .pending:
// body will append to handlers, so we must stay barrier’d
body(sealant)
case .resolved:
sealed = true
}
}
if sealed {
// we do this outside the barrier to prevent potential deadlocks
// it's safe because we never transition away from this state
body(sealant)
}
}
}

结合pipe方法来分析一下inspect到底做了什么,pipe方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/// - See: `Thenable.pipe`
public func pipe(to: @escaping(Result<T>) -> Void) {
switch box.inspect() {
case .pending:
box.inspect {
switch $0 {
case .pending(let handlers):
handlers.append(to)
case .resolved(let value):
to(value)
}
}
case .resolved(let value):
to(value)
}
}

pipe调用inspect传递了一个闭包,我们一层一层展开它就变成这个样子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
override func inspect(_ body: (Sealant<T>) -> Void) {
var sealed = false
barrier.sync(flags: .barrier) {
switch sealant {
case .pending:
// body will append to handlers, so we must stay barrier’d
//body(sealant)
switch sealant {
case .pending(let handlers):
// to是下一个操作的闭包 添加到了box关联的数组中
handlers.append(to)
case .resolved(let value):
to(value)
}
case .resolved:
sealed = true
}
}
if sealed {
// we do this outside the barrier to prevent potential deadlocks
// it's safe because we never transition away from this state
body(sealant)
}
}

firstly执行流程总结

首先fristly全局方法会将我们传递进去的Promise调用pipe连接到一个pendingPromise,由于pipe接收一个闭包参数,正好Box类内部的func seal(_: T) {}就符合这个闭包,所以传递pipe就接受了seal作为参数,所以到目前为止

1
2
3
firstly {
Promise.value(1)
}

可以转换成下面这样

1
Promise.value(1).pipe(to:EmptyBox().seal)

Promise.value(1)创建的Promise内部会维护一个resolved类型的Box,接着EmptyBox()创建了一个pending类型的Boxpipe方法内部会调用pending类型Boxseal方法将Promise.value(1)内部的值封装进EmptyBox()中,并且改变Box的类型为resolved,然后返回了这个新的盒子。

短短一句firstly包含了一个盒子的替换和状态改变。

map操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func map<U>(on: DispatchQueue? = conf.Q.map, flags: DispatchWorkItemFlags? = nil, _ transform: @escaping(T) throws -> U) -> Promise<U> {
let rp = Promise<U>(.pending)
pipe {
switch $0 {
case .fulfilled(let value):
on.async(flags: flags) {
do {
rp.box.seal(.fulfilled(try transform(value)))
} catch {
rp.box.seal(.rejected(error))
}
}
case .rejected(let error):
rp.box.seal(.rejected(error))
}
}
return rp
}

首先上一步的Promise.value(1)就会创建一个Promise(box: SealedBox(value: .fulfilled(value))),内部Box中的值就是fulfilled类型的,所以这里的pipe会执行fulfilled分支,在制定的线程上异步执行内部新盒子的seal方法替换值并改变状态,这里内部还会捕捉transform执行的异常自动的进行rejected调用。如果没有错误,就又会返回一个新的Promise包含了map之后的新值。

then操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func then<U: Thenable>(on: DispatchQueue? = conf.Q.map, flags: DispatchWorkItemFlags? = nil, _ body: @escaping(T) throws -> U) -> Promise<U.T> {
let rp = Promise<U.T>(.pending)
pipe {
switch $0 {
case .fulfilled(let value):
on.async(flags: flags) {
do {
let rv = try body(value)
guard rv !== rp else { throw PMKError.returnedSelf }
rv.pipe(to: rp.box.seal)
} catch {
rp.box.seal(.rejected(error))
}
}
case .rejected(let error):
rp.box.seal(.rejected(error))
}
}
return rp
}

看这个样子有点像map操作,但是仔细看返回值不一样,map操作返回Promise<U>then操作返回Promise<U.T>,这是因为我们在mapthen中做的事情不一样,比如我们的Promise<Int>map操作,transform闭包返回值是Int,而thenbody闭包返回值是Promise<T>,所以最后还要将内部的类型取出来作为一个新得Promise返回,所以就要返回Promise<U.T>

剩下的操作差不多就是在指定的线程队列上异步指定body取得新的Promise ,然后判断我们传递进去的Promise和内部新创建的Promise是不是相同,如果相同就抛出错误,如果不同就我们传入的Promise值传递给内部Promise并改变状态,最后返回新Promise

done操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func done(on: DispatchQueue? = conf.Q.return, flags: DispatchWorkItemFlags? = nil, _ body: @escaping(T) throws -> Void) -> Promise<Void> {
let rp = Promise<Void>(.pending)
pipe {
switch $0 {
case .fulfilled(let value):
on.async(flags: flags) {
do {
try body(value)
rp.box.seal(.fulfilled(()))
} catch {
rp.box.seal(.rejected(error))
}
}
case .rejected(let error):
rp.box.seal(.rejected(error))
}
}
return rp
}

看到这个指定逻辑就知道了,已经到最后的执行步骤了,不会在有下一步的执行,所以传递进去的闭包没有返回值,内部也直接返回了一个Promise<Void>

catch操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@discardableResult
func `catch`(on: DispatchQueue? = conf.Q.return, flags: DispatchWorkItemFlags? = nil, policy: CatchPolicy = conf.catchPolicy, _ body: @escaping(Error) -> Void) -> PMKFinalizer {
let finalizer = PMKFinalizer()
pipe {
switch $0 {
case .rejected(let error):
guard policy == .allErrors || !error.isCancelled else {
fallthrough
}
on.async(flags: flags) {
body(error)
finalizer.pending.resolve(())
}
case .fulfilled:
finalizer.pending.resolve(())
}
}
return finalizer
}

PMKFinalizer类型

1
2
3
4
5
6
7
8
9
10
public class PMKFinalizer {
let pending = Guarantee<Void>.pending()

/// `finally` is the same as `ensure`, but it is not chainable
public func finally(on: DispatchQueue? = conf.Q.return, flags: DispatchWorkItemFlags? = nil, _ body: @escaping () -> Void) {
pending.guarantee.done(on: on, flags: flags) {
body()
}
}
}

这个第一句有点意思 它的方法像下面这样

1
2
3
4
//方法内部又是一个方法,内部方法返回的是一个元组 所以padding的类型就是一个元组
public class func pending() -> (guarantee: Guarantee<T>, resolve: (T) -> Void) {
return { ($0, $0.box.seal) }(Guarantee<T>(.pending))
}

最后padding的类型是:

1
let pending = (Guarantee<T>(.pending), Guarantee<T>(.pending).box.seal)

所以再回过头看catch的操作就是,检查前面的步骤中有rejected,就执行rejected操作,进行一个错误的筛选,然后执行我们的闭包操作在执行finalizer.pending.resolve(()),其实这个finalizer.pending.resolve(())具体的执行仅仅改变了Guarantee内部box的状态剩下的啥也没干,筛选错误时还允许穿透到fulfilled分支,其实也是什么也没做,仅仅改变状态,最后返回finalizer对象。

finally操作

因为catch操作返回的finalizer对象有一个finally方法,所以最后可以调用,内部调用了元组中Guarantee<T>(.pending)done方法

1
2
3
4
5
6
7
8
9
10
11
@discardableResult
func done(on: DispatchQueue? = conf.Q.return, flags: DispatchWorkItemFlags? = nil, _ body: @escaping(T) -> Void) -> Guarantee<Void> {
let rg = Guarantee<Void>(.pending)
pipe { (value: T) in
on.async(flags: flags) {
body(value)
rg.box.seal(())
}
}
return rg
}

Guarantee中的done方法没有rejected分支判断,这个类的语义就是保证都成功,其他的和Promisedone操作类似。

最后的总结

首先我们构造Promise

1
2
3
4
5
6
7
8
9
10
firstly {
URLSession.shared.dataTask(.promise, with: try makeUrlRequest()).validate()
// ^^ we provide `.validate()` so that eg. 404s get converted to errors
}.map {
try JSONDecoder().decode(Foo.self, with: $0.data)
}.done { foo in
//…
}.catch { error in
//…
}

firstly内部帮我们启动了一个异步队列跑我们的耗时任务,然后在接下来的mapthen中又切换回主线程调用我们的操作,这样就实现了异步的链式调用。