iOS 多线程:锁

本文主要介绍iOS中常用锁的相关知识及用法,大部分概念来自于线程同步及线程锁

原子操作

问:在OC中对属性变量添加atomic修饰符,能使属性线程安全吗?

原子操作,即不可分割开的操作;该操作一定是在同一个cpu时间片中完成,这样即使线程被切换,多个线程也不会看到同一块内存中不完整的数据。

原子表示不可分割的最小单元,具体来说是指在所处尺度空间或者层(layer)中不能观测到更为具体的内部实现与结构。对于计算机程序执行的最小单位是单条指令。我们可以通过参考各种cpu的指令操作手册,用其汇编指令编写原子操作。而这种方式太过于低效。

某些简单的表达式可以算作现代编程语言的最小执行单元,某些简单的表达式,其实编译之后得到的汇编指令不止一条,所以他们并不是真正意义原子的。以加法指令操作实现 x += n为例 ,gcc编译出来的汇编形式上如下:

1
2
3
4
5
...
movl 0xc(%ebp), %eax
addl $n, %eax
movl %eax, 0xc(%ebp)
...

而将它放在所线程环境之中,显然也是不安全的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
dispatch_group_t group = dispatch_group_create();
__block int i = 1;
for (int k = 0; k < 300; k++) {
dispatch_group_enter(group);
dispatch_async(dispatch_get_global_queue(0, 0), ^{
++i;
dispatch_group_leave(group);
});
dispatch_group_enter(group);
dispatch_async(dispatch_get_global_queue(0, 0), ^{
--i;
dispatch_group_leave(group);
});
}
dispatch_group_notify(group, dispatch_get_main_queue(), ^{
NSLog(@"----result=%d i=%d",self.pro1,i);
});

上述例子中,全局变量i理论上应该最后得到1,而实际上却几率性得到0,-1,2,-2,1。

为了避免错误,很多操作系统或编译器都提供了一些常用原子化操作的内建函数或API,包括一些实际是多条指令的常用表达式。上述操作中,将i++/i–,替换为 OSAtomicIncrement32(&i) / OSAtomicDecrement32(&i) ,将得到预期的结果1。
OC 中的原子操作API

OSAtomicAdd32 原子交换两个值
OSAtomicDecrement32 原子减少一个值
OSAtomicIncrement32 原子增加一个值
OSAtomicXor32 原子进行异或

综上,OC中使用原子属性,并不能保证线程安全,而iOS中同步锁开销很大,所以在iOS开发中通常给核心业务代码加锁,使其整体变为原子的,而不针对具体的属性读写方法。

iOS中的锁

互斥锁

1. 基本概念

互斥锁是在很多平台上都比较常用的一种锁。它属于sleep-waiting类型的锁。即当锁处于占用状态时,其他线程会挂起,当锁被释放时,所有等待的线程都将被唤醒,再次对锁进行竞争。在挂起与释放过程中,涉及用户态与内核态之间的context切换,而这种切换是比较消耗性能的。

2. pthread_mutex

pthread_mutex 是pthread中的互斥锁,具有跨平台性质。pthread是POSIX线程(POSIX threads)的简称,是线程的POSIX标准(可移植操作系统接口 Portable Operation System Interface)。POSIX是unix的api设计标准,兼容各大主流平台。所以pthread_mutex是比较低层的,可以跨平台的互斥锁实现。

初始化方法:

1
int pthread_mutex_init(pthread_mutex_t * __restrict, const pthread_mutexattr_t * __restrict);

pthread_mutex_t * __restrict 代表互斥锁的类型,有以下四种:

