Py学习  »  Python

使用 python 实现简单的共享锁和排他锁

小米运维 • 5 年前 • 420 次点击  
阅读 62

使用 python 实现简单的共享锁和排他锁

本文通过代码实操讲解了如何使用 python 实现简单的共享锁和排他锁。
上篇文章回顾:记一次容量提升5倍的HttpDns业务Cache调优

共享锁和排它锁

1、什么是共享锁

共享锁又称为读锁。

从多线程的角度来讲,共享锁允许多个线程同时访问资源,但是对写资源只能又一个线程进行。

从事务的角度来讲,若事务 T 对数据 A 加上共享锁,则事务 T 只能读 A; 其他事务也只能对数据 A 加共享锁,而不能加排他锁,直到事务 T 释放 A 上的 S 锁。这就保证了其他事务可以读 A,但是在事务 T 释放 A 上的共享锁之前,不能对 A 做任何修改。

2、什么是排它锁

排他锁又成为写锁。

从多线程的角度来讲,在访问共享资源之前对进行加锁操作,在访问完成之后进行解锁操作。 加锁后,任何其他试图再次加锁的线程会被阻塞,直到当前进程解锁。如果解锁时有一个以上的线程阻塞,那么所有该锁上的线程都被编程就绪状态, 第一个变为就绪状态的线程又执行加锁操作,那么其他的线程又会进入等待。 在这种方式下,只有一个线程能够访问被互斥锁保护的资源。

从事务的角度来讲,若事务T对数据对象A加上排它锁,则只允许T读取和修改数据A,其他任何事务都不能再对A加任何类型的锁,直到事务T释放X锁。它可以防止其他事务获取资源上的锁,直到事务末尾释放锁。

InnoDB 中的行锁

InnoDB实现了以下两种类型的行锁:

共享锁(S):允许一个事务去读一行,阻止其他事务获得相同数据集的排他锁。

排他锁(X):允许获得排他锁的事务更新数据,阻止其他事务取得相同数据集的共享读锁和排他写锁。

另外,为了允许行锁和表锁共存,实现多粒度锁机制,InnoDB 还有两种内部使用的意向锁(Intention Locks),这两种意向锁都是表锁。

意向共享锁(IS):事务打算给数据行加行共享锁,事务在给一个数据行加共享锁前必须先取得该表的 IS 锁。

意向排他锁(IX)事务打算给数据行加行排他锁,事务在给一个数据行加排他锁前必须先取得该表的 IX 锁。

如果一个事务请求的锁模式与当前的锁兼容,InnoDB 就将请求的锁授予该事务;反之,如果两者不兼容,该事务就要等待锁释放。

意向锁是 InnoDB 自动加的,不需用户干预。对于 UPDATE、DELETE 和 INSERT 语句,InnoDB 会自动给涉及数据集加排他锁(X);对于普通SELECT语句,InnoDB 不会加任何锁;事务可以通过以下语句显示给记录集加共享锁或排他锁。

共享锁(S):

SELECT * FROM table_name WHERE ... LOCK IN SHARE MODE复制代码

排他锁(X):

SELECT * FROM table_name WHERE ... FOR UPDATE复制代码

用 SELECT ... IN SHARE MODE获得共享锁,主要用在需要数据依存关系时来确认某行记录是否存在,并确保没有人对这个记录进行UPDATE或者DELETE操作。但是如果当前事务也需要对该记录进行更新操作,则很有可能造成死锁,对于锁定行记录后需要进行更新操作的应用,应该使用 SELECT... FOR UPDATE 方式获得排他锁。

使用Python实现

1、代码实现

不多说,直接上代码:

