我正在努力实现这一点
example
下面来自Airflow文档,但使用新的ExternalPythonOperator。
from datetime import datetime
from airflow import DAG
from airflow.decorators import task
with DAG(dag_id="simple_mapping", start_date=datetime(2022, 3, 4)) as dag:
@task
def add_one(x: int):
return x + 1
@task
def sum_it(values):
total = sum(values)
print(f"Total was {total}")
added_values = add_one.expand(x=[1, 2, 3])
sum_it(added_values)
我的代码如下所示:
from airflow.decorators import dag, task
from airflow.sensors.filesystem import FileSensor
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from datetime import datetime
import os
from company.my_task import task_one_func, task_two_func, task_three_func
default_args: dict = {
"owner": "admin",
"depends_on_past": False,
"email": [],
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
}
AIRFLOW_HOME = os.getcwd()
DROP_DIR = f"{AIRFLOW_HOME}/resources/drop_folder"
VENVS_DIR = "/opt/airflow/dags/company/venvs"
DAG_ENV = f"{VENVS_DIR}/my_env_name/bin/python"
@dag(
schedule="@daily",
start_date=datetime(2022, 11, 17, 8, 0, 0, 0),
catchup=False,
default_args=default_args,
tags=[],
)
def my_dag():
now: str = "{{ execution_date.strftime('%Y-%m-%dT%H:%M:%S') }}"
# Wait for file into drop_folder
wait_files = FileSensor(
task_id="wait_file",
fs_conn_id="fs_default",
poke_interval=30*60,
filepath=DROP_DIR,
mode="reschedule",
timeout=60*60*8
)
# External python operator manual decorators
task_one_dec = task.external_python(
task_id="t_one", python=os.fspath(DAG_ENV), retries=0, expect_airflow=False)(task_one_func)
task_two_dec = task.external_python(
task_id="t_two", python=os.fspath(DAG_ENV), retries=0, expect_airflow=False)(task_two_func)
task_three_dec = task.external_python(
task_id="t_three", python=os.fspath(DAG_ENV), trigger_rule="one_success", expect_airflow=False)(task_three_func)
# Get the sub directories of a drop folder
# Return a list
task_one = task_one_dec(DROP_DIR)
# Expand for each sub directory found
# Verify the each file and return its path
task_two = task_two_dec.expand(
file_path=task_one)
# Loop through the task_two list and move the subdir to another folder.
task_three = task_three_dec(
task_two)
trigger_next_dag = TriggerDagRunOperator(
task_id="trigger_next",
trigger_dag_id="next_dag_name",
execution_date=now
)
wait_files >> task_one >> task_two >> task_three >> trigger_next_dag
my_dag()
我的任务文件:
def task_one_func(drop_dir: str) -> list:
# Analyze drop_dir
return ['subdir1', 'subdir2']
def task_two_func(file_path: str) -> str:
# Verify the file
print("Verified", file_path)
return file_path
def task_three_func(file_paths: list) -> None:
for item in file_paths:
"Move the file to another place"
print(item)
return None
task_three返回错误:
_pickle.PicklingError: Can't pickle <class 'sqlalchemy.orm.session.Session'>: it's not the same object as sqlalchemy.orm.session.Session
我尝试过的:
-
根据venv python版本安装正确的Airflow版本并添加
expect_airflow=True
给我的装饰师:
错误仍然存在
-
更换
task_three
(
ExternalPythonOperator
)到一个简单的
PythonOperator
:
错误不再出现,我可以浏览列表
-
添加
Python操作员
之间
task_two
和
任务_三
将输出移动到一个新列表可以解决问题,但为什么呢?
-
启用气流cfg内部的Xcom酸洗
`
#在此处声明任务二。。
@任务
def lazyXcomToList(li)->列表:
new_list=[]
对于李中的项目:
new_list.append(项)
返回new_list
lazyXcomToListTask=lazyXcom到列表(task_two)
#在此处声明任务三。。。
等待文件>>task_one>>任务_两个>>lazyXcomToList>>task_three>>触发器_下一个数据
`