1.PTHREAD_MUTEX_NORMAL 缺省类型,也就是普通锁。当一个线程加锁以后,其余请求锁的线程将形成一个等待队列,并在解锁后先进先出原则获得锁。
2.PTHREAD_MUTEX_ERRORCHECK 检错锁,如果同一个线程请求同一个锁,则返回 EDEADLK,否则与普通锁类型动作相同。这样就保证当不允许多次加锁时不会出现嵌套情况下的死锁。
3.PTHREAD_MUTEX_RECURSIVE 递归锁,允许同一个线程对同一个锁成功获得多次,并通过多次 unlock 解锁。线程首次成功获取互斥锁时,锁定计数会设置为 1。线程每重新锁定该互斥锁一次,锁定计数就增加 1。线程每解除锁定该互斥锁一次,锁定计数就减小 1。 锁定计数达到 0 时,该互斥锁即可供其他线程获取。如果某个线程尝试解除锁定的互斥锁不是由该线程锁定或者未锁定,则将返回错误。
4.PTHREAD_MUTEX_DEFAULT 适应锁,动作最简单的锁类型,仅等待解锁后重新竞争,没有等待队列。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
pthread_mutex_t mutex;
void MyInitFunction()
{
pthread_mutex_init(&mutex, NULL);
}

void MyLockingFunction()
{
pthread_mutex_lock(&mutex);
// Do work.
pthread_mutex_unlock(&mutex);
}
//释放锁
pthread_mutex_destroy(&mutex);

pthread_mutex还有一种简便的调用方式,使用的是全局唯一互斥锁。实验表明,该锁是所有属性都是默认的,进程内可见,类型是普通锁

1
2
3
4
static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_lock(&mutex);
block();
pthread_mutex_unlock(&mutex);

同时它还提供了一种非阻塞版本pthread_mutex_trylock。若尝试获取锁时发现互斥锁已经被锁定,或则超出了递归锁定的最大次数,则立即返回,不会挂起。只有在锁未被占用时才能成功加锁。

1
2
3
4
5
6
7
8
9
10
static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
int res = pthread_mutex_trylock(&mutex);
if(res == 0){
block();
pthread_mutex_unlock(&mutex);
}else if(res == EBUSY){
printf("由于 mutex 所指向的互斥锁已锁定,因此无法获取该互斥锁。");
}else if (res == EAGAIN){
printf("由于已超出了 mutex 的递归锁定最大次数,因此无法获取该互斥锁。");
}

3. NSLock、NSRecursiveLock

官方文档:

Warning
The NSLock class uses POSIX threads to implement its locking behavior. When sending an unlock message to an NSLock object, you must be sure that message is sent from the same thread that sent the initial lock message. Unlocking a lock from a different thread can result in undefined behavior.
You should not use this class to implement a recursive lock. Calling the lock method twice on the same thread will lock up your thread permanently. Use the NSRecursiveLock class to implement recursive locks instead.
Unlocking a lock that is not locked is considered a programmer error and should be fixed in your code. The NSLock class reports such errors by printing an error message to the console when they occur.

  • 其实现是基于pthread的。
  • 谁持有谁释放,试图释放由其他线程持有的锁是不合法的。
  • 如果用在需要递归嵌套加锁的场景时,需要使用其子类NSRecursiveLock。不是所有情况下都会引发递归调用,而NSLock在性能上要优于NSRecursiveLock。而当我们使用NSLock不小心造成死锁时,可以尝试将其替换为NSRecursiveLock。
  • lock与unlock是一一对应的,如果试图释放一个没有加锁的锁,会发生异常崩溃。而lock始终等不到对应的unlock会进入饥饿状态,让当前线程一直挂起。

使用方式:

1
2
3
4
5
6
7
8
9
10
11
BOOL moreToDo = YES;
NSLock *theLock = [[NSLock alloc] init];
//...
while (moreToDo) {
/* Do another increment of calculation */
/* until there’s no more to do. */
if ([theLock tryLock]) {
/* Update display used by all threads. */
[theLock unlock];
}
}
1
2
3
4
5
6
7
8
9
10
11
12
NSRecursiveLock *theLock = [[NSRecursiveLock alloc] init];
void MyRecursiveFunction(int value)
{
[theLock lock];
if (value != 0)
{
--value;
MyRecursiveFunction(value);
}
[theLock unlock];
}
MyRecursiveFunction(5);

