Py学习  »  Redis

python redis和celeriy客户端太多,每次执行时出现不同的错误任务使用pymsql连接到mysql

Roman • 5 年前 • 2123 次点击  

我目前正在开发一个应用程序,它必须处理几个长时间运行的任务。 我正在使用 python 3 , flask , celery , redis .

我在本地主机上有一个工作解决方案,但是在heroku上有很多错误,每次执行应用程序都会触发一组不同的错误。我知道这不可能是随机的,所以我想知道从哪里开始寻找。

我觉得redis一定是出了问题,我试图了解什么是客户,他们来自哪里,但我找不到关于这个话题的官方文档或解释。

问题:

如果redis服务器启动了(即使是在本地主机上),那么许多客户端都已连接,即使我什么也没做。关于Heroku(我正在使用 heroku-redis )我总是有6个客户机,在本地主机上有11个客户机。

我已经做了一些研究,并且我能够展示它们:

if 'DYNO' in os.environ:
    redis_db = redis.StrictRedis(host='HOST', port=15249, password='REDISDBPW')
else:
    redis_db = redis.StrictRedis()

# see what keys are in Redis
all_keys = redis_db.keys()
print (all_keys)

all_clients = redis_db.client_list()
print (all_clients)

我看到了所有这些客户,但那里的信息对我毫无帮助。它们是什么?他们为什么在那里?他们从哪里来?

所有的heroku redis插件都有一个客户端限制,所以我需要理解并优化这个限制。首先我想 clientsnumber == tasknumber ,但事实并非如此。

总共我定义了12个任务,但现在我正在测试2个任务(都在30秒内完成)。

当我在本地主机上执行任务时,客户机从11增加到16。如果我从16岁到18岁再执行一次,并且在这之后,他们总是停留在18岁,这与我执行任务的频率无关。

这是怎么回事?我有两个任务,为什么客户从11个增加到16个,然后从16个增加到18个?为什么任务完成后它们还没有关闭?

我现在正在为整个问题奋斗几天(尽管它在本地主机上总是工作得很好),所以欢迎任何帮助或想法。我需要找个地方,所以目前我正在努力了解客户。

编辑:

我安装了flower并尝试在本地主机上监视这两个任务,一切看起来都很好。它处理两个任务,两个任务都在几秒钟内成功。返回值是正确的(但它在localhost上总是工作得很好)。

但问题是,在我开始花客户数量跳到30。我还是不知道: 什么是客户 ?根据我生成的客户端数量,我需要一个100美元的插件来处理两个任务,这需要几秒钟来完成,这不可能是真的,我仍然认为redis有问题,即使是在本地主机上。

我的redis设置非常简单:

if 'DYNO' in os.environ:
    app.config['CELERY_BROKER_URL'] = 'redis://[the full URL from the redis add-on]'
    app.config['CELERY_RESULT_BACKEND'] = 'redis://[the full URL from the redis add-on]'
else:
    app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
    app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost'

celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'], backend=app.config['CELERY_RESULT_BACKEND'])

下面是一个任务示例:

@celery.task(bind=True)
def get_users_deregistrations_task(self, g_start_date, g_end_date):

    start_date = datetime.strptime(g_start_date, '%d-%m-%Y')
    end_date = datetime.strptime(g_end_date, '%d-%m-%Y')

    a1 = db_session.query(func.sum(UsersTransactionsVK.amount)).filter(UsersTransactionsVK.date_added >= start_date, UsersTransactionsVK.date_added <= end_date, UsersTransactionsVK.payed == 'Yes').scalar()
    a2 = db_session.query(func.sum(UsersTransactionsStripe.amount)).filter(UsersTransactionsStripe.date_added >= start_date, UsersTransactionsStripe.date_added <= end_date, UsersTransactionsStripe.payed == 'Yes').scalar()
    a3 = db_session.query(func.sum(UsersTransactions.amount)).filter(UsersTransactions.date_added >= start_date, UsersTransactions.date_added <= end_date, UsersTransactions.on_hold == 'No').scalar()

    if a1 is None:
        a1 = 0

    if a2 is None:
        a2 = 0

    if a3 is None:
        a3 = 0

    amount = a1 + a2 + a3

    return {'some_value' : amount}

# Selects user deregistrations between selected dates
@app.route('/get-users-deregistration', methods=["POST"])
@basic_auth.required
@check_verified
def get_users_deregistrations():
    if request.method == "POST":

        # init task
        task = get_users_deregistrations_task.apply_async([session['g_start_date'], session['g_end_date']])
        return json.dumps({}), 202, {'Location': url_for('taskstatus_get_users_deregistrations', task_id=task.id)}

