社区所有版块导航
Python
python开源   Django   Python   DjangoApp   pycharm  
DATA
docker   Elasticsearch  
aigc
aigc   chatgpt  
WEB开发
linux   MongoDB   Redis   DATABASE   NGINX   其他Web框架   web工具   zookeeper   tornado   NoSql   Bootstrap   js   peewee   Git   bottle   IE   MQ   Jquery  
机器学习
机器学习算法  
Python88.com
反馈   公告   社区推广  
产品
短视频  
印度
印度  
Py学习  »  Python

YAML 焦虑再见:PythonSDK 助力大规模 Argo Workflows 构建

阿里云云原生 • 1 年前 • 185 次点击  

Argo Workflows 是一个开源的工作流管理系统,专为 Kubernetes 设计,旨在帮助用户创建和运行复杂的工作流程。它允许用户定义一系列的任务,这些任务可以按照特定的顺序执行,也可以设置任务间的依赖关系,从而实现自动化的工作流程编排。

使用 Argo Workflows 的场景非常广泛,包括定时任务、机器学习、仿真计算、科学计算、ETL数据处理、模型训练、CI/CD 等。

Argo Workflows 默认使用 YAML 格式进行编排,对于初次接触或者不熟悉 YAML 格式及 Argo Workflows 的人来说,使用 YAML 来编排复杂的工作流可能会显得有些挑战性。YAML 虽然简洁且易于阅读,但是编写大型或复杂的工作流配置时,确实可能因为其严格的缩进规则和较为繁琐的结构而显得有些棘手。

Hera 是一个用于构建和提交 Argo 工作流程的 Python SDK 框架,其主要目标是简化工作流程的构建和提交,尤其是对于数据科学家,使用 Python 能更好的兼容平时的使用习惯,克服 YAML 的阻碍。使用 Hera PythonSDK 具有以下优势:

1) 简洁性:编写代码简短易懂,大大提高编写效率。
2) 支持复杂工作流:在编写复杂工作流时,如果用 YAML 进行编辑的话,容易出现语法问题。
3) Python 生态集成:每个 Function 就是一个 Template,非常容易和 Python 生态的框架进行集成。
4) 可测试性:能够直接使用 Python 测试框架来提升代码质量。

ACK One Serverless Argo 工作流集群托管了 Argo Workflow,本文将介绍使用如何使用 Hera 和 ACK One Serveless Argo 集群进行交互,其架构如下所示:

01

开通 Argo 工作流

集群并获取访问认证 Token

Cloud Native

参考链接:

1)创建工作流集群:

https://help.aliyun.com/zh/ack/distributed-cloud-container-platform-for-kubernetes/user-guide/create-a-workflow-cluster?spm=a2c4g.11186623.0.i2

2)开通 Argo Server:

https://help.aliyun.com/zh/ack/distributed-cloud-container-platform-for-kubernetes/user-guide/enable-argo-server-for-a-workflow-cluster?spm=a2c4g.11186623.0.0.3548463fxRw5sf

3)开通 Argo Server 公网访问(专线用户可选):

https://help.aliyun.com/zh/ack/distributed-cloud-container-platform-for-kubernetes/user-guide/enable-public-access-to-the-argo-server?spm=a2c4g.11186623.0.0.467217ea6qBU6z

4)创建并获取集群 Token:

kubectl create token default -n default
02

开启 Hera PythonSDK 之旅

Cloud Native


1) 安装 Hera

安装 Hera 非常简便,只需一条命令:

pip install hera-workflows

2) 编写并提交 Workflows

在 Argo Workflows 中,DAG(有向无环图)是一种常用的方式来定义复杂的任务依赖关系,其中"Diamond"结构是指一个常见的工作流模式,其中两个或多个任务并行执行后,它们的结果汇聚到一个共同的后续任务。这种结构在需要合并不同数据流或处理结果的场景中非常有用。

下面是一个具体的示例,展示如何使用 Hera 定义一个具有"Diamond"结构的工作流,即两个任务 taskA 和 taskB 并行运行,它们的输出都作为输入给到 taskC:

a. Simple DAG diamond

# 导入相关包from hera.workflows import DAG, Workflow, scriptfrom hera.shared import global_configimport urllib3
urllib3.disable_warnings()
# 配置访问地址和tokenglobal_config.host = "https://argo.{{clusterid}}.{{region-id}}.alicontainer.com:2746"global_config.token = "abcdefgxxxxxx" # 填入之前获取的tokenglobal_config.verify_ssl = ""
# 装饰器函数script是 Hera 实现近乎原生的 Python 函数编排的关键功能。# 它允许您在 Hera 上下文管理器(例如Workflow或Steps上下文)下调用该函数,# 该函数在任何 Hera 上下文之外仍将正常运行,这意味着您可以在给定函数上编写单元测试。# 该示例是打印输入的信息。@script()def echo(message: str): print(message)
# 构建workflow,Workflow是 Argo 中的主要资源,# 也是 Hera 的关键类,负责保存模板、设置入口点和运行模板。with Workflow( generate_name="dag-diamond-", entrypoint="diamond",) as w: with DAG(name="diamond"): A = echo(name="A", arguments={"message": "A"}) # 构建template B = echo(name="B", arguments={"message": "B"}) C = echo(name="C", arguments={"message": "C"}) D = echo(name="D", arguments={"message": "D"}) A >> [B, C] >> D # 构建依赖关系,B、C任务依赖A,D依赖B和C# 创建workfloww.create()
提交工作流:
python simpleDAG.py

