Skynet专题之:锁

JavenLaw

原子操作 和 锁 都用于处理多线程环境下的并发访问问题,因此下面将统一解释它们


原子操作&锁的区别

在并发编程中

原子性是指一个操作要么完全执行,要么完全不执行,没有中间状态

它是在底层硬件或操作系统级别执行的操作,是不可中断的单个操作,具有原子性和互斥性

原子性操作的目的是:保证在多线程环境下对共享数据的操作不会出现竞态条件

锁是一种同步机制,用于实现临界区的互斥访问

它保证同一时间只有一个线程可以获得锁并执行临界区代码,避免多个线程同时修改共享数据而导致的数据竞争问题

锁的作用是提供互斥性,但锁并不保证临界区内的操作是原子的

锁的原子性体现在:

​ 当一个线程获得了锁并进入临界区时,它可以确保在持有锁的期间不会被其他线程打断

​ 这确实具有一种原子性,即在给定的上下文中,临界区的执行是不可中断的

锁的目的是:保护共享资源,以确保在任何给定时间只有一个线程可以访问临界区

第一,各自的作用

原子:

  • 原子操作用于对共享数据进行原子性的读取和修改,确保多个线程对同一数据进行操作时不会引发竞态条件和数据不一致的问题
  • 原子操作可以保证某个特定操作在执行期间不会被其他线程中断,从而确保操作的完整性和一致性

锁:

  • 锁用于实现临界区的互斥访问,确保同一时间只有一个线程可以进入临界区执行操作,从而避免多个线程同时修改共享数据而导致的数据竞争问题
  • 锁可以保证在一个线程执行临界区代码时,其他线程会被阻塞,等待当前线程释放锁后才能继续执行

第二,区别和好处

粒度:

  • 原子操作通常用于对单个变量或对象的操作,提供了更细粒度的并发控制。它可以在不阻塞其他线程的情况下,对共享数据进行原子性的读取和修改
  • 锁通常用于对一段代码(临界区)的访问控制,提供了更粗粒度的并发控制。它可以确保同一时间只有一个线程可以执行临界区代码

开销:

  • 原子操作通常比锁的开销更小,因为它不需要上下文切换和线程阻塞等额外开销。原子操作使用硬件级别的原子指令来实现,执行速度较快
  • 锁的实现可能涉及线程调度和上下文切换,需要更多的开销。当临界区的代码较长或复杂时,锁的开销可能会更高

场景:

  • 原子操作适用于对共享数据进行简单的读取和修改操作,如计数器、标志位等。它们可以在不阻塞其他线程的情况下,保证数据的一致性
  • 锁适用于需要对一段代码进行互斥访问的情况,如修改共享数据的复杂算法、数据结构的更新等。它们可以确保在同一时间只有一个线程执行临界区代码,避免数据竞争和不一致性


内存顺序

在开始锁的实现之前,有些必要的知识需要了解,可以先了解《Skynet专题之:原子操作》

即原子主要分为3类:

​ C/C++标准:atomic 头文件 + stdatomic.h 头文件

​ GCC实现:GCC 编译器提供了一些内建函数

​ POSIX标准:pthread.h 头文件

我们的自旋锁也是3种实现方式

内存顺序

​ 假设有两个线程 A 和 B,它们共享一个整型变量 flag 用于表示某个条件是否满足

​ 线程 A 负责设置 flag,表示某个事件已经发生

​ 线程 B 负责检查 flag,线程 B 需要等待 flag 被设置为 1 后才能继续执行

​ 我们希望当 flag 被设置时,线程 B 能够立即看到这个变化

​ 但线程 A 设置 flag 的操作可能被编译器或硬件重排序到之后

​ 这样线程 B 可能会一直处于等待状态

​ 即:

​ 如果没有内存序的保证,那么编译器或处理器可能会对指令进行重排序,导致以下情况发生:

​ 1,线程 A 先设置 flag 为 1,然后再做其他操作

​ 2,线程 B 在检查 flag 时,可能会在 flag 被设置为 1 之前就开始检查,导致它不会等待 flag 真正被设置为 1

