社区所有版块导航
Python
python开源   Django   Python   DjangoApp   pycharm  
DATA
docker   Elasticsearch  
aigc
aigc   chatgpt  
WEB开发
linux   MongoDB   Redis   DATABASE   NGINX   其他Web框架   web工具   zookeeper   tornado   NoSql   Bootstrap   js   peewee   Git   bottle   IE   MQ   Jquery  
机器学习
机器学习算法  
Python88.com
反馈   公告   社区推广  
产品
短视频  
印度
印度  
Py学习  »  Python

Python SQLalchemy-我可以在函数之间传递连接对象吗?

Phil • 3 年前 • 1321 次点击  

我有一个python应用程序,它正在从mysql/mariadb读取数据,用它从api获取数据,然后将结果插入另一个表中。

我设置了一个模块,带有连接数据库和 返回传递给其他函数/模块的连接对象 .然而,我认为这可能不是一个正确的方法。这个想法是要有一个小模块,只要我需要连接到数据库,我就可以随时调用它。

还要注意的是,我在循环期间(以及在传递给db_更新模块的循环内)和调用期间使用了相同的连接对象 close() 当一切都结束了。

我有时也会收到db的一些警告,这些警告大多发生在我打电话的地方 db_conn.close() ,因此我猜我没有正确处理连接或会话/引擎。此外,日志中的连接id警告不断增加,这是另一个提示,表明我做错了。

[Warning] Aborted connection 351 to db: 'some_db' user: 'some_user' host: '172.28.0.3' (Got an error reading communication packets)

这里有一些 伪代码 这代表了我目前的结构:


################
## db_connect.py
################
# imports ...
from sqlalchemy import create_engine
def db_connect():
    # get env ...
    db_string = f"mysql+pymysql://{db_user}:{db_pass}@{db_host}:{db_port}/{db_name}"
    try:
        engine = create_engine(db_string)
    except Exception as e:
        return None
    
    db_conn = engine.connect()
    return db_conn


################
## db_update.py
################
# imports ...
def db_insert(db_conn, api_result):
    # ...
    ins_qry = "INSERT INTO target_table (attr_a, attr_b) VALUES (:a, :b);"
    ins_qry = text(ins_qry)
    ins_qry = ins_qry.bindparams(a = value_a, b = value_b)

    try:
        db_conn.execute(ins_qry)
    except Exception as e:
        print(e)
        return None     
    return True


################
## main.py
################
from sqlalchemy import text
from db_connect import db_connect
from db_update import db_insert

def run():
    try:
        db_conn = db_connect()
        if not db_conn:
            return False
    except Exception as e:
        print(e)

    qry =  "SELECT *
            FROM some_table
            WHERE some_attr IN (:some_value);"
    qry = text(qry)
    search_run_qry = qry.bindparams(
            some_value  = 'abc'
    )
    result_list = db_conn.execute(qry).fetchall()

    for result_item in result_list:
        ## do stuff like fetching data from api for every record in the query result
        api_result = get_api_data(...)
        ## insert into db:
        db_ins_status = db_insert(db_conn, api_result)
        ## ...
    
    db_conn.close

run()

编辑: 另一个问题: a) 在循环中,每次迭代都会更新以使用同一个连接,这样可以吗?还是更明智的做法是将引擎传递给 run() 函数和调用 db_conn = engine.connect() db_conn.close() 每次更新前后?

b) 我正在考虑使用 ThreadPoolExecutor 而不是API调用的循环。这是否会对如何使用连接产生影响,也就是说,对于正在对同一个表进行更新的多个线程,我是否可以使用同一个连接?

注意:我没有使用ORM功能,主要是因为我有很强的DWH/SQL背景(虽然没有DBA那么好),而且我习惯于编写甚至复杂的SQL查询。出于这个原因,我正在考虑改用PyMySQL连接器。

提前谢谢!

Python社区是高质量的Python/Django开发社区
本文地址:http://www.python88.com/topic/131568
 
1321 次点击  
文章 [ 1 ]  |  最新文章 3 年前
user10949197 user10949197
Reply   •   1 楼
user10949197 user10949197    3 年前

是的,您可以返回/传递连接对象作为参数,但db_connect方法的目的是什么,除了测试连接?正如我所见,这种db_connect方法没有任何目的,因此我建议您像我以前那样做。

我想分享我的一个项目的代码片段。

def create_record(sql_query: str, data: tuple):
    try:
        connection = mysql_obj.connect()
        db_cursor = connection.cursor()
        db_cursor.execute(sql_query, data)
        connection.commit()
        return db_cursor, connection
    except Exception as error:
        print(f'Connection failed error message: {error}')

然后用这个作为另一个我的需要

db_cursor, connection, query_data = fetch_data(sql_query, query_data)

毕竟我需要关闭与这个方法和方法调用的连接。

def close_connection(connection, db_cursor):
    """
    This method used to close SQL server connection
    """
    db_cursor.close()
    connection.close()

以及调用方法

 close_connection(connection, db_cursor)

我不确定我能不能在这张支票上分享我的github link 请在模型下。py您可以查看数据库方法,并查看如何调用它们。py

最好的 哈桑。