@app.route('/status/<task_id>')
def taskstatus_get_users_deregistrations(task_id):
    task = get_users_deregistrations_task.AsyncResult(task_id)
    if task.state == 'PENDING':
        response = {
            'state': task.state,
            'current': 0,
            'total': 1,
            'status': 'Pending...'
        }
    elif task.state != 'FAILURE':
        response = {
            'state': task.state,
            'current': task.info['current'],
            'total': task.info['total'],
            'status': 'Finished',
            'statistic': task.info['statistic'],
            'final_dataset': task.info     
        }
        if 'result' in task.info:
            response['result'] = task.info['result']
    else:
        print ('in else')
        # something went wrong in the background job
        response = {
            'state': task.state,
            'current': 1,
            'total': 1,
            'status': str(task.info),  # this is the exception raised
        }
    return json.dumps(response)

编辑:

这是我为Heroku准备的程序文件:

web: gunicorn stats_main:app
worker: celery worker -A stats_main.celery --loglevel=info

编辑

我认为问题可能是连接池(在redis端),我没有正确使用它。

我还发现了芹菜的一些配置,并添加了它们:

celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'], backend=app.config['CELERY_RESULT_BACKEND'], redis_max_connections=20, BROKER_TRANSPORT_OPTIONS = {
    'max_connections': 20,
}, broker_pool_limit=None)

我用这些配置将所有内容再次上传到Heroku。我仍然只测试了两个任务,都很快。

我已经在Heroku上连续执行了10次任务,7次。有3次他们看起来完成得太早:返回的结果是错误的(正确的结果是f.e.30000,返回的结果是3次18000)。

客户机很快跳到20个,但从未超过20个,因此至少最大客户机错误和与redis错误的连接丢失得到了解决。

现在最大的问题是任务完成得太早,返回的结果是否正确非常重要,性能根本不重要。

编辑

不管怎样,什么都没有解决,一切都是随机的。 我加了两个 print() 在其中一个任务中进一步调试并上传到heroku。在两次执行之后,我再次看到到redis的连接丢失,达到了最大客户端数(尽管我的redismonitor插件显示客户端从未超过20个)

编辑

大量的客户端可能是由空闲的客户端造成的,这些客户端由于某种原因从未关闭(在 heroku ):

默认情况下,redis不会关闭空闲连接,这意味着 如果不显式关闭redis连接,将锁定 你自己的情况。

为了确保不会发生这种情况,heroku redis设置了一个默认连接 超时300秒。此超时不适用于 非发布/订阅客户端和其他阻止操作。

我现在为空闲客户添加了一个kill函数,就在我的每个任务之前:

def kill_idle_clients():
    if 'DYNO' in os.environ:
        redis_db = redis.StrictRedis(host='HOST', port=15249, password='REDISDBPW')
    else:
        redis_db = redis.StrictRedis()

    all_clients = redis_db.client_list()
    counter = 0
    for client in all_clients:
        if int(client['idle']) >= 15:
            redis_db.client_kill(client['addr'])
            counter += 1

    print ('killing idle clients:', counter)

在任务开始之前,它会关闭所有空闲时间超过15秒的客户端。它在localhost上再次工作(但毫不奇怪,它总是在localhost上工作)。我的客户少了,但在Heroku上,现在只有10个客户中的2个。8次任务又太早完成了。也许那些懒散的客户并不是真的懒散,我不知道。

它也几乎不可能进行测试,因为每个任务的执行都有不同的结果(失去与redis的连接、达到客户端限制、过早完成、工作正常)。

编辑

似乎芹菜的设置一直被忽略。我一直对此持怀疑态度,决定通过添加一些随机参数并将值更改为无意义来测试它。我重新启动了芹菜工人办公室。

我本想看到一些错误,但它的工作好像什么都没发生。

对于这些非感官配置,一切都像以前一样工作:

celery = Celery(app.name, broker=app.config['REDIS_URL'], backend=app.config['REDIS_URL'], redis_max_connections='pups', BROKER_TRANSPORT_OPTIONS = {
    'max_connections': 20,
}, broker_pool_limit=None, broker_connection_timeout='pups', pups="pups")
celery.conf.broker_transport_options = {'visibility_timeout': 'pups'}

编辑

我改变了加载芹菜配置的方式(从一个单独的配置文件)。现在看来是可行的,但问题仍然是一样的。

