本文整理自 KubeCon China 2025(Argo Workflows:Intro、Updates and Deep Dive)
Argo 是首屈一指的开源 MLOps 和 GitOps CNCF 项目。它的目标是扩展 Kubernetes 的边界,让 Kubernetes 做更多的事情。
在 2024 年的 Kuberntees 开源十周年之际,CNCF 进行了一次用户调查。Argo 被选为了最广泛采用的 Top 5 个毕业项目,毕业意味着符合用户使用的最高标准,而 Top 5 的毕业项目凸显了终端用户对 Argo 的喜爱。
与此同时,Argo 社区也在蓬勃发展,在 2024 年,Argo 吸引了超过 890 个贡献者作出贡献,在整个 CNCF 排名第三,仅次于 Kubernetes 和 Opentelemetry,可见 Argo 社区的繁荣与健康。
可以看到,Argo 不管在对终端用户、还是贡献者方面,都有着极大吸引力。显示了 Argo 在 Kubernetes 中独特的一个生态地位。
接下来着重来介绍下,Argo Project 的核心项目,也是 Argo Project 开源的首个项目 - Argo Workflows。
Argo Workflows 是专为 Kubernetes 设计的工作流引擎。能够在 Kubernetes 上编排不同类型的 Job。主要的使用场景包括,机器学习的 Pipeline(比如数据预处理、训练、离线推理等)、批量数据处理(数据清洗,数据收集)、基础设施的自动化(集群自动化升级、迁移等)、以及 CI/CD(比如应用自动化、构建、发布)等。
覆盖了需要提升效率,自动化的各种场景,是企业研发提升效率的利器。
左图是一个数据处理 map reduce 的示例,每个绿色的圆圈,代表一个任务。中间的四个代表并行执行。当提交完任务之后,就可以在控制台来观测任务的执行情况。
接下来看一下 Argo Workflows 的部署架构,如图所示,主要有两个部分:Argo UI 和 Workflow-Controller,Argo UI 用于接收请求和观测任务的运行。 用户可以通过 Argo CLI、Python 的 SDK、或者 Argo UI 直接提交任务。
任务定义为 Kubernetes 的一个 CRD,当 Watch 到任务提交后,Workflow-Controller 即会根据逻辑关系,逐步的创建每个 Pod,每个 pod 中包含三个 Container,InitContainer 来收集前面步骤的输出,MainContainer 来执行真正的任务,waitContainer 来观测 MainContainer 执行任务的状态,上报任务执行是否成功,和收集 MainContainer 的输出。
这样的架构使得每个任务的状态上报、输出文件收集等从中心的 controller 中解耦出来,相比传统的 DAG 引擎,更容易扩展。 加上其本身具有的云原生、轻量化的特点,受到了广大用户的喜爱。
这里是一个仿真 workflow 的定义,Kind 是 Workflow,Spec 主要由两个方面组成,一个是依赖关系,可以是串行的 Steps 或者复杂的 DAG,另一个是任务的 Template 的定义,其中包括使用的镜像、启动参数等,对应一个真实的 Pod。
比如这个例子,提交任务之后首先会找工作流入口 entrypoint:main,然后从 main template 执行任务,首先执行 data-process,调用的模板是下边的实际定义。 由于 run-simulate 依赖第一步的任务,所以 run-simlation 第二步执行,调用 simulation template。依次类推,执行完这个 Workflow 的每个任务。
接下来看下最近的更新:
首先是扩展性和性能上有了长足的进步。
有了以下一些增强:
可以使用 Multiple Mutexes and Semaphores,更好的控制并发策略。
在持久化时增加队列,减小压力。并行的 Pod 清理,在 retry 超大的 workflow 时非常有用。
并行的 Artifacts 解析,加速任务执行。支持超大参数的执行,在复杂科学计算场景下非常有效。
提供是否传送 events 的选项,减少压力。支持 namespace 级别的并发控制。
这是在性能和规模上的增强。
第二个增强是 Cron Workflows 方面,是 Argo Workflows 使用非常广泛的能力之一。
支持了多个 cron scheduler 调度:可以在单个 Cron Workflow 中集成多个定时调度策略来进行工作流调度。
增加停止策略:可以设定策略在特定情况下停止调度,可以避免定时工作流持续失败,导致集群中失败工作流积压。
When 表达式:在每次调度之前检查表达式是否为 true,提供了和 cron scheduler 更灵活的组合机制。
这个增强给用户提供了更灵活的调度策略。
第三个重要的更新是,PythonSDK Hera 正式被 Argo Workflows 社区接受为官方推荐的 Python 项目,意味着 Argo Workflows 的 Python 语言支持来到了新的阶段。
使用 Python 可以让数据科学家、研究员等熟悉 Python 的用户直接构建 Python Native 的工作流,提升研发效率和运维效率。
根据今年首次的用户报告,使用 Hera 的用户中超过 50% 是机器学习工程师和研发工程师,使用的场景中,批量数据处理和机器学习的 Pipeline 超过 70%,这进一步促进了 Argo Workflows 在 AI、MLPipeline 中的采用率。
下边是一个简单使用 Python 语言编写 Argo Workflows 的例子:
如上所示,可以用 Python 原生的方式构建 workflow,其中 echo 是原生的 python 函数,代表一个任务,用户可以在这里定义自己的数据处理、训练逻辑。
在 DAG 里面可以用这种双箭头构建逻辑关系。提交之后,即可在控制台看到如上的 workflow。
第四个重要的更新的是关于 AI、Bigdata 的。
通常,基于 Argo Workflows 构建机器学习 Pipelines 有三种方式。原生方式,(将数据处理、训练任务打报到镜像)最常使用的方式,第二种是通过 ResourceTemplate(将 ML 的任务定义嵌入到 tempate 中),第三种就是 Plugin 方式。
Plugin 是扩展 Argo Workflows 一种方式,用户可以不用等待社区的发布,自己定义和其他系统的集成。 在 AI、大数据场景,我们增加了多个 Plugin,包括 Spark、Ray、Pytorch 等,实现通过 Plugin 方式对主要 AI 任务类型进行编排。
接下来我们看一个编排 PytorchJob 的示例。
如图所示,用户使用 Argo Workflows 构建了一个机器学习的流水线,包括数据预处理、训练、分析等。其中,训练部分可以采用 Pytorch Job,这个时候 Argo 会在集群中创建一个 PytorchJob,并持续 watch 这个 PytorchJob 的状态。在集群中的 Train-Operator 观测到 PytorchJob 创建之后,即会创建相应的 pod 完成任务的训练,更新 Job 状态。
Argo 感知到这个 PytorchJob 完成后,就会开始后续的分析操作。从而完成整个的端到端的数据处理、训练和推理等。
右边是一个实际的例子,在 Argo Workflows 的模板里面写入采用 pytorch 的 plugin、并写入 PytorchJob 的具体定义,即可完成对 PytorchJob 任务的编排。
Argo Workflows 在全球有超过 200+ 的大型组织采用,其中包括大的互联网公司、制造业巨头、通信设备商、软件提供商等,覆盖了各行各业。
与此同时,我们也观测到,一些新兴的行业、场景,采用 Argo Workflows 的企业在极速上升。包括自动驾驶仿真、复杂的科学计算、金融量化回测、大模型的微调、无人机/机器人仿真、芯片研发、具身智能等新兴行业。
Argo Workflows 帮助这些新兴产业用户构建高效的 AI、ML、Data Pipelines,加速并行,提升产品研发和迭代效率。
Argo Workflows 通过在性能优化、调度策略、Python 场景拓展及 AI/ML 任务的集成,进一步提升在大规模机器学习、数据处理场景的能力。在自动驾驶、科学计算、大模型等多种新兴行业的大范围采用,显示了其在仿真、模拟、训练、数据处理等多个AI场景受到了用户的青睐。可以看出:Argo Workflows 正在帮助各行各业用户加速在 Kubernetes 上构建 Al、ML、Data Pipelines。
最后,欢迎关注 Argo Workflows 项目,并加入阿里云的 Serverless Argo Workflows 的交流群(35688562),交流获取最佳实践!
Argo Workflows 项目:
https://github.com/argoproj/argo-workflows