社区所有版块导航
Python
python开源   Django   Python   DjangoApp   pycharm  
DATA
docker   Elasticsearch  
aigc
aigc   chatgpt  
WEB开发
linux   MongoDB   Redis   DATABASE   NGINX   其他Web框架   web工具   zookeeper   tornado   NoSql   Bootstrap   js   peewee   Git   bottle   IE   MQ   Jquery  
机器学习
机器学习算法  
Python88.com
反馈   公告   社区推广  
产品
短视频  
印度
印度  
Py学习  »  Python

Python 操作 Redis 必备神器:redis-py 源码阅读

Python猫 • 3 年前 • 522 次点击  

 △点击上方“Python猫”关注 ,回复“2”加入交流群

作者:肖恩

来源:游戏不存在


  • redis协议规范

  • redis-py概述

  • redis-py基础使用

    • RedisCommand

    • Redis连接

    • 连接池

  • pipeline

  • LuaScript

  • lock

redis协议规范

RESP(Redis Serialization Protocol)是Redis客户端和服务端的通讯协议。数据示例如下:

  1. +OK\r\n

  2. -Error message\r\n

  3. :1000\r\n

  4. $6\r\nfoobar\r\n

  5. *2\r\n$3\r\nfoo\r\n$3\r\nbar\r\n

  6. *3\r\n:1\r\n:2 \r\n:3\r\n

协议定义了5种类型:

  1. +前缀表示字符串,后接字符串文本,以 \r\n结尾,通常用于命令结果

  2. -前缀表示异常信息,后接以空格连接的两个字符串,以 \r\n结尾

  3. :前缀表示整数,后接整数,以 \r\n结尾

  4. $前缀表示定长的字符串,后接字符串长度, \r\n和字符串文本,以 \r\n结尾

  5. *前缀表示数组,后接数组的长度和 \r\n,数组的每个元素可以由上面4种类型构成

协议还约定了Null等的实现,详情请看参考链接部分。下面示例了 LLEN mylist 的请求和响应

  1. C: *2\r\n$4\r\nLLEN\r\n$6\r\nmylist\r\n


  2. S: :48293\r\n

  • 客户端发送了 LLEN mylist指令,指令序列化成RESP长度为2的数组,2个定长字符串分别是llen和mylist。

  • 服务端响应整数48293,即mylist数据的长度。

Request-Response model是redis服务的请求响应模型,可以对比http协议的模式。redis服务端响应客户端的指令,处理后响应回复客户端,可以简单理解为一问一答。当然pipeline,pub/sub和monitor除外。

Redis-py 源码概述

本文使用的redis-py版本是 3.5.3, 文件及包信息是:

名称描述
clientredis的api
connection连接,连接池等
exceptions异常和错误
lock锁的实现
sentinel扩展的哨兵连接
utils工具
_compat都版本适配包

redis-py未依赖其它的包,代码量虽然不多,6000行左右,但是100%理解还是需要一定的时间和基础。本文从redis-py日常使用出发,也是redis-py的README中内容,介绍这些基础功能在源码中的实现。

redis-py基础使用

RedisCommand

redis-py的简单使用:

  1. >>> import redis

  2. >>> r = redis.Redis(host='localhost', port=6379, db=0)

  3. >>> r.set('foo', 'bar')

  4. True

  5. >>> r.get('foo')

  6. b'bar'

追踪redis-py的实现:

  1. # client.py


  2. class Redis(object )


  3. def __init__(self, host='localhost', port=6379,

  4. db=0, ..):

  5. ...

  6. connection_pool = ConnectionPool(**kwargs)

  7. self.connection_pool = connection_pool

  8. ...


  9. def set(self, name, value, ex=None, px=None, nx=False, xx=False, keepttl=False)

  10. ...

  11. return self.execute_command('SET', *pieces)


  12. def get(self, name):

  13. return self.execute_command( 'GET', name)


  14. # COMMAND EXECUTION AND PROTOCOL PARSING

  15. def execute_command(self, *args, **options):

  16. "Execute a command and return a parsed response"

  17. conn = self.connection or pool.get_connection(command_name, **options)

  18. conn.send_command(*args)

  19. return self.parse_response(conn, command_name, **options)