celery_task = Celery(broker=app.config['REDIS_URL'], backend=app.config['REDIS_URL'])
celery_task.config_from_object('celeryconfig')

编辑

通过这些配置,我设法将本地主机上所有任务的客户端数量限制在18个(我尝试了所有12个任务)。然而,在英雄身上,它“不知何故”起作用。客户少了,但一次达到了20个,尽管我认为我不能超过18个。(我在Heroku上测试了4个任务)。

在heroku上测试所有12个任务会触发许多不同的sql错误。我现在比以前更困惑了。似乎同一个任务被多次执行,但我只看到12个任务url。

我认为因为sql错误是f.e.:

sqlalchemy.exc.InternalError: (pymysql.err.InternalError) Packet sequence number wrong - got 117 expected 1

sqlalchemy.exc.InterfaceError: (pymysql.err.InterfaceError) (0, '')

Multiple rows were found for one()

我用4个任务在heroku上测试了几次,任务结果返回了很多次,但是结果非常奇怪。

这次任务没有提前完成,但是返回了增加的值,看起来任务a已经返回了值2次并对其进行了汇总。

示例:任务A必须返回10K,但它返回20K,因此该任务已执行两次,结果已汇总。

这是我目前的配置。我仍然不完全理解这道数学题,但我认为(就客户数量而言):

max-conncurency * CELERYD_MAX_TASKS_PER_CHILD

在localhost上,我找到了一个新的cli命令来检查工作机状态,并且 max-conncurecy=3 CELERYD_MAX_TASKS_PER_CHILD=6

CLI命令:

celery -A stats_main.celery_task inspect stats

我当前的配置:

工作启动:

celery worker -A stats_main.celery_task --loglevel=info --autoscale=10,3

配置:

CELERY_REDIS_MAX_CONNECTIONS=20
BROKER_POOL_LIMIT=None
CELERYD_WORKER_LOST_WAIT=20
CELERYD_MAX_TASKS_PER_CHILD=6
BROKER_TRANSPORT_OPTIONS = {'visibility_timeout': 18000} # 5 hours
CELERY_RESULT_DB_SHORT_LIVED_SESSIONS = True #useful if: For example, intermittent errors like (OperationalError) (2006, ‘MySQL server has gone away’)

编辑

现在看到所有这些sql错误,我决定研究一个完全不同的方向。我的新理论是 MySQL 问题。

我调整了与mysql服务器的连接,如 this question .

我还发现pymsql threadsafety=1 ,我还不知道这是否是一个问题,但似乎MySQL与连接和连接池有关。

目前,我还可以说内存不是问题,因为如果包太大,它不应该在本地主机上工作,这意味着我离开了 max_allowed_packet 默认值为4MB左右。

我还创建了3个虚拟任务,在不连接外部mysql数据库的情况下进行一些简单的计算。我已经在heroku上执行了5次,没有错误,结果总是正确的,所以我假设问题不是芹菜、redis,而是mysql,尽管我不知道它为什么会在本地主机上工作。也许这是三者的结合,导致了Heroku的问题。

编辑

我调整了js文件。现在每个任务都被一个接一个地调用,这意味着它们不是异步的(我仍然使用芹菜的 apply_async 因为 apply 不起作用)

所以这是一个艰难的解决办法。我只是创造了一个 var 对于每个任务,f.e. var task_1_rdy = false;

我还创建了一个函数,它每2秒运行一次,检查一个任务是否就绪,如果就绪,它将启动下一个任务。我想很容易理解我在这里做了什么。

在heroku上测试了这个,没有任何错误,即使有多个任务,所以这个问题可能已经解决了。我需要做更多的检查,但看起来很有希望。Ofc。我没有使用异步功能,运行一个接一个的任务可能会有最差的性能,但嘿,它现在工作了。我将对绩效差异进行基准测试,并在周一更新问题。

编辑

我今天做了很多测试。任务完成所需的时间是一样的(同步与异步),我不知道为什么,但它是一样的。

在Heroku上处理所有12个任务并选择一个大的时间范围(大的时间范围=任务需要更长的时间,因为要处理的数据更多):

任务结果再次不精确,返回的值是错误的,只是稍微有点错误,但是错误,因此不可靠,f.e.任务a必须返回20k,在heroku上返回19500。我不知道数据丢失/任务返回太早的可能性,但两周后我将放弃并尝试使用完全不同的系统。

Python社区是高质量的Python/Django开发社区
本文地址:http://www.python88.com/topic/41244
 
2123 次点击  
文章 [ 2 ]  |  最新文章 5 年前