# -*- coding: utf-8 -*-import threadingclass Source:    # 队列成员标识    __N = None    # 排他锁    __X = 0    # 意向排他锁    __IX = 1    # 共享锁标识    __S = 2    # 意向共享标识    __IS = 3    # 同步排他锁    __lockX = threading.Lock()       # 事件通知    __events = [        threading.Event(),        threading.Event(),        threading.Event(),        threading.Event()   ]        # 事件通知队列    __eventsQueue = [       [],       [],       [],       []   ]        # 事件变更锁    __eventsLock = [        threading.Lock(),        threading.Lock(),        threading.Lock(),        threading.Lock()   ]         # 相互互斥的锁    __mutexFlag = {}         # 锁类    class __ChildLock:        # 锁标识        __flag = 0        # 锁定的资源        __source = None        def __init__(self, source, flag):            self.__flag = flag            self.__source = source                # 加锁        def lock(self):            self.__source.lock(self.__flag)                        # 解锁        def unlock(self):            self.__source.unlock(self.__flag)     def __init__(self):        self.__initMutexFlag()        self.__initEvents()        # 不建议直接在外面使用,以免死锁    def lock(self, flag):        # 如果是排他锁,先进进行枷锁        if flag == self.__X: self.__lockX.acquire()        self.__events[flag].wait()        self.__lockEvents(flag)            # 不建议直接在外面使用,以免死锁    def unlock(self, flag):        self.__unlockEvents(flag)                if flag == self.__X: self.__lockX.release()            # 获取相互互斥    def __getMutexFlag(self, flag):        return self.__mutexFlag[flag]        def __initMutexFlag(self):        self.__mutexFlag[self.__X] = [self.__X, self.__IX, self.__S, self.__IS]        self.__mutexFlag[self.__IX] = [self.__X, self.__S]        self.__mutexFlag[self.__S] = [self.__X, self.__IX]        self.__mutexFlag[self.__IS] = [self.__X]            def __initEvents(self):        for event in self.__events:            event.set()            # 给事件加锁, 调用 wait 时阻塞    def __lockEvents(self, flag):        mutexFlags = self.__getMutexFlag(flag)                for i in mutexFlags:                        # 为了保证原子操作,加锁            self.__eventsLock[i].acquire()            self.__eventsQueue[i].append(self.__N)            self.__events[i].clear()            self.__eventsLock[i].release()        # 给事件解锁, 调用 wait 不阻塞    def __unlockEvents(self, flag):        mutexFlags = self.__getMutexFlag(flag)                for i in mutexFlags:                        # 为了保证原子操作,加锁            self.__eventsLock[i].acquire()            self.__eventsQueue[i].pop(0)                        if len(self.__eventsQueue[i]) == 0: self.__events[i].set()            self.__eventsLock[i].release()        # 获取锁    def __getLock(self, flag):        lock = self.__ChildLock(self, flag)        lock.lock()                return lock       # 获取 X 锁    def lockX(self):        return self.__getLock(self.__X)        # 获取 IX 锁    def lockIX(self):        return self.__getLock(self.__IX)            # 获取 S 锁    def lockS(self):        return self.__getLock(self.__S)            # 获取 IS 锁    def lockIS(self):        return self.__getLock(self.__IS)复制代码

使用方式:

from lock import Source# 初始化一个锁资源source = Source()# 获取资源的X锁,获取不到则线程被阻塞,获取到了继续往下执行lock = source.lockX() lock.unlock()lock = source.lockIX() lock.unlock()lock = source.lockS() lock.unlock()lock = source.lockIS() lock.unlock()复制代码

2、实现思路

以 S 锁为例,获取锁的步骤如下:

  • 检测 S 锁是否可以取到,取到了话继续执行,没有取到则阻塞,等待其他线程解锁唤醒。

  • 获取与 S 锁相互冲突的锁(IX,X),并将 IX 锁和 X 锁 锁住,后续想获得 IX 锁或者 X 锁的线程就会被阻塞。

  • 向 IX 锁和 X 锁的标识队列插入标识,如果此时另外一个线程拿到了 IS 锁,则会继续想 IX 锁队列标识插入标识。

  • 完成加锁,返回 S 锁。