注意:为了便于理解,示例代码和实际的代码有出入,省去了复杂的逻辑和异常等

  • redis首先创造了一个到redis服务的连接,

  • redis包装了redis的所有指令,使用命令模式执行指令。

  • 执行命令就是使用创建的连接发送指令,然后解析和获取响应。这和redis协议上的Request-Response model行为一致。

Redis连接

继续查看连接的创建和执行:

  1. # connection.py

  2. class Connection(object)


  3. def __init__(...):

  4. self.host = host

  5. self. port = int(port)

  6. self._sock = connect()


  7. def connect():

  8. for res in socket.getaddrinfo(self.host, self.port, self.socket_type,

  9. socket.SOCK_STREAM):

  10. family, socktype, proto, canonname, socket_address = res

  11. sock = socket.socket(family, socktype, proto)

  12. ...

  13. # TCP_NODELAY

  14. sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)

  15. # connect

  16. sock.connect(socket_address)

  17. ...

  18. return sock


  19. def pack_command(self, *args):

  20. command = []

  21. args = tuple(args[0].encode().split()) + args[1:]

  22. ...

  23. buff = SYM_EMPTY.join((SYM_STAR, str(len (args)).encode(), SYM_CRLF))

  24. for arg in imap(self.encoder.encode, args):

  25. buff = SYM_EMPTY.join(

  26. (buff, SYM_DOLLAR, str(arg_length).encode(), SYM_CRLF))

  27. output.append(buff)

  28. output.append(arg)

  29. ...

  30. return command


  31. def send_command(self, *args, **kwargs):


  32. command = self.pack_command(args)


  33. if isinstance(command, str):

  34. command = [command]

  35. for item in command:

  36. self._sock.sendall(*args, **kwargs)

  • connection维持了一个socket连接

  • 收到redis的命令使用pack_command进行RESP的序列化打包

  • 数据包使用socket发送

  1. # connection.py


  2. class Connection(object)


  3. def __init__(...,parser_class=PythonParser,...);

  4. self._parser = parser_class (socket_read_size=socket_read_size)


  5. def read_response(self):

  6. response = self._parser.read_response()

  7. return response


  8. class PythonParser(BaseParser):

  9. "Plain Python parsing class"

  10. def __init__(self, socket_read_size):

  11. self. socket_read_size = socket_read_size

  12. ...

  13. self._sock = connection._sock

  14. self._buffer = SocketBuffer(self._sock,

  15. self.socket_read_size,

  16. connection.socket_timeout)

  17. self.encoder = connection.encoder


  18. def read_response(self):

  19. raw = self._buffer.readline()


  20. byte, response = raw[:1], raw[1:]


  21. # server returned an error

  22. if byte == b'-':

  23. response = nativestr(response)

  24. ...

  25. # single value

  26. elif byte == b'+':

  27. pass

  28. # int value

  29. elif byte == b':':

  30. response = long(response)

  31. # bulk response

  32. elif byte == b'$':

  33. length = int(response)

  34. response = self._buffer.read(length)

  35. # multi-bulk response

  36. elif byte == b'*':

  37. length = int(response)

  38. response = [self.read_response() for i in xrange(length)]

  39. if isinstance(response, bytes):

  40. response = self.encoder.decode(response)

  41. return response

  • connection创建了一个parser用于读取和解析服务响应

  • 默认的PythonParser使用SocketBuffer读取socket数据

  • read_response实现了RESP协议的解析过程。对于每行数据 \r\n,第一个字符是响应类型,剩下的数据内容,如果是multi-bulk还需要循环读取多行。建议对比协议和发送请求进行详细阅读理解。

PythonParser是pure-python的实现,如果希望更高效,可以额外安装hiredis,会提供一个基于c的解析器 HiredisParser

连接池

redis-py使用连接池来提高执行效率,主要的使用方法3个步骤,创建连接池,从连接池中获取有效连接执行命令,完成后释放连接,语句如下:

  1. # redis.py

  2. connection_pool = ConnectionPool(**kwargs)

  3. pool.get_connection(command_name, **options)


  4. try:

  5. conn.send_command(*args)

  6. ...

  7. finally:

  8. ...

  9. pool.release(conn)

