Python中国社区  »  Python

【临实战】使用 Python 从 Redis 中删除 4000W 个 KEY

临书 • 4 天前 • 13 次点击  
本文主要涉及 Redis 的以下两个操作和其 Python 实现,目录:
  • SCAN 命令
  • DEL 命令
  • 使用 Python SCAN
  • 使用 Python DEL
  • 成果展示

SCAN 命令

SCAN 命令及相关的 SSCAN、HSCAN 和 ZSCAN 命令都用于增量迭代(incrementally iterate)一个集合的元素(a collection of elements):
  • SCAN 用于迭代当前数据库中的数据库键
  • SSCAN 用于迭代集合键中的元素
  • HSCAN 用于迭代哈希键中的键值对
  • ZSCAN 用于迭代有序集合中的元素(包括元素分值和元素分值)
以上四列命令都支持增量迭代,每次执行都会返回少量元素,所以他们都可以用于生产环境,而不会出现像 KEYS、SMEMBERS 命令一样 -- 可能会阻塞服务器

不过,增量式迭代命令也不是没有缺点的:
举个例子,使用 SMEMBERS 命令可以返回集合键当前包含的所有元素,但是对于 SCAN 这类增量迭代命令来说,因为在堆键进行增量迭代的过程中,键可能会被改变,所以增量式迭代命令只能对被返回的元素提供有限的保证(offer limited guarantees about the returned elements)。

因为 SCAN、SSCAN、HSCAN 和 ZSCAN 命令的工作方式都非常相似,但是要记住:
  • SSCAN、HSCAN 和 ZSCAN 命令的第一个参数总是一个数据库键;
  • SCAN 命令则不需要在第一个参数提供任何数据库键 -- 因为它迭代的是当前数据库中的所有数据库键。

SCAN 命令的基本用法
SCAN 命令是一个基于游标的迭代器(cursor based iterator):
SCAN 命令每次被调用后,都会向用户返回一个新的游标,用户在下次迭代时需要使用这个新游标作为 SCAN 命令的游标参数,以此来延续之前的迭代过程。
当 SCAN 命令的游标参数被设置为 0 时,服务器开始一次新的迭代,而当服务器向用户返回值为 0 的游标时,表示迭代结束。
示例:
redis 127.0.0.1:6379> scan 0
1) "17"
2)  1) "key:12"
    2) "key:8"
    3) "key:4"
    4) "key:14"
    5) "key:16"
    6) "key:17"
    7) "key:15"
    8) "key:10"
    9) "key:3"
    10) "key:7"
    11) "key:1"

redis 127.0.0.1:6379> scan 17
1) "0"
2) 1) "key:5"
   2) "key:18"
   3) "key:0"
   4) "key:2"
   5) "key:19"
   6) "key:13"
   7) "key:6"
   8) "key:9"
   9) "key:11"
上面的例子中,第一次迭代用 0 作为游标,表示开始第一次迭代。
第二次迭代使用第一次迭代时返回的游标,即:17。
从示例可以看出,SCAN 命令的返回是一个两个元素的数组,第一个元素是新游标,第二个元素也是一个数组,包含有所被包含的元素。
第二次调用 SCAN 命令时,返回游标 0,这表示迭代已经结束了,整个数据集(collection)已经被完整遍历过一遍了。
这个过程被称为一次完整遍历(full iteration)。

精简一下内容,补充三点:
  1. 因为 SCAN 命令仅仅使用游标来记录迭代状态,所以在迭代过程中,如果这个数据集的元素有增减,如果是减,不保证元素不返回;如果是增,也不保证一定返回;而且在某种情况下同一个元素还可能被返回多次。所以对迭代返回的元素所执行的操作最好可以重复执行多次(幂等)。
  2. 增量迭代命令不保证每次迭代所返回的元素数量(没扫到嘛),但是我们可以使用 COUNT 选项对命令的行为进行一定程度的调整。COUNT 参数的默认值为 10,在迭代一个足够大的、由哈希表实现的数据库、集合键、哈希键或者有序集合键时,如果用户没有使用 MATCH 选项,那么命令返回的数量通常和 COUNT 选项指定的一样,或者多一些(😓),在迭代编码为整数集合(intset:一个由整数值构成的小集合)或编码为压缩列表(ziplist:由不同值构成的一个小哈希或者一个小有序集合)时,会无视 COUNT 选项指定的值,在第一次迭代就将数据集的所有元素都返回给用户。
  3. MATCH 选项,直接看示例吧,如下