​ 使用适当的内存顺序可以解决这个问题

​ 例如

​ A在设置 flag 时使用 std::memory_order_release

​ B在检查 flag 时使用 std::memory_order_acquire

​ 这样可以确保写操作先于读操作,并且能够保证线程 B 能够正确地看到线程 A 设置的 flag 值

atomic_test_and_set 和 __sync_lock_test_and_set 都是用于原子操作的函数,但是有一些区别

1,atomic_test_and_set 通常用于在 C 语言中实现原子操作 它的原型通常是 int atomic_test_and_set(int *ptr, int value),表示将 *ptr 的值设置为 value,并返回 *ptr 原来的值 这个函数在一些系统中可能不是原子的,因此在使用时需要谨慎

2,__ sync_lock_test_and_set 是 GCC 内置的函数,用于实现原子操作 它的原型通常是 int __sync_lock_test_and_set(int *ptr, int value),表示将 *ptr 的值设置为 value,并返回 *ptr 原来的值 这个函数在 GCC 中是原子的,不过它是 GCC 特有的,不具有跨平台性。

总的来说,这两个函数都可以用于实现原子操作,但是 __sync_lock_test_and_set 更多的是在 GCC 中使用

可以看到:sync_lock_test_and_set 函数就是用的GCC编译器内部的函数 被定义为: #define atomic_flag_test_and_set_(ptr) __ sync_lock_test_and_set(ptr, 1)

可以看到:atomic_exchange_explicit()函数是C++标准中定义的原子操作函数 被定义为:#define atomic_test_and_set_(ptr) STD _ atomic_exchange_explicit()

其并没有使用:atomic_test_and_set函数来定义atomic_test_and_set_(ptr)

而是使用atomic_exchange_explicit(),为什么呢?

这就是因为内存顺序的问题

请查看后面自旋锁的实现(spinlock.h)

atomic_test_and_set_ 本身并没有内存顺序的概念。它只是一个简单的原子操作,用于设置一个原子标志并返回之前的值

如果直接使用 atomic_test_and_set_ 宏来实现自旋锁,而没有指定内存序,那么就不能保证内存序

这可能导致编译器或硬件对内存访问进行优化,可能会影响锁的正确性

因此,在实现类似的原子操作时,确保正确地指定适当的内存序是非常重要的

而使用了 atomic_exchange_explicit 来实现 atomic_test_and_set_,并指定了 std::memory_order_acquire 内存顺序

这是为了确保在设置标志后对后续加载操作的可见性

memory_order_acquire:这个内存序保证当前操作以及之前的读操作不会被后续的加载或存储操作重排序到这个操作之后。它确保当前操作对后续的加载操作可见

memory_order_release:这个内存序保证当前操作以及之后的写操作不会被前面的加载或存储操作重排序到这个操作之前。它确保当前操作不会影响之前的加载操作

memory_order_relaxed:这个内存序没有顺序约束,允许当前操作和其他操作重排序,只要不会改变数据的依赖关系。这个内存序通常用于不需要同步的场景,可以获得最高的性能


自旋锁(spinlock)

自旋锁在尝试获取锁时不会立即进入阻塞状态,而是通过不断地循环检测锁的状态(自旋),直到获取到锁为止

优点:

​ 避免上下文切换: 在尝试获取锁时不会进入阻塞状态,因此避免了线程在用户态和内核态之间切换的开销,对于短期占用锁的情况,可以提高性能

​ 等待时间短: 适用于临界区代码执行时间非常短的情况,因为自旋等待的时间一般比较短

​ 实时性: 可以保证在获取锁之前不会被其他线程抢占,适用于对实时性要求较高的场景

缺点:

​ 占用 CPU 资源: 在尝试获取锁时会一直占用 CPU 进行自旋,如果锁被占用的时间较长,会造成 CPU 资源的浪费

​ 优先级反转: 如果一个高优先级的线程在自旋等待锁时被一个低优先级的线程抢占,可能会导致优先级反转问题