以 S 锁为例,解锁的步骤如下:

  • 获取与 S 锁相互冲突的锁(IX,X),向 IX 锁和 X 锁的标识队列移除一个标识。

  • 判断 IX 锁和 X 锁队列标识是否为空,如果不为空,则继续锁定,为空则解锁并唤醒被 IX 锁和 X 锁阻塞的线程。

  • 完成 S 锁解锁。

3、锁兼容测试

测试代码




    
# -*- coding: utf-8 -*-import threadingimport timefrom lock import Source# 初始化资源source= Source()maplockname=['X','IX','S','IS']class MyThread(threading,Thread):    flag = None    def __init__(self, flag):        super().__init__()        self.flag = flag    def run(self):        lock = self.lock()        time1 = time.time()        strtime1 = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time1))        print('我拿到 %s 锁,开始执行了喔,现在时间是 %s' % (maplockname[self.flag], strtime1))        time.sleep(1)        time2 = time.time()        strtime2 = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time2))        print('我释放 %s 锁,结束执行了,现在时间是 %s' % (maplockname[self.flag], strtime2))        lock.unlock()       def lock(self):        if self.flag == 0:                  return source.lockX()                elif self.flag == 1:               return source.lockIX()               elif self.flag == 2:                         return source.lockS()                else:                         return source.lockIS()             def unlock(self, lock):        lock.unlock()def test_lock():    for x in range(0, 4):                  for y in range(0, 4):                time1 = time.time()                thread1 = MyThread(x)                thread2 = MyThread(y)                thread1.start()                thread2.start()                thread1.join()                thread2.join()                time2 = time.time()                difftime = time2 - time1                            if difftime > 2:                     print('%s 锁和 %s 锁 冲突了!' % (maplockname[x], maplockname[y]))                            elif difftime > 1:                     print('%s 锁和 %s 锁 没有冲突!' % (maplockname[x], maplockname[y]))            print('')if __name__ == '__main__':    test_lock()复制代码

运行结果:

我拿到 X 锁了,开始执行了喔,现在时间是 2019-02-17 18:38:09我释放 X 锁了,结束执行了,现在时间是 2019-02-17 18:38:10我拿到 X 锁了,开始执行了喔,现在时间是 2019-02-17 18:38:10我释放 X 锁了,结束执行了,现在时间是 2019-02-17 18:38:11X 锁和 X 锁 冲突了!我拿到 X 锁了,开始执行了喔,现在时间是 2019-02-17 18:38:11我释放 X 锁了,结束执行了,现在时间是 2019-02-17 18:38:12我拿到 IX 锁了,开始执行了喔,现在时间是 2019-02-17 18:38:12我释放 IX 锁了,结束执行了,现在时间是 2019-02-17 18:38:13X 锁和 IX 锁 冲突了!我拿到 X 锁了,开始执行了喔,现在时间是 2019-02-17 18:38:13我释放 X 锁了,结束执行了,现在时间是 2019-02-17 18:38:14我拿到 S 锁了,开始执行了喔,现在时间是 2019-02-17 18:38:14我释放 S 锁了,结束执行了,现在时间是 2019-02-17 18:38:15X 锁和 S 锁 冲突了!我拿到 X 锁了,开始执行了喔,现在时间是 2019-02-17 18:38:15我释放 X 锁了,结束执行了,现在时间是 2019-02-17 18:38:16我拿到 IS 锁了,开始执行了喔,现在时间是 2019-02-17 18:38:16我释放 IS 锁了,结束执行了,现在时间是 2019-02-17 18:38:17X 锁和 IS 锁 冲突了!我拿到 IX 锁了,开始执行了喔,现在时间是 2019-02-17 18:38:17我释放 IX 锁了,结束执行了,现在时间是 2019-02-17 18:38:18我拿到 X 锁了,开始执行了喔,现在时间是 2019-02-17 18:38:18我释放 X 锁了,结束执行了,现在时间是 2019-02-17 18:38:19IX 锁和 X 锁 冲突了!我拿到 IX 锁了,开始执行了喔,现在时间是 2019-02-17 18:38:19我拿到 IX 锁了,开始执行了喔,现在时间是 2019-02-17 18:38:19我释放 IX 锁了,结束执行了,现在时间是 2019-02-17 18:38:20我释放 IX 锁了,结束执行了,现在时间是 2019-02-17 18:38:20IX 锁和 IX 锁 没有冲突!我拿到 IX 锁了,开始执行了喔,现在时间是 2019-02-17 18:38:20我释放 IX 锁了,结束执行了,现在时间是 2019-02-17 18:38:21我拿到 S 锁了,开始执行了喔,现在时间是 2019-02-17 18:38:21我释放 S 锁了,结束执行了,现在时间是 2019-02-17 18:38:22IX 锁和 S 锁 冲突了!我拿到 IX 锁了,开始执行了喔,现在时间是 2019-02-17 18:38:22我拿到 IS 锁了,开始执行了喔,现在时间是 2019-02-17 18:38:22我释放 IX 锁了,结束执行了,现在时间是 2019-02-17 18:38:23我释放 IS 锁了,结束执行了,现在时间是 2019-02-17 18:38:23IX 锁和 IS 锁 没有冲突!我拿到 S 锁了,开始执行了喔,现在时间是 2019-02-17 18:38:23我释放 S 锁了,结束执行了,现在时间是 2019-02-17 18:38:24我拿到 X 锁了,开始执行了喔,现在时间是 2019-02-17 18:38:24我释放 X 锁了,结束执行了,现在时间是 2019-02-17 18:38:25S 锁和 X 锁 冲突了!我拿到 S 锁了,开始执行了喔,现在时间是 2019-02-17 18:38:25我释放 S 锁了,结束执行了,现在时间是 2019-02-17 18:38:26我拿到 IX 锁了,开始执行了喔,现在时间是 2019-02-17 18:38:26我释放 IX 锁了,结束执行了,现在时间是 2019-02-17 18:38:27S 锁和 IX 锁 冲突了!我拿到 S 锁了,开始执行了喔,现在时间是 2019-02-17 18:38:27我拿到 S 锁了,开始执行了喔,现在时间是 2019-02-17 18:38:27我释放 S 锁了,结束执行了,现在时间是 2019-02-17 18:38:28我释放 S 锁了,结束执行了,现在时间是 2019-02-17 18:38:28S 锁和 S 锁 没有冲突!我拿到 S 锁了,开始执行了喔,现在时间是 2019-02-17 18:38:28我拿到 IS 锁了,开始执行了喔,现在时间是 2019-02-17 18:38:28我释放 S 锁了,结束执行了,现在时间是 2019-02-17 18:38:29我释放 IS 锁了,结束执行了,现在时间是 2019-02-17 18:38:29S 锁和 IS 锁 没有冲突!我拿到 IS 锁了,开始执行了喔,现在时间是 2019-02-17 18:38:29我释放 IS 锁了,结束执行了,现在时间是 2019-02-17 18:38:30我拿到 X 锁了,开始执行了喔,现在时间是 2019-02-17 18:38:30我释放 X 锁了,结束执行了,现在时间是 2019-02-17 18:38:31IS 锁和 X 锁 冲突了!我拿到 IS 锁了,开始执行了喔,现在时间是 2019-02-17 18:38:31我拿到 IX 锁了,开始执行了喔,现在时间是 2019-02-17 18:38:31我释放 IX 锁了,结束执行了,现在时间是 2019-02-17 18:38:32我释放 IS 锁了,结束执行了,现在时间是 2019-02-17 18:38:32IS 锁和 IX 锁 没有冲突!我拿到 IS 锁了,开始执行了喔,现在时间是 2019-02-17 18:38:32我拿到 S 锁了,开始执行了喔,现在时间是 2019-02-17 18:38:32我释放 IS 锁了,结束执行了,现在时间是 2019-02-17 18:38:33我释放 S 锁了,结束执行了,现在时间是 2019-02-17 18:38:33IS 锁和 S 锁 没有冲突!我拿到 IS 锁了,开始执行了喔,现在时间是 2019-02-17 18:38:33我拿到 IS 锁了,开始执行了喔,现在时间是 2019-02-17 18:38:33我释放 IS 锁了,结束执行了,现在时间是 2019-02-17 18:38:34我释放 IS 锁了,结束执行了,现在时间是 2019-02-17 18:38:34IS 锁和 IS 锁 没有冲突!复制代码

