Py学习  »  Python

YAML 焦虑再见:PythonSDK 助力大规模 Argo Workflows 构建

阿里云云原生 • 1 年前 • 174 次点击  

Argo Workflows 是一个开源的工作流管理系统,专为 Kubernetes 设计,旨在帮助用户创建和运行复杂的工作流程。它允许用户定义一系列的任务,这些任务可以按照特定的顺序执行,也可以设置任务间的依赖关系,从而实现自动化的工作流程编排。

使用 Argo Workflows 的场景非常广泛,包括定时任务、机器学习、仿真计算、科学计算、ETL数据处理、模型训练、CI/CD 等。

Argo Workflows 默认使用 YAML 格式进行编排,对于初次接触或者不熟悉 YAML 格式及 Argo Workflows 的人来说,使用 YAML 来编排复杂的工作流可能会显得有些挑战性。YAML 虽然简洁且易于阅读,但是编写大型或复杂的工作流配置时,确实可能因为其严格的缩进规则和较为繁琐的结构而显得有些棘手。

Hera 是一个用于构建和提交 Argo 工作流程的 Python SDK 框架,其主要目标是简化工作流程的构建和提交,尤其是对于数据科学家,使用 Python 能更好的兼容平时的使用习惯,克服 YAML 的阻碍。使用 Hera PythonSDK 具有以下优势:

1) 简洁性:编写代码简短易懂,大大提高编写效率。
2) 支持复杂工作流:在编写复杂工作流时,如果用 YAML 进行编辑的话,容易出现语法问题。
3) Python 生态集成:每个 Function 就是一个 Template,非常容易和 Python 生态的框架进行集成。
4) 可测试性:能够直接使用 Python 测试框架来提升代码质量。

ACK One Serverless Argo 工作流集群托管了 Argo Workflow,本文将介绍使用如何使用 Hera 和 ACK One Serveless Argo 集群进行交互,其架构如下所示:

01

开通 Argo 工作流

集群并获取访问认证 Token

Cloud Native

参考链接:

1)创建工作流集群:

https://help.aliyun.com/zh/ack/distributed-cloud-container-platform-for-kubernetes/user-guide/create-a-workflow-cluster?spm=a2c4g.11186623.0.i2

2)开通 Argo Server:

https://help.aliyun.com/zh/ack/distributed-cloud-container-platform-for-kubernetes/user-guide/enable-argo-server-for-a-workflow-cluster?spm=a2c4g.11186623.0.0.3548463fxRw5sf

3)开通 Argo Server 公网访问(专线用户可选):

https://help.aliyun.com/zh/ack/distributed-cloud-container-platform-for-kubernetes/user-guide/enable-public-access-to-the-argo-server?spm=a2c4g.11186623.0.0.467217ea6qBU6z

4)创建并获取集群 Token:

kubectl create token default -n default
02

开启 Hera PythonSDK 之旅

Cloud Native


1) 安装 Hera

安装 Hera 非常简便,只需一条命令:

pip install hera-workflows

2) 编写并提交 Workflows

在 Argo Workflows 中,DAG(有向无环图)是一种常用的方式来定义复杂的任务依赖关系,其中"Diamond"结构是指一个常见的工作流模式,其中两个或多个任务并行执行后,它们的结果汇聚到一个共同的后续任务。这种结构在需要合并不同数据流或处理结果的场景中非常有用。

下面是一个具体的示例,展示如何使用 Hera 定义一个具有"Diamond"结构的工作流,即两个任务 taskA 和 taskB 并行运行,它们的输出都作为输入给到 taskC:

a. Simple DAG diamond

# 导入相关包from hera.workflows import DAG, Workflow, scriptfrom hera.shared import global_configimport urllib3
urllib3.disable_warnings()
# 配置访问地址和tokenglobal_config.host = "https://argo.{{clusterid}}.{{region-id}}.alicontainer.com:2746"global_config.token = "abcdefgxxxxxx" # 填入之前获取的tokenglobal_config.verify_ssl = ""
# 装饰器函数script是 Hera 实现近乎原生的 Python 函数编排的关键功能。# 它允许您在 Hera 上下文管理器(例如Workflow或Steps上下文)下调用该函数,# 该函数在任何 Hera 上下文之外仍将正常运行,这意味着您可以在给定函数上编写单元测试。# 该示例是打印输入的信息。@script()def echo(message: str): print(message)
# 构建workflow,Workflow是 Argo 中的主要资源,# 也是 Hera 的关键类,负责保存模板、设置入口点和运行模板。with Workflow( generate_name="dag-diamond-", entrypoint="diamond",) as w: with DAG(name="diamond"): A = echo(name="A", arguments={"message": "A"}) # 构建template B = echo(name="B", arguments={"message": "B"}) C = echo(name="C", arguments={"message": "C"}) D = echo(name="D", arguments={"message": "D"}) A >> [B, C] >> D # 构建依赖关系,B、C任务依赖A,D依赖B和C# 创建workfloww.create()
提交工作流:
python simpleDAG.py

在控制台查看工作流运行状态,可以看到任务运行成功:

b. Map-Reduce

在 Argo Workflows 中实现 MapReduce 风格的数据处理,关键在于如何有效利用其 DAG(有向无环图)模板来组织和协调多个任务,从而模拟 Map 和 Reduce 阶段。

