Py学习  »  Python

在单个ExternalPythonOperator中使用气流动态任务映射输出

Justin • 2 年前 • 238 次点击  

我正在努力实现这一点 example 下面来自Airflow文档,但使用新的ExternalPythonOperator。

from datetime import datetime

from airflow import DAG
from airflow.decorators import task


with DAG(dag_id="simple_mapping", start_date=datetime(2022, 3, 4)) as dag:

    @task
    def add_one(x: int):
        return x + 1

    @task
    def sum_it(values):
        total = sum(values)
        print(f"Total was {total}")

    added_values = add_one.expand(x=[1, 2, 3])
    sum_it(added_values)

我的代码如下所示:

from airflow.decorators import dag, task
from airflow.sensors.filesystem import FileSensor
from airflow.operators.trigger_dagrun import TriggerDagRunOperator

from datetime import datetime
import os

from company.my_task import task_one_func, task_two_func, task_three_func

default_args: dict = {
    "owner": "admin",
    "depends_on_past": False,
    "email": [],
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 1,
}

AIRFLOW_HOME = os.getcwd()
DROP_DIR = f"{AIRFLOW_HOME}/resources/drop_folder"
VENVS_DIR = "/opt/airflow/dags/company/venvs"
DAG_ENV = f"{VENVS_DIR}/my_env_name/bin/python"


@dag(
    schedule="@daily",
    start_date=datetime(2022, 11, 17, 8, 0, 0, 0),
    catchup=False,
    default_args=default_args,
    tags=[],
)
def my_dag():
    now: str = "{{ execution_date.strftime('%Y-%m-%dT%H:%M:%S') }}"

    # Wait for file into drop_folder
    wait_files = FileSensor(
        task_id="wait_file",
        fs_conn_id="fs_default",
        poke_interval=30*60,
        filepath=DROP_DIR,
        mode="reschedule",
        timeout=60*60*8
    )

    # External python operator manual decorators
    task_one_dec = task.external_python(
        task_id="t_one", python=os.fspath(DAG_ENV), retries=0, expect_airflow=False)(task_one_func)

    task_two_dec = task.external_python(
        task_id="t_two", python=os.fspath(DAG_ENV), retries=0, expect_airflow=False)(task_two_func)

    task_three_dec = task.external_python(
        task_id="t_three", python=os.fspath(DAG_ENV), trigger_rule="one_success", expect_airflow=False)(task_three_func)

    # Get the sub directories of a drop folder
    # Return a list
    task_one = task_one_dec(DROP_DIR)

    # Expand for each sub directory found
    # Verify the each file and return its path
    task_two = task_two_dec.expand(
        file_path=task_one)

    # Loop through the task_two list and move the subdir to another folder.
    task_three = task_three_dec(
        task_two)

    trigger_next_dag = TriggerDagRunOperator(
        task_id="trigger_next",
        trigger_dag_id="next_dag_name",
        execution_date=now
    )

    wait_files >> task_one >> task_two >> task_three >> trigger_next_dag


my_dag()

我的任务文件:

def task_one_func(drop_dir: str) -> list:
    # Analyze drop_dir

    return ['subdir1', 'subdir2']

def task_two_func(file_path: str) -> str:
    # Verify the file
    print("Verified", file_path)

    return file_path

def task_three_func(file_paths: list) -> None:
    for item in file_paths:
        "Move the file to another place"
        print(item)

    return None

dag graph

task_three返回错误:

_pickle.PicklingError: Can't pickle <class 'sqlalchemy.orm.session.Session'>: it's not the same object as sqlalchemy.orm.session.Session

我尝试过的:

  • 根据venv python版本安装正确的Airflow版本并添加 expect_airflow=True 给我的装饰师: 错误仍然存在

  • 更换 task_three ( ExternalPythonOperator )到一个简单的 PythonOperator : 错误不再出现,我可以浏览列表

  • 添加 Python操作员 之间 task_two 任务_三 将输出移动到一个新列表可以解决问题,但为什么呢?

  • 启用气流cfg内部的Xcom酸洗 `

    #在此处声明任务二。。 @任务 def lazyXcomToList(li)->列表: new_list=[] 对于李中的项目: new_list.append(项)

    返回new_list

    lazyXcomToListTask=lazyXcom到列表(task_two)

    #在此处声明任务三。。。

    等待文件>>task_one>>任务_两个>>lazyXcomToList>>task_three>>触发器_下一个数据

`

Python社区是高质量的Python/Django开发社区
本文地址:http://www.python88.com/topic/158643
 
238 次点击