0%

iOS多线程-GCD(二)

上一篇我们主要探索了GCD的主队列及串行队列并发队列在源码上的区分,以及同步函数的调用时机。本节我们主要探索同步函数与异步函数的区别:

  • 同步函数死锁分析
  • 任务回调是否具有同步性、异步性
  • dispatch_once底层的分析

我们先从同步函数开始探索。

同步函数死锁分析

libdispatch源码中全局搜索dispatch_sync,找到方法的实现

1
2
3
4
5
6
7
8
9
void
dispatch_sync(dispatch_queue_t dq, dispatch_block_t work)
{
uintptr_t dc_flags = DC_FLAG_BLOCK;
if (unlikely(_dispatch_block_has_private_data(work))) {
return _dispatch_sync_block_with_privdata(dq, work, dc_flags);
}
_dispatch_sync_f(dq, work, _dispatch_Block_invoke(work), dc_flags);
}

继续跟进方法_dispatch_sync_f实际调用_dispatch_sync_f_inline方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_sync_f_inline(dispatch_queue_t dq, void *ctxt,
dispatch_function_t func, uintptr_t dc_flags)
{
if (likely(dq->dq_width == 1)) {//串行队列
return _dispatch_barrier_sync_f(dq, ctxt, func, dc_flags);
}

if (unlikely(dx_metatype(dq) != _DISPATCH_LANE_TYPE)) {
DISPATCH_CLIENT_CRASH(0, "Queue type doesn't support dispatch_sync");
}

dispatch_lane_t dl = upcast(dq)._dl;
// Global concurrent queues and queues bound to non-dispatch threads
// always fall into the slow case, see DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE
if (unlikely(!_dispatch_queue_try_reserve_sync_width(dl))) {
return _dispatch_sync_f_slow(dl, ctxt, func, 0, dl, dc_flags);
}

if (unlikely(dq->do_targetq->do_targetq)) {
return _dispatch_sync_recurse(dl, ctxt, func, dc_flags);
}
_dispatch_introspection_sync_begin(dl);
_dispatch_sync_invoke_and_complete(dl, ctxt, func DISPATCH_TRACE_ARG(
_dispatch_trace_item_sync_push_pop(dq, ctxt, func, dc_flags)));
}

串行队列调用了_dispatch_barrier_sync_f方法,最终调用到_dispatch_barrier_sync_f_inline方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_barrier_sync_f_inline(dispatch_queue_t dq, void *ctxt,
dispatch_function_t func, uintptr_t dc_flags)
{
dispatch_tid tid = _dispatch_tid_self();

if (unlikely(dx_metatype(dq) != _DISPATCH_LANE_TYPE)) {
DISPATCH_CLIENT_CRASH(0, "Queue type doesn't support dispatch_sync");
}

dispatch_lane_t dl = upcast(dq)._dl;
if (unlikely(!_dispatch_queue_try_acquire_barrier_sync(dl, tid))) {
//死锁的是否会有_dispatch_sync_f_slow异常
return _dispatch_sync_f_slow(dl, ctxt, func, DC_FLAG_BARRIER, dl,
DC_FLAG_BARRIER | dc_flags);
}

if (unlikely(dl->do_targetq->do_targetq)) {
return _dispatch_sync_recurse(dl, ctxt, func,
DC_FLAG_BARRIER | dc_flags);
}
_dispatch_introspection_sync_begin(dl);
_dispatch_lane_barrier_sync_invoke_and_complete(dl, ctxt, func
DISPATCH_TRACE_ARG(_dispatch_trace_item_sync_push_pop(
dq, ctxt, func, dc_flags | DC_FLAG_BARRIER)));
}

这里有多个调用的方法,我们自定义一种死锁的情况看调用堆栈的情况:

1
2
3
4
5
6
7
8
9
10
dispatch_queue_t queue = dispatch_queue_create("jason", DISPATCH_QUEUE_SERIAL);
NSLog(@"1");
dispatch_async(queue, ^{
NSLog(@"2");
dispatch_sync(queue, ^{
NSLog(@"3");
});
NSLog(@"4");
});
NSLog(@"5");

执行代码查看调用栈:

死锁

可以看到调用了_dispatch_sync_f_slow方法,然后调用了__DISPATCH_WAIT_FOR_QUEUE__

1
2
3
4
5
6
7
8
9
10
11
12
DISPATCH_NOINLINE
static void
__DISPATCH_WAIT_FOR_QUEUE__(dispatch_sync_context_t dsc, dispatch_queue_t dq)
{
uint64_t dq_state = _dispatch_wait_prepare(dq);
if (unlikely(_dq_state_drain_locked_by(dq_state, dsc->dsc_waiter))) {
DISPATCH_CLIENT_CRASH((uintptr_t)dq_state,
"dispatch_sync called on queue "
"already owned by current thread");
}
///省略代码
}

可以看到这里的crash信息和我们写的例子中最终的错误是一致的

死锁2

说明_dq_state_drain_locked_by判断的条件是产生死锁的原因,这个函数调用了_dispatch_lock_is_locked_by函数:

1
2
3
4
5
6
7
8
DISPATCH_ALWAYS_INLINE
static inline bool
_dispatch_lock_is_locked_by(dispatch_lock lock_value, dispatch_tid tid)
{
// equivalent to _dispatch_lock_owner(lock_value) == tid
return ((lock_value ^ tid) & DLOCK_OWNER_MASK) == 0;
}
#define DLOCK_OWNER_MASK ((dispatch_lock)0xfffffffc)

DLOCK_OWNER_MASK是一个很大的值,说明lock_value ^ tid为0,也就是tid=lock_value,看上面的注释也是这个意思,即当前的等待的线程与现在执行的线程是同一个。

同步函数的回调

我们建立一个全局并发队列探索

1
2
3
4
dispatch_queue_t concurrentQueue = dispatch_get_global_queue(0, 0);
dispatch_sync(concurrentQueue, ^{
NSLog(@"函数调用了....");
});

我们回到_dispatch_sync_f_inline函数,然后打符号断点看执行了哪个方法

  • _dispatch_sync_f_slow
  • _dispatch_sync_recurse
  • _dispatch_introspection_sync_begin
  • _dispatch_sync_invoke_and_complete

同步函数并发队列探索

断点调用到了_dispatch_sync_f_slow函数,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
static void
_dispatch_sync_f_slow(dispatch_queue_class_t top_dqu, void *ctxt,
dispatch_function_t func, uintptr_t top_dc_flags,
dispatch_queue_class_t dqu, uintptr_t dc_flags)
{
dispatch_queue_t top_dq = top_dqu._dq;
dispatch_queue_t dq = dqu._dq;
if (unlikely(!dq->do_targetq)) {
return _dispatch_sync_function_invoke(dq, ctxt, func);
}

pthread_priority_t pp = _dispatch_get_priority();
struct dispatch_sync_context_s dsc = {
.dc_flags = DC_FLAG_SYNC_WAITER | dc_flags,
.dc_func = _dispatch_async_and_wait_invoke,
.dc_ctxt = &dsc,
.dc_other = top_dq,
.dc_priority = pp | _PTHREAD_PRIORITY_ENFORCE_FLAG,
.dc_voucher = _voucher_get(),
.dsc_func = func,
.dsc_ctxt = ctxt,
.dsc_waiter = _dispatch_tid_self(),
};

_dispatch_trace_item_push(top_dq, &dsc);
__DISPATCH_WAIT_FOR_QUEUE__(&dsc, dq);

if (dsc.dsc_func == NULL) {
// dsc_func being cleared means that the block ran on another thread ie.
// case (2) as listed in _dispatch_async_and_wait_f_slow.
dispatch_queue_t stop_dq = dsc.dc_other;
return _dispatch_sync_complete_recurse(top_dq, stop_dq, top_dc_flags);
}
_dispatch_introspection_sync_begin(top_dq);
_dispatch_trace_item_pop(top_dq, &dsc);
_dispatch_sync_invoke_and_complete_recurse(top_dq, ctxt, func,top_dc_flags
DISPATCH_TRACE_ARG(&dsc));
}