在控制台查看工作流运行状态,可以看到任务运行成功:

b. Map-Reduce

在 Argo Workflows 中实现 MapReduce 风格的数据处理,关键在于如何有效利用其 DAG(有向无环图)模板来组织和协调多个任务,从而模拟 Map 和 Reduce 阶段。

以下是一个更加详细的示例,展示了如何使用 Hera 构建一个简单的 MapReduce 工作流,用于处理文本文件的单词计数任务,其中每一步都是一个 Python 函数,可以非常容易和 Python 生态进行集成。

from hera.workflows import DAG, Artifact, NoneArchiveStrategy, Parameter, OSSArtifact, Workflow, scriptfrom hera.shared import global_config


    
import urllib3
urllib3.disable_warnings()# 设置访问地址global_config.host = "https://argo.{{clusterid}}.{{region-id}}.alicontainer.com:2746"global_config.token = "abcdefgxxxxxx" # 填入之前获取的tokenglobal_config.verify_ssl = ""
# 使用script装饰函数时,将script参数传递给script装饰器。这包括image、inputs、outputs、resources等。@script( image="python:alpine3.6", inputs=Parameter(name="num_parts"), outputs=OSSArtifact(name="parts", path="/mnt/out", archive=NoneArchiveStrategy(), key="{{workflow.name}}/parts"),)def split(num_parts: int) -> None: # 根据输入参数num_parts创建多个文件,文件中写入foo字符和parts编号 import json import os import sys
os.mkdir("/mnt/out")
part_ids = list(map(lambda x: str(x), range(num_parts))) for i, part_id in enumerate(part_ids, start=1): with open("/mnt/out/" + part_id + ".json", "w") as f: json.dump({"foo": i}, f) json.dump(part_ids, sys.stdout)
# script中定义image、inputs、outputs@script( image="python:alpine3.6", inputs=[Parameter(name="part_id", value="0"), Artifact(name="part", path="/mnt/in/part.json"),], outputs=OSSArtifact( name="part", path="/mnt/out/part.json", archive=NoneArchiveStrategy(), key="{{workflow.name}}/results/{{inputs.parameters.part_id}}.json", ),)def map_() -> None: # 根据文件中foo字符的个数,生成新文件,将foo内容parts编号乘以2,写入bar内容 import json import os
os.mkdir("/mnt/out") with open("/mnt/in/part.json") as f: part = json.load(f) with open("/mnt/out/part.json", "w") as f: json.dump({"bar": part["foo"] * 2}, f)
# script中定义image、inputs、outputs、resources@script( image="python:alpine3.6", inputs=OSSArtifact(name="results", path="/mnt/in", key="{{workflow.name}}/results"), outputs=OSSArtifact( name="total", path="/mnt/out/total.json", archive=NoneArchiveStrategy(), key="{{workflow.name}}/total.json" ),)def reduce() -> None: # 计算每个parts的bar的值的总和。 import json import os
os.mkdir("/mnt/out")
total = 0 for f in list(map(lambda x: open("/mnt/in/" + x), os.listdir("/mnt/in"))): result = json.load(f) total = total + result["bar"] with open("/mnt/out/total.json", "w") as f: json.dump({"total": total}, f)
# 构建workflow,输入name、设置入口点、namespace、全局参数等。with Workflow(generate_name="map-reduce-", entrypoint="main", namespace="default", arguments=Parameter(name="num_parts", value="4")) as w: with DAG(name="main"): s = split(arguments=Parameter(name="num_parts", value="{{workflow.parameters.num_parts}}")) # 构建templetes m = map_( with_param=s.result, arguments=[Parameter(name="part_id", value="{{item}}"), OSSArtifact(name="part", key="{{workflow.name}}/parts/{{item}}.json"),], ) # 输入参数并构建templetes, s >> m >> reduce() # 构建任务依赖关系# 创建工作流w.create()
提交工作流:
python map-reduce.py

控制台查看工作流状态,可以看到任务运行成功:

03

总结

Cloud Native

Hera 优雅的对接 Python 生态体系与 Argo Workflows 框架,将繁琐复杂的工作流设计转化为直观简明的创作体验。它不仅为大规模任务编排开创了一条免受 YAML 复杂性困扰的通途,还为数据工程师铺设了平滑的桥梁,让他们能够借助熟悉的 Python 语言,无缝构造和优化机器学习工作流,加速实现从创意到部署的高效迭代循环,推动智能应用的迅速落地与持续演进。

ACKOne Serveles Argo 团队是国内最早使用和维护 Argo Workflows 的团队之一,在 Argo Workflow 使用方面积累众多的最佳实践,欢迎加入钉钉群号一同交流:35688562

参考文档:

[1] 分布式工作流

https://help.aliyun.com/zh/ack/distributed-cloud-container-platform-for-kubernetes/user-guide/overview-12?spm=a2c4g.11186623.0.0.2f066c0bcQJtWZ

[2] Argo Workflows

https://github.com/argoproj/argo-workflows

[3] Hera

https://hera.readthedocs.io/en/stable/

[4] Train LLM with Hera

https://www.youtube.com/watch?v=nRYf3GkKpss

[5] simple-diamond Yaml

https://github.com/argoproj/argo-workflows/blob/main/examples/dag-diamond.yaml

[6] map-reduce Yaml
https://github.com/argoproj/argo-workflows/blob/main/examples/map-reduce.yaml

Python社区是高质量的Python/Django开发社区
本文地址:http://www.python88.com/topic/171789
 
185 次点击