4. @synchronized

1
2
3
4
5
6
7
- (void)myMethod:(id)anObj
{
@synchronized(anObj)
{
// Everything between the braces is protected by the @synchronized directive.
}
}

anObj 是一个唯一标识符,如果在两个不同线程中执行上述方法,并为anObj参数传递了不同的对象,则每个线程都会获得一个锁继续处理而不会被另一个阻塞,但如果传递相同对象,则其中一个线程会被阻塞,直到第一个线程完成。

@synchronized 具体源码实现 @synchronized 实现

@synchronized块会在受保护的代码中隐式添加一个异常处理程序,如果抛出异常,将自动释放互斥量。这意味着为了使用该指令,还须在代码中启用OC异常处理。
隐式添加的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
@try {
objc_sync_enter(obj);
// do work
} @finally {
objc_sync_exit(obj);
}

// 以上两个方法的注释
/**
* Begin synchronizing on 'obj'.
* Allocates recursive pthread_mutex associated with 'obj' if needed.
* 为传入的对象分配了一个递归锁,递归锁在同一线程不会引发死锁
* @param obj The object to begin synchronizing on.
*
* @return OBJC_SYNC_SUCCESS once lock is acquired.
*/
OBJC_EXPORT int
objc_sync_enter(id _Nonnull obj)
OBJC_AVAILABLE(10.3, 2.0, 9.0, 1.0, 2.0);

/**
* End synchronizing on 'obj'.
*
* @param obj The object to end synchronizing on.
*
* @return OBJC_SYNC_SUCCESS or OBJC_SYNC_NOT_OWNING_THREAD_ERROR
*/
OBJC_EXPORT int
objc_sync_exit(id _Nonnull obj)
OBJC_AVAILABLE(10.3, 2.0, 9.0, 1.0, 2.0);

enum {
OBJC_SYNC_SUCCESS = 0,
OBJC_SYNC_NOT_OWNING_THREAD_ERROR = -1
};

加锁代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
int objc_sync_enter(id obj)
{
int result = OBJC_SYNC_SUCCESS;

if (obj) {
// 可以看做是链表中的一个节点 关联了object与锁
SyncData* data = id2data(obj, ACQUIRE);
assert(data);
data->mutex.lock();
} else {
// @synchronized(nil) does nothing
if (DebugNilSync) {
_objc_inform("NIL SYNC DEBUG: @synchronized(nil); set a breakpoint on objc_sync_nil to debug");
}
objc_sync_nil();
}

return result;
}

// End synchronizing on 'obj'.
// Returns OBJC_SYNC_SUCCESS or OBJC_SYNC_NOT_OWNING_THREAD_ERROR
int objc_sync_exit(id obj)
{
int result = OBJC_SYNC_SUCCESS;
if (obj) {
SyncData* data = id2data(obj, RELEASE);
if (!data) {
result = OBJC_SYNC_NOT_OWNING_THREAD_ERROR;
} else {
bool okay = data->mutex.tryUnlock();
if (!okay) {
result = OBJC_SYNC_NOT_OWNING_THREAD_ERROR;
}
}
} else {
// @synchronized(nil) does nothing
}
return result;
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
//链表中的一个节点,关联object与lock,并且有一个nextdata指向下一个节点
typedef struct SyncData {
id object;
recursive_mutex_t mutex;
struct SyncData* nextData;
int threadCount; //此时使用这个锁的线程数量,因为 SyncData 结构体会被缓存,如果threadCount==0 说明这个SyncData实例可以被复用了
} SyncData;

typedef struct SyncList {
SyncData *data;
spinlock_t lock;
} SyncList;

// Use multiple parallel lists to decrease contention among unrelated objects.
#define COUNT 16
#define HASH(obj) ((((uintptr_t)(obj)) >> 5) & (COUNT - 1)) //哈希算法将对象所在的内存地址转化为无符号整型并右移五位,再跟 0xF 做按位与运算,这样结果不会超出数组大小。
#define LOCK_FOR_OBJ(obj) sDataLists[HASH(obj)].lock
#define LIST_FOR_OBJ(obj) sDataLists[HASH(obj)].data
static SyncList sDataLists[COUNT]; //声明一个SyncList 结构体数组大小为16

objc_sync_enter里没有持有传入的对象,假如对象在 “synchronized block” 中被设成 nil时 其他线程使用这个对象会一直阻塞吗?

1
2
3
4
5
6
7
8
9
10
11
12
NSNumber *number = @(1);
NSNumber *thisPtrWillGoToNil = number;
@synchronized (thisPtrWillGoToNil) {
thisPtrWillGoToNil = nil;
}
dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_BACKGROUND, 0), ^ {
NSCAssert(![NSThread isMainThread], @"Must be run on background thread");
@synchronized (number) {
NSLog(@"This line does indeed get printed to stdout");
}

});

