Py学习  »  Python

Python SQLalchemy-我可以在函数之间传递连接对象吗?

Phil • 3 年前 • 1322 次点击  

我有一个python应用程序,它正在从mysql/mariadb读取数据,用它从api获取数据,然后将结果插入另一个表中。

我设置了一个模块,带有连接数据库和 返回传递给其他函数/模块的连接对象 .然而,我认为这可能不是一个正确的方法。这个想法是要有一个小模块,只要我需要连接到数据库,我就可以随时调用它。

还要注意的是,我在循环期间(以及在传递给db_更新模块的循环内)和调用期间使用了相同的连接对象 close() 当一切都结束了。

我有时也会收到db的一些警告,这些警告大多发生在我打电话的地方 db_conn.close() ,因此我猜我没有正确处理连接或会话/引擎。此外,日志中的连接id警告不断增加,这是另一个提示,表明我做错了。

[Warning] Aborted connection 351 to db: 'some_db' user: 'some_user' host: '172.28.0.3' (Got an error reading communication packets)

这里有一些 伪代码 这代表了我目前的结构:


################
## db_connect.py
################
# imports ...
from sqlalchemy import create_engine
def db_connect():
    # get env ...
    db_string = f"mysql+pymysql://{db_user}:{db_pass}@{db_host}:{db_port}/{db_name}"
    try:
        engine = create_engine(db_string)
    except Exception as e:
        return None
    
    db_conn = engine.connect()
    return db_conn


################
## db_update.py
################
# imports ...
def db_insert(db_conn, api_result):
    # ...
    ins_qry = "INSERT INTO target_table (attr_a, attr_b) VALUES (:a, :b);"
    ins_qry = text(ins_qry)
    ins_qry = ins_qry.bindparams(a = value_a, b = value_b)

    try:
        db_conn.execute(ins_qry)
    except Exception as e:
        print(e)
        return None     
    return True


################
## main.py
################
from sqlalchemy import text
from db_connect import db_connect
from db_update import db_insert

def run():
    try:
        db_conn = db_connect()
        if not db_conn:
            return False
    except Exception as e:
        print(e)

    qry =  "SELECT *
            FROM some_table
            WHERE some_attr IN (:some_value);"
    qry = text(qry)
    search_run_qry = qry.bindparams(
            some_value  = 'abc'
    )
    result_list = db_conn.execute(qry).fetchall()

    for result_item in result_list:
        ## do stuff like fetching data from api for every record in the query result
        api_result = get_api_data(...)
        ## insert into db:
        db_ins_status = db_insert(db_conn, api_result)
        ## ...
    
    db_conn.close

run()

编辑: 另一个问题: a) 在循环中,每次迭代都会更新以使用同一个连接,这样可以吗?还是更明智的做法是将引擎传递给 run() 函数和调用 db_conn = engine.connect() db_conn.close() 每次更新前后?

b) 我正在考虑使用 ThreadPoolExecutor 而不是API调用的循环。这是否会对如何使用连接产生影响,也就是说,对于正在对同一个表进行更新的多个线程,我是否可以使用同一个连接?

注意:我没有使用ORM功能,主要是因为我有很强的DWH/SQL背景(虽然没有DBA那么好),而且我习惯于编写甚至复杂的SQL查询。出于这个原因,我正在考虑改用PyMySQL连接器。

提前谢谢!

Python社区是高质量的Python/Django开发社区
本文地址:http://www.python88.com/topic/131568
 
1322 次点击  
文章 [ 1 ]  |  最新文章 3 年前