我目前正在开发一个应用程序,它必须处理几个长时间运行的任务。
我正在使用
python 3
,
flask
,
celery
,
redis
.
我在本地主机上有一个工作解决方案,但是在heroku上有很多错误,每次执行应用程序都会触发一组不同的错误。我知道这不可能是随机的,所以我想知道从哪里开始寻找。
我觉得redis一定是出了问题,我试图了解什么是客户,他们来自哪里,但我找不到关于这个话题的官方文档或解释。
问题:
如果redis服务器启动了(即使是在本地主机上),那么许多客户端都已连接,即使我什么也没做。关于Heroku(我正在使用
heroku-redis
)我总是有6个客户机,在本地主机上有11个客户机。
我已经做了一些研究,并且我能够展示它们:
if 'DYNO' in os.environ:
redis_db = redis.StrictRedis(host='HOST', port=15249, password='REDISDBPW')
else:
redis_db = redis.StrictRedis()
# see what keys are in Redis
all_keys = redis_db.keys()
print (all_keys)
all_clients = redis_db.client_list()
print (all_clients)
我看到了所有这些客户,但那里的信息对我毫无帮助。它们是什么?他们为什么在那里?他们从哪里来?
所有的heroku redis插件都有一个客户端限制,所以我需要理解并优化这个限制。首先我想
clientsnumber == tasknumber
,但事实并非如此。
总共我定义了12个任务,但现在我正在测试2个任务(都在30秒内完成)。
当我在本地主机上执行任务时,客户机从11增加到16。如果我从16岁到18岁再执行一次,并且在这之后,他们总是停留在18岁,这与我执行任务的频率无关。
这是怎么回事?我有两个任务,为什么客户从11个增加到16个,然后从16个增加到18个?为什么任务完成后它们还没有关闭?
我现在正在为整个问题奋斗几天(尽管它在本地主机上总是工作得很好),所以欢迎任何帮助或想法。我需要找个地方,所以目前我正在努力了解客户。
编辑:
我安装了flower并尝试在本地主机上监视这两个任务,一切看起来都很好。它处理两个任务,两个任务都在几秒钟内成功。返回值是正确的(但它在localhost上总是工作得很好)。
但问题是,在我开始花客户数量跳到30。我还是不知道:
什么是客户
?根据我生成的客户端数量,我需要一个100美元的插件来处理两个任务,这需要几秒钟来完成,这不可能是真的,我仍然认为redis有问题,即使是在本地主机上。
我的redis设置非常简单:
if 'DYNO' in os.environ:
app.config['CELERY_BROKER_URL'] = 'redis://[the full URL from the redis add-on]'
app.config['CELERY_RESULT_BACKEND'] = 'redis://[the full URL from the redis add-on]'
else:
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost'
celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'], backend=app.config['CELERY_RESULT_BACKEND'])
下面是一个任务示例:
@celery.task(bind=True)
def get_users_deregistrations_task(self, g_start_date, g_end_date):
start_date = datetime.strptime(g_start_date, '%d-%m-%Y')
end_date = datetime.strptime(g_end_date, '%d-%m-%Y')
a1 = db_session.query(func.sum(UsersTransactionsVK.amount)).filter(UsersTransactionsVK.date_added >= start_date, UsersTransactionsVK.date_added <= end_date, UsersTransactionsVK.payed == 'Yes').scalar()
a2 = db_session.query(func.sum(UsersTransactionsStripe.amount)).filter(UsersTransactionsStripe.date_added >= start_date, UsersTransactionsStripe.date_added <= end_date, UsersTransactionsStripe.payed == 'Yes').scalar()
a3 = db_session.query(func.sum(UsersTransactions.amount)).filter(UsersTransactions.date_added >= start_date, UsersTransactions.date_added <= end_date, UsersTransactions.on_hold == 'No').scalar()
if a1 is None:
a1 = 0
if a2 is None:
a2 = 0
if a3 is None:
a3 = 0
amount = a1 + a2 + a3
return {'some_value' : amount}
# Selects user deregistrations between selected dates
@app.route('/get-users-deregistration', methods=["POST"])
@basic_auth.required
@check_verified
def get_users_deregistrations():
if request.method == "POST":
# init task
task = get_users_deregistrations_task.apply_async([session['g_start_date'], session['g_end_date']])
return json.dumps({}), 202, {'Location': url_for('taskstatus_get_users_deregistrations', task_id=task.id)}
@app.route('/status/<task_id>')
def taskstatus_get_users_deregistrations(task_id):
task = get_users_deregistrations_task.AsyncResult(task_id)
if task.state == 'PENDING':
response = {
'state': task.state,
'current': 0,
'total': 1,
'status': 'Pending...'
}
elif task.state != 'FAILURE':
response = {
'state': task.state,
'current': task.info['current'],
'total': task.info['total'],
'status': 'Finished',
'statistic': task.info['statistic'],
'final_dataset': task.info
}
if 'result' in task.info:
response['result'] = task.info['result']
else:
print ('in else')
# something went wrong in the background job
response = {
'state': task.state,
'current': 1,
'total': 1,
'status': str(task.info), # this is the exception raised
}
return json.dumps(response)
编辑:
这是我为Heroku准备的程序文件:
web: gunicorn stats_main:app
worker: celery worker -A stats_main.celery --loglevel=info
编辑
我认为问题可能是连接池(在redis端),我没有正确使用它。
我还发现了芹菜的一些配置,并添加了它们:
celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'], backend=app.config['CELERY_RESULT_BACKEND'], redis_max_connections=20, BROKER_TRANSPORT_OPTIONS = {
'max_connections': 20,
}, broker_pool_limit=None)
我用这些配置将所有内容再次上传到Heroku。我仍然只测试了两个任务,都很快。
我已经在Heroku上连续执行了10次任务,7次。有3次他们看起来完成得太早:返回的结果是错误的(正确的结果是f.e.30000,返回的结果是3次18000)。
客户机很快跳到20个,但从未超过20个,因此至少最大客户机错误和与redis错误的连接丢失得到了解决。
现在最大的问题是任务完成得太早,返回的结果是否正确非常重要,性能根本不重要。
编辑
不管怎样,什么都没有解决,一切都是随机的。
我加了两个
print()
在其中一个任务中进一步调试并上传到heroku。在两次执行之后,我再次看到到redis的连接丢失,达到了最大客户端数(尽管我的redismonitor插件显示客户端从未超过20个)
编辑
大量的客户端可能是由空闲的客户端造成的,这些客户端由于某种原因从未关闭(在
heroku
):
默认情况下,redis不会关闭空闲连接,这意味着
如果不显式关闭redis连接,将锁定
你自己的情况。
为了确保不会发生这种情况,heroku redis设置了一个默认连接
超时300秒。此超时不适用于
非发布/订阅客户端和其他阻止操作。
我现在为空闲客户添加了一个kill函数,就在我的每个任务之前:
def kill_idle_clients():
if 'DYNO' in os.environ:
redis_db = redis.StrictRedis(host='HOST', port=15249, password='REDISDBPW')
else:
redis_db = redis.StrictRedis()
all_clients = redis_db.client_list()
counter = 0
for client in all_clients:
if int(client['idle']) >= 15:
redis_db.client_kill(client['addr'])
counter += 1
print ('killing idle clients:', counter)
在任务开始之前,它会关闭所有空闲时间超过15秒的客户端。它在localhost上再次工作(但毫不奇怪,它总是在localhost上工作)。我的客户少了,但在Heroku上,现在只有10个客户中的2个。8次任务又太早完成了。也许那些懒散的客户并不是真的懒散,我不知道。
它也几乎不可能进行测试,因为每个任务的执行都有不同的结果(失去与redis的连接、达到客户端限制、过早完成、工作正常)。
编辑
似乎芹菜的设置一直被忽略。我一直对此持怀疑态度,决定通过添加一些随机参数并将值更改为无意义来测试它。我重新启动了芹菜工人办公室。
我本想看到一些错误,但它的工作好像什么都没发生。
对于这些非感官配置,一切都像以前一样工作:
celery = Celery(app.name, broker=app.config['REDIS_URL'], backend=app.config['REDIS_URL'], redis_max_connections='pups', BROKER_TRANSPORT_OPTIONS = {
'max_connections': 20,
}, broker_pool_limit=None, broker_connection_timeout='pups', pups="pups")
celery.conf.broker_transport_options = {'visibility_timeout': 'pups'}
编辑
我改变了加载芹菜配置的方式(从一个单独的配置文件)。现在看来是可行的,但问题仍然是一样的。
celery_task = Celery(broker=app.config['REDIS_URL'], backend=app.config['REDIS_URL'])
celery_task.config_from_object('celeryconfig')
编辑
通过这些配置,我设法将本地主机上所有任务的客户端数量限制在18个(我尝试了所有12个任务)。然而,在英雄身上,它“不知何故”起作用。客户少了,但一次达到了20个,尽管我认为我不能超过18个。(我在Heroku上测试了4个任务)。
在heroku上测试所有12个任务会触发许多不同的sql错误。我现在比以前更困惑了。似乎同一个任务被多次执行,但我只看到12个任务url。
我认为因为sql错误是f.e.:
sqlalchemy.exc.InternalError: (pymysql.err.InternalError) Packet sequence number wrong - got 117 expected 1
或
sqlalchemy.exc.InterfaceError: (pymysql.err.InterfaceError) (0, '')
或
Multiple rows were found for one()
我用4个任务在heroku上测试了几次,任务结果返回了很多次,但是结果非常奇怪。
这次任务没有提前完成,但是返回了增加的值,看起来任务a已经返回了值2次并对其进行了汇总。
示例:任务A必须返回10K,但它返回20K,因此该任务已执行两次,结果已汇总。
这是我目前的配置。我仍然不完全理解这道数学题,但我认为(就客户数量而言):
max-conncurency * CELERYD_MAX_TASKS_PER_CHILD
在localhost上,我找到了一个新的cli命令来检查工作机状态,并且
max-conncurecy=3
和
CELERYD_MAX_TASKS_PER_CHILD=6
CLI命令:
celery -A stats_main.celery_task inspect stats
我当前的配置:
工作启动:
celery worker -A stats_main.celery_task --loglevel=info --autoscale=10,3
配置:
CELERY_REDIS_MAX_CONNECTIONS=20
BROKER_POOL_LIMIT=None
CELERYD_WORKER_LOST_WAIT=20
CELERYD_MAX_TASKS_PER_CHILD=6
BROKER_TRANSPORT_OPTIONS = {'visibility_timeout': 18000} # 5 hours
CELERY_RESULT_DB_SHORT_LIVED_SESSIONS = True #useful if: For example, intermittent errors like (OperationalError) (2006, âMySQL server has gone awayâ)
编辑
现在看到所有这些sql错误,我决定研究一个完全不同的方向。我的新理论是
MySQL
问题。
我调整了与mysql服务器的连接,如
this question
.
我还发现pymsql
threadsafety=1
,我还不知道这是否是一个问题,但似乎MySQL与连接和连接池有关。
目前,我还可以说内存不是问题,因为如果包太大,它不应该在本地主机上工作,这意味着我离开了
max_allowed_packet
默认值为4MB左右。
我还创建了3个虚拟任务,在不连接外部mysql数据库的情况下进行一些简单的计算。我已经在heroku上执行了5次,没有错误,结果总是正确的,所以我假设问题不是芹菜、redis,而是mysql,尽管我不知道它为什么会在本地主机上工作。也许这是三者的结合,导致了Heroku的问题。
编辑
我调整了js文件。现在每个任务都被一个接一个地调用,这意味着它们不是异步的(我仍然使用芹菜的
apply_async
因为
apply
不起作用)
所以这是一个艰难的解决办法。我只是创造了一个
var
对于每个任务,f.e.
var task_1_rdy = false;
我还创建了一个函数,它每2秒运行一次,检查一个任务是否就绪,如果就绪,它将启动下一个任务。我想很容易理解我在这里做了什么。
在heroku上测试了这个,没有任何错误,即使有多个任务,所以这个问题可能已经解决了。我需要做更多的检查,但看起来很有希望。Ofc。我没有使用异步功能,运行一个接一个的任务可能会有最差的性能,但嘿,它现在工作了。我将对绩效差异进行基准测试,并在周一更新问题。
编辑
我今天做了很多测试。任务完成所需的时间是一样的(同步与异步),我不知道为什么,但它是一样的。
在Heroku上处理所有12个任务并选择一个大的时间范围(大的时间范围=任务需要更长的时间,因为要处理的数据更多):
任务结果再次不精确,返回的值是错误的,只是稍微有点错误,但是错误,因此不可靠,f.e.任务a必须返回20k,在heroku上返回19500。我不知道数据丢失/任务返回太早的可能性,但两周后我将放弃并尝试使用完全不同的系统。