社区所有版块导航
Python
python开源   Django   Python   DjangoApp   pycharm  
DATA
docker   Elasticsearch  
aigc
aigc   chatgpt  
WEB开发
linux   MongoDB   Redis   DATABASE   NGINX   其他Web框架   web工具   zookeeper   tornado   NoSql   Bootstrap   js   peewee   Git   bottle   IE   MQ   Jquery  
机器学习
机器学习算法  
Python88.com
反馈   公告   社区推广  
产品
短视频  
印度
印度  
Py学习  »  Python

Python 中 Redis 库分布式锁简单分析

Python猫 • 4 年前 • 418 次点击  
 △点击上方Python猫”关注 ,回复“1”领取电子书

作者:ayuliao

来源:懒编程

简介

我们常会遇到某段逻辑在相同时间段内只希望被单个实例执行,而在微服务架构中,一个程序可能会存在多个实例,此时就需要通过分布式锁来实现串行执行。

最简单的分布式锁无非就是找到对于多个程序实例而言单一的存在,比如MySQL数据只有一个或Redis只有一个,此时都可以利用这单一的存在构建一个锁,多个程序实例要执行某段逻辑前必须先获得这个锁,然后才能执行。

因为某些原因,上班的时候我和同事一起研究了一下Python redis库中分布式锁的实现源码,这里简单分享一下。

通过pip可以安装这个库。

pip install redis==2.10.6

这里以这个库的2.10.6版本为例,对它Redis分布式锁源码进行简单的分析。

代码分析

实例化StrictRedis对象后,使用其中的lock方法便可获得一个分布式锁。

首先看一下lock方法对应的源码。

def lock(self, name, timeout=None, sleep=0.1, blocking_timeout=None,
             lock_class=None, thread_local=True)
:

        if lock_class is None:
            if self._use_lua_lock is None:
                # the first time .lock() is called, determine if we can use
                # Lua by attempting to register the necessary scripts
                try:
                    LuaLock.register_scripts(self)
                    self._use_lua_lock = True
                except ResponseError:
                    self._use_lua_lock = False
            lock_class = self._use_lua_lock and LuaLock or Lock
        return lock_class(self, name, timeout=timeout, sleep=sleep,
                          blocking_timeout=blocking_timeout,
                          thread_local=thread_local)

该方法提供了多个参数,其中:

  • name用于指定锁名
  • timeout用于指定锁的超时时间
  • sleep用于指定线程睡眠时间,线程争夺锁的过程本质就是一个循环,每过sleep秒,就会尝试去获取锁对象
  • blocking_timeout用于指定阻塞超时时间,当多个实例争夺锁时,这个时间就是实例等待锁的最长时间
  • lock_class表示使用锁的类对象
  • thread_local表示是否线程安全

方法中最关键的一句代码为lock_class = self._use_lua_lock and LuaLock or Lock,确定了lock_class后,便实例化该lock_class即可。

lock_class可以为LuaLock也可为Lock,经过简单分析,Lock类才是关键,LuaLock类继承自Lock,通过Lua代码实现Redis的一些操作,这里着重看Lock类。

首先看到该类的__init__方法。

class Lock(object):
    def __init__(self, redis, name, timeout=None, sleep=0.1,
                 blocking=True, blocking_timeout=None, thread_local=True)
:

        self.redis = redis
        self.name = name
        self.timeout = timeout
        self.sleep = sleep
        self.blocking = blocking
        self.blocking_timeout = blocking_timeout
        self.thread_local = bool(thread_local)
        self.local = threading.local() if self.thread_local else dummy()
        self.local.token = None
        if self.timeout and self.sleep > self.timeout:
            raise LockError("'sleep' must be less than 'timeout'")

__init__方法初始化不同的属性,其中self.local为线程的本地字段,用于存储该线程特有的数据,不与其他线程进行共享。

此外,在__init__方法中对timeout与sleep进行的判断,如果线程等待锁时的睡眠时间大于锁的超时时间,则直接返回错误。

接着重点看Lock类中的acquire方法,该方法代码如下。

import time as mod_time

class Lock(object):

    def acquire(self, blocking=None, blocking_timeout=None):
        sleep = self.sleep
        token = b(uuid.uuid1().hex)
        if blocking is None:
            blocking = self.blocking
        if blocking_timeout is None:
            blocking_timeout = self.blocking_timeout
        stop_trying_at = None
        if blocking_timeout is not None:
            stop_trying_at = mod_time.time() + blocking_timeout
        while 1:
            if self.do_acquire(token):
                self.local.token = token
                return True
            if not blocking:
                return False
            if stop_trying_at is not None and mod_time.time() > stop_trying_at:
                return False
            mod_time.sleep(sleep)