连接池一定要注意释放,可以用try/finally,也可以使用上下文装饰器,这里使用了前者

连接池的具体实现:

  1. # connection.py

  2. class ConnectionPool(object):


  3. def __init__(...):

  4. self._available_connections = []

  5. self._in_use_connections = set()


  6. def make_connection(self):

  7. "Create a new connection"

  8. return self.connection_class(**self.connection_kwargs)


  9. def get_connection(self, command_name, *keys, **options)

  10. try:

  11. connection = self._available_connections.pop()

  12. except IndexError:

  13. connection = self .make_connection()

  14. self._in_use_connections.add(connection)

  15. ...

  16. connection.connect()

  17. return connection


  18. def release(self, connection):

  19. "Releases the connection back to the pool"


  20. try:

  21. self ._in_use_connections.remove(connection)

  22. except KeyError:

  23. # Gracefully fail when a connection is returned to this pool

  24. # that the pool doesn't actually own

  25. pass


  26. self._available_connections.append(connection)

  • 连接池内部使用可用连接数组和正在使用连接集合管理所有连接

  • 获取连接时候,优先从可用连接数组获取;没有可用连接会创建新的连接

  • 所有获取到的连接会加入正在使用连接, 如果当前连接未连接会先建立连接

  • 连接释放时会从正在使用连接集合中移除,然后加入可用连接数组数组,等待复用

到这里,我们基本理顺了一个redis指令执行的流程:

  1. r = redis.Redis( host='localhost', port=6379, db=0)

  2. r.set('foo', 'bar')

pipeline

redis还支持pipeline管线模式,可以批量发送一些命令,然后获取所有的结果:

  1. >>> r = redis.Redis(...)

  2. >>> pipe = r.pipeline()

  3. >>> pipe.set('foo', 'bar').sadd('faz', 'baz' ).incr('auto_number').execute()

  4. [True, True, 6]

pipeline的继承自redis,做了一些扩展

  1. class Pipeline(Redis)

  2. def __init__(...):

  3. self.command_stack = []


  4. def execute_command(self, *args, **kwargs):

  5. self.command_stack.append((args, options))

  6. return self


  7. def execute(self, raise_on_error=True):

  8. "Execute all the commands in the current pipeline"

  9. stack = self.command_stack


  10. execute = self._execute_pipeline


  11. execute (conn, stack, raise_on_error)


  12. def _execute_pipeline(self, connection, commands, raise_on_error):

  13. # build up all commands into a single request to increase network perf

  14. all_cmds = connection.pack_commands([args for args, _ in commands])

  15. connection.send_packed_command(all_cmds)


  16. response = []

  17. for args, options in commands:

  18. response.append(

  19. self.parse_response(connection, args[0], **options))


  20. return response

  • pipeline使用一个stack来临时存储批量发送的命令,同时返回自身,这样可以支持链式语法

  • execute时候才正式发送指令

  • 发送指令后再依次获取服务响应,打包称一个数组统一返回

LuaScript

redis使用lua脚本来处理事务,使用方法如下:

  1. >>> r = redis.Redis()

  2. >>> lua = """

  3. ... local value = redis.call('GET', KEYS[1])

  4. ... value = tonumber(value)

  5. ... return value * ARGV[1]"""

  6. >>> multiply = r.register_script(lua)

  7. >>> r.set('foo', 2)

  8. >>> multiply(keys=['foo'], args=[5])

  9. 10

  • lua脚本中定义了KEYS和ARGV两个数组用于接受参数,KEY的第一个值(lua数组从1开始)是key的名称,ARGV的第一个值是倍数

  • 脚本需要进行注册

  • redis-py中把参数传递给脚本并执行得到结果

