Py学习  »  DATABASE

用Python开发MySQL增强半同步BinlogServer(T2通信篇)

老叶茶馆 • 4 年前 • 373 次点击  
导读
作者:曾永伟,知数堂10期学员,多年JAVA物流行业开发管理经验和PHP/Python跨境电商开发管理经验,对数据库系统情有独钟,善于运用SQL编程简化业务逻辑,去年开始正式从业MySQL DBA, 专注于DB系统自动化运维、MySQL云上实践。
本文为python-mysql-binlogserver系列的第二篇(T2通信篇),之后会陆续发布此系列的其他文章,请大家点击在看或者收藏,自行练习。

一想到你在关注我就忍不住有点紧张


一、概述

从前一篇文章我们已经了解了:

  • 二进制在计算机中的应用

  • 字符集与二进制的关系及其在Python中处理

  • 用struct来处理二进制的转换问题

  • Socket通信的基本编程方法

在本节中,我们将结合这些内容进一步来了解如何使Python和MySQL进行交互。


二、MySQL通信协议基础

在Python中,使用socket.secv接收数据或send发送数据的都是二进制流对象Bytes,我们需要结合MySQL通信协议来逐字节解析其具体的含义。

MySQL基础通信单位Packet,它由header + payload组成,header由3个字节的payload长度(最大16M字节数)和1个字节的流水号组成,在读取一个Packet时,通常先读4个字节,解析出payload的长度和payload的序号,再根据payload的长度把余下的正文读取出来。

  1. s = socket. socket(socket.AF_INET, socket.SOCK_STREAM)

  2. s.connect(("127.0.0.1", 3306))

  3. '''

  4. # 先读Header

  5. header = s.recv(4)

  6. length = struct.unpack('<I', header[:3] + b'\x00')[0]

  7. sequenceId = struct.unpack('<B', header[:-1])[0]


  8. # 再读余下的正文,完成一个Packet的完整读取

  9. body = s.recv(length)

解析Greeting包

当Client连接上MySQL Server后,MySQL会主动发送一个greeting包,把自己的状态和随机密文发送给Client, 等待Client响应帐户和密码等信息,验证失败发送ERR包并主动断开连接,验证成功后发送OK包,保持连接,等待Client发送其它"指令"。

接下来我先看一下Greeting包正文的官方说明:

  1. https://dev.mysql.com/doc/internals/en/connection-phase-packets.html

  2. 1 [0a] protocol version

  3. string[NUL] server version

  4. 4 connection id

  5. string[8] auth-plugin-data-part-1

  6. 1 [ 00] filler

  7. 2 capability flags (lower 2 bytes)

  8. 1 character set

  9. 2 status flags

  10. 2 capability flags (upper 2 bytes)

  11. 1 length of auth-plugin-data

  12. string[10] reserved (all [00])

  13. string[$len] auth-plugin-data-part-2 ($len=MAX(13, length of auth-plugin-data - 8))

  14. string[NUL] auth-plugin name

为了更加直接的观察和理解,我这里去掉了对低版本的兼容格式,让其显得更加整洁。

我们来尝试用Python解析第一个Greeting包:

  1. import socket

  2. import struct

  3. from pprint import pprint


  4. s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

  5. s.connect(("192.168.56.101", 3306))


  6. greeting = {}


  7. # Header 3+1

  8. greeting["length"] = struct.unpack(', s.recv(3) + b'\x00')[0]

  9. greeting["sequenceId"] = struct.unpack(', s.recv(1))[0]


  10. # 正文开始

  11. greeting["protocolVersion"] = s.recv(1)


  12. # serverVersion是string[NUL]类型,所以一直循环读取到\x00节束

  13. greeting["serverVersion"] = ""

  14. while True:

  15. _byte = s.recv(1)

  16. if _byte == b'\x00':

  17. break

  18. greeting["serverVersion"] += chr(int(_byte.hex (), 16))


  19. # 余下部分请自行参照上方文档进行一一对照

  20. greeting["connectionId"] = struct.unpack(', s.recv(4))[0]

  21. greeting["challenge1"] = s.recv(8).decode("utf8")

  22. _filler = s.recv( 1)

  23. greeting["capabilityFlags"] = s.recv(2)

  24. greeting["characterSet"] = s.recv(1)

  25. greeting["statusFlags"] = s.recv(2)

  26. greeting["capabilityFlag"] = s.recv( 2)

  27. greeting["authPluginDataLength"] = struct.unpack(', s.recv(1))[0]

  28. _reserved = s.recv(10)

  29. greeting["challenge2"] = s.recv(max(13, greeting["authPluginDataLength"] - 8)).decode("utf8")

  30. greeting["authPluginName"] = ""

  31. while True:

  32. _byte = s.recv(1)

  33. if _byte == b'\x00':

  34. break

  35. greeting["authPluginName"] += chr (int(_byte.hex(), 16))

  36. pprint(greeting)

输出:

  1. {'authPluginDataLength': 21,

  2. 'authPluginName': 'mysql_native_password',

  3. 'capabilityFlag': b'\xff\x81',

  4. 'capabilityFlags': b'\xff\xf7',

  5. 'challenge1': 'X:u8\x11N4\x1b',

  6. 'challenge2': 'dF\x04f~\x1f!%\x14\x1acV\x00',

  7. 'characterSet': b'\xe0',

  8. 'connectionId': 73,

  9. 'length': 78,

  10. 'protocolVersion': b'\n',

  11. 'sequenceId': 0 ,

  12. 'serverVersion': '5.7.20-log',

  13. 'statusFlags': b'\x02\x00'}

可见用Python解析出Greeting包的内容并没有想象中那么难,只要结合官方文档,用Python struct很容易就把二进制流还原成可读的文本信息。

为了使代码可以复用,我们将上面的代码进行一个简单的函数化封装,以方便给后面的例子调用:

  1. # learn_packet1_greeting.py


  2. import socket

  3. import struct

  4. from pprint import pprint



  5. def get_greeting(s):

  6. greeting = {}

  7. greeting["length"] = struct.unpack(', s.recv(3) + b'\x00')[0]

  8. greeting["sequenceId"] = struct.unpack(', s.recv(1))[0]

  9. greeting ["protocolVersion"] = s.recv(1)

  10. greeting["serverVersion"] = ""

  11. while True:

  12. _byte = s.recv(1)

  13. if _byte == b'\x00':

  14. break

  15. greeting ["serverVersion"] += chr(int(_byte.hex(), 16))

  16. greeting["connectionId"] = struct.unpack(', s.recv(4))[0]

  17. greeting["challenge1"] = s.recv(8).decode("utf8")

  18. _filler = s.recv(1)

  19. greeting["capabilityFlags"] = s.recv(2)

  20. greeting["characterSet"] = s.recv(1)

  21. greeting["statusFlags"] = s.recv(2)

  22. greeting[ "capabilityFlag"] = s.recv(2)

  23. greeting["authPluginDataLength"] = struct.unpack(', s.recv(1))[0]

  24. _reserved = s.recv(10)

  25. greeting["challenge2"] = s.recv(max(13 , greeting["authPluginDataLength"] - 8)).decode("utf8")

  26. greeting["authPluginName"] = ""

  27. while True:

  28. _byte = s.recv(1)

  29. if _byte == b'\x00':

  30. break

  31. greeting["authPluginName"] += chr(int(_byte.hex(), 16))

  32. return greeting


  33. if __name__ == "__main__":

  34. s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

  35. s.connect(("192.168.1.100", 3306))

  36. greeting = get_greeting(s)

  37. pprint(greeting)

封装Response包,完成验证

当Client得到Greeting包后,就可以结合两个随机码,组成认证回应包,完成MySQL的认证:

  1. # learn_packet2_auth.py


  2. import socket

  3. import struct

  4. import hashlib

  5. from functools import partial

  6. from py_mysql_binlogserver ._tutorial.learn_packet1_greeting import get_greeting



  7. def dump_packet(packet, title=None):

  8. pass # 省略代码,节约篇幅



  9. def scramble_native_password(password, message):

  10. '''

  11. mysql_native_password

  12. https://dev.mysql.com/doc/internals/en/secure-password-authentication.html#packet-Authentication::Native41

  13. '''

  14. SCRAMBLE_LENGTH = 20

  15. sha1_new = partial(hashlib.new, 'sha1')


  16. """Scramble used for mysql_native_password"""

  17. if not password:

  18. return b''


  19. password = password.encode("utf-8")

  20. message = message.encode("utf-8")


  21. stage1 = sha1_new(password).digest()

  22. stage2 = sha1_new(stage1).digest()

  23. s = sha1_new()

  24. s.update(message[:SCRAMBLE_LENGTH])

  25. s.update(stage2)

  26. result = s.digest()

  27. return _my_crypt(result, stage1)



  28. def _my_crypt(message1, message2):

  29. result = bytearray(message1)

  30. for i in range(len(result)):

  31. result[i] ^= message2[i]


  32. return bytes(result)


  33. def get_response(s, username, password, challenge1, challenge2):

  34. '''

  35. https://dev.mysql.com/doc/internals/en/connection-phase-packets.html

  36. 简化版

  37. 4 capability flags, CLIENT_PROTOCOL_41 always set

  38. 4 max-packet size

  39. 1 character set

  40. string[23] reserved (all [0])

  41. string[NUL] username

  42. lenenc-int length of auth-response

  43. string[n] auth-response

  44. string[NUL] auth plugin name

  45. '''

  46. scramble_password = scramble_native_password(password, challenge1 + challenge2)


  47. response = b''

  48. response += struct.pack(', 32482821)

  49. response += struct.pack(' , 16777216)

  50. response += struct.pack(', 33)

  51. response += b''.join([b'\x00' for i in range(23)])

  52. response += username.encode() + b'\x00'

  53. response += struct. pack(', len(scramble_password))

  54. response += scramble_password

  55. response += "mysql_native_password".encode() + b'\x00'

  56. response = struct.pack(', len(response))[:-1] + struct.pack(', 1) + response

  57. return response



  58. if __name__ == "__main__":


  59. s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

  60. s.connect(("192.168.1.100", 3306))


  61. greeting = get_greeting(s)


  62. username = 'repl'

  63. password = 'repl1234'

  64. response = get_response(s, username, password, greeting["challenge1"], greeting["challenge2"])

  65. s.send(response)

  66. dump_packet(response, "Response packet:")


  67. result = s.recv(1024)

  68. dump_packet(result, "Result packet:")

向服务器发送帐号及加密的密文,通过验证后,会得到OK包:

  1. Response packet:

  2. 00000000 50 00 00 01 05 A6 EF 01 00 00 00 01 21 00 00 00 P....¦ï. ....!...

  3. 00000010 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 ........ ........

  4. 00000020 00 00 00 00 72 65 70 6C 00 14 F6 4A C1 E7 4F 2A ....repl ..öJÁçO*

  5. 00000030 BB A3 CB 29 3C 5B 50 F9 3C AF E3 6C 1C A9 6D 79 »£Ë)Pù lmy

  6. 00000040 73 71 6C 5F 6E 61 74 69 76 65 5F 70 61 73 73 77 sql_nati ve_passw

  7. 00000050 6F 72 64 00 ord.


  8. Result packet:

  9. 00000000 07 00 00 02 00 00 00 02 00 00 00 ........ ...

验证失败会得到如下结果:

  1. Response packet:

  2. 00000000 50 00 00 01 05 A6 EF 01 00 00 00 01 21 00 00 00 P....¦ï. ....!...

  3. 00000010 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 ........ ........

  4. 00000020 00 00 00 00 72 65 70 6C 00 14 AB 41 90 72 07 98 ....repl ..«Ar.�

  5. 00000030 0A 36 21 F8 FC CC 83 7B 8E 4E A3 75 A2 DB 6D 79 .6!øüÌ�{ N£u¢Ûmy

  6. 00000040 73 71 6C 5F 6E 61 74 69 76 65 5F 70 61 73 73 77 sql_nati ve_passw

  7. 00000050 6F 72 64 00 ord.


  8. Result packet:

  9. 00000000 4A 00 00 02 FF 15 04 23 32 38 30 30 30 41 63 63 J......# 28000Acc

  10. 00000010 65 73 73 20 64 65 6E 69 65 64 20 66 6F 72 20 75 ess deni ed for u

  11. 00000020 73 65 72 20 27 72 65 70 6C 27 40 27 31 39 32 2E ser 'rep l'@'192.

  12. 00000030 31 36 38 2E 31 2E 31 27 20 28 75 73 69 6E 67 20 168.1.1' (using

  13. 00000040 70 61 73 73 77 6F 72 64 3A 20 59 45 53 29 password : YES)

执行查询

完成认证后,服务器就会保存连接,直到超时或Client主动退出才会中断连接。接下来我们就可以在完成认证后的socket上发送命令了,就像使用mysql client一样,只不过通过socket发送的数据还要是bytes对象,直接上菜:

  1. # learn_packet3_query.py


  2. import socket

  3. import struct


  4. from py_mysql_binlogserver._tutorial.learn_packet1_greeting import get_greeting

  5. from py_mysql_binlogserver ._tutorial.learn_packet2_auth import dump_packet, get_response



  6. def read_packet(skt):

  7. while True:

  8. _header = skt.recv(5)

  9. _length = struct.unpack(", (_header[0:3] + b"\x00"))[ 0]

  10. _sequenceId = struct.unpack(", _header[3:4])[0]

  11. _packetType = struct.unpack(", _header[4:])[0]


  12. # 每个查询会产生多个数据包,读到 EOF 包后结束

  13. if _packetType == 0xfe: # EOF

  14. break


  15. _payload = skt.recv(_length - 1)

  16. dump_packet(_header + _payload, f"read packet [{_sequenceId}]")



  17. def get_query(sql):

  18. """

  19. https://dev.mysql.com/doc/internals/en/com-query.html

  20. 1 [03] COM_QUERY

  21. string[EOF] the query the server shall execute

  22. """

  23. query = b''

  24. query += struct.pack(", 3)

  25. query += sql.encode()


  26. query = struct.pack(', len(query))[:-1] + struct.pack(', 0) + query

  27. return query



  28. if __name__ == "__main__":


  29. s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

  30. s.connect(("192.168.1.100", 3306))


  31. greeting = get_greeting(s)

  32. username = 'repl'

  33. password = 'repl1234'

  34. response = get_response(s, username, password, greeting["challenge1"], greeting["challenge2"])

  35. s.send(response)

  36. result = s.recv(1024)


  37. sql = "select @@version_comment"

  38. query = get_query(sql)

  39. dump_packet(query, f"query packet:{sql}")

  40. s.send(query

    )



  41. read_packet(s)



三、Binlog文件格式
先简单回顾一下binlog文件中都有些什么内容:
  1. mysql> show binlog events in 'mysql-bin.000009';

  2. +------------------+-----+----------------+-----------+-------------+--------------------------------------------------------------------+

  3. | Log_name | Pos | Event_type | Server_id | End_log_pos | Info |

  4. +------------------+-----+----------------+-----------+-------------+--------------------------------------------------------------------+

  5. | mysql-bin.000009 | 4 | Format_desc | 3306100 | 123 | Server ver: 5.7.20-log, Binlog ver: 4 |

  6. | mysql-bin.000009 | 123 | Previous_gtids | 3306100 | 190 | f0ea18e0-3cff-11e9-9488-0800275ae9e7:1-19 |

  7. | mysql-bin.000009 | 190 | Gtid | 3306100 | 251 | SET @@SESSION.GTID_NEXT= 'f0ea18e0-3cff-11e9-9488-0800275ae9e7:20' |

  8. | mysql-bin.000009 | 251 | Query | 3306100 | 318 | BEGIN |

  9. | mysql-bin.000009 | 318 | Table_map | 3306100 | 361 | table_id: 223 (db3.t3) |

  10. | mysql-bin.000009 | 361 | Write_rows | 3306100 | 402 | table_id: 223 flags: STMT_END_F |

  11. | mysql-bin.000009 | 402 | Xid | 3306100 | 429 | COMMIT /* xid=286 */ |

  12. | mysql-bin.000009 | 429 | Gtid | 3306100 | 490 | SET @@SESSION.GTID_NEXT= 'f0ea18e0-3cff-11e9-9488-0800275ae9e7:21' |

  13. | mysql-bin.000009 | 490 | Query | 3306100 | 557 | BEGIN |

  14. | mysql-bin.000009 | 557 | Table_map | 3306100 | 600 | table_id: 223 (db3.t3) |

  15. | mysql-bin.000009 | 600 | Write_rows | 3306100 | 641 | table_id: 223 flags: STMT_END_F |

  16. | mysql-bin.000009 | 641 | Xid | 3306100 | 668 | COMMIT /* xid=287 */ |

  17. | mysql-bin.000009 | 668 | Rotate | 3306100 | 711 | mysql-bin.000010;pos=4 |

  18. +------------------+-----+----------------+-----------+-------------+--------------------------------------------------------------------+

  19. 13 rows in set (0.00 sec)

可以看出,binlog文件内容就是有很多的Event组成,一个完整的binlog应该是由Format_desc event开始,Rotate event结束,它们充当Binlog文件的元数据,中间Event才是真正和数据相关Event,每一个Event的格式都不尽相同,需要单独作解析。不过我们的BinlogServer并不关心具体的Event内容,只需要把Event作为一个接收,存储和发送的基本单元即可,简单说就是,把Master发送的Event按顺序存储起来,当有Slave change过来以后,再从指定位置把Event一个一个的发送给Slave,仅此而已。

每一个Event都有自己的Headyier, 描述了它的创建时间、类型、Server Id、长度、下一个Event位置和flags信息,结合Event的长度和下个Event位置信息,我们可以很容易地实现顺序扫描一个Binlog文件中的所有Event:

  1. # learn_bin2_binlog.py


  2. import struct

  3. from py_mysql_binlogserver.constants.EVENT_TYPE import event_type_name


  4. event_map = event_type_name()


  5. with open("mysql-bin.000009", mode="rb") as fr:

  6. _file_header = fr.read(4)

  7. if _file_header != bytes.fromhex("fe62696e"):

  8. print( "It is not a binlog file.")

  9. exit()


  10. '''

  11. https://dev.mysql.com/doc/internals/en/binlog-event-header.html

  12. 4 timestamp

  13. 1 event type

  14. 4 server-id

  15. 4 event-size

  16. 4 log pos

  17. 2 flags

  18. '''

  19. while True:

  20. event_header = fr.read(19)

  21. if len(event_header) == 0:

  22. break

  23. timestamp, event_type, server_id, event_size, log_pos, flags = struct.unpack(', event_header)

  24. print("Binlog Event[%s]: [%s] %s %s" % (timestamp,

  25. event_type,

  26. event_map.get(event_type), log_pos))

  27. event_body = fr.read(event_size - 19)

输出:

  1. Binlog Event[1570889546]: [15] FORMAT_DESCRIPTION_EVENT 123

  2. Binlog Event [1570889546]: [35] PREVIOUS_GTIDS_LOG_EVENT 190

  3. Binlog Event[1570889807]: [33] GTID_LOG_EVENT 251

  4. Binlog Event[1570889807]: [2] QUERY_EVENT 318

  5. Binlog Event[1570889807]: [19] TABLE_MAP_EVENT 361

  6. Binlog Event [1570889807]: [30] WRITE_ROWS_EVENT 402

  7. Binlog Event[1570889807]: [16] XID_EVENT 429

  8. Binlog Event[1570889813]: [33] GTID_LOG_EVENT 490

  9. Binlog Event[1570889813]: [2] QUERY_EVENT 557

  10. Binlog Event[ 1570889813]: [19] TABLE_MAP_EVENT 600

  11. Binlog Event[1570889813]: [30] WRITE_ROWS_EVENT 641

  12. Binlog Event[1570889813]: [16] XID_EVENT 668

  13. Binlog Event[1570889820]: [4] ROTATE_EVENT 711

是不是比想象中简单,如果你把基础篇的内容全部理解了,我相信上面这段代码不会难到你,相反如果你还不能理解上面这段代码,请移步回去多看几遍,多练几遍再回来。更多的Binlog相关知识,请参考官方文档。


四、小结

这一节我们把Socket通信的难关攻破了,已经完成了和MySQL服务器进行握手,登陆和执行查询,近距离的接触了MySQL的通信协议,学会了如何运用Python struct进行简单的解包和封包,也简单分析了binlog的组成及使用Python来解析binlog文件,最重要的是学会了结合官方文档来解决我们的实际问题。

下一节我们将在本节的基础上进入实战篇,利用Socket向Master发起Slave注册,发送BinlogDump指令,以获取Master上的Binlog Event并保存到本地文件中。


相关文档

  • https://dev.mysql.com/doc/internals/en/mysql-packet.html

  • https://dev.mysql.com/doc/internals/en/binlog-file-header.html

  • https://dev.mysql.com/doc/internals/en/binlog-event-header.html

  • https://github.com/alvinzane/py-mysql-binlogserver/tree/master/pymysqlbinlogserver/_tutorial

(本周先为大家献上这两篇文章,大家可以多练习,下周会继续发布此系列的文章,请持续关注哦~~)



扫码加入MySQL技术Q群

(群号:650149401)

   

点“在看”给我一朵小黄花



Python社区是高质量的Python/Django开发社区
本文地址:http://www.python88.com/topic/50340
 
373 次点击