acquire方法的主逻辑就是一个死循环,在死循环中调用do_acquire方法获取Redis分布式锁,如果成功获得锁,则将token存储到当前线程的local对象中,如果没有获得,则判断blocking,如果blocking为Flase,则不再阻塞,直接返回结果,反之,则判断当前时间是否超过blocking_timeout,超过,同样返回False,反之,通过sleep方法让当前线程睡眠sleep秒。

进一步分析do_acquire方法,代码如下:

    def do_acquire(self, token):
        if self.redis.setnx(self.name, token):
            if self.timeout:
                # convert to milliseconds
                timeout = int(self.timeout * 1000# 转成毫秒
                self.redis.pexpire(self.name, timeout)
            return True
        return False

do_acquire方法中,一开始通过redis的setnx方法将name对着作为key,token作为value,setnx方法只有在key不存的情况下,才能正常的将value存入Redis中,若key依存,该方法不做任何操作,此时就相当于没有获取到锁。

将token成功插入后,则判断有无超时时间,如果设置了timeout,则通过pexpire方法将redis中name这个key的超时设置一下,因为pexpire方法是以毫秒为单位的,所以需要先将timeout转换成毫秒单位。

如果没有设置timeout,那么name这个key只能通过do_release方法中的逻辑清除。

至此,我们清楚的知道了,Redis分布式锁的本质其实就是Redis中的一个key-value,非常简单...

理清锁的获取逻辑后,来看一下相应的释放逻辑,主要关注release方法,该方法代码如下。

    def release(self):
        "Releases the already acquired lock"
        expected_token = self.local.token
        if expected_token is None:
            raise LockError("Cannot release an unlocked lock")
        self.local.token = None
        self.do_release(expected_token)

release方法中先将线程中的token取出,并将其置为None,然后调用do_release方法实现锁的释放,do_release方法代码如下。

    def do_release(self, expected_token):
        name = self.name

        def execute_release(pipe):
            lock_value = pipe.get(name)
            if lock_value != expected_token:
                raise LockError("Cannot release a lock that's no longer owned")
            pipe.delete(name)

        self.redis.transaction(execute_release, name)

do_release方法的逻辑非常简单,其主要逻辑在execute_release方法,通过Redis的transaction方法开启一个事务来执行execute_release方法中逻辑。

在execute_release中,首先通过get方法获取name这个key对应的value,获得后,通过delete方法将其删除,实现Redis分布式锁的释放。

blocking属性

观察到acquire方法的这段代码。

        while 1:
            if self.do_acquire(token):
                self.local.token = token
                return True
            if not blocking:
                return False
            if stop_trying_at is not None and mod_time.time() > stop_trying_at:
                return False
            mod_time.sleep(sleep)

如果blocking为True,获取不到锁,则执行后面的逻辑,让线程睡眠,阻塞等待其他线程将锁释放;如果blocking为False,获取不到锁,则直接返回获取锁失败。

这就会引出几种情况,假设现在有线程A与线程B都需要执行相同的逻辑,执行前需要获取锁。

如果线程A在执行的过程中,线程B也要执行了,如果blocking为True,此时线程B会被阻塞,等待线程A是否Redis锁;如果blocking为False,线程B此时获取不到锁,不执行相同的逻辑。

如果线程A执行完了,此时线程B到来,如果blocking为True或False,此时线程B都不会被阻塞并成功拿到锁,执行相同的逻辑。

一个简单的结论是,blocking无法保证逻辑是否被单次执行,如果希望通过Redis分布式锁让逻辑只执行一次,依旧需要从业务层面做控制,比如MySQL中的业务数据是否被修改或Redis中是否记录这业务数据等。

结尾

现在很多业务都离不开Redis,它已经成为互联网中的基础设施了,Redis有很多有趣的内容可以跟大家分享。

前段时间看见Redis之父退居二线,说已经为Redis工作了10年了,每天都要revice、merge他人的代码,这种工作让他没有创造东西的快乐,所以决定退居二线,将Redis交由社区运营,这让我有些感慨,软件工程是创造性的工作,适当的放空、阅读与行业无关的书籍其实有助于激发创造力。

最后感谢你的阅读,我们下篇文章见。

Python猫技术交流群开放啦!群里既有国内一二线大厂在职员工,也有国内外高校在读学生,既有十多年码龄的编程老鸟,也有中小学刚刚入门的新人,学习氛围良好!想入群的同学,请在公号内回复『交流群』,获取猫哥的微信(谢绝广告党,非诚勿扰!)~

近期热门文章推荐:

耗时两年,我终于出了一本电子书!

架构篇:什么才是真正的架构设计?

为什么 Python 多线程无法利用多核?

Python 任务自动化工具 tox 教程

感谢创作者的好文
Python社区是高质量的Python/Django开发社区
本文地址:http://www.python88.com/topic/74942
 
418 次点击