我们继续加符号断点跟踪_dispatch_trace_item_push_dispatch_sync_complete_recurse_dispatch_trace_item_pop_dispatch_sync_invoke_and_complete_recurse_dispatch_sync_function_invoke

同步函数并发队列

说明执行了_dispatch_sync_function_invoke函数,注意dq->do_targetq系统队列为空,因为我们使用的是全局并发队列,所以执行到了这里。

继续看_dispatch_sync_function_invoke函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
DISPATCH_NOINLINE
static void
_dispatch_sync_function_invoke(dispatch_queue_class_t dq, void *ctxt,
dispatch_function_t func)
{
_dispatch_sync_function_invoke_inline(dq, ctxt, func);
}
static inline void
_dispatch_sync_function_invoke_inline(dispatch_queue_class_t dq, void *ctxt,
dispatch_function_t func)
{
dispatch_thread_frame_s dtf;
_dispatch_thread_frame_push(&dtf, dq);
_dispatch_client_callout(ctxt, func);
_dispatch_perfmon_workitem_inc();
_dispatch_thread_frame_pop(&dtf);
}

可以看到这里执行了_dispatch_client_callout函数,也就调用了回调函数

异步函数回调

同样,我们也用符号断点的方式探究:

1
2
3
4
dispatch_queue_t concurrentQueue = dispatch_get_global_queue(0, 0);
dispatch_async(concurrentQueue, ^{
NSLog(@"函数调用了....");
});

全局搜索dispatch_async其调用了_dispatch_continuation_async函数,然后调用dx_push

1
#define dx_push(x, y, z) dx_vtable(x)->dq_push(x, y, z)

最后调用了dq_push,dq_push根据队列类型的不同而调用,我们看并发队列的

1
2
3
4
5
6
7
8
9
10
DISPATCH_VTABLE_SUBCLASS_INSTANCE(queue_concurrent, lane,
.do_type = DISPATCH_QUEUE_CONCURRENT_TYPE,
.do_dispose = _dispatch_lane_dispose,
.do_debug = _dispatch_queue_debug,
.do_invoke = _dispatch_lane_invoke,

.dq_activate = _dispatch_lane_activate,
.dq_wakeup = _dispatch_lane_wakeup,
.dq_push = _dispatch_lane_concurrent_push,
);

搜索_dispatch_lane_concurrent_push的实现实际调用了_dispatch_lane_push:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
void
_dispatch_lane_push(dispatch_lane_t dq, dispatch_object_t dou,
dispatch_qos_t qos)
{
dispatch_wakeup_flags_t flags = 0;
struct dispatch_object_s *prev;

if (unlikely(_dispatch_object_is_waiter(dou))) {
return _dispatch_lane_push_waiter(dq, dou._dsc, qos);
}
dispatch_assert(!_dispatch_object_is_global(dq));
qos = _dispatch_queue_push_qos(dq, qos);

prev = os_mpsc_push_update_tail(os_mpsc(dq, dq_items), dou._do, do_next);
if (unlikely(os_mpsc_push_was_empty(prev))) {
_dispatch_retain_2_unsafe(dq);
flags = DISPATCH_WAKEUP_CONSUME_2 | DISPATCH_WAKEUP_MAKE_DIRTY;
} else if (unlikely(_dispatch_queue_need_override(dq, qos))) {
_dispatch_retain_2_unsafe(dq);
flags = DISPATCH_WAKEUP_CONSUME_2;
}
os_mpsc_push_update_prev(os_mpsc(dq, dq_items), prev, dou._do, do_next);
if (flags) {
return dx_wakeup(dq, qos, flags);
}
}

我们添加符号断点_dispatch_lane_push_waiter_dispatch_queue_push_qosos_mpsc_push_update_prevdx_wakeup。调用的是dx_wakeup:

1
#define dx_wakeup(x, y, z) dx_vtable(x)->dq_wakeup(x, y, z)

实际是对dq_wakeup的封装,依然我们找并发队列:

1
2
3
4
5
6
7
8
9
10
DISPATCH_VTABLE_SUBCLASS_INSTANCE(queue_concurrent, lane,
.do_type = DISPATCH_QUEUE_CONCURRENT_TYPE,
.do_dispose = _dispatch_lane_dispose,
.do_debug = _dispatch_queue_debug,
.do_invoke = _dispatch_lane_invoke,

.dq_activate = _dispatch_lane_activate,
.dq_wakeup = _dispatch_lane_wakeup,
.dq_push = _dispatch_lane_concurrent_push,
);