脚本的实现原理:

  1. # client.py

  2. class Redis(object):


  3. def register_script(self, script):

  4. return Script(self, script


  5. def script_load(self, script):

  6. "Load a Lua ``script`` into the script cache. Returns the SHA."

  7. return self.execute_command('SCRIPT LOAD', script)


  8. def evalsha(self, sha, numkeys, *keys_and_args):

  9. return self.execute_command('EVALSHA', sha, numkeys, *keys_and_args)


  10. class Script(object):

  11. "An executable Lua script object returned by ``register_script``"


  12. def __init__(self, registered_client, script):

  13. self.registered_client = registered_client

  14. self.script = script

  15. # Precalculate and store the SHA1 hex digest of the script.

  16. ...

  17. self.sha = hashlib.sha1(script).hexdigest()


  18. def __call__(self, keys=[], args =[], client=None):

  19. "Execute the script, passing any required ``args``"

  20. args = tuple(keys) + tuple(args)

  21. # make sure the Redis server knows about the script

  22. ...

  23. try:

  24. return client.evalsha(self.sha, len(keys), *args)

  25. except NoScriptError:

  26. # Maybe the client is pointed to a differnet server than the client

  27. # that created this instance?

  28. # Overwrite the sha just in case there was a discrepancy.

  29. self.sha = client.script_load(self.script)

  30. return client.evalsha(self.sha, len(keys), *args

  • lua脚本通过 script load 加载到redis服务,并获得一个sha值,sha值可以重用,避免多次加载同一脚本

  • 通过  evalsha 执行脚本

lock

redis-py还提供了一个全局锁的实现, 可以跨进程同步:

  1. try:

  2. with r.lock('my-lock-key', blocking_timeout=5) as lock:

  3. # code you want executed only after the lock has been acquired

  4. except LockError:

  5. # the lock wasn't acquired

下面是其实实现:

  1. # lock.py

  2. class Lock(object):


  3. LUA_RELEASE_SCRIPT = """

  4. local token = redis.call('get', KEYS[1])

  5. if not token or token ~= ARGV[1] then

  6. return 0

  7. end

  8. redis.call('del', KEYS[1])

  9. return 1

  10. ""


  11. def __init__(...):

  12. ...

  13. self.redis = redis

  14. self.name = name

  15. self.local = threading.local() if self.thread_local else dummy()

  16. self.local.token = None

  17. cls = self.__class__

  18. cls.lua_release = client.register_script(cls.LUA_RELEASE_SCRIPT)


  19. def __enter__(self):

  20. # force blocking, as otherwise the user would have to check whether

  21. # the lock was actually acquired or not.

  22. if self.acquire(blocking=True):

  23. return self

  24. raise LockError("Unable to acquire lock within the time specified")


  25. def __exit__(self, exc_type, exc_value, traceback):

  26. self.release()


  27. def acquire(self, blocking=None, blocking_timeout=None, token=None):

  28. ...

  29. token = uuid.uuid1().hex.encode()

  30. self.redis.set(self.name, token, nx=True, px=timeout)

  31. ...

  32. self.local.token = token

  33. ...


  34. def release(self):

  35. expected_token = self.local.token

  36. self.local.token = None

  37. self.lua_release(keys=[self.name],

  38. args=[expected_token],

  39. client=self.redis)

  • LUA_RELEASE_SCRIPT使用lua脚本来处理删除token的事务

  • lock使用线程变量来存储token值,保证多线程并发可以正常

  • _enter和_exit是装饰器语法,保证可以合法的获取和释放

  • 申请锁的时候获取一个临时的token,然后设置到redis服务中,这个token是有生命周期的,可以超时自动释放。

  • 释放的时候清理线程本地变量和redis服务中的变量

TODO

源码中的 publish/subscibeMonitor , Sentinel 和事务等内容,个人认为并不在主线任务上,留待后续再行介绍。

参考链接

  • https://redis.io/topics/protocol

  • https://github.com/andymccurdy/redis-py

  • https://pypi.org/project/hiredis/#description

近两年里,我原创和翻译了130+技术文章,主要关注Python进阶、小技巧、编程设计、PEP翻译、Python哲学等话题。现已集结出了一本电子书《优雅的Python》,请回复数字『1』,获取下载地址。

近期热门文章推荐:

Python优化机制:常量折叠
如何用 Python 制作地球仪?
Python 源码混淆与加密
为什么 Python 多线程无法利用多核?

分享在看是对我最大的支持!

Python社区是高质量的Python/Django开发社区
本文地址:http://www.python88.com/topic/108141
 
522 次点击