GCD源码分析(三)——dispatch_once(上)

一、前言

1
2
3
4
5
6
7
8
+ (instancetype) sharedInstance {
static dispatch_once_t onceToken;
static TestObject sharedInstance;
dispatch_once(&onceToken, ^{
sharedInstance = [TestObject new];
});
return sharedInstance;
}

相信大家对上面的代码很熟悉,objective-c中单例的实现离不开dispatch_once。从Swift3.0开始Apple就废弃了dispatch_once:DISPATCH_SWIFT3_UNAVAILABLE("Use lazily initialized globals instead"),Swift中的单例写法变成了下面这种写法,由静态不可变变量代替。但是了解dispatch_once的工作原理对我们编写高质量高性能的代码依然有着重要的参考作用。

1
2
3
4
static let singleton = TestObject()
private override init() {
...
}

dispatch_once被广泛使用在OC单例中,它可以保证在多线程程序中,指定的代码只被执行一次。dispatch_once没有使用任何“锁”,但是依然能够保证多线程情况下的线程安全,并且有着优异的性能。

二、_dispatch_once_f

1
2
3
4
5
6
7
8
9
10
void _dispatch_once_f(dispatch_once_t *predicate, void *_Nullable context,
dispatch_function_t function)
{
if (DISPATCH_EXPECT(*predicate, ~0l) != ~0l) {
dispatch_once_f(predicate, context, function);
} else {
dispatch_compiler_barrier();
}
DISPATCH_COMPILER_CAN_ASSUME(*predicate == ~0l);
}

predicate是dispatch_once_t类型的指针,用来指示block代码是否执行完毕。其实dispatch_once_t就是long类型:

1
typedef long dispatch_once_t;

这个函数的核心就是if-else语句,DISPATCH_EXPECT告诉编译器predict的期待值是~0l,由编译器对代码进行优化,如果predicate != ~0l,说明block语句尚未执行过,进入dispatch_once_f_slow函数,进行一些操作,如:执行block,线程等待、block执行完成后唤醒等待线程等。否则,执行dispatch_compiler_barrier。这个if-else语句涉到了CPU的分支预测和指令预执行,放到下篇讲。

进入dispatch_once_f_slow函数之前,先看下dispatch_once的执行过程中可能遇到哪些情形:

  1. 第一次执行,执行block,执行完成后置predicate标记
  2. 非第一次执行,而步骤1尚未执行完毕,此时线程需要等待步骤1完成,步骤1完成后依次唤醒等待的线程
  3. 非第一次执行,且步骤1已经执行完成,线程跳过block继续执行后续任务

上述的1、2就是在dispatch_once_f_slow中进行处理的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
static void
dispatch_once_f_slow(dispatch_once_t *val, void *ctxt, dispatch_function_t func)
{
...
_dispatch_once_waiter_t volatile *vval = (_dispatch_once_waiter_t*)val;
struct _dispatch_once_waiter_s dow = { };
_dispatch_once_waiter_t tail = &dow, next, tmp;
dispatch_thread_event_t event;

if (atomic_compare_exchange_strong_explicit((typeof(*(vval)) _Atomic *)(vval),
NULL, tail, memory_order_acquire, memory_order_relaxed)) {
dow.dow_thread = _dispatch_tid_self();
_dispatch_client_callout(ctxt, func);

next = (_dispatch_once_waiter_t)atomic_exchange_explicit((typeof(*(val)) _Atomic *)(val), DLOCK_ONCE_DONE, memory_order_release);
while (next != tail) {
tmp = (_dispatch_once_waiter_t)_dispatch_wait_until(next->dow_next);
event = &next->dow_event;
next = tmp;
_dispatch_thread_event_signal(event);
}
} else {
_dispatch_thread_event_init(&dow.dow_event);
next = *vval;
for (;;) {
if (next == DISPATCH_ONCE_DONE) {
break;
}
if (os_atomic_cmpxchgv(vval, next, tail, &next, release)) {
dow.dow_thread = next->dow_thread;
dow.dow_next = next;
if (dow.dow_thread) {
pthread_priority_t pp = _dispatch_get_priority();
_dispatch_thread_override_start(dow.dow_thread, pp, val);
}
_dispatch_thread_event_wait(&dow.dow_event);
if (dow.dow_thread) {
_dispatch_thread_override_end(dow.dow_thread, val);
}
break;
}
}
_dispatch_thread_event_destroy(&dow.dow_event);
}
#endif
}

先看函数的入参:

  • dispatch_once_t *val,即外部传入的predicate标识,指示上述步骤1是否完成
  • void *ctxt,需要执行的block的指针
  • dispatch_function_t func,由libdispatch封装的block内部的执行函数

接下来是声明变量:

  • vval:由volatile修饰的val,

volatile关键字的作用是告诉编译器:这个值随时可能改变,每次用的时候都要从内存中取,使得编译器不对该值进行优化

  • tail, next, tmp都是_dispatch_once_waiter_t类型的变量,其中tail被初始化为dow,这些值与线程等待链有关
  • dispatch_thread_event_t event,用于线程的同步,dispatch_thread_event_t存储了线程的信号量信息
