GCD源码分析(二)——Dispatch Queue和Thread Pool

一、前言

大部分的GCD操作都离不开队列(queue):使用dispatch_get_main_queue获取主队列,使用dispatch_queue_create创建一个自定义的队列,使用dispatch_get_global_queue获取一个全局的并发队列等等。那么GCD是如何通过这些队列实现多线程的呢?它又是如何管理这些队列的呢?dispatch_async/dispatch_sync是如何工作的呢?带着这些问题,我们从源码中寻找答案。

二、队列与线程

首先,借用一张网上的图片直观地描述GCD队列和线程的关系。

GCD队列和线程

通过GCD,我们可以很方便地实现多线程,而不需要过多地关注线程的实现和创建等,GCD内部维护了一个线程池,由系统根据任务的数量和优先级动态地创建和分配线程执行。

我们提交的任务由GCD内部的manager queue管理一层层地分发到target queue中,最终汇聚到root queue中并由线程池管理线程来执行任务。

线程和队列并不是一对一的关系,一个线程中可能有多个串行或并行队列,这些队列按照同步或异步的方式工作。

注:为方便阅读,文中的所有代码均为宏展开后的代码。

GCD中队列的种类

从libdispatch源码中可以看到,GCD中一共有如下几种队列:

  • 主队列,使用dispatch_get_main_queue()获得的队列,与主线程绑定
  • 全局队列,使用dispatch_get_global_queue()获得的队列,是并行队列,由GCD创建并管理,也是libdispatch内部使用的root-queue
  • 自定义队列,使用dispatch_queue_create()创建的队列,为串行或并行队列
  • 管理队列,libdispatch内部使用的队列,不暴露给开发者,作为队列的调度管理者使用
  • Runloop队列,用于与线程绑定的dispatch_queue,比如:提交到main-queue上的任务是由runloop-queue进行管理并最终调度到main thread的runloop中处理。
主队列main queue

dispatch_get_main_queue()的函数声明处有如下内容:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/*!
* @function dispatch_get_main_queue
*
* @abstract
* Returns the default queue that is bound to the main thread.
*
* @discussion
* In order to invoke blocks submitted to the main queue, the application must
* call dispatch_main(), NSApplicationMain(), or use a CFRunLoop on the main
* thread.
*
* @result
* Returns the main queue. This queue is created automatically on behalf of
* the main thread before main() is called.
*/

可以看出主队列在main()函数调用之前被创建,同时也说明了libdispatch库的初始化工作在main函数之前就完成了。

调用dispatch_get_main_queue()返回的是_dispatch_main_q这样一个dispatch_queue_t类型的结构体。

1
2
3
4
dispatch_queue_t dispatch_get_main_queue(void)
{
return (__bridge dispatch_queue_t)&(_dispatch_main_q);
}

_dispatch_main_q的结构:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
struct dispatch_queue_s _dispatch_main_q = {
.do_vtable = OS_dispatch_queue_main_class,
._objc_isa = OS_dispatch_queue_main_class
.do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
.do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT
#if !DISPATCH_USE_RESOLVERS
.do_targetq = &_dispatch_root_queues[
DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS_OVERCOMMIT],
#endif
.dq_state = DISPATCH_QUEUE_STATE_INIT_VALUE(1) |
DISPATCH_QUEUE_ROLE_BASE_ANON,
.dq_label = "com.apple.main-thread",
.dq_atomic_flags = DQF_THREAD_BOUND | DQF_CANNOT_TRYSYNC | DQF_WIDTH(1),
.dq_serialnum = 1,
};

从代码中可以看到_dispatch_main_q包含以下属性:

1、.do_vtable、._objc_isa
  • do_vtable 包含了队列的类型、dispose、invoke、push、wakeup和debug等信息,这些信息与队列和任务的调度有关
1
2
3
4
5
6
7
8
9
DISPATCH_VTABLE_SUBCLASS_INSTANCE(queue_main, queue,
.do_type = DISPATCH_QUEUE_SERIAL_TYPE,
.do_kind = "main-queue",
.do_dispose = _dispatch_queue_dispose,
.do_push = _dispatch_queue_push,
.do_invoke = _dispatch_queue_invoke,
.do_wakeup = _dispatch_main_queue_wakeup,
.do_debug = dispatch_queue_debug,
);
  • _objc_isa main_queue的isa指针指向OS_dispatch_queue_main_class
2、.do_ref_cnt、.do_xref_cnt

队列的内部、外部引用计数都赋值为DISPATCH_OBJECT_GLOBAL_REFCNT,而DISPATCH_OBJECT_GLOBAL_REFCNT的实际定义为INT_MAX,说明了主队列的生命周期与App的生命周期一致,开发者无需对主队列进行retain/release操作,其生命周期由GCD管理。

1
2
#define DISPATCH_OBJECT_GLOBAL_REFCNT		_OS_OBJECT_GLOBAL_REFCNT
#define _OS_OBJECT_GLOBAL_REFCNT INT_MAX
3、.do_targetq

主队列的target queue为&_dispatch_root_queues[ DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS_OVERCOMMIT],,实际上就是serialnum=11的”com.apple.root.default-qos.overcommit”这个全局队列,注意这里虽然有个条件编译命令:#if !DISPATCH_USE_RESOLVERS,但是实际上在libdispatch_init()函数中又一次设置了main_queue的do_targetq:

1
2
3
4
#if DISPATCH_USE_RESOLVERS // rdar://problem/8541707
_dispatch_main_q.do_targetq = &_dispatch_root_queues[
DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS_OVERCOMMIT];
#endif

所以无论条件编译是否命中,主队列的target queue都被设置为”com.apple.root.default-qos.overcommit”这个root queue。

dispatch_main_queue的target_queue

GCD中所有的非全局队列(自定义队列及内部的管理队列)的任务最终都是要提交到全局队列(即:root queue)中处理,主队列除外,主队列与主线程绑定,提交到主队列中的任务由runloop queue管理并提交到主线程的runloop执行。

全局队列global queue