​ 死锁: 自旋锁可能会导致死锁问题,例如两个线程互相持有对方需要的资源并在自旋等待对方释放资源

总的来说,自旋锁适用于对锁的占用时间较短、对实时性要求较高的情况,但需要注意避免长时间占用 CPU 资源和可能引发的优先级反转和死锁问题

在实际使用时,需要根据具体情况综合考虑选择合适的同步机制

先看下最简单的用C语言写的自旋锁的例子

// 特别说明:网页显示问题,以下代码的 预处理语句 都省略了 #
include <stdio.h>
include <stdlib.h>
include <stdatomic.h>
include <pthread.h>

// 定义自旋锁结构
typedef struct {
    atomic_flag lock;
} spinlock_t;
// atomic_flag 是 C11 标准库中的一种最简单的原子类型,专门用于实现低级的同步原语
// atomic_flag 只提供了两个操作:测试并设置(atomic_flag_test_and_set)和清除(atomic_flag_clear)

// 初始化自旋锁
void spinlock_init(spinlock_t *s) {
    atomic_flag_clear(&s->lock); //将 s->lock 标志初始化为未设置状态,这表示锁是空闲的
}
// atomic_flag_clear 是 C11 标准库中的一个函数,用于清除(即重置)一个 atomic_flag 对象的状态,使其未被设置
// 它是原子操作的一部分,确保对标志的操作是线程安全的

// 自旋锁上锁
void spinlock_lock(spinlock_t *s) {
    // 尝试设置标志。如果标志已经被设置,则进入自旋等待状态,直到锁被释放
    while (atomic_flag_test_and_set(&s->lock)) { 
        // 自旋等待,什么都不做
    }
}
// atomic_flag_test_and_set 的主要作用是将 atomic_flag 设置为已设置状态,并返回标志在设置之前的状态
// 这一操作是原子的,即它在硬件层面上保证了操作的不可分割性
// 通常用于实现自旋锁(spin lock)等低级同步机制
// 通过这个函数,可以确保只有一个线程成功地将标志从未设置状态变为已设置状态,从而获得锁
// 其他线程在尝试获取锁时会发现标志已被设置,因此会进入自旋等待

// 自旋锁解锁
void spinlock_unlock(spinlock_t *s) {
    atomic_flag_clear(&s->lock); // 将 s->lock 标志重置为未设置状态,表示释放锁
}

// 共享资源
int shared_resource = 0;
spinlock_t spinlock;

// 线程函数
void* thread_func(void* arg) {
    for (int i = 0; i < 100000; ++i) {
        spinlock_lock(&spinlock);
        ++shared_resource;
        spinlock_unlock(&spinlock);
    }
    return NULL;
}

int main() {
    // 创建10个线程,执行thread_func
    pthread_t threads[10];
	
    // 对自旋锁进行初始化
	spinlock_init(&spinlock);

	// 创建线程,并执行thread_func
    for (int i = 0; i < 10; ++i) {
        pthread_create(&threads[i], NULL, thread_func, NULL);
    }
	
    // 等待所有线程完成
    for (int i = 0; i < 10; ++i) {
        pthread_join(threads[i], NULL);
    }

	printf("Final value of shared resource: %d\n", shared_resource);

	return 0;
}

再来看看Skynet中的自旋锁实现