1
2
3
4
5
6
7
8
9
10
typedef struct dispatch_thread_event_s {
#if HAVE_UL_COMPARE_AND_WAIT || HAVE_FUTEX
// 1 means signalled but not waited on yet
// UINT32_MAX means waited on, but not signalled yet
// 0 is the initial and final state
uint32_t dte_value;
#else
_dispatch_sema4_t dte_sema;
#endif
} dispatch_thread_event_s, *dispatch_thread_event_t;

接下来是if-else语句,通过atomic_compare_exchange_strong_explicit原子操作判断传入的val(即:predicate)是否为0,如果为0,说明是第一次执行,进入if分支,并将vval赋值为&dow 执行block。

if 分支

dispatch_once第一次执行时,进入if分支,将vval赋值为&dow,指向_dispatch_once_waiter_s类型的变量,即进入等待,等待DISPATCH_ONCE_DONE

1
2
3
4
5
typedef struct _dispatch_once_waiter_s {
volatile struct _dispatch_once_waiter_s *volatile dow_next;
dispatch_thread_event_s dow_event;
mach_port_t dow_thread;
} *_dispatch_once_waiter_t;

waiter结构体中有next指针,说明这些等待的线程构成了一个等待链。

然后dow.dow_thread = _dispatch_tid_self();将等待链的头指针指向当前线程。

_dispatch_client_callout函数调用func执行block。

block执行完毕后,使用next = (_dispatch_once_waiter_t)atomic_exchange_explicit((typeof(*(val)) _Atomic *)(val), DLOCK_ONCE_DONE, memory_order_release);,这行代码的作用是:1、先将val的值赋值给next;2、把DLOCK_ONCE_DONE赋值给val,说明dispatch_once执行完毕。

由于dispatch_once面临的是多线程,在block执行的过程中可能有多个线程也调用了dispatch_once:

  1. 如果在block执行完毕之前,没有其他线程调用dispatch_once,此时next应该是等于tail的,二者都为0,最后的while循环条件不成立;
  2. 如果在block执行完毕之前,有其他线程调用dispatch_once,此时next在else分支中被改变为downext = *vval,进入while循环。

最后的while循环中,遍历等待链,调用_dispatch_thread_event_signal逐个signal信号量,唤醒等待的线程执行后续的操作。

else 分支

else分支是线程等待分支,在第一次调用dispatch_once开始执行block且未结束的时候,所有后来调用dispatch_once的线程都会进入等待分支。

首先是更改next的值为*vval,通过上文分析,此时的vval指针已经被赋值为&dow

然后进入一个无限for循环,如果发现vval的值为DISPATCH_ONCE_DONE,直接break,并调用_dispatch_thread_event_destroy函数销毁线程信号量,说明第一次的dispatch_once已经完成,此时进入的线程已经无需进入等待链了,直接执行后续操作即可。

如果vval的值不为DISPATCH_ONCE_DONE,说明第一次的dispatch_once还未完成,进行一个原子比较并交换的操作:if (os_atomic_cmpxchgv(vval, next, tail, &next, release)),我们把宏展开

1
2
3
4
5
6
7
if({
_os_atomic_basetypeof(vval) _r = next;
_Bool _b = atomic_compare_exchange_strong_explicit(_os_atomic_c11_atomic(vval),
&_r, tail, memory_order_release, memory_order_relaxed);
next = _r;
_b;
})

atomic_compare_exchange_strong_explicit就是进行比较并交换的原子操作函数,比较当前vvaltail的值。tail为初始化的dispatch_waiter对象,用于存储当前进入dispatch_once的等待线程的信息。

  1. 如果二者相等,说明第一次的dispatch_once还未执行完毕(如果执行完毕,vval的值会被变更为DLOCK_ONCE_DONE),将tail赋值给vval,然后进入if流程,将当前进入等待线程的dispatch_once_waiter的next指针指向当前next。如果在dispatch_once尚未执行完毕的时候,不断地有线程进入等待,结合前面的next = *vval语句,可以看到这样一个线程进入等待链的过程:

    线程进入等待(创建dispatch_once_waiter:dow,并将其作为tail);
    next = vval;
    vval = tail;
    tail->next = next;

    进入等待链的线程调用_dispatch_thread_event_wait来wait信号量,直到dispatch_once完成后signal信号量唤醒线程。

  2. 如果二者不相等,那只有一个可能,dispatch_once完成。将vval(此时的值为DISPATCH_ONCE_DONE)赋值给next,然后在for循环中触发next == DISPATCH_ONCE_DONE条件,break掉for循环,然后调用_dispatch_thread_event_destroy销毁信号量。

三、总结

如同libdispatch中其他代码一样,dispatch_once_f_slow中也使用了诸多的原子操作来实现无锁情形下的线程安全。实际上,原子操作是利用CPU指令执行的原子性实现的CPU级别的“锁”,比传统的软件锁(pthread_mutex_lock等)性能高很多,当我们使用软件锁所带来的性能消耗过大时,可以考虑使用原子锁。但是使用原子操作代替传统锁也有缺陷:编程实现较为复杂,一不小心可能会出问题而且bug难以查找。
下篇文章中,我们分析一下dispatch_once的性能。