GCD内部维护12个全局队列,对应上述的四个优先级:High/Default/Low/Background。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
struct dispatch_queue_s _dispatch_root_queues[] = {
#define _DISPATCH_ROOT_QUEUE_IDX(n, flags) \
((flags & DISPATCH_PRIORITY_FLAG_OVERCOMMIT) ? \
DISPATCH_ROOT_QUEUE_IDX_##n##_QOS_OVERCOMMIT : \
DISPATCH_ROOT_QUEUE_IDX_##n##_QOS)
#define _DISPATCH_ROOT_QUEUE_ENTRY(n, flags, ...) \
[_DISPATCH_ROOT_QUEUE_IDX(n, flags)] = { \
DISPATCH_GLOBAL_OBJECT_HEADER(queue_root), \
.dq_state = DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE, \
.do_ctxt = &_dispatch_root_queue_contexts[ \
_DISPATCH_ROOT_QUEUE_IDX(n, flags)], \
.dq_atomic_flags = DQF_WIDTH(DISPATCH_QUEUE_WIDTH_POOL), \
.dq_priority = _dispatch_priority_make(DISPATCH_QOS_##n, 0) | flags | \
DISPATCH_PRIORITY_FLAG_ROOTQUEUE | \
((flags & DISPATCH_PRIORITY_FLAG_DEFAULTQUEUE) ? 0 : \
DISPATCH_QOS_##n << DISPATCH_PRIORITY_OVERRIDE_SHIFT), \
__VA_ARGS__ \
}

_DISPATCH_ROOT_QUEUE_ENTRY(MAINTENANCE, 0,
.dq_label = "com.apple.root.maintenance-qos",
.dq_serialnum = 4,
),
_DISPATCH_ROOT_QUEUE_ENTRY(MAINTENANCE, DISPATCH_PRIORITY_FLAG_OVERCOMMIT,
.dq_label = "com.apple.root.maintenance-qos.overcommit",
.dq_serialnum = 5,
),
_DISPATCH_ROOT_QUEUE_ENTRY(BACKGROUND, 0,
.dq_label = "com.apple.root.background-qos",
.dq_serialnum = 6,
),
_DISPATCH_ROOT_QUEUE_ENTRY(BACKGROUND, DISPATCH_PRIORITY_FLAG_OVERCOMMIT,
.dq_label = "com.apple.root.background-qos.overcommit",
.dq_serialnum = 7,
),
_DISPATCH_ROOT_QUEUE_ENTRY(UTILITY, 0,
.dq_label = "com.apple.root.utility-qos",
.dq_serialnum = 8,
),
_DISPATCH_ROOT_QUEUE_ENTRY(UTILITY, DISPATCH_PRIORITY_FLAG_OVERCOMMIT,
.dq_label = "com.apple.root.utility-qos.overcommit",
.dq_serialnum = 9,
),
_DISPATCH_ROOT_QUEUE_ENTRY(DEFAULT, DISPATCH_PRIORITY_FLAG_DEFAULTQUEUE,
.dq_label = "com.apple.root.default-qos",
.dq_serialnum = 10,
),
_DISPATCH_ROOT_QUEUE_ENTRY(DEFAULT,
DISPATCH_PRIORITY_FLAG_DEFAULTQUEUE | DISPATCH_PRIORITY_FLAG_OVERCOMMIT,
.dq_label = "com.apple.root.default-qos.overcommit",
.dq_serialnum = 11,
),
_DISPATCH_ROOT_QUEUE_ENTRY(USER_INITIATED, 0,
.dq_label = "com.apple.root.user-initiated-qos",
.dq_serialnum = 12,
),
_DISPATCH_ROOT_QUEUE_ENTRY(USER_INITIATED, DISPATCH_PRIORITY_FLAG_OVERCOMMIT,
.dq_label = "com.apple.root.user-initiated-qos.overcommit",
.dq_serialnum = 13,
),
_DISPATCH_ROOT_QUEUE_ENTRY(USER_INTERACTIVE, 0,
.dq_label = "com.apple.root.user-interactive-qos",
.dq_serialnum = 14,
),
_DISPATCH_ROOT_QUEUE_ENTRY(USER_INTERACTIVE, DISPATCH_PRIORITY_FLAG_OVERCOMMIT,
.dq_label = "com.apple.root.user-interactive-qos.overcommit",
.dq_serialnum = 15,
),
};

dq_serialnum为队列号,全局队列的队列号从4开始,前面三个分别为:

  • 主队列,dq_serialnum = 1
  • 管理队列(_dispatch_mgr_q),dq_serialnum = 2
  • dispatch_mgr_root_queue(_dispatch_mgr_q的目标队列),dq_serialnum = 3
管理队列manager queue
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
struct dispatch_queue_s _dispatch_mgr_q = {
.do_vtable = DISPATCH_VTABLE(queue_mgr),
._objc_isa = DISPATCH_OBJC_CLASS(queue_mgr),
.do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
.do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
.dq_state = DISPATCH_QUEUE_STATE_INIT_VALUE(1) |
DISPATCH_QUEUE_ROLE_BASE_ANON,
.do_targetq = &_dispatch_mgr_root_queue,
.dq_label = "com.apple.libdispatch-manager",
.dq_atomic_flags = DQF_WIDTH(1),
.dq_priority = DISPATCH_PRIORITY_FLAG_MANAGER |
DISPATCH_PRIORITY_SATURATED_OVERRIDE,
.dq_serialnum = 2,
};

static struct dispatch_queue_s _dispatch_mgr_root_queue = {
DISPATCH_GLOBAL_OBJECT_HEADER(queue_root),
.dq_state = DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE,
.do_ctxt = &_dispatch_mgr_root_queue_context,
.dq_label = "com.apple.root.libdispatch-manager",
.dq_atomic_flags = DQF_WIDTH(DISPATCH_QUEUE_WIDTH_POOL),
.dq_priority = DISPATCH_PRIORITY_FLAG_MANAGER |
DISPATCH_PRIORITY_SATURATED_OVERRIDE,
.dq_serialnum = 3,
};

#if DISPATCH_USE_PTHREAD_POOL
// 6618342 Contact the team that owns the Instrument DTrace probe before
// renaming this symbol
static void *
_dispatch_worker_thread(void *context)
{
....

#if DISPATCH_USE_INTERNAL_WORKQUEUE
bool overcommit = (qc->dgq_wq_options & WORKQ_ADDTHREADS_OPTION_OVERCOMMIT);
bool manager = (dq == &_dispatch_mgr_root_queue);
bool monitored = !(overcommit || manager);
if (monitored) {
_dispatch_workq_worker_register(dq, qc->dgq_qos);
}
#endif

const int64_t timeout = 5ull * NSEC_PER_SEC;
pthread_priority_t old_pri = _dispatch_get_priority();
do {
_dispatch_root_queue_drain(dq, old_pri);
_dispatch_reset_priority_and_voucher(old_pri, NULL);
} while (dispatch_semaphore_wait(&pqc->dpq_thread_mediator,
dispatch_time(0, timeout)) == 0);

#if DISPATCH_USE_INTERNAL_WORKQUEUE
if (monitored) {
_dispatch_workq_worker_unregister(dq, qc->dgq_qos);
}
#endif
(void)os_atomic_inc2o(qc, dgq_thread_pool_size, release);
_dispatch_global_queue_poke(dq, 1, 0);
_dispatch_release(dq); // retained in _dispatch_global_queue_poke_slow
return NULL;
}
#endif // DISPATCH_USE_PTHREAD_POOL

从代码可以看出,manager queue是作为root queue与线程池之间的调度和管理者的,如:GCD Timer的实现就用到了管理队列

自定义队列

使用dispatch_queue_create创建自定义队列,为方便阅读,只保留主要流程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
static dispatch_queue_t
_dispatch_queue_create_with_target(const char *label, dispatch_queue_attr_t dqa,
dispatch_queue_t tq, bool legacy)
{
if (!slowpath(dqa)) {
// 如果没有传入dispatch_queue_attr_t,将其设置为默认参数
dqa = _dispatch_get_default_queue_attr();
} else if (dqa->do_vtable != DISPATCH_VTABLE(queue_attr)) {
DISPATCH_CLIENT_CRASH(dqa->do_vtable, "Invalid queue attribute");
}

//
// Step 1: Normalize arguments (qos, overcommit, tq)
//
// 获取要创建的队列的优先级
dispatch_qos_t qos = _dispatch_priority_qos(dqa->dqa_qos_and_relpri);

...

if (!tq) {
// 如果没有指定target queue, 将dispatch_root_queue赋值给tq
tq = _dispatch_get_root_queue(
qos == DISPATCH_QOS_UNSPECIFIED ? DISPATCH_QOS_DEFAULT : qos,
overcommit == _dispatch_queue_attr_overcommit_enabled);
if (slowpath(!tq)) {
DISPATCH_CLIENT_CRASH(qos, "Invalid queue attribute");
}
}

//
// Step 2: Initialize the queue
//

if (legacy) {
// if any of these attributes is specified, use non legacy classes
if (dqa->dqa_inactive || dqa->dqa_autorelease_frequency) {
legacy = false;
}
}

// 创建dispatch_queue的vtable参数
const void *vtable;
dispatch_queue_flags_t dqf = 0;
if (legacy) {
vtable = DISPATCH_VTABLE(queue);
} else if (dqa->dqa_concurrent) {
vtable = DISPATCH_VTABLE(queue_concurrent);
} else {
vtable = DISPATCH_VTABLE(queue_serial);
}

...

// 分配空间
dispatch_queue_t dq = _dispatch_object_alloc(vtable,
sizeof(struct dispatch_queue_s) - DISPATCH_QUEUE_CACHELINE_PAD);
// 根据上文的参数初始化队列
_dispatch_queue_init(dq, dqf, dqa->dqa_concurrent ?
DISPATCH_QUEUE_WIDTH_MAX : 1, DISPATCH_QUEUE_ROLE_INNER |
(dqa->dqa_inactive ? DISPATCH_QUEUE_INACTIVE : 0));

// 设置label
dq->dq_label = label;
// 设置优先级
dq->dq_priority = dqa->dqa_qos_and_relpri;
if (!dq->dq_priority) {
// legacy way of inherithing the QoS from the target
_dispatch_queue_priority_inherit_from_target(dq, tq);
} else if (overcommit == _dispatch_queue_attr_overcommit_enabled) {
dq->dq_priority |= DISPATCH_PRIORITY_FLAG_OVERCOMMIT;
}
if (!dqa->dqa_inactive) {
_dispatch_queue_inherit_wlh_from_target(dq, tq);
}
// retain target queue,防止其提前释放
_dispatch_retain(tq);
// 设置target queue
dq->do_targetq = tq;
_dispatch_object_debug(dq, "%s", __func__);
// 自省函数
return _dispatch_introspection_queue_create(dq);
}

根据传入的参数调用_dispatch_queue_init创建相应的串行或并行队列,然后设置label、队列优先级,并设置target queue为dispatch_root_queue

1
2
3
4
tq = _dispatch_get_root_queue(
qos == DISPATCH_QOS_UNSPECIFIED ? DISPATCH_QOS_DEFAULT : qos,
overcommit == _dispatch_queue_attr_overcommit_enabled);
dq->do_targetq = tq;

targetq的作用就是将push到queue中的任务(可能是任务也可能是queue)层层向上push,最终push到全局队列中,由全局队列调度线程池来执行任务(或者pop queue)。

While custom queues are a powerful abstraction, all blocks you schedule on them will ultimately trickle down to one of the system’s global queues and its thread pool(s).

虽然自定义队列是一个强大的抽象,但你在队列上安排的所有Block最终都会渗透到系统的某一个全局队列及其线程池。

Runloop队列

Runloop队列用于与线程绑定的队列的任务调度,比如主队列,看下main queue的wake up函数:

1
2
3
4
5
6
7
8
9
10
11
void
_dispatch_main_queue_wakeup(dispatch_queue_t dq, dispatch_qos_t qos,
dispatch_wakeup_flags_t flags)
{
#if DISPATCH_COCOA_COMPAT
if (_dispatch_queue_is_thread_bound(dq)) {
return _dispatch_runloop_queue_wakeup(dq, qos, flags);
}
#endif
return _dispatch_queue_wakeup(dq, qos, flags);
}

_dispatch_queue_is_thread_bound函数判断当前队列是否是与线程绑定的,由于主队列是绑定在主线程上的,这里就调用了_dispatch_runloop_queue_wakeup函数,由runloop queue将任务调度到主线程的runloop上,最终由主线程runloop在合适的时机执行。

GCD dispatch流程分析

上一篇文章说过,dispatch_queue_s结构体中有do_vtable元素,这个do_vtable中包含了队列的push/wakeup/invoke/dispose等与dispatch相关的信息:

1
2
3
4
5
6
7
8
9
DISPATCH_VTABLE_SUBCLASS_INSTANCE(queue_main, queue,
.do_type = DISPATCH_QUEUE_SERIAL_TYPE,
.do_kind = "main-queue",
.do_dispose = _dispatch_queue_dispose,
.do_push = _dispatch_queue_push,
.do_invoke = _dispatch_queue_invoke,
.do_wakeup = _dispatch_main_queue_wakeup,
.do_debug = dispatch_queue_debug,
);

其它队列,如:manager queue、runloop queue、root queue等也有类似的vtable结构:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/// manager queue vtable
DISPATCH_VTABLE_SUBCLASS_INSTANCE(queue_mgr, queue,
.do_type = DISPATCH_QUEUE_MGR_TYPE,
.do_kind = "mgr-queue",
.do_push = _dispatch_mgr_queue_push,
.do_invoke = _dispatch_mgr_thread,
.do_wakeup = _dispatch_mgr_queue_wakeup,
.do_debug = dispatch_queue_debug,
);

/// runloop queue vtable
DISPATCH_VTABLE_SUBCLASS_INSTANCE(queue_runloop, queue,
.do_type = DISPATCH_QUEUE_RUNLOOP_TYPE,
.do_kind = "runloop-queue",
.do_dispose = _dispatch_runloop_queue_dispose,
.do_push = _dispatch_queue_push,
.do_invoke = _dispatch_queue_invoke,
.do_wakeup = _dispatch_runloop_queue_wakeup,
.do_debug = dispatch_queue_debug,
);

/// root queue vtable
DISPATCH_VTABLE_SUBCLASS_INSTANCE(queue_root, queue,
.do_type = DISPATCH_QUEUE_GLOBAL_ROOT_TYPE,
.do_kind = "global-queue",
.do_dispose = _dispatch_pthread_root_queue_dispose,
.do_push = _dispatch_root_queue_push,
.do_invoke = NULL,
.do_wakeup = _dispatch_root_queue_wakeup,
.do_debug = dispatch_queue_debug,
);
  • .do_push: 调用push操作将任务提交到queue上
1
2
3
4
static void _dispatch_continuation_push(dispatch_queue_t dq, dispatch_continuation_t dc)
{
dx_vtable(dq)->do_push(dq, dc, _dispatch_continuation_override_qos(dq, dc));
}
  • .do_wakeup: 当push任务到队列中时,会调用do_wakeup唤醒队列
1
2
3
4
5
6
7
8
9
static inline void
_dispatch_queue_push_inline(dispatch_queue_t dq, dispatch_object_t _tail,
dispatch_qos_t qos)
{
struct dispatch_object_s *tail = _tail._do;
dispatch_wakeup_flags_t flags = 0;
...
return dx_vtable(dq)->do_wakeup(dq, qos, flags);
}
  • .do_invoke: 在libdispatch中,do_invoke只在以下3种情况执行:1、任务出队;2、runloop queue出队;3、如果queue重写了invoke,则当queue元素出队时,调用_dispatch_queue_override_invoke,在_dispatch_queue_override_invoke函数中调用do_invoke。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
/// 1.
static inline void _dispatch_continuation_pop_inline(dispatch_object_t dou,
dispatch_invoke_context_t dic, dispatch_invoke_flags_t flags,
dispatch_queue_t dq)
{
dispatch_pthread_root_queue_observer_hooks_t observer_hooks =
_dispatch_get_pthread_root_queue_observer_hooks();
if (observer_hooks) observer_hooks->queue_will_execute(dq);
_dispatch_trace_continuation_pop(dq, dou);
flags &= _DISPATCH_INVOKE_PROPAGATE_MASK;
if (_dispatch_object_has_vtable(dou)) {
/// 调用invoke,执行任务
dx_invoke(dou._do, dic, flags);
} else {
_dispatch_continuation_invoke_inline(dou, DISPATCH_NO_VOUCHER, flags);
}
if (observer_hooks) observer_hooks->queue_did_execute(dq);
}

/// 2.
void _dispatch_root_queue_drain_deferred_wlh(dispatch_deferred_items_t ddi
DISPATCH_PERF_MON_ARGS_PROTO)
{
...
retry:
dispatch_assert(ddi->ddi_wlh_needs_delete);
// 出队一个任务块
_dispatch_trace_continuation_pop(rq, dq);

if (_dispatch_queue_drain_try_lock_wlh(dq, &dq_state)) {
/// runloop queue出队一个元素,调用其invoke函数
dx_invoke(dq, &dic, flags);
...
}
...
}

/// 3.
void _dispatch_queue_override_invoke(dispatch_continuation_t dc,
dispatch_invoke_context_t dic, dispatch_invoke_flags_t flags)
{
dispatch_queue_t old_rq = _dispatch_queue_get_current();
dispatch_queue_t assumed_rq = dc->dc_other;
dispatch_priority_t old_dp;
voucher_t ov = DISPATCH_NO_VOUCHER;
dispatch_object_t dou;

...
_dispatch_continuation_pop_forwarded(dc, ov, DISPATCH_OBJ_CONSUME_BIT, {
if (_dispatch_object_has_vtable(dou._do)) {
dx_invoke(dou._do, dic, flags);
} else {
_dispatch_continuation_invoke_inline(dou, ov, flags);
}
});
_dispatch_reset_basepri(old_dp);
_dispatch_queue_set_current(old_rq);
}

除了主队列,其他所有队列中提交的任务最终都要通过层层的target queue提交到root queue中(把queue整体提交到target queue中),从线程池中取出或新建一个线程执行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
void
_dispatch_queue_class_wakeup(dispatch_queue_t dq, dispatch_qos_t qos,
dispatch_wakeup_flags_t flags, dispatch_queue_wakeup_target_t target)
{
dispatch_assert(target != DISPATCH_QUEUE_WAKEUP_WAIT_FOR_EVENT);

if (target && !(flags & DISPATCH_WAKEUP_CONSUME_2)) {
_dispatch_retain_2(dq);
flags |= DISPATCH_WAKEUP_CONSUME_2;
}

...

if (target) {
...
if (likely((old_state ^ new_state) & enqueue)) {
dispatch_queue_t tq;
if (target == DISPATCH_QUEUE_WAKEUP_TARGET) {
os_atomic_thread_fence();dependency
tq = os_atomic_load_with_dependency_on2o(dq, do_targetq,
(long)new_state);
} else {
tq = target;
}
dispatch_assert(_dq_state_is_enqueued(new_state));
// 将queue提交到它的target queue中
return _dispatch_queue_push_queue(tq, dq, new_state);
}
...
}
...
done:
if (likely(flags & DISPATCH_WAKEUP_CONSUME_2)) {
return _dispatch_release_2_tailcall(dq);
}
}

那么队列的dispatch的大致流程如下所示:

提交任务到queue -> 调用push将任务入队 -> 调用wakeup唤醒队列 -> 如果有target queue,往上提交,直到root queue为止 -> 由root queue从线程池中取出或创建一个新线程执行任务。

主队列的dispatch流程与上述略有不同,提交到主队列的任务由GCD内部的runloop queue管理并最终由主线程的runloop执行。

dispatch_async分析

先看下主队列上的异步任务。

主队列的dispatch_async

如果我们想在主线程中执行一个异步操作,通常的做法:

1
2
3
dispatch_async(dispatch_get_main_queue(), ^{
NSLog(@"dispatch async in main queue");
});

上述这段代码在libdispatch内部的流程如下:

1、将传入的block任务转换成一个dispatch_continuation_t类型的结构体对象,然后调用_dispatch_continuation_async将continuation push到main queue中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
void dispatch_async(dispatch_queue_t dq, dispatch_block_t work)
{
// 分配空间
dispatch_continuation_t dc = _dispatch_continuation_alloc();
uintptr_t dc_flags = DISPATCH_OBJ_CONSUME_BIT;
// block转换成dispatch_continuation对象
_dispatch_continuation_init(dc, dq, work, 0, 0, dc_flags);
_dispatch_continuation_async(dq, dc);
}

static inline void _dispatch_continuation_async2(dispatch_queue_t dq, dispatch_continuation_t dc,
bool barrier)
{
if (fastpath(barrier || !DISPATCH_QUEUE_USES_REDIRECTION(dq->dq_width))) {
// 将任务push到main queue中
return _dispatch_continuation_push(dq, dc);
}
return _dispatch_async_f2(dq, dc);
}

注:在libdispatch源码中能经常见到fastpath、slowpath、likely、unlikely等宏,编写这些宏的目的是告诉编译器来对我们的代码进行优化,通常:

  • fastpath/likely 表示条件更可能成立
  • slowpath/unlikely 表示条件更不可能成立

2、唤醒main queue

1
2
3
4
5
6
7
8
9
10
11
12
13
_dispatch_continuation_push(dispatch_queue_t dq, dispatch_continuation_t dc)
{
dx_vtable(dq)->do_push(dq, dc, _dispatch_continuation_override_qos(dq, dc));
}
// 上文说过main queue的vtable存储了push/wakeup/invoke等信息,上述代码实际上是调用了_dispatch_queue_push
static inline void
_dispatch_queue_push_inline(dispatch_queue_t dq, dispatch_object_t _tail,
dispatch_qos_t qos)
{
...
// 这里调用queue的wakeup函数
return dx_wakeup(dq, qos, flags);
}

3、wakeup runloop queue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
void _dispatch_main_queue_wakeup(dispatch_queue_t dq, dispatch_qos_t qos,
dispatch_wakeup_flags_t flags)
{
#if DISPATCH_COCOA_COMPAT
if (_dispatch_queue_is_thread_bound(dq)) {
// 这里是判断queue是否与与线程绑定,main queue的wake up命中判断,唤醒runloop queue.
return _dispatch_runloop_queue_wakeup(dq, qos, flags);
}
#endif
return _dispatch_queue_wakeup(dq, qos, flags);
}

void _dispatch_runloop_queue_wakeup(dispatch_queue_t dq, dispatch_qos_t qos,
dispatch_wakeup_flags_t flags)
{
#if DISPATCH_COCOA_COMPAT
...
if (_dispatch_queue_class_probe(dq)) {
// 判断队列中是否有任务,如果有,执行runloop queue poke操作
return _dispatch_runloop_queue_poke(dq, qos, flags);
}

...
return _dispatch_queue_wakeup(dq, qos, flags);
#endif
}

4、唤醒主线程,并注册回调函数,由mach内核在合适的时机执行_dispatch_main_queue_drain操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
static inline void _dispatch_runloop_queue_class_poke(dispatch_queue_t dq)
{
dispatch_runloop_handle_t handle = _dispatch_runloop_queue_get_handle(dq);
if (!_dispatch_runloop_handle_is_valid(handle)) {
return;
}

#if HAVE_MACH
mach_port_t mp = handle;
kern_return_t kr = _dispatch_send_wakeup_runloop_thread(mp, 0);
switch (kr) {
case MACH_SEND_TIMEOUT:
case MACH_SEND_TIMED_OUT:
case MACH_SEND_INVALID_DEST:
break;
default:
(void)dispatch_assume_zero(kr);
break;
}
...
#else
#error "runloop support not implemented on this platform"
#endif
}

void _dispatch_main_queue_callback_4CF(
void *ignored DISPATCH_UNUSED)
{
if (main_q_is_draining) {
return;
}
_dispatch_queue_set_mainq_drain_state(true);
_dispatch_main_queue_drain();
_dispatch_queue_set_mainq_drain_state(false);
}

5、执行_dispatch_continuation_pop_inline函数,如果主队列中有未完成的任务,将任务出队并执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
static void _dispatch_main_queue_drain(void)
{
dispatch_queue_t dq = &_dispatch_main_q;
dispatch_thread_frame_s dtf;

if (!dq->dq_items_tail) {
return;
}

...
do {
next_dc = os_mpsc_pop_snapshot_head(dc, tail, do_next);
_dispatch_continuation_pop_inline(dc, &dic, DISPATCH_INVOKE_NONE, dq);
} while ((dc = next_dc));
...
}

static inline void _dispatch_continuation_pop_inline(dispatch_object_t dou,
dispatch_invoke_context_t dic, dispatch_invoke_flags_t flags,
dispatch_queue_t dq)
{
...
if (_dispatch_object_has_vtable(dou)) {
dx_invoke(dou._do, dic, flags);
} else {
_dispatch_continuation_invoke_inline(dou, DISPATCH_NO_VOUCHER, flags);
}
if (observer_hooks) observer_hooks->queue_did_execute(dq);
}

至此,主队列的dispatch_async流程执行完毕,其实我们在xcode断点时的堆栈信息也能窥探一二。

dispatch_async(dispatch_main_queue)的调用栈

自定义队列/全局队列上的dispatch_async

示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 1、自定义串行队列
dispatch_queue_t customSerialQueue = dispatch_queue_create("com.gcd.queue.custom", DISPATCH_QUEUE_SERIAL);
dispatch_async(customSerialQueue, ^{
NSLog(@"customSerialQueue task");
});

// 2、自定义并行队列
dispatch_queue_t customConcurrentQueue = dispatch_queue_create("com.gcd.queue.custom", DISPATCH_QUEUE_CONCURRENT);
dispatch_async(customConcurrentQueue, ^{
NSLog(@"customConcurrentQueue task");
});

dispatch_queue_t globalQueue = dispatch_get_global_queue(QOS_CLASS_DEFAULT, 0);
// 3、全局队列
dispatch_async(globalQueue, ^{
NSLog(@"customGlobalQueue task");
});

自定义创建的并行队列比其他队列(串行队列和全局队列)多了一个redirection流程

1
2
3
4
5
6
7
8
9
10
static inline void _dispatch_continuation_async2(dispatch_queue_t dq, dispatch_continuation_t dc,
bool barrier)
{
// 判断是否需要redirection,如果不需要,将任务入队
if (fastpath(barrier || !DISPATCH_QUEUE_USES_REDIRECTION(dq->dq_width))) {
return _dispatch_continuation_push(dq, dc);
}
// 执行redirection流程
return _dispatch_async_f2(dq, dc);
}

代码中DISPATCH_QUEUE_USES_REDIRECTION这个宏是用来判断queue是否需要redirection,如果dq_width满足width > 1 && width < 0xfff条件,则队列需要热direction。串行队列(dq_width = 1)和全局队列(dq_width = 0xfff)都不满足上述条件,无需direction。

queue width

redirection:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
static void
_dispatch_async_f_redirect(dispatch_queue_t dq,
dispatch_object_t dou, dispatch_qos_t qos)
{
if (!slowpath(_dispatch_object_is_redirection(dou))) {
dou._dc = _dispatch_async_redirect_wrap(dq, dou);
}
dq = dq->do_targetq;

// Find the queue to redirect to
while (slowpath(DISPATCH_QUEUE_USES_REDIRECTION(dq->dq_width))) {
if (!fastpath(_dispatch_queue_try_acquire_async(dq))) {
break;
}
...
dq = dq->do_targetq;
}

dx_push(dq, dou, qos);
}

redirection操作的目的就是要将任务最终push到root queue中。

对于无需redirection的队列,调用其push函数,将任务push到队列中,分两种情况:

  • 普通的自定义队列:如果queue有target_queue,调用_dispatch_queue_push_queue,将queue层层向上push到target_queue中,最终push到root queue中。
  • 全局队列:由于全局队列(root queue)没有target_queue,调用_dispatch_root_queue_push直接把任务push到root queue中。

最终,所有提交到非主队列的任务都push到了root queue中,由root queue调度线程池并分配线程执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
/// root queue线程池管理相关
static void _dispatch_global_queue_poke_slow(dispatch_queue_t dq, int n, int floor)
{
dispatch_root_queue_context_t qc = dq->do_ctxt;
int remaining = n;
int r = ENOSYS;

_dispatch_root_queues_init();
_dispatch_debug_root_queue(dq, __func__);
...
#if DISPATCH_USE_PTHREAD_POOL
dispatch_pthread_root_queue_context_t pqc = qc->dgq_ctxt;
if (fastpath(pqc->dpq_thread_mediator.do_vtable)) {
while (dispatch_semaphore_signal(&pqc->dpq_thread_mediator)) {
_dispatch_root_queue_debug("signaled sleeping worker for "
"global queue: %p", dq);
if (!--remaining) {
return;
}
}
}

bool overcommit = dq->dq_priority & DISPATCH_PRIORITY_FLAG_OVERCOMMIT;
if (overcommit) {
os_atomic_add2o(qc, dgq_pending, remaining, relaxed);
} else {
if (!os_atomic_cmpxchg2o(qc, dgq_pending, 0, remaining, relaxed)) {
_dispatch_root_queue_debug("worker thread request still pending for "
"global queue: %p", dq);
return;
}
}

int32_t can_request, t_count;
// seq_cst with atomic store to tail <rdar://problem/16932833>
t_count = os_atomic_load2o(qc, dgq_thread_pool_size, ordered);
do {
can_request = t_count < floor ? 0 : t_count - floor;
if (remaining > can_request) {
_dispatch_root_queue_debug("pthread pool reducing request from %d to %d",
remaining, can_request);
os_atomic_sub2o(qc, dgq_pending, remaining - can_request, relaxed);
remaining = can_request;
}
if (remaining == 0) {
_dispatch_root_queue_debug("pthread pool is full for root queue: "
"%p", dq);
return;
}
} while (!os_atomic_cmpxchgvw2o(qc, dgq_thread_pool_size, t_count,
t_count - remaining, &t_count, acquire));

pthread_attr_t *attr = &pqc->dpq_thread_attr;
pthread_t tid, *pthr = &tid;
#if DISPATCH_USE_MGR_THREAD && DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES
if (slowpath(dq == &_dispatch_mgr_root_queue)) {
pthr = _dispatch_mgr_root_queue_init();
}
#endif
do {
_dispatch_retain(dq); // released in _dispatch_worker_thread
while ((r = pthread_create(pthr, attr, _dispatch_worker_thread, dq))) {
if (r != EAGAIN) {
(void)dispatch_assume_zero(r);
}
_dispatch_temporary_resource_shortage();
}
} while (--remaining);
#endif // DISPATCH_USE_PTHREAD_POOL
}

那么root queue是如何出队的呢?上述代码的do-while循环中调用了pthread_create创建新的线程,并将线程运行函数起始地址指向_dispatch_worker_thread,那么线程创建后会执行_dispatch_worker_thread

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
static void *_dispatch_worker_thread(void *context)
{

...

do {
_dispatch_root_queue_drain(dq, old_pri);
_dispatch_reset_priority_and_voucher(old_pri, NULL);
} while (dispatch_semaphore_wait(&pqc->dpq_thread_mediator,
dispatch_time(0, timeout)) == 0);

...

_dispatch_global_queue_poke(dq, 1, 0);

...

return NULL;
}

static void _dispatch_root_queue_drain(dispatch_queue_t dq, pthread_priority_t pp)
{
...
while ((item = fastpath(_dispatch_root_queue_drain_one(dq)))) {
if (reset) _dispatch_wqthread_override_reset();
_dispatch_continuation_pop_inline(item, &dic, flags, dq);
reset = _dispatch_reset_basepri_override();
if (unlikely(_dispatch_queue_drain_should_narrow(&dic))) {
break;
}
}

...

#if DISPATCH_COCOA_COMPAT
_dispatch_last_resort_autorelease_pool_pop(&dic);
#endif // DISPATCH_COCOA_COMPAT
...
}

_dispatch_worker_thread中进行drain root queue,将root queue中的元素一个个出队,元素出队时调用_dispatch_continuation_pop_inline,触发队元素的.do_invoke,执行任务。

总结

整理一下dispatch_async的流程:

dispatch_async流程

dispatch_sync分析

dispatch_sync的流程与上文分析的大同小异,一般来说同步任务是在当前线程中执行,同时它会阻塞当前线程直到任务执行完毕。

  • 当queue时串行队列时,当前线程会获取lock,如果成功则执行任务,否则出发crash,比如

dispatch_sync死锁

  • 当queue是并行队列时,会直接执行任务。

关于dispatch_sync的流程不详细分析了,这里重点关注一下lock机制以及引起死锁的情形。

dispatch_sync_f函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
void
dispatch_sync_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func)
{
if (likely(dq->dq_width == 1)) {
// 串行队列
return dispatch_barrier_sync_f(dq, ctxt, func);
}

// Global concurrent queues and queues bound to non-dispatch threads
// always fall into the slow case, see DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE
if (unlikely(!_dispatch_queue_try_reserve_sync_width(dq))) {
// 全局队列
return _dispatch_sync_f_slow(dq, ctxt, func, 0);
}

_dispatch_introspection_sync_begin(dq);
if (unlikely(dq->do_targetq->do_targetq)) {
// 多重队列,寻找最终的targetq,最终还是会回到该函数中
return _dispatch_sync_recurse(dq, ctxt, func, 0);
}
// 执行队列中的任务
_dispatch_sync_invoke_and_complete(dq, ctxt, func);
}
获取队列的lock

dispatch_sync_f函数中,如果是串行队列,执行dispatch_barrier_sync_f,一步步往下执行,会看到lock相关的函数:_dispatch_queue_try_acquire_barrier_sync_and_suspend

使用宏替换后的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
static inline bool
_dispatch_queue_try_acquire_barrier_sync_and_suspend(dispatch_queue_t dq,
uint32_t tid, uint64_t suspend_count)
{
// 根据queue的width初始化init
uint64_t init = DISPATCH_QUEUE_STATE_INIT_VALUE(dq->dq_width);
// _dispatch_lock_value_from_tid获取传入tid的第3到32位,mask为0xfffffffc
uint64_t value = DISPATCH_QUEUE_WIDTH_FULL_BIT | DISPATCH_QUEUE_IN_BARRIER |
_dispatch_lock_value_from_tid(tid) |
(suspend_count * DISPATCH_QUEUE_SUSPEND_INTERVAL);
uint64_t old_state, new_state;
bool _result = false;
typeof(&(dq)->dq_state) _p = &(dq)->dq_state;
// 原子操作,获取queue的dq_state并赋值给old_state
old_state = os_atomic_load(_p, relaxed);

do {
uint64_t role = old_state & DISPATCH_QUEUE_ROLE_MASK;
if (old_state != (init | role)) {
// 如果old_state与(init | role)值不相等,说明queue的dq_state被改变,当前有thread持有该queue,
// 终止循环并直接返回false
os_atomic_rmw_loop_give_up(break);
}

new_state = value | role;

_os_atomic_basetypeof(_p) _r = old_state;

// 判断old_state是否等于new_state,
// 如果相等,获取lock成功,置db.dq_state为new_state,并返回true
// 否则,获取lock失败,结束循环,返回false
_Bool _b = atomic_compare_exchange_weak_explicit(_os_atomic_c11_atomic(_p), &_r, new_state, memory_order_acquire, memory_order_relaxed);
old_state = _r;
_result = _b;

} while (os_unlikely(!_result));

return result;
}

从上述代码可以看出,libdispatch通过一些原子操作来比较queue.dq_state的值来实现lock操作:

1、如果dq_state为初始值(init | role),说明当前queue没有被任何线程lock,则lock成功并设置dq_state为(value | role);
2、否则lock失败,返回false。

线程获取到queue的lock后,queue.dq_state中同时也记录了当前持有lock的线程的tid信息。

再返回到上一级函数dispatch_barrier_sync_f

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
void dispatch_barrier_sync_f(dispatch_queue_t dq, void *ctxt,
dispatch_function_t func)
{
dispatch_tid tid = _dispatch_tid_self();

// The more correct thing to do would be to merge the qos of the thread
// that just acquired the barrier lock into the queue state.
//
// However this is too expensive for the fastpath, so skip doing it.
// The chosen tradeoff is that if an enqueue on a lower priority thread
// contends with this fastpath, this thread may receive a useless override.
//
// Global concurrent queues and queues bound to non-dispatch threads
// always fall into the slow case, see DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE
if (unlikely(!_dispatch_queue_try_acquire_barrier_sync(dq, tid))) {
return _dispatch_sync_f_slow(dq, ctxt, func, DISPATCH_OBJ_BARRIER_BIT);
}

_dispatch_introspection_sync_begin(dq);
if (unlikely(dq->do_targetq->do_targetq)) {
return _dispatch_sync_recurse(dq, ctxt, func, DISPATCH_OBJ_BARRIER_BIT);
}
_dispatch_queue_barrier_sync_invoke_and_complete(dq, ctxt, func);
}

当lock失败后,进入到_dispatch_sync_f_slow等待上一个任务执行完成:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
static void
_dispatch_sync_wait(dispatch_queue_t top_dq, void *ctxt,
dispatch_function_t func, uintptr_t top_dc_flags,
dispatch_queue_t dq, uintptr_t dc_flags)
{
pthread_priority_t pp = _dispatch_get_priority();
dispatch_tid tid = _dispatch_tid_self();
dispatch_qos_t qos;
uint64_t dq_state;

// 死锁检测部分
// 获取当前queue的dq_state
dq_state = _dispatch_sync_wait_prepare(dq);
if (unlikely(_dq_state_drain_locked_by(dq_state, tid))) {
// 如果当前queue已经被当前thread持有,引起死锁,触发crash
DISPATCH_CLIENT_CRASH((uintptr_t)dq_state,
"dispatch_sync called on queue "
"already owned by current thread");
}

struct dispatch_sync_context_s dsc = {
.dc_flags = dc_flags | DISPATCH_OBJ_SYNC_WAITER_BIT,
.dc_other = top_dq,
.dc_priority = pp | _PTHREAD_PRIORITY_ENFORCE_FLAG,
.dc_voucher = DISPATCH_NO_VOUCHER,
.dsc_func = func,
.dsc_ctxt = ctxt,
.dsc_waiter = tid,
};

...

// 将任务入队
_dispatch_queue_push_sync_waiter(dq, &dsc, qos);

// 等待之前的任务执行完毕
if (dsc.dc_data == DISPATCH_WLH_ANON) {
// 信号量等待,semaphore_wait(dsc.dsc_event->dte_sema)
_dispatch_thread_event_wait(&dsc.dsc_event); // acquire
// 销毁信号量,结束等待,semaphore_destroy(mach_task_self(), dsc.dsc_event->dte_sema)
_dispatch_thread_event_destroy(&dsc.dsc_event);
// If _dispatch_sync_waiter_wake() gave this thread an override,
// ensure that the root queue sees it.
if (dsc.dsc_override_qos > dsc.dsc_override_qos_floor) {
_dispatch_set_basepri_override_qos(dsc.dsc_override_qos);
}
} else {
// 等待owner释放
_dispatch_event_loop_wait_for_ownership(&dsc);
}
_dispatch_introspection_sync_begin(top_dq);

...

// 最终调用_dispatch_sync_function_invoke_inline,执行任务
_dispatch_sync_invoke_and_complete_recurse(top_dq, ctxt, func,top_dc_flags);
}



static inline void
_dispatch_sync_function_invoke_inline(dispatch_queue_t dq, void *ctxt,
dispatch_function_t func)
{
dispatch_thread_frame_s dtf;
_dispatch_thread_frame_push(&dtf, dq);
_dispatch_client_callout(ctxt, func);
_dispatch_perfmon_workitem_inc();
_dispatch_thread_frame_pop(&dtf);
}
死锁分析

死锁的产生必须满足四个必要条件:

  • 资源互斥访问
  • 请求与保持
  • 不剥夺
  • 循环等待

libdispatch中创建并分配线程来执行任务块的过程中,线程/队列对资源的操作满足上述前三个条件,那么如果再满足第四个条件,必然会发生死锁。

在GCD中满足两个条件即会形成循环等待的情形:

  • 串行队列正在执行任务Task 1(无论是sync还是async方式提交的)
  • Task 1未执行完成,又向队列中同步提交Task 2(dispatch_sync方式提交)

死锁的示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
dispatch_queue_t serialQ = dispatch_queue_create("com.testqueue.serial", DISPATCH_QUEUE_SERIAL_WITH_AUTORELEASE_POOL);

// 1.
dispatch_sync(serialQ, ^{
NSLog(@"Task 1 begin.");
dispatch_sync(serialQ, ^{
NSLog(@"Task 2.");
});
NSLog(@"Task 1 complete.");
});

// 2.
dispatch_async(serialQ, ^{
NSLog(@"Task 1 begin.");
dispatch_sync(serialQ, ^{
NSLog(@"Task 2.");
});
NSLog(@"Task 1 complete.");
});

// 两种情况输出一样:
Task 1 begin.
// 并且发生crash, xcode提示:Thread 1: EXC_BAD_INSTRUCTION (code=EXC_I386_INVOP, subcode=0x0)

新版libdispatch中引入了死锁检测机制,发生死锁时,主动触发程序crash,并定位到引起死锁的代码,降低了调试难度。

死锁检测相关的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
dq_state = _dispatch_sync_wait_prepare(dq);
if (unlikely(_dispatch_lock_is_locked_by(dq_state, tid))) {
// 如果当前queue已经被当前thread持有,引起死锁,触发crash
DISPATCH_CLIENT_CRASH((uintptr_t)dq_state,
"dispatch_sync called on queue "
"already owned by current thread");
}

static inline bool _dispatch_lock_is_locked_by(dispatch_lock lock_value, dispatch_tid tid)
{
// equivalent to _dispatch_lock_owner(lock_value) == tid
return ((lock_value ^ tid) & DLOCK_OWNER_MASK) == 0;
}

从中可以看到,libdispatch是通过(dq_state ^ tid) & DLOCK_OWNER_MASK这句代码判断dq是否被某个线程持有,而dq.dq_state中存有持有它的线程的tid信息,如果对应tid的线程持有了dq,则返回true,说明当前线程已持有dq,循环等待条件成立,产生死锁,否则返回false。

那么问题来了,如果在thread_A中提交Task1,在Task1还在执行时,在thread_B中同步提交Task2会发生什么情况呢,GCD能否检测出死锁呢?

测试代码中为了避开主线程死锁对测试的干扰,采用dispatch_async方式提交Task1。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
- (void)viewDidLoad {
[super viewDidLoad];
// Do any additional setup after loading the view, typically from a nib.

dispatch_queue_t serialQueue = dispatch_queue_create("com.testqueue.serial", DISPATCH_QUEUE_SERIAL_WITH_AUTORELEASE_POOL);
// 为了避免主线程的死锁,干扰测试结果,这里使用dispatch_async方式提交Task1.
dispatch_async(serialQueue, ^{
NSLog(@"Task 1 begin.");

dispatch_sync(dispatch_get_main_queue(), ^{
dispatch_sync(serialQueue, ^{
NSLog(@"Task 2 begin.");
});
});

NSLog(@"Task 1 complete.");
});

NSLog(@"main queue task complete.");
}

输出如下:

1
2
Test[73008:2443797] main queue complete.
Test[73008:2443854] Task 1 begin.

此时程序卡住并没有crash,xcode也并未提示任何crash信息。

这说明了libdispatch死锁检测机制的问题:它只针对在同一个线程中向串行队列同步提交任务的情况。如果Task 2是在其它线程中同步提交的,它就无法检测出来了。

三、总结

本文只是粗略地分析了GCD的queue、dispatch过程以及queue和线程之间的调度关系,libdispatch源码的繁杂远远不止于此,细节之处还有如:mach_port通信、线程tsd、runloop callback、系统内核(libkern)交互等等,同时还有一些提升程序性能的编程技巧等(libdispatch为了最大限度地提升性能,大量使用了原子操作而非pthread_mutex_lockOSSpinLock/OSUnfairLock等锁来实现同步)。感兴趣的同学可以把源码下载下来仔细阅读一下。