社区所有版块导航
Python
python开源   Django   Python   DjangoApp   pycharm  
DATA
docker   Elasticsearch  
aigc
aigc   chatgpt  
WEB开发
linux   MongoDB   Redis   DATABASE   NGINX   其他Web框架   web工具   zookeeper   tornado   NoSql   Bootstrap   js   peewee   Git   bottle   IE   MQ   Jquery  
机器学习
机器学习算法  
Python88.com
反馈   公告   社区推广  
产品
短视频  
印度
印度  
Py学习  »  Redis

python redis和celeriy客户端太多,每次执行时出现不同的错误任务使用pymsql连接到mysql

Roman • 4 年前 • 1260 次点击  

我目前正在开发一个应用程序,它必须处理几个长时间运行的任务。 我正在使用 python 3 , flask , celery , redis .

我在本地主机上有一个工作解决方案,但是在heroku上有很多错误,每次执行应用程序都会触发一组不同的错误。我知道这不可能是随机的,所以我想知道从哪里开始寻找。

我觉得redis一定是出了问题,我试图了解什么是客户,他们来自哪里,但我找不到关于这个话题的官方文档或解释。

问题:

如果redis服务器启动了(即使是在本地主机上),那么许多客户端都已连接,即使我什么也没做。关于Heroku(我正在使用 heroku-redis )我总是有6个客户机,在本地主机上有11个客户机。

我已经做了一些研究,并且我能够展示它们:

if 'DYNO' in os.environ:
    redis_db = redis.StrictRedis(host='HOST', port=15249, password='REDISDBPW')
else:
    redis_db = redis.StrictRedis()

# see what keys are in Redis
all_keys = redis_db.keys()
print (all_keys)

all_clients = redis_db.client_list()
print (all_clients)

我看到了所有这些客户,但那里的信息对我毫无帮助。它们是什么?他们为什么在那里?他们从哪里来?

所有的heroku redis插件都有一个客户端限制,所以我需要理解并优化这个限制。首先我想 clientsnumber == tasknumber ,但事实并非如此。

总共我定义了12个任务,但现在我正在测试2个任务(都在30秒内完成)。

当我在本地主机上执行任务时,客户机从11增加到16。如果我从16岁到18岁再执行一次,并且在这之后,他们总是停留在18岁,这与我执行任务的频率无关。

这是怎么回事?我有两个任务,为什么客户从11个增加到16个,然后从16个增加到18个?为什么任务完成后它们还没有关闭?

我现在正在为整个问题奋斗几天(尽管它在本地主机上总是工作得很好),所以欢迎任何帮助或想法。我需要找个地方,所以目前我正在努力了解客户。

编辑:

我安装了flower并尝试在本地主机上监视这两个任务,一切看起来都很好。它处理两个任务,两个任务都在几秒钟内成功。返回值是正确的(但它在localhost上总是工作得很好)。

但问题是,在我开始花客户数量跳到30。我还是不知道: 什么是客户 ?根据我生成的客户端数量,我需要一个100美元的插件来处理两个任务,这需要几秒钟来完成,这不可能是真的,我仍然认为redis有问题,即使是在本地主机上。

我的redis设置非常简单:

if 'DYNO' in os.environ:
    app.config['CELERY_BROKER_URL'] = 'redis://[the full URL from the redis add-on]'
    app.config['CELERY_RESULT_BACKEND'] = 'redis://[the full URL from the redis add-on]'
else:
    app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
    app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost'

celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'], backend=app.config['CELERY_RESULT_BACKEND'])

下面是一个任务示例:

@celery.task(bind=True)
def get_users_deregistrations_task(self, g_start_date, g_end_date):

    start_date = datetime.strptime(g_start_date, '%d-%m-%Y')
    end_date = datetime.strptime(g_end_date, '%d-%m-%Y')

    a1 = db_session.query(func.sum(UsersTransactionsVK.amount)).filter(UsersTransactionsVK.date_added >= start_date, UsersTransactionsVK.date_added <= end_date, UsersTransactionsVK.payed == 'Yes').scalar()
    a2 = db_session.query(func.sum(UsersTransactionsStripe.amount)).filter(UsersTransactionsStripe.date_added >= start_date, UsersTransactionsStripe.date_added <= end_date, UsersTransactionsStripe.payed == 'Yes').scalar()
    a3 = db_session.query(func.sum(UsersTransactions.amount)).filter(UsersTransactions.date_added >= start_date, UsersTransactions.date_added <= end_date, UsersTransactions.on_hold == 'No').scalar()

    if a1 is None:
        a1 = 0

    if a2 is None:
        a2 = 0

    if a3 is None:
        a3 = 0

    amount = a1 + a2 + a3

    return {'some_value' : amount}

# Selects user deregistrations between selected dates
@app.route('/get-users-deregistration', methods=["POST"])
@basic_auth.required
@check_verified
def get_users_deregistrations():
    if request.method == "POST":

        # init task
        task = get_users_deregistrations_task.apply_async([session['g_start_date'], session['g_end_date']])
        return json.dumps({}), 202, {'Location': url_for('taskstatus_get_users_deregistrations', task_id=task.id)}