4、公平锁与非公平锁

(1)问题分析

仔细想了想,如果有一种场景,就是用户一直再读,写获取不到锁,那么不就造成脏读吗?这不就是由于资源的抢占不就是非公平锁造成的。如何避免这个问题呢?这就涉及到了公平锁与非公平锁。

对产生的结果来说,如果一个线程组里,能保证每个线程都能拿到锁,那么这个锁就是公平锁。相反,如果保证不了每个线程都能拿到锁,也就是存在有线程饿死,那么这个锁就是非公平锁。

(2)非公平锁测试

上述代码锁实现的是非公平锁,测试代码如下:

def test_fair_lock():    threads = []        for i in range(0, 10):                  if i == 2:                       # 0 代表排他锁(X)            threads.append(MyThread(0))                  else:                        # 2 代表共享锁(S)            threads.append(MyThread(2))    for thread in threads: thread.start()复制代码

运行结果:

我拿到 S 锁了,开始执行了喔,现在时间是 2019-02-17 19:06:33我拿到 S 锁了,开始执行了喔,现在时间是 2019-02-17 19:06:33我拿到 S 锁了,开始执行了喔,现在时间是 2019-02-17 19:06:33我拿到 S 锁了,开始执行了喔,现在时间是 2019-02-17 19:06:33我拿到 S 锁了,开始执行了喔,现在时间是 2019-02-17 19:06:33我拿到 S 锁了,开始执行了喔,现在时间是 2019-02-17 19:06:33我拿到 S 锁了,开始执行了喔,现在时间是 2019-02-17 19:06:33我拿到 S 锁了,开始执行了喔,现在时间是 2019-02-17 19:06:33我拿到 S 锁了,开始执行了喔,现在时间是 2019-02-17 19:06:33我释放 S 锁了,结束执行了,现在时间是 2019-02-17 19:06:34我释放 S 锁了,结束执行了,现在时间是 2019-02-17 19:06:34我释放 S 锁了,结束执行了,现在时间是 2019-02-17 19:06:34我释放 S 锁了,结束执行了,现在时间是 2019-02-17 19:06:34我释放 S 锁了,结束执行了,现在时间是 2019-02-17 19:06:34我释放 S 锁了,结束执行了,现在时间是 2019-02-17 19:06:34我释放 S 锁了,结束执行了,现在时间是 2019-02-17 19:06:34我释放 S 锁了,结束执行了,现在时间是 2019-02-17 19:06:34我释放 S 锁了,结束执行了,现在时间是 2019-02-17 19:06:34我拿到 X 锁了,开始执行了喔,现在时间是 2019-02-17 19:06:34我释放 X 锁了,结束执行了,现在时间是 2019-02-17 19:06:35复制代码

可以看到由于资源抢占问题,排他锁被最后才被获取到了。

(3)公平锁的实现

实现公平锁,只需要在原有的代码进行小小得修改就行了。