// 特别说明:网页显示问题,以下代码的 预处理语句 都省略了 #
// spinlock.h
ifndef SKYNET_SPINLOCK_H
	define SKYNET_SPINLOCK_H

	// 这上面是进行统一定义
	define SPIN_INIT(q) spinlock_init(&(q)->lock);
	define SPIN_LOCK(q) spinlock_lock(&(q)->lock);
	define SPIN_UNLOCK(q) spinlock_unlock(&(q)->lock);
	define SPIN_DESTROY(q) spinlock_destroy(&(q)->lock);

	// 查看Skynet项目文件中的Makefile,第9行有个注释
	// # CFLAGS += -DUSE_PTHREAD_LOCK
	// 即你可以把注释去掉,则USE_PTHREAD_LOCK会被定义
	// 
	// 现在USE_PTHREAD_LOCK是没有定义的
	// 因此直接进入下面的代码
	ifndef USE_PTHREAD_LOCK

		// __STDC_NO_ATOMICS__ 是一个预定义宏,用于指示编译器是否支持 C11 中的 <stdatomic.h> 头文件中定义的原子操作
		// 如果__STDC_NO_ATOMICS__ 没有被定义,编译器支持 C11 的原子操作,
		// 如果__STDC_NO_ATOMICS__ 被定义,则表示编译器不支持 C11 的原子操作
		ifdef __STDC_NO_ATOMICS__
			// 这里表示编译器不支持 C11 的原子操作,因此需要下面的方式来实现原子操作
			// 即:使用GCC编译器内置的函数来代替POSIX标准中定义的原子操作函数
			
            define atomic_flag_ int
            define ATOMIC_FLAG_INIT_ 0
            define atomic_flag_test_and_set_(ptr) __sync_lock_test_and_set(ptr, 1)
            define atomic_flag_clear_(ptr) __sync_lock_release(ptr)
			
            // 自旋锁结构
            struct spinlock {
                atomic_flag_ lock;
            };

            // 自旋锁初始化,并使用内联函数
            static inline void
            spinlock_init(struct spinlock *lock) {
                atomic_flag_ v = ATOMIC_FLAG_INIT_;
                lock->lock = v;
            }

            // 自旋锁上锁,并使用内联函数
            static inline void
            spinlock_lock(struct spinlock *lock) {
                while (atomic_flag_test_and_set_(&lock->lock)) {}
            }

            // 自旋锁尝试上锁
            static inline int
            spinlock_trylock(struct spinlock *lock) {
                return atomic_flag_test_and_set_(&lock->lock) == 0;
            }

            // 自旋锁解锁,并使用内联函数
            static inline void
            spinlock_unlock(struct spinlock *lock) {
                atomic_flag_clear_(&lock->lock);
            }

            // 销毁自旋锁
            static inline void
            spinlock_destroy(struct spinlock *lock) {
                (void) lock;
            }

		else  // __STDC_NO_ATOMICS__
			// 编译器支持 C11 中的原子操作,不需要用GCC编译器内置的函数来代替POSIX标准中定义的原子操作函数
			// 即:使用 atomic.h文件 即可
			
            include "atomic.h"
			
			// 实际使用 atomic_exchange_explicit 代替 atomic_test_and_set 来实现
            define atomic_test_and_set_(ptr) STD_ atomic_exchange_explicit(ptr, 1, STD_ memory_order_acquire)
            define atomic_clear_(ptr) STD_ atomic_store_explicit(ptr, 0, STD_ memory_order_release);
            define atomic_load_relaxed_(ptr) STD_ atomic_load_explicit(ptr, STD_ memory_order_relaxed)

            if defined(__x86_64__)
            	#include <immintrin.h> // For _mm_pause
            	#define atomic_pause_() _mm_pause()
            else
            	#define atomic_pause_() ((void)0)
            endif

			// 自旋锁结构
            struct spinlock {
                STD_ atomic_int lock;
            };

			// 自旋锁初始化,并使用内联函数
            static inline void
            spinlock_init(struct spinlock *lock) {
                STD_ atomic_init(&lock->lock, 0);
            }

			// 自旋锁上锁,并使用内联函数
            static inline void
            spinlock_lock(struct spinlock *lock) {
                for (;;) {
                    if (!atomic_test_and_set_(&lock->lock))
                        return;
                    while (atomic_load_relaxed_(&lock->lock))
                        atomic_pause_();
                }
            }

			// 自旋锁尝试上锁
            static inline int
            spinlock_trylock(struct spinlock *lock) {
                return !atomic_load_relaxed_(&lock->lock) &&
                    !atomic_test_and_set_(&lock->lock);
            }
			
			// 自旋锁解锁,并使用内联函数
            static inline void
            spinlock_unlock(struct spinlock *lock) {
                atomic_clear_(&lock->lock);
            }

			// 销毁自旋锁
            static inline void
            spinlock_destroy(struct spinlock *lock) {
                (void) lock;
            }

		endif  // __STDC_NO_ATOMICS__

	else
		// 查看Skynet项目文件中的Makefile,第9行有个注释
		// # CFLAGS += -DUSE_PTHREAD_LOCK
		// 即你可以把注释去掉,则USE_PTHREAD_LOCK会被定义
		// 
		// 这里的情况分支是:USE_PTHREAD_LOCK被定义了
		// 因此直接将使用pthread.h的来实现自旋锁

		include <pthread.h>
		
		// 某些原因,这里使用互斥锁来代替实现自旋锁
		// 可能是因为互斥锁的优点
		// 1,阻塞等待:当互斥锁不可用时,线程会被阻塞并让出CPU,这样其他线程可以使用CPU资源,这对于锁持有时间较长的情况尤其有用
		// 2,可重入:多数互斥锁实现是可重入的,即同一个线程可以多次获取同一个互斥锁而不会死锁
		// 3,调度友好:因为阻塞等待,互斥锁更适合于多任务系统,避免了忙等待对系统性能的影响
		
		// we use mutex instead of spinlock for some reason
		// you can also replace to pthread_spinlock

		// 自旋锁结构
        struct spinlock {
            pthread_mutex_t lock;
        };

		// 自旋锁初始化,并使用内联函数
        static inline void
        spinlock_init(struct spinlock *lock) {
            pthread_mutex_init(&lock->lock, NULL);
        }

		// 自旋锁上锁,并使用内联函数
        static inline void
        spinlock_lock(struct spinlock *lock) {
            pthread_mutex_lock(&lock->lock);
        }

		// 自旋锁尝试上锁
        static inline int
        spinlock_trylock(struct spinlock *lock) {
            return pthread_mutex_trylock(&lock->lock) == 0;
        }

		// 自旋锁解锁,并使用内联函数
        static inline void
        spinlock_unlock(struct spinlock *lock) {
            pthread_mutex_unlock(&lock->lock);
        }

		// 销毁自旋锁
        static inline void
        spinlock_destroy(struct spinlock *lock) {
            pthread_mutex_destroy(&lock->lock);
        }

	endif