示例:
redis 127.0.0.1:6379> sadd myset 1 2 3 foo foobar feelsgood
(integer) 6

redis 127.0.0.1:6379> sscan myset 0 match f*
1) "0"
2) 1) "foo"
   2) "feelsgood"
   3) "foobar"
注意:对元素的模式匹配工作是在命令从数据集中取出元素之后,向客户端返回元素之前进行的,所以有可能返回空
示例:
redis 127.0.0.1:6379> scan 0 MATCH *11*
1) "288"
2) 1) "key:911"

redis 127.0.0.1:6379> scan 288 MATCH *11*
1) "224"
2) (empty list or set)

redis 127.0.0.1:6379> scan 224 MATCH *11*
1) "80"
2) (empty list or set)

redis 127.0.0.1:6379> scan 80 MATCH *11*
1) "176"
2) (empty list or set)

redis 127.0.0.1:6379> scan 176 MATCH *11* COUNT 1000
1) "0"
2)  1) "key:611"
    2) "key:711"
    3) "key:118"
    4) "key:117"
    5) "key:311"
    6) "key:112"
    7) "key:111"
    8) "key:110"
    9) "key:113"
   10) "key:211"
   11) "key:411"
   12) "key:115"
   13) "key:116"
   14) "key:114"
   15) "key:119"
   16) "key:811"
   17) "key:511"
   18) "key:11"
注意:最后一次迭代,通过 COUNT 选项指定为 1000 强制命令为本次迭代扫描更多元素,从而使返回的元素也变多了。

DEL 命令

这个比较简单,删除给定的一个或者多个 key
redis> SET name "redis"
OK
redis> SET type "key-value store"
OK
redis> SET website "redis.com"
OK
redis> DEL name type website
(integer) 3

使用 Python SCAN

安装 redis 包
pip install redis
完整代码示例:
import redis

pool=redis.ConnectionPool(host='redis_hostname', port=6379, max_connections=100)
r = redis.StrictRedis(connection_pool=pool)

cursor_number, keys = r.execute_command('scan', 0, "count", 200000)

while True:
    if cursor_number == 0:
        # 结束一次完整的比遍历
        break
    cursor_number, keys = r.execute_command('scan', cursor_number, "count", 200000)
    # do something with keys

我将需要删除的 key 存在一个文件里,有 2.2G,大概 4000W 个,下一步就是删除了

使用 Python DEL

因为文件很大,我们用到一个小技巧,分块读取
with open("/data/rediskeys") as kf:
    lines = kf.readlines(1024*1024)
调用 delete 方法时,用到一个小技巧就是『*』星号
r.delete(*taskkey_list)
我们看一下定义就清楚了:
delete method
放上完整代码:
import redis
import time

pool=redis.ConnectionPool(host='redis_hostname', port=6379, max_connections=100)
r = redis.StrictRedis(connection_pool=pool)

start_time = time.time()
SUCCESS_DELETED = 0

with open("/data/rediskeys") as kf:
    while True:
        lines = kf.readlines(1024*1024)
        if not lines:
            break
        else:
            taskkey_list = [i.strip() for i in lines if i.startswith("UCS:TASKKEY")]
            SUCCESS_DELETED += r.delete(*taskkey_list)

        print SUCCESS_DELETED

end_time = time.time()
print end_time - start_time, SUCCESS_DELETED

成果展示

结束,下篇再见



今天看啥 - 高品质阅读平台
本文地址:http://www.jintiankansha.me/t/1jmbXQWYOm
Python社区是高质量的Python/Django开发社区
本文地址:http://www.python88.com/topic/5273
 
13 次点击  
分享到微博
分享
社区所有版块导航
Python
python开源   Django   Python   DjangoApp   pycharm  
DATA
docker   Elasticsearch  
WEB开发
linux   NoSql   DATABASE   Jquery   Bootstrap   NGINX   js   其他Web框架   Git   peewee   web工具   bottle   zookeeper   IE   MongoDB   MQ   tornado   Redis  
机器学习
机器学习算法  
Python88.com
公告   社区推广   反馈