@app.route('/status/<task_id>')
def taskstatus_get_users_deregistrations(task_id):
    task = get_users_deregistrations_task.AsyncResult(task_id)
    if task.state == 'PENDING':
        response = {
            'state': task.state,
            'current': 0,
            'total': 1,
            'status': 'Pending...'
        }
    elif task.state != 'FAILURE':
        response = {
            'state': task.state,
            'current': task.info['current'],
            'total': task.info['total'],
            'status': 'Finished',
            'statistic': task.info['statistic'],
            'final_dataset': task.info     
        }
        if 'result' in task.info:
            response['result'] = task.info['result']
    else:
        print ('in else')
        # something went wrong in the background job
        response = {
            'state': task.state,
            'current': 1,
            'total': 1,
            'status': str(task.info),  # this is the exception raised
        }
    return json.dumps(response)

编辑:

这是我为Heroku准备的程序文件:

web: gunicorn stats_main:app
worker: celery worker -A stats_main.celery --loglevel=info

编辑

我认为问题可能是连接池(在redis端),我没有正确使用它。

我还发现了芹菜的一些配置,并添加了它们:

celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'], backend=app.config['CELERY_RESULT_BACKEND'], redis_max_connections=20, BROKER_TRANSPORT_OPTIONS = {
    'max_connections': 20,
}, broker_pool_limit=None)

我用这些配置将所有内容再次上传到Heroku。我仍然只测试了两个任务,都很快。

我已经在Heroku上连续执行了10次任务,7次。有3次他们看起来完成得太早:返回的结果是错误的(正确的结果是f.e.30000,返回的结果是3次18000)。

客户机很快跳到20个,但从未超过20个,因此至少最大客户机错误和与redis错误的连接丢失得到了解决。

现在最大的问题是任务完成得太早,返回的结果是否正确非常重要,性能根本不重要。

编辑

不管怎样,什么都没有解决,一切都是随机的。 我加了两个 print() 在其中一个任务中进一步调试并上传到heroku。在两次执行之后,我再次看到到redis的连接丢失,达到了最大客户端数(尽管我的redismonitor插件显示客户端从未超过20个)

编辑

大量的客户端可能是由空闲的客户端造成的,这些客户端由于某种原因从未关闭(在 heroku ):

默认情况下,redis不会关闭空闲连接,这意味着 如果不显式关闭redis连接,将锁定 你自己的情况。

为了确保不会发生这种情况,heroku redis设置了一个默认连接 超时300秒。此超时不适用于 非发布/订阅客户端和其他阻止操作。

我现在为空闲客户添加了一个kill函数,就在我的每个任务之前:

def kill_idle_clients():
    if 'DYNO' in os.environ:
        redis_db = redis.StrictRedis(host='HOST', port=15249, password='REDISDBPW')
    else:
        redis_db = redis.StrictRedis()

    all_clients = redis_db.client_list()
    counter = 0
    for client in all_clients:
        if int(client['idle']) >= 15:
            redis_db.client_kill(client['addr'])
            counter += 1

    print ('killing idle clients:', counter)

在任务开始之前,它会关闭所有空闲时间超过15秒的客户端。它在localhost上再次工作(但毫不奇怪,它总是在localhost上工作)。我的客户少了,但在Heroku上,现在只有10个客户中的2个。8次任务又太早完成了。也许那些懒散的客户并不是真的懒散,我不知道。

它也几乎不可能进行测试,因为每个任务的执行都有不同的结果(失去与redis的连接、达到客户端限制、过早完成、工作正常)。

编辑

似乎芹菜的设置一直被忽略。我一直对此持怀疑态度,决定通过添加一些随机参数并将值更改为无意义来测试它。我重新启动了芹菜工人办公室。

我本想看到一些错误,但它的工作好像什么都没发生。

对于这些非感官配置,一切都像以前一样工作:

celery = Celery(app.name, broker=app.config['REDIS_URL'], backend=app.config['REDIS_URL'], redis_max_connections='pups', BROKER_TRANSPORT_OPTIONS = {
    'max_connections': 20,
}, broker_pool_limit=None, broker_connection_timeout='pups', pups="pups")
celery.conf.broker_transport_options = {'visibility_timeout': 'pups'}

编辑

我改变了加载芹菜配置的方式(从一个单独的配置文件)。现在看来是可行的,但问题仍然是一样的。

celery_task = Celery(broker=app.config['REDIS_URL'], backend=app.config['REDIS_URL'])
celery_task.config_from_object('celeryconfig')

编辑

通过这些配置,我设法将本地主机上所有任务的客户端数量限制在18个(我尝试了所有12个任务)。然而,在英雄身上,它“不知何故”起作用。客户少了,但一次达到了20个,尽管我认为我不能超过18个。(我在Heroku上测试了4个任务)。

在heroku上测试所有12个任务会触发许多不同的sql错误。我现在比以前更困惑了。似乎同一个任务被多次执行,但我只看到12个任务url。

我认为因为sql错误是f.e.:

sqlalchemy.exc.InternalError: (pymysql.err.InternalError) Packet sequence number wrong - got 117 expected 1

sqlalchemy.exc.InterfaceError: (pymysql.err.InterfaceError) (0, '')

Multiple rows were found for one()

我用4个任务在heroku上测试了几次,任务结果返回了很多次,但是结果非常奇怪。

这次任务没有提前完成,但是返回了增加的值,看起来任务a已经返回了值2次并对其进行了汇总。

示例:任务A必须返回10K,但它返回20K,因此该任务已执行两次,结果已汇总。

这是我目前的配置。我仍然不完全理解这道数学题,但我认为(就客户数量而言):

max-conncurency * CELERYD_MAX_TASKS_PER_CHILD

在localhost上,我找到了一个新的cli命令来检查工作机状态,并且 max-conncurecy=3 CELERYD_MAX_TASKS_PER_CHILD=6

CLI命令:

celery -A stats_main.celery_task inspect stats

我当前的配置:

工作启动:

celery worker -A stats_main.celery_task --loglevel=info --autoscale=10,3

配置:

CELERY_REDIS_MAX_CONNECTIONS=20
BROKER_POOL_LIMIT=None
CELERYD_WORKER_LOST_WAIT=20
CELERYD_MAX_TASKS_PER_CHILD=6
BROKER_TRANSPORT_OPTIONS = {'visibility_timeout': 18000} # 5 hours
CELERY_RESULT_DB_SHORT_LIVED_SESSIONS = True #useful if: For example, intermittent errors like (OperationalError) (2006, ‘MySQL server has gone away’)

编辑

现在看到所有这些sql错误,我决定研究一个完全不同的方向。我的新理论是 MySQL 问题。

我调整了与mysql服务器的连接,如 this question .

我还发现pymsql threadsafety=1 ,我还不知道这是否是一个问题,但似乎MySQL与连接和连接池有关。

目前,我还可以说内存不是问题,因为如果包太大,它不应该在本地主机上工作,这意味着我离开了 max_allowed_packet 默认值为4MB左右。

我还创建了3个虚拟任务,在不连接外部mysql数据库的情况下进行一些简单的计算。我已经在heroku上执行了5次,没有错误,结果总是正确的,所以我假设问题不是芹菜、redis,而是mysql,尽管我不知道它为什么会在本地主机上工作。也许这是三者的结合,导致了Heroku的问题。

编辑

我调整了js文件。现在每个任务都被一个接一个地调用,这意味着它们不是异步的(我仍然使用芹菜的 apply_async 因为 apply 不起作用)

所以这是一个艰难的解决办法。我只是创造了一个 var 对于每个任务,f.e. var task_1_rdy = false;

我还创建了一个函数,它每2秒运行一次,检查一个任务是否就绪,如果就绪,它将启动下一个任务。我想很容易理解我在这里做了什么。

在heroku上测试了这个,没有任何错误,即使有多个任务,所以这个问题可能已经解决了。我需要做更多的检查,但看起来很有希望。Ofc。我没有使用异步功能,运行一个接一个的任务可能会有最差的性能,但嘿,它现在工作了。我将对绩效差异进行基准测试,并在周一更新问题。

编辑

我今天做了很多测试。任务完成所需的时间是一样的(同步与异步),我不知道为什么,但它是一样的。

在Heroku上处理所有12个任务并选择一个大的时间范围(大的时间范围=任务需要更长的时间,因为要处理的数据更多):

任务结果再次不精确,返回的值是错误的,只是稍微有点错误,但是错误,因此不可靠,f.e.任务a必须返回20k,在heroku上返回19500。我不知道数据丢失/任务返回太早的可能性,但两周后我将放弃并尝试使用完全不同的系统。

Python社区是高质量的Python/Django开发社区
本文地址:http://www.python88.com/topic/41244
 
1260 次点击  
文章 [ 2 ]  |  最新文章 4 年前
will-hart 陈海栋
Reply   •   1 楼
will-hart 陈海栋    5 年前

现在我有60%的把握是你的任务太长,服务器无法在默认的web请求返回时间内响应。70%/30%的方法适用于网络速度非常快的本地计算机。在云平台上,延迟是个问题,有时它会影响您的程序。在那之前,如果 celery worker 失败,它将自动创建另一个工人来完成未完成的工作,因为 gunicon celery ,导致连接增加。

所以解决办法是:

  • 选项1使任务更快完成

  • 选项2首先返回一个确认,在后台计算,然后进行另一个api调用以发回结果

陈海栋
Reply   •   2 楼
陈海栋    5 年前

听起来像是使用芹菜工人redis作为消息队列的rest api。 以下是CHK列表:

1在您的客户机中,您是否在逻辑完成后关闭了连接?

2芹菜会新工人,工人可能会惹麻烦,试试用花监测芹菜

3确保你的客户机完成任务,试着用打印的东西调试,有时登台和本地有网络问题阻止你结束芹菜任务。

4如果你用redis做芹菜味精队列,试着监控队列的数量,也许它们会自动放大?