以下是一个更加详细的示例,展示了如何使用 Hera 构建一个简单的 MapReduce 工作流,用于处理文本文件的单词计数任务,其中每一步都是一个 Python 函数,可以非常容易和 Python 生态进行集成。

from hera.workflows import DAG, Artifact, NoneArchiveStrategy, Parameter, OSSArtifact, Workflow, scriptfrom hera.shared import global_config


    
import urllib3
urllib3.disable_warnings()# 设置访问地址global_config.host = "https://argo.{{clusterid}}.{{region-id}}.alicontainer.com:2746"global_config.token = "abcdefgxxxxxx" # 填入之前获取的tokenglobal_config.verify_ssl = ""
# 使用script装饰函数时,将script参数传递给script装饰器。这包括image、inputs、outputs、resources等。@script( image="python:alpine3.6", inputs=Parameter(name="num_parts"), outputs=OSSArtifact(name="parts", path="/mnt/out", archive=NoneArchiveStrategy(), key="{{workflow.name}}/parts"),)def split(num_parts: int) -> None: # 根据输入参数num_parts创建多个文件,文件中写入foo字符和parts编号 import json import os import sys
os.mkdir("/mnt/out")
part_ids = list(map(lambda x: str(x), range(num_parts))) for i, part_id in enumerate(part_ids, start=1): with open("/mnt/out/" + part_id + ".json", "w") as f: json.dump({"foo": i}, f) json.dump(part_ids, sys.stdout)
# script中定义image、inputs、outputs@script( image="python:alpine3.6", inputs=[Parameter(name="part_id", value="0"), Artifact(name="part", path="/mnt/in/part.json"),], outputs=OSSArtifact( name="part", path="/mnt/out/part.json", archive=NoneArchiveStrategy(), key="{{workflow.name}}/results/{{inputs.parameters.part_id}}.json", ),)def map_() -> None: # 根据文件中foo字符的个数,生成新文件,将foo内容parts编号乘以2,写入bar内容 import json import os
os.mkdir("/mnt/out") with open("/mnt/in/part.json") as f: part = json.load(f) with open("/mnt/out/part.json", "w") as f: json.dump({"bar": part["foo"] * 2}, f)
# script中定义image、inputs、outputs、resources@script( image="python:alpine3.6", inputs=OSSArtifact(name="results", path="/mnt/in", key="{{workflow.name}}/results"), outputs=OSSArtifact( name="total", path="/mnt/out/total.json", archive=NoneArchiveStrategy(), key="{{workflow.name}}/total.json" ),)def reduce() -> None: # 计算每个parts的bar的值的总和。 import json import os
os.mkdir("/mnt/out")
total = 0 for f in list(map(lambda x: open("/mnt/in/" + x), os.listdir("/mnt/in"))): result = json.load(f) total = total + result["bar"] with open("/mnt/out/total.json", "w") as f: json.dump({"total": total}, f)
# 构建workflow,输入name、设置入口点、namespace、全局参数等。with Workflow(generate_name="map-reduce-", entrypoint="main", namespace="default", arguments=Parameter(name="num_parts", value="4")) as w: with DAG(name="main"): s = split(arguments=Parameter(name="num_parts", value="{{workflow.parameters.num_parts}}")) # 构建templetes m = map_( with_param=s.result, arguments=[Parameter(name="part_id", value="{{item}}"), OSSArtifact(name="part", key="{{workflow.name}}/parts/{{item}}.json"),], ) # 输入参数并构建templetes, s >> m >> reduce() # 构建任务依赖关系# 创建工作流w.create()
提交工作流:
python map-reduce.py

控制台查看工作流状态,可以看到任务运行成功:

03

总结

Cloud Native

Hera 优雅的对接 Python 生态体系与 Argo Workflows 框架,将繁琐复杂的工作流设计转化为直观简明的创作体验。它不仅为大规模任务编排开创了一条免受 YAML 复杂性困扰的通途,还为数据工程师铺设了平滑的桥梁,让他们能够借助熟悉的 Python 语言,无缝构造和优化机器学习工作流,加速实现从创意到部署的高效迭代循环,推动智能应用的迅速落地与持续演进。

ACKOne Serveles Argo 团队是国内最早使用和维护 Argo Workflows 的团队之一,在 Argo Workflow 使用方面积累众多的最佳实践,欢迎加入钉钉群号一同交流:35688562

参考文档:

[1] 分布式工作流

https://help.aliyun.com/zh/ack/distributed-cloud-container-platform-for-kubernetes/user-guide/overview-12?spm=a2c4g.11186623.0.0.2f066c0bcQJtWZ

[2] Argo Workflows

https://github.com/argoproj/argo-workflows

[3] Hera

https://hera.readthedocs.io/en/stable/

[4] Train LLM with Hera

https://www.youtube.com/watch?v=nRYf3GkKpss

[5] simple-diamond Yaml

https://github.com/argoproj/argo-workflows/blob/main/examples/dag-diamond.yaml

[6] map-reduce Yaml
https://github.com/argoproj/argo-workflows/blob/main/examples/map-reduce.yaml

Python社区是高质量的Python/Django开发社区
本文地址:http://www.python88.com/topic/171789
 
174 次点击