class Source:    # ...... 省略    def __init__(self, isFair=False):          self.__isFair = isFair          self.__initMutexFlag()          self.__initEvents()        # ...... 省略        def lock(self, flag):        # 如果是排他锁,先进进行枷锁        if flag == self.__X: self.__lockX.acquire()                if self.__isFair:                   # 如果是公平锁则,先将互斥的锁给阻塞,防止其他线程进入            self.__lockEventsWait(flag)            self.__events[flag].wait()            self.__lockEventsQueue(flag)                else:                    # 如果是非公平锁,如果锁拿不到,则先等待            self.__events[flag].wait()            self.__lockEvents(flag)       def __lockEventsWait(self, flag):        mutexFlags = self.__getMutexFlag(flag)                for i in mutexFlags:                        # 为了保证原子操作,加锁            self.__eventsLock[i].acquire()            self.__events[i].clear()            self.__eventsLock[i].release()        def __lockEventsQueue(self, flag):          mutexFlags = self.__getMutexFlag(flag)               for i in mutexFlags:                            # 为了保证原子操作,加锁                self.__eventsLock[i].acquire()                self.__eventsQueue[i].append(self.__N)                self.__eventsLock[i].release()复制代码

测试代码:

source = Source(True)def test_fair_lock():    threads = []        for i in range(0, 10):                  if i == 2:                            # 0 代表排他锁(X)                threads.append(MyThread(0))                  else:                           # 2 代表共享锁(S)                threads.append(MyThread(2))            for thread in threads: thread.start()复制代码

运行结果:

我拿到 S 锁了,开始执行了喔,现在时间是 2019-02-17 19:35:16我拿到 S 锁了,开始执行了喔,现在时间是 2019-02-17 19:35:16我释放 S 锁了,结束执行了,现在时间是 2019-02-17 19:35:17我释放 S 锁了,结束执行了,现在时间是 2019-02-17 19:35:17我拿到 X 锁了,开始执行了喔,现在时间是 2019-02-17 19:35:17我释放 X 锁了,结束执行了,现在时间是 2019-02-17 19:35:18我拿到 S 锁了,开始执行了喔,现在时间是 2019-02-17 19:35:18我拿到 S 锁了,开始执行了喔,现在时间是 2019-02-17 19:35:18我拿到 S 锁了,开始执行了喔,现在时间是 2019-02-17 19:35:18我拿到 S 锁了,开始执行了喔,现在时间是 2019-02-17 19:35:18我拿到 S 锁了,开始执行了喔,现在时间是 2019-02-17 19:35:18我拿到 S 锁了,开始执行了喔,现在时间是 2019-02-17 19:35:18我拿到 S 锁了,开始执行了喔,现在时间是 2019-02-17 19:35:18我释放 S 锁了,结束执行了,现在时间是 2019-02-17 19:35:19我释放 S 锁了,结束执行了,现在时间是 2019-02-17 19:35:19我释放 S 锁了,结束执行了,现在时间是 2019-02-17 19:35:19我释放 S 锁了,结束执行了,现在时间是 2019-02-17 19:35:19我释放 S 锁了,结束执行了,现在时间是 2019-02-17 19:35:19我释放 S 锁了,结束执行了,现在时间是 2019-02-17 19:35:19我释放 S 锁了,结束执行了,现在时间是 2019-02-17 19:35:19复制代码

可以看到排他锁在第二次的时候就被获取到了。

(4)优缺点

非公平锁性能高于公平锁性能。首先,在恢复一个被挂起的线程与该线程真正运行之间存在着严重的延迟。而且,非公平锁能更充分的利用cpu的时间片,尽量的减少cpu空闲的状态时间。

参考文献:

共享锁(S锁)和排它锁(X锁):https://www.jianshu.com/p/bd3b3ccedda9

Java多线程--互斥锁/共享锁/读写锁 快速入门:https://www.jianshu.com/p/87ac733fda80

Java多线程 -- 公平锁和非公平锁的一些思考:https://www.jianshu.com/p/eaea337c5e5b

MySQL- InnoDB锁机制:https://www.cnblogs.com/aipiaoborensheng/p/5767459.html


文章首发于共公众号“小米运维”,点击查看原文


Python社区是高质量的Python/Django开发社区
本文地址:http://www.python88.com/topic/30386
 
420 次点击