endif


读写锁(reader-writer lock)

读写锁允许多个线程同时读取共享资源,但只允许一个线程写入共享资源

优点:

​ 并发性提高: 允许多个线程同时读取共享资源,因此可以提高并发性和系统的吞吐量

​ 读操作优先: 读允许读操作的并发执行,因此适用于读操作频繁、写操作相对较少的场景

​ 读写锁在读操作远远多于写操作时,能够避免写操作的饥饿

​ 降低锁竞争: 读写锁将共享资源的读取和写入操作分开,可以降低锁的竞争,提高系统的性能

​ 粒度更细: 与互斥锁相比,读写锁的粒度更细,允许多个线程同时读取共享资源,因此能够更好地利用多核处理器的性能

缺点:

​ 写者优先问题: 如果写操作频繁,读写锁可能导致读操作的线程长时间等待,造成写者优先(Writer-Preferencing)问题,影响系统的响应性能

​ 锁开销: 读写锁的实现通常会引入额外的开销,包括锁的管理和维护开销,可能会影响系统的性能

​ 复杂性增加:可能会增加代码的复杂性,需要更加细致地考虑共享资源的读取和写入操作,以及读写锁的获取和释放策略

总的来说,读写锁适用于读操作频繁、写操作相对较少的场景,可以提高系统的并发性和性能

但在某些情况下,由于写者优先问题或额外的锁开销,可能需要谨慎使用读写锁

再来看看Skynet中的自旋锁实现

说明:读写锁的实现和自旋锁的实现方式,是极其类似的,详细的可以看看自旋锁的代码解释

注意:无论是自旋锁还是读写锁,本质都是使用了原子操作,一定得看看《Skynet专题之:原子操作》