这行代码还是会打印。OC处理了这种情形,可能是编译器做了如下处理

1
2
3
4
5
6
7
8
NSString *test = @"test";
id synchronizeTarget = (id)test;
@try {
objc_sync_enter(synchronizeTarget);
test = nil; //空操作
} @finally {
objc_sync_exit(synchronizeTarget);
}

自旋锁

自旋锁 与互斥锁有点类似,只是自旋锁被某线程占用时,其他线程不会进入睡眠(挂起)状态,而是一直运行(自旋/空转)直到锁被释放。由于不涉及用户态与内核态之间的切换,它的效率远远高于互斥锁。
虽然它的效率比互斥锁高,但是它也有些不足之处:

  • 自旋锁一直占用CPU,会降低CPU效率。在高并发执行的时候,或代码片段比较耗时,容易引发CPU占用率暴涨的风险
  • 使用自旋锁可能造成死锁,如递归调用时可能会造成死锁
  • 自旋锁可能引起优先级反转的问题。如果一个低优先级的线程获得锁并访问共享资源,这时一个高优先级的线程也尝试获得这个锁,自旋锁会处于忙等状态从而占用大量 CPU。此时低优先级线程无法与高优先级线程争夺 CPU 时间,从而导致任务迟迟完不成、无法释放 lock。再iOS10中建议替换OSSPinLock为os_unfair_lock。
    解决优先级反转有两种方法:优先级天花板和优先级继承

    1. 优先级天花板是当任务申请锁时,把该任务优先级提升到可访问这个资源的所有任务中的最高优先级。
    2. 优先级继承是当任务A申请共享资源S时,如果S正在被任务C使用,通过比较任务C与自身的优先级,如发现任务C优先级小于自身优先级,则将任务C的优先级提升到自身优先级。C释放资源后,在恢复C的优先级。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    // iOS 10以后使用
    os_unfair_lock_t unfairLock = &(OS_UNFAIR_LOCK_INIT);
    NSLog(@"线程1 准备上锁");
    os_unfair_lock_lock(unfairLock);
    sleep(4);
    NSLog(@"线程1");
    os_unfair_lock_unlock(unfairLock);
    NSLog(@"线程1 解锁成功");
    NSLog(@"---------------------------------------");

不在安全的OSSpinLock

信号量

dispatch_semaphore是GCD用于控制多线程并发的信号量,通过wait/signal的信号事件控制并发执行的最大线程数,信号量不支持递归.
当信号量为0时,dispatch_wait 会阻塞线程,可以利用这点特性实现控制代码块最大并发数,或将异步线程转为同步。

源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
long
dispatch_semaphore_signal(dispatch_semaphore_t dsema)
{
//对信号量执行+1操作
long value = os_atomic_inc2o(dsema, dsema_value, release);
// 如果值大于0 直接返回
if (likely(value > 0)) {
return 0;
}
if (unlikely(value == LONG_MIN)) {
DISPATCH_CLIENT_CRASH(value,
"Unbalanced call to dispatch_semaphore_signal()");
}
return _dispatch_semaphore_signal_slow(dsema);
}
DISPATCH_NOINLINE
long
_dispatch_semaphore_signal_slow(dispatch_semaphore_t dsema)
{
_dispatch_sema4_create(&dsema->dsema_sema, _DSEMA4_POLICY_FIFO);
_dispatch_sema4_signal(&dsema->dsema_sema, 1);
return 1;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
DISPATCH_NOINLINE
static long
_dispatch_semaphore_wait_slow(dispatch_semaphore_t dsema,
dispatch_time_t timeout)
{
long orig;

_dispatch_sema4_create(&dsema->dsema_sema, _DSEMA4_POLICY_FIFO);
switch (timeout) {
default:
if (!_dispatch_sema4_timedwait(&dsema->dsema_sema, timeout)) {
break;
}
// Fall through and try to undo what the fast path did to
// dsema->dsema_value
case DISPATCH_TIME_NOW:
orig = dsema->dsema_value;
while (orig < 0) {
if (os_atomic_cmpxchgvw2o(dsema, dsema_value, orig, orig + 1,&orig, relaxed)) {
return _DSEMA4_TIMEOUT();
}
}
// Another thread called semaphore_signal().
// Fall through and drain the wakeup.
case DISPATCH_TIME_FOREVER:
_dispatch_sema4_wait(&dsema->dsema_sema);
break;
}
return 0;
}

long
dispatch_semaphore_wait(dispatch_semaphore_t dsema, dispatch_time_t timeout)
{
// 信号量-1
long value = os_atomic_dec2o(dsema, dsema_value, acquire);
// 如果值大于等于0 直接返回
if (likely(value >= 0)) {
return 0;
}
// 否则开始阻塞当前线程
return _dispatch_semaphore_wait_slow(dsema, timeout);
}

条件锁

1. NSCondition 条件锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
NSCondition *cLock = [NSCondition new];
//线程1
dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{
[cLock lock];
NSLog(@"线程1加锁成功");
[cLock wait];
NSLog(@"线程1");
[cLock unlock];
});

//线程2
dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{
[cLock lock];
NSLog(@"线程2加锁成功");
[cLock wait];
NSLog(@"线程2");
[cLock unlock];
});

dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{
sleep(2);
NSLog(@"唤醒一个等待的线程");
[cLock signal];
//[cLock broadcast] 唤醒所有等待的线程
});

输出
线程1加锁成功
线程2加锁成功
唤醒一个等待的线程
线程1

2. NSConditionLock

条件锁,可以用于实现任务间的依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
 NSConditionLock *cLock = [[NSConditionLock alloc] initWithCondition:0];

//线程1
dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{
if([cLock tryLockWhenCondition:0]){
NSLog(@"线程1");
[cLock unlockWithCondition:1];
}else{
NSLog(@"失败");
}
});
//线程2
dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{
[cLock lockWhenCondition:3];
NSLog(@"线程2");
[cLock unlockWithCondition:2];
});
//线程3
dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{
[cLock lockWhenCondition:1];
NSLog(@"线程3");
[cLock unlockWithCondition:3];
});
输出:
线程1
线程3
线程2

读写锁

读写锁 从广义的逻辑上讲,也可以认为是一种共享版的互斥锁。如果对一个临界区大部分是读操作而只有少量的写操作,读写锁在一定程度上能够降低线程互斥产生的代价。

对于同一个锁,读写锁有两种获取锁的方式:共享(share)方式,独占(Exclusive)方式。写操作独占,读操作共享
读写锁状态 | 以共享方式获取(读操作) | 以独占方式获取(写操作)
———|———|———
自由 | 成功 | 成功
共享 | 成功 | 等待
独占 | 等待 | 等待

//读
    pthread_rwlock_rdlock(&rwLock);
    pthread_rwlock_unlock(&rwLock);
//写
    pthread_rwlock_wrlock(&rwLock);
    pthread_rwlock_unlock(&rwLock);

参考

线程同步及线程锁
关于 @synchronized,这儿比你想知道的还要多