_dispatch_lane_wakeup调用的_dispatch_queue_wakeup,同样也是把_dispatch_queue_wakeup调用的return的方法添加符号断点,会调用_dispatch_queue_wakeup->_dispatch_lane_push,执行到_dispatch_root_queue_drain

dispatch_once函数底层实现

我们平时定义一个单例对象的时候一般都会使用dispatch_once函数

1
2
3
4
static dispatch_once_t onceToken;
dispatch_once(&onceToken, ^{
NSLog(@"once")
});

我们在libdispatch中搜索dispatch_once看其底层实现:

1
2
3
4
5
void
dispatch_once(dispatch_once_t *val, dispatch_block_t block)
{
dispatch_once_f(val, block, _dispatch_Block_invoke(block));
}

valdispatch_once_t类型。继续跟进dispatch_once_f函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
DISPATCH_NOINLINE
void
dispatch_once_f(dispatch_once_t *val, void *ctxt, dispatch_function_t func)
{
//gate
dispatch_once_gate_t l = (dispatch_once_gate_t)val;

#if !DISPATCH_ONCE_INLINE_FASTPATH || DISPATCH_ONCE_USE_QUIESCENT_COUNTER
uintptr_t v = os_atomic_load(&l->dgo_once, acquire);
if (likely(v == DLOCK_ONCE_DONE)) {//第一次会标示为done return
return;
}
#if DISPATCH_ONCE_USE_QUIESCENT_COUNTER
if (likely(DISPATCH_ONCE_IS_GEN(v))) {
return _dispatch_once_mark_done_if_quiesced(l, v);
}
#endif
#endif
///第一次调用
if (_dispatch_once_gate_tryenter(l)) {//锁 说明单例是线程安全的
return _dispatch_once_callout(l, ctxt, func);//执行任务
}
return _dispatch_once_wait(l);
}
static inline bool
_dispatch_once_gate_tryenter(dispatch_once_gate_t l)
{
return os_atomic_cmpxchg(&l->dgo_once, DLOCK_ONCE_UNLOCKED,
(uintptr_t)_dispatch_lock_value_for_self(), relaxed);
}
DISPATCH_NOINLINE
static void
_dispatch_once_callout(dispatch_once_gate_t l, void *ctxt,
dispatch_function_t func)
{
///执行任务
_dispatch_client_callout(ctxt, func);
///广播关门处理
_dispatch_once_gate_broadcast(l);
}
static inline void
_dispatch_once_gate_broadcast(dispatch_once_gate_t l)
{
dispatch_lock value_self = _dispatch_lock_value_for_self();
uintptr_t v;
#if DISPATCH_ONCE_USE_QUIESCENT_COUNTER
v = _dispatch_once_mark_quiescing(l);
#else
v = _dispatch_once_mark_done(l);
#endif
if (likely((dispatch_lock)v == value_self)) return;
_dispatch_gate_broadcast_slow(&l->dgo_gate, (dispatch_lock)v);
}

小结

  • 只执行一次原理:onceToken是静态变量,具有唯一性,在底层被封装成了dispatch_once_gate_t类型的变量ll主要是用来获取底层原子封装性的关联,即变量v,通过v来查询任务的状态,如果此时v等于DLOCK_ONCE_DONE,说明任务已经处理过一次了,直接`return

  • block调用的时机:如果此时任务没有执行过,将任务进行加锁,即任务状态置为DLOCK_ONCE_UNLOCK,目的是为了保证当前任务执行的唯一性,防止在其他地方有多次定义。加锁之后进行block回调函数的执行,执行完成后,将当前任务解锁,将当前的任务状态置为DLOCK_ONCE_DONE,在下次进来时,就不会在执行,会直接返回

  • 如果在当前任务执行期间,有其他任务进来,会进入无限次等待,原因是当前任务已经获取了锁,进行了加锁,其他任务是无法获取锁的