// 特别说明:网页显示问题,以下代码的 预处理语句 都省略了 #
ifndef SKYNET_RWLOCK_H
	define SKYNET_RWLOCK_H

	// 查看Skynet项目文件中的Makefile,第9行有个注释
	// # CFLAGS += -DUSE_PTHREAD_LOCK
	// 即你可以把注释去掉,则USE_PTHREAD_LOCK会被定义
	// 
	// 现在USE_PTHREAD_LOCK是没有定义的
	// 因此直接进入下面的代码
	ifndef USE_PTHREAD_LOCK

		include "atomic.h"

        struct rwlock {
            ATOM_INT write;
            ATOM_INT read;
        };

        static inline void
        rwlock_init(struct rwlock *lock) {
            ATOM_INIT(&lock->write, 0);
            ATOM_INIT(&lock->read, 0);
        }

        static inline void
        rwlock_rlock(struct rwlock *lock) {
            for (;;) {
                while(ATOM_LOAD(&lock->write)) {}
                ATOM_FINC(&lock->read);
                if (ATOM_LOAD(&lock->write)) {
                    ATOM_FDEC(&lock->read);
                } else {
                    break;
                }
            }
        }

        static inline void
        rwlock_wlock(struct rwlock *lock) {
            while (!ATOM_CAS(&lock->write,0,1)) {}
            while(ATOM_LOAD(&lock->read)) {}
        }

        static inline void
        rwlock_wunlock(struct rwlock *lock) {
            ATOM_STORE(&lock->write, 0);
        }

        static inline void
        rwlock_runlock(struct rwlock *lock) {
            ATOM_FDEC(&lock->read);
        }

	else
		// 读写锁也有通过 pthread.h 的方式
		// 跟自旋锁通过 pthread.h 来实现的方式差不多
        include <pthread.h>

        // only for some platform doesn't have __sync_*
        // todo: check the result of pthread api

        struct rwlock {
            pthread_rwlock_t lock;
        };

        static inline void
        rwlock_init(struct rwlock *lock) {
            pthread_rwlock_init(&lock->lock, NULL);
        }

        static inline void
        rwlock_rlock(struct rwlock *lock) {
             pthread_rwlock_rdlock(&lock->lock);
        }

        static inline void
        rwlock_wlock(struct rwlock *lock) {
             pthread_rwlock_wrlock(&lock->lock);
        }

        static inline void
        rwlock_wunlock(struct rwlock *lock) {
            pthread_rwlock_unlock(&lock->lock);
        }

        static inline void
        rwlock_runlock(struct rwlock *lock) {
            pthread_rwlock_unlock(&lock->lock);
        }

	endif

endif


互斥锁(mutex lock)

互斥锁常用于保护共享资源,确保在任何时刻只有一个线程可以访问共享资源

优点:

​ 确保数据一致性:在同一时刻只有一个线程访问共享资源,从而避免数据竞争和数据不一致性

​ 避免死锁: 合理使用互斥锁可以避免死锁情况的发生,确保程序的正常运行

​ 易于使用: 互斥锁的接口简单,易于使用,能够帮助开发人员管理共享资源的访问

缺点:

​ 性能开销: 由于互斥锁需要在锁的获取和释放过程中进行上下文切换和内核态操作,因此会引入一定的性能开销,特别是在高并发情况下

​ 可能引起饥饿: 如果某个线程持有互斥锁的时间过长,可能会导致其他线程长时间等待,引起饥饿问题

​ 可能引起优先级反转: 当一个低优先级的线程持有锁时,高优先级的线程无法访问共享资源,可能导致优先级反转问题

互斥锁是保护共享资源的重要工具,能够确保数据一致性和避免竞态条件

但在使用时需要注意性能开销和可能引起的饥饿和优先级反转问题

因为在自旋锁的 pthread.h 的实现方式种,使用了 pthread_mutex 来实现

因此在这里不再重复记录

在Skynet框架中,还在另一个地方使用了pthread_mutex

就是在监视线程的部分,使用了pthread_mutex

因此我把这部分的知识移动了,详见《Skynet专题之:线程》