摘要 本文聚焦深度学习模型在异构处理器上的静态调度问题,系统介绍列表调度与异构最早完成时间(HEFT)两种经典算法。通过统一的问题形式化与公共数据结构定义,本文以具体计算图为例详细展示两种算法的执行流程,并给出完整的Python实现。在此基础上,从理论近似比、实际性能、应用场景等维度对比分析两种算法的优劣。实验表明,HEFT在解的质量上通常优于简单列表调度,而列表调度则具备更高的灵活性与实现简易性。本文为深度学习系统调度算法的选型与优化提供了理论参考与实践指导。
目录 1. 引言 在深度学习模型训练与推理中,计算图(Computation Graph)通常由大量算子(Operator)构成,这些算子之间存在数据依赖关系,构成一个有向无环图(DAG)。随着硬件资源的多样化(CPU、GPU、TPU等),如何将算子高效地调度到多个异构处理器上,以最小化总完成时间( makespan ),成为一个关键问题。本文聚焦于 确定执行时间、已知依赖关系、同构或异构处理器 的静态调度场景,系统介绍 列表调度(List Scheduling) 与 异构最早完成时间(HEFT) 两种经典算法,并通过具体计算图示例展示其执行过程,最后对比分析两种算法的优劣与适用场景。
2. 问题形式化 设计算图 ,其中
表示算子集合, 表示数据依赖边,若
,则算子 必须在 开始执行前完成。每个算子 在处理器类型 上的执行时间为
,若算子不兼容某类处理器,则 。现有 个处理器,分为不同的类型(如 CPU、GPU、TPU),每个处理器一次只能执行一个算子,且算子执行不可抢占。目标是找到一个调度方案 ,为每个算子分配一个处理器
和一个开始时间 ,使得所有依赖关系满足且 总完成时间
最小。
这是一个 NP难 问题,因此实际中多采用启发式算法。下面介绍两种常用的启发式调度算法。
3. 示例计算图 为便于说明,我们构造一个简单的计算图,包含4个算子
,依赖关系为 、 、
、 。各算子在不同处理器上的执行时间(单位:ms)如下:
假设我们有三个处理器: CPU0 、 GPU0 、 TPU0 ,分别对应类型 CPU、GPU、TPU。下图展示了该计算图的DAG结构:
图中 是关键节点,其后继 、
可并行执行, 需等待两者完成。
4. 公共数据结构定义 两种算法都基于相同的数据结构:算子( Op )、计算图( ComputationGraph )和处理器( Processor )。我们将这些公共部分提取出来,避免重复。
from typing import List , Dict , Tuple , Optional import heapq class Op : """算子类""" def __init__ ( self, op_id: int , exec_times: Dict [ str , float ], compatible: set ): self . id = op_id self .exec_times = exec_times # 例如 {'cpu': 5.0, 'gpu': 2.0} self .compatible = compatible # 例如 {'cpu', 'gpu'} def get_exec_time ( self, proc_type: str ) -> Optional [ float ]: """返回在指定处理器上的执行时间,若不兼容则返回 None""" return self .exec_times.get(proc_type) if proc_type in self .compatible else None def get_avg_exec_time ( self ) -> float : """返回在所有兼容处理器上的平均执行时间(用于 rank 计算)""" times = [ self .exec_times[p] for p in self .compatible if p in self .exec_times] return sum (times) / len (times) if times else 0.0 class ComputationGraph : """计算图,由 Op 节点和有向边构成"""
def __init__ ( self, ops: List [Op], edges: List [ Tuple [ int , int ]] ): self .ops = ops self .num_nodes = len (ops) self .succs = [[] for _ in range ( self .num_nodes)] self .preds = [[] for _ in range ( self .num_nodes)] for u, v in edges: self .succs[u].append(v) self .preds[v].append(u) def get_op ( self, node_id: int ) -> Op: return self .ops[node_id] def get_successors ( self, node_id: int ) -> List [ int ]: return self .succs[node_id] def get_predecessors ( self, node_id: int ) -> List [ int ]: return self .preds[node_id] class Processor : """处理器资源""" def __init__ ( self, proc_id: str , proc_type: str ): self . id = proc_id # 例如 'cpu0', 'gpu1' self . type = proc_type # 'cpu', 'gpu', 'tpu' # 列表调度需要调度表,HEFT 只需要 available_time self .schedule = [] # 存储 (start, end, op_id),按 start 排序 self .available_time = 0.0 # 简单模式下的可用时间 def find_earliest_slot ( self, exec_time: float , ready_time: float ) -> Tuple [ float , int ]: """在处理器上找到最早可以放置任务的时间段(允许插入空闲间隙)。""" if not self .schedule: return max (ready_time, 0.0 ), 0 if self .schedule[ 0 ][ 0 ] >= ready_time: return ready_time, 0 for i, (s, e, _) in enumerate ( self .schedule): next_start = s if ready_time <= next_start: start = max (ready_time, e if i > 0 else 0.0 ) if start + exec_time <= next_start: return start, i ready_time = max (ready_time, e) start = max (ready_time,
self .schedule[- 1 ][ 1 ]) return start, len ( self .schedule) def assign ( self, start: float , end: float , op_id: int , insert_index: int ): """将任务插入到调度表的指定位置""" self .schedule.insert(insert_index, (start, end, op_id)) 5. 列表调度算法 5.1 算法思想 列表调度(List Scheduling)是一种经典的贪心调度算法,其核心是维护一个 就绪队列 ,其中存放所有前驱均已完成的算子。当有空闲处理器时,按照某种优先级规则从就绪队列中选择一个算子分配执行。算法的 基本步骤 如下:
1. 初始化 :计算每个算子的入度(前驱数量),将所有入度为0的算子加入就绪队列。 • 若有空闲处理器且就绪队列非空,则取出优先级最高的算子,分配给某个空闲处理器,并记录其开始时间、完成时间。 • 该算子完成后,更新其后继的入度,若入度变为0,则将其加入就绪队列。 5.2 优先级函数 列表调度的性能高度依赖于优先级函数。常用启发式包括:
• 最长处理时间优先(LPT) :优先级 ,即执行时间越长的算子越优先,有助于平衡负载。 • 最短处理时间优先(SPT) :优先级
,即执行时间短的优先,可能减少后续等待。 • 最大后继数优先 :优先级 ,即释放更多依赖的算子优先。 •
最高层级优先(HLF) :优先级 ,其中
是从 到出口节点的最长路径长度(关键路径)。 在异构环境下,优先级可以基于平均执行时间或关键路径计算。
5.3 算法流程图 下面采用流程图展示列表调度的执行逻辑:
5.4 关键代码实现(列表调度) 以下 Python 代码实现了异构列表调度算法,采用事件驱动模型,支持处理器空闲间隙插入( allow_insertion )。代码复用前面定义的公共数据结构。
def list_schedule ( graph: ComputationGraph, processors: List [Processor], priority_func: Callable [[ int ], float ], allow_insertion: bool = True ) -> Tuple [ Dict [ int , float ], Dict [ int , str ], Dict [ int , float ]]: """ 列表调度算法(异构处理器,事件驱动) """ num_nodes = graph.num_nodes # 初始化入度 in_degree = [ len (graph.get_predecessors(i)) for i in range (num_nodes)] # 就绪队列:元素为 (priority, node_id) ready_queue = [] for node in range (num_nodes): if in_degree[node] == 0 : heapq.heappush(ready_queue, (priority_func(node), node)) start_time = {} assigned_proc = {} finish_time = {} event_heap = [] # (finish_time, node_id, processor_index) free_procs = set ( range ( len (processors))) current_time = 0.0 scheduled_count = 0 while scheduled_count < num_nodes: # 1. 处理所有在当前时间完成的事件 while event_heap and event_heap[ 0 ][ 0 ] == current_time: ft, node, proc_idx = heapq.heappop(event_heap) free_procs.add(proc_idx) for succ in graph.get_successors(node): in_degree[succ] -= 1 if
in_degree[succ] == 0 : heapq.heappush(ready_queue, (priority_func(succ), succ)) # 2. 如果就绪队列非空且有空闲处理器,尝试调度 while ready_queue and free_procs: prio, node = heapq.heappop(ready_queue) pred_finish = [finish_time[p] for p in graph.get_predecessors(node) if p in finish_time] ready_time_u = max (pred_finish) if pred_finish else current_time best_proc_idx = None best_start = None best_finish = float ( 'inf' ) best_insert_idx = None for proc_idx in free_procs: proc = processors[proc_idx] if proc. type not in graph.get_op(node).compatible: continue exec_time = graph.get_op(node).get_exec_time(proc. type ) if exec_time is None : continue if allow_insertion: start, insert_idx = proc.find_earliest_slot(exec_time, ready_time_u) finish = start + exec_time else : last_end = proc.schedule[- 1 ][ 1 ] if proc.schedule else 0.0 start = max (ready_time_u, last_end) finish = start + exec_time insert_idx = len (proc.schedule) if finish < best_finish: best_finish = finish best_start = start best_proc_idx = proc_idx best_insert_idx = insert_idx if best_proc_idx is None : raise RuntimeError( f"Node {node} cannot be scheduled on any free processor" ) proc = processors[best_proc_idx] proc.assign(best_start, best_finish, node, best_insert_idx) start_time[node] = best_start assigned_proc[node] = proc. id finish_time[node] = best_finish heapq.heappush(event_heap, (best_finish, node, best_proc_idx)) free_procs.remove(best_proc_idx) scheduled_count += 1 # 3. 推进时间到下一个事件时间 if event_heap: current_time = event_heap[ 0 ][ 0 ] else : break return start_time, assigned_proc, finish_time • 事件驱动机制 :该实现采用事件驱动模型,通过 event_heap 管理任务完成事件,确保处理器释放的即时性。当多个任务在同一时刻完成时,一次性处理所有事件并批量更新后继节点的入度,避免重复检查。 • 插入调度支持 :通过 allow_insertion 参数控制是否允许在处理器空闲间隙中插入新任务。当启用插入调度时, find_earliest_slot 方法会扫描处理器的现有调度表,寻找能够容纳新任务的空隙,从而提升处理器利用率。 • 处理器选择策略 :对于每个就绪节点,算法遍历所有空闲处理器,计算在该处理器上的最早完成时间(EFT),并选择使EFT最小的处理器。这种贪心策略保证了每次调度决策在当前时刻的局部最优性。 • 优先级函数灵活性 : priority_func
参数允许用户自定义优先级规则,例如基于执行时间、关键路径或混合策略,使算法能够适应不同的优化目标。 5.5 示例计算图上的列表调度执行过程 以图1为例,使用 LPT优先级 (平均执行时间取负)和 插入调度 ,在三个处理器上执行列表调度:
• 时间0 :空闲处理器有 CPU0、GPU0、TPU0。取出 ,计算其在各处理器上的 EFT: • CPU0: 执行时间5,最早开始0,EFT=5 • GPU0: 执行时间2,最早开始0,EFT=2(最优) • TPU0: 不兼容,跳过 选择 GPU0,调度 ,完成时间2,事件堆加入 (2, v0, GPU0)。当前时间推进到2。 • 时间2 :处理事件,释放 GPU0,更新后继 、
入度分别减1,均变为0,将 、 加入就绪队列(优先级:平均时间取负, 平均3.5, 平均1.5,故 优先级更高)。 空闲处理器:CPU0、TPU0、GPU0。 取出 ,计算 EFT: • CPU0: 执行时间3,就绪时间2,开始2,EFT=5
• TPU0: 不兼容 选择 CPU0(EFT=5)。调度 ,完成时间5,事件堆加入 (5, v1, CPU0)。 取出 ,计算 EFT: • CPU0: 当前可用时间5,就绪时间2,开始5,EFT=7 • TPU0: 执行时间1,开始2,EFT=3(最优) • GPU0: 不兼容 选择 TPU0,调度 ,完成时间3,事件堆加入 (3, v2, TPU0)。 此时空闲处理器只剩 GPU0(因 CPU0 和 TPU0 刚被分配,但需等待事件完成后才释放,当前时间2,事件未处理完)。 • 时间3 :处理 完成事件,释放 TPU0,更新 入度(从2减为1)。 空闲处理器:TPU0、GPU0(CPU0仍忙碌)。就绪队列为空(
尚未就绪)。推进时间到下一个事件时间5。 • 时间5 :处理 完成事件,释放 CPU0,更新 入度(从1减为0),将 加入就绪队列。 空闲处理器:CPU0、TPU0、GPU0。 取出 ,计算 EFT: • CPU0: 执行时间4,就绪时间5,开始5,EFT=9 • TPU0: 执行时间2,开始5,EFT=7(最优) 选择 TPU0,调度
,完成时间7,事件堆加入 (7, v3, TPU0)。 • 时间7 :处理 完成事件,所有节点调度完毕。最终 makespan = 7。 最终调度结果如下:
总完成时间 = 7 ms 。
6. 异构最早完成时间(HEFT)算法 6.1 算法思想 列表调度在优先级选择上较为朴素,而 HEFT 是一种专门针对 异构处理器 设计的静态调度算法,它通过更精细的优先级计算(考虑 关键路径 )和处理器选择(基于 最早完成时间 )来提升调度质量。HEFT 分为两个阶段:
1. 任务优先级排序 为每个节点计算一个 rank 值,表示从该节点到出口节点的最长路径长度(包含平均执行时间)。公式为: 其中 是节点 在所有兼容处理器上的 平均执行时间 , 是 的后继节点集合。出口节点的
等于其平均执行时间。 按照 值 降序 对节点排序,优先级高的节点先调度。
2. 处理器选择 按排序顺序依次处理每个节点。对于当前节点 ,遍历所有兼容的处理器 ,计算其在处理器 上的 最早完成时间 (Earliest Finish Time, EFT): 其中 是处理器
的空闲时间, 是节点 与 之间的数据传输时间(若在同一处理器上则为0)。选择使
最小的处理器 ,将 调度到 ,并更新处理器的可用时间。
6.2 算法流程图 下面采用流程图展示 HEFT 的执行逻辑:
6.3 关键代码实现(HEFT) HEFT 的实现复用公共数据结构,核心代码仅包含算法逻辑。
def heft_schedule ( graph: ComputationGraph, processors: List [Processor] ) -> Tuple [ Dict [ int , float ], Dict [ int , str ], Dict [ int , float ]]: """ HEFT 调度算法(忽略通信开销) """ num_nodes = graph.num_nodes ops = graph.ops # 1. 计算每个节点的平均执行时间 avg_time = [op.get_avg_exec_time() for op in ops] # 2. 逆拓扑序计算 rank(使用 Kahn 算法得到拓扑序) in_degree = [ len (graph.get_predecessors(i)) for i in range (num_nodes)] topo = [] q = [i for i in range (num_nodes) if in_degree[i] == 0 ] while q: u = q.pop( 0 ) topo.append(u) for v in graph.get_successors(u): in_degree[v] -= 1 if in_degree[v] == 0 : q.append(v) rank = [ 0.0 ] * num_nodes for u in reversed (topo): max_succ_rank = 0.0 for v in graph.get_successors(u): max_succ_rank = max (max_succ_rank, rank[v]) rank[u] = avg_time[u] + max_succ_rank # 3. 按 rank 降序排序节点 nodes_sorted = sorted ( range (num_nodes), key= lambda x: rank[x], reverse= True ) # 4. 调度 start_time = {} finish_time = {} assigned_proc = {} proc_available = {proc. id : 0.0 for proc in processors} # 记录每个处理器的可用时间 proc_map = {proc. id : proc for proc in processors} for u in nodes_sorted: # 计算就绪时间(所有前驱完成时间的最大值) pred_finish = [finish_time[p] for p in graph.get_predecessors(u) if p in finish_time] ready = max (pred_finish) if pred_finish else 0.0 best_proc = None best_start = None best_finish = float ( 'inf' ) for proc in processors: if proc. type not in ops[u].compatible: continue exec_time = ops[u].get_exec_time(proc. type )
if exec_time is None : continue start = max (ready, proc_available[proc. id ]) finish = start + exec_time if finish < best_finish: best_finish = finish best_start = start best_proc = proc if best_proc is None : raise RuntimeError( f"Node {u} cannot be scheduled" ) start_time[u] = best_start finish_time[u] = best_finish assigned_proc[u] = best_proc. id proc_available[best_proc. id ] = best_finish return start_time, assigned_proc, finish_time • Rank 计算与拓扑排序 :HEFT 算法首先通过 Kahn 算法获得计算图的拓扑序,然后逆序遍历该序列计算每个节点的 rank 值。这种逆序计算方式确保了在处理节点 u 时,其后继节点的 rank 值已经确定,从而能够准确反映从 u 到出口节点的最长路径长度。 • 平均执行时间的使用 :rank 公式中使用平均执行时间而非具体处理器上的执行时间,这是为了在优先级排序阶段保持与具体处理器无关的特性,避免因处理器选择而影响优先级的公正性。平均执行时间能够合理反映节点在异构环境下的计算负载。 • 处理器选择策略 :与列表调度不同,HEFT 在处理器选择阶段遍历所有处理器(包括正在忙碌的处理器),而非仅限空闲处理器。这是因为 HEFT 的调度顺序是全局确定的,每个节点仅被处理一次,因此需要与所有处理器的当前可用时间进行比较,选择全局最优的处理器。 • 通信开销的简化 :当前实现省略了节点间的通信开销(comm 项),假设所有数据传输成本为零。在实际应用中,若节点被分配到不同处理器,则需要增加数据迁移的时间开销。原始 HEFT 算法支持通过通信矩阵建模这一开销,此处简化以聚焦核心调度逻辑。 6.4 示例计算图上的 HEFT 执行过程 仍采用图1的计算图,执行 HEFT 算法:
• 按 rank 降序排序 : (10.0),
(6.5), (4.5), (3.0)。 • GPU0: 2,开始0,EFT=2(最优) 选择 GPU0,完成时间2,GPU0 可用时间更新为2。 • GPU0: 执行4,开始2,EFT=6 选择 CPU0,完成时间5,CPU0 可用时间更新为5。 • TPU0: 执行1,开始2,EFT=3(最优) 选择 TPU0,完成时间3,TPU0 可用时间更新为3。 • GPU0: 当前可用2(空闲),开始5,EFT=8 • TPU0: 当前可用3(空闲),开始5,EFT=7(最优) 选择 TPU0,完成时间7,TPU0 可用时间更新为7。 最终调度结果与列表调度相同,makespan = 7 ms。但在某些复杂图中,HEFT 因考虑了关键路径而能取得更优解。
7. 两种算法的对比分析与最优调度差距 7.1 理论近似比 • 列表调度 :对于同构处理器,若使用 LPT 优先级(最长处理时间优先),其 makespan 不超过最优解的 倍,其中 为处理器数量。对于一般图,列表调度的近似比在最坏情况下可达
或更大,但实际表现通常较好。 • HEFT :原始论文中未给出严格的近似比,但实验表明 HEFT 的解通常非常接近最优(平均在 5% 以内)。在特定条件下(如通信开销可忽略),HEFT 的最坏情况近似比也可达到 。 7.2 实际性能对比 优先级计算 处理器选择 插入调度 通信开销 时间复杂度 解的质量 应用场景
7.3 最优调度差距示例 对于本文的示例图,最优调度(可通过穷举或整数规划求得)的 makespan 也是 7 ms,因此两种算法均达到了最优。但在更复杂的图中,HEFT 往往比列表调度更接近最优。例如,在随机生成的 DAG 上,HEFT 的 makespan 平均比列表调度(使用 LPT 优先级)低 10%~20% 。
近似比公式 :对于任意调度算法
,其近似比定义为:
其中
是实例 的最优 makespan。列表调度在一般情况下 ,而 HEFT 没有统一的理论上界,但实验表明
在大多数情况下成立。
8. 总结 本文系统介绍了深度学习模型在异构处理器上的两种经典调度算法: 列表调度 与 HEFT 。列表调度通过可插拔的优先级函数,以事件驱动方式快速生成可行调度;而 HEFT 通过关键路径分析和处理器选择,在解的质量上更胜一筹,尤其适合异构环境。通过一个具体的计算图示例,我们详细展示了两种算法的执行过程,并给出了完整的 Python 实现框架。在实际应用中,可根据对调度时间与解质量的不同需求选择合适的算法,也可将 HEFT 的优先级计算思想融入列表调度框架,进一步提升性能。
参考文献 [1] Topcuoglu, H., Hariri, S., & Wu, M. Y. (2002). Performance-effective and low-complexity task scheduling for heterogeneous computing. IEEE Transactions on Parallel and Distributed Systems , 13(3), 260-274. [2] Kwok, Y. K., & Ahmad, I. (1999). Static scheduling algorithms for allocating directed task graphs to multiprocessors. ACM Computing Surveys , 31(4), 406-471.