Py学习  »  机器学习算法

深度学习模型多处理器调度算法研究:从列表调度到HEFT

ai算法芯片与系统 • 2 周前 • 34 次点击  

 

摘要

本文聚焦深度学习模型在异构处理器上的静态调度问题,系统介绍列表调度与异构最早完成时间(HEFT)两种经典算法。通过统一的问题形式化与公共数据结构定义,本文以具体计算图为例详细展示两种算法的执行流程,并给出完整的Python实现。在此基础上,从理论近似比、实际性能、应用场景等维度对比分析两种算法的优劣。实验表明,HEFT在解的质量上通常优于简单列表调度,而列表调度则具备更高的灵活性与实现简易性。本文为深度学习系统调度算法的选型与优化提供了理论参考与实践指导。

目录

  1. 1. 引言
  2. 2. 问题形式化
  3. 3. 示例计算图
  4. 4. 公共数据结构定义
  5. 5. 列表调度算法
  6. 6. 异构最早完成时间(HEFT)算法
  7. 7. 两种算法的对比分析与最优调度差距
  8. 8. 总结

1. 引言

在深度学习模型训练与推理中,计算图(Computation Graph)通常由大量算子(Operator)构成,这些算子之间存在数据依赖关系,构成一个有向无环图(DAG)。随着硬件资源的多样化(CPU、GPU、TPU等),如何将算子高效地调度到多个异构处理器上,以最小化总完成时间(makespan),成为一个关键问题。本文聚焦于确定执行时间、已知依赖关系、同构或异构处理器的静态调度场景,系统介绍列表调度(List Scheduling) 与异构最早完成时间(HEFT) 两种经典算法,并通过具体计算图示例展示其执行过程,最后对比分析两种算法的优劣与适用场景。

2. 问题形式化

设计算图 ,其中  表示算子集合, 表示数据依赖边,若 ,则算子  必须在  开始执行前完成。每个算子  在处理器类型  上的执行时间为 ,若算子不兼容某类处理器,则 。现有  个处理器,分为不同的类型(如 CPU、GPU、TPU),每个处理器一次只能执行一个算子,且算子执行不可抢占。目标是找到一个调度方案 ,为每个算子分配一个处理器  和一个开始时间 ,使得所有依赖关系满足且总完成时间 最小。

这是一个NP难问题,因此实际中多采用启发式算法。下面介绍两种常用的启发式调度算法。

3. 示例计算图

为便于说明,我们构造一个简单的计算图,包含4个算子 ,依赖关系为 。各算子在不同处理器上的执行时间(单位:ms)如下:

算子
CPU
GPU
TPU
兼容集合
5.0
2.0
{CPU, GPU}
3.0
4.0
{CPU, GPU}
2.0
1.0
{CPU, TPU}
4.0
3.0
2.0
{CPU, GPU, TPU}

假设我们有三个处理器:CPU0GPU0TPU0,分别对应类型 CPU、GPU、TPU。下图展示了该计算图的DAG结构:

图中  是关键节点,其后继  可并行执行, 需等待两者完成。

4. 公共数据结构定义

两种算法都基于相同的数据结构:算子(Op)、计算图(ComputationGraph)和处理器(Processor)。我们将这些公共部分提取出来,避免重复。

from typing import List, Dict, Tuple, Optional
import
 heapq

class
 Op:
    """算子类"""

    def
 __init__(self, op_id: int, exec_times: Dict[str, float], compatible: set):
        self
.id = op_id
        self
.exec_times = exec_times      # 例如 {'cpu': 5.0, 'gpu': 2.0}
        self
.compatible = compatible      # 例如 {'cpu', 'gpu'}

    def
 get_exec_time(self, proc_type: str) -> Optional[float]:
        """返回在指定处理器上的执行时间,若不兼容则返回 None"""

        return
 self.exec_times.get(proc_type) if proc_type in self.compatible else None

    def
 get_avg_exec_time(self) -> float:
        """返回在所有兼容处理器上的平均执行时间(用于 rank 计算)"""

        times = [self.exec_times[p] for p in self.compatible if p in self.exec_times]
        return
 sum(times) / len(times) if times else 0.0


class
 ComputationGraph:
    """计算图,由 Op 节点和有向边构成"""

    def
 __init__(self, ops: List[Op], edges: List[Tuple[int, int]]):
        self
.ops = ops
        self
.num_nodes = len(ops)
        self
.succs = [[] for _ in range(self.num_nodes)]
        self
.preds = [[] for _ in range(self.num_nodes)]
        for
 u, v in edges:
            self
.succs[u].append(v)
            self
.preds[v].append(u)

    def
 get_op(self, node_id: int) -> Op:
        return
 self.ops[node_id]

    def
 get_successors(self, node_id: int) -> List[int]:
        return
 self.succs[node_id]

    def
 get_predecessors(self, node_id: int) -> List[int]:
        return
 self.preds[node_id]


class
 Processor:
    """处理器资源"""

    def
 __init__(self, proc_id: str, proc_type: str):
        self
.id = proc_id          # 例如 'cpu0', 'gpu1'
        self
.type = proc_type      # 'cpu', 'gpu', 'tpu'
        # 列表调度需要调度表,HEFT 只需要 available_time

        self
.schedule = []          # 存储 (start, end, op_id),按 start 排序
        self
.available_time = 0.0   # 简单模式下的可用时间

    def
 find_earliest_slot(self, exec_time: float, ready_time: float) -> Tuple[float, int]:
        """在处理器上找到最早可以放置任务的时间段(允许插入空闲间隙)。"""

        if
 not self.schedule:
            return
 max(ready_time, 0.0), 0

        if
 self.schedule[0][0] >= ready_time:
            return
 ready_time, 0

        for
 i, (s, e, _) in enumerate(self.schedule):
            next_start = s
            if
 ready_time <= next_start:
                start = max(ready_time, e if i > 0 else 0.0)
                if
 start + exec_time <= next_start:
                    return
 start, i
            ready_time = max(ready_time, e)

        start = max(ready_time,  self.schedule[-1][1])
        return
 start, len(self.schedule)

    def
 assign(self, start: float, end: float, op_id: int, insert_index: int):
        """将任务插入到调度表的指定位置"""

        self
.schedule.insert(insert_index, (start, end, op_id))

5. 列表调度算法

5.1 算法思想

列表调度(List Scheduling)是一种经典的贪心调度算法,其核心是维护一个就绪队列,其中存放所有前驱均已完成的算子。当有空闲处理器时,按照某种优先级规则从就绪队列中选择一个算子分配执行。算法的基本步骤如下:

  1. 1. 初始化:计算每个算子的入度(前驱数量),将所有入度为0的算子加入就绪队列。
  2. 2. 循环调度
  • • 若有空闲处理器且就绪队列非空,则取出优先级最高的算子,分配给某个空闲处理器,并记录其开始时间、完成时间。
  • • 该算子完成后,更新其后继的入度,若入度变为0,则将其加入就绪队列。
  • • 推进时间到下一个事件(完成时间)并继续。
  • 3. 终止:所有算子调度完毕,输出最大完成时间。
  • 5.2 优先级函数

    列表调度的性能高度依赖于优先级函数。常用启发式包括:

    • • 最长处理时间优先(LPT):优先级 ,即执行时间越长的算子越优先,有助于平衡负载。
    • • 最短处理时间优先(SPT):优先级 ,即执行时间短的优先,可能减少后续等待。
    • • 最大后继数优先:优先级 ,即释放更多依赖的算子优先。
    • •  最高层级优先(HLF):优先级 ,其中  是从  到出口节点的最长路径长度(关键路径)。

    在异构环境下,优先级可以基于平均执行时间或关键路径计算。

    5.3 算法流程图

    下面采用流程图展示列表调度的执行逻辑:

    5.4 关键代码实现(列表调度)

    以下 Python 代码实现了异构列表调度算法,采用事件驱动模型,支持处理器空闲间隙插入(allow_insertion)。代码复用前面定义的公共数据结构。

    def list_schedule(
        graph: ComputationGraph,
        processors: List[Processor],
        priority_func: Callable[[int], float],
        allow_insertion: bool = True
    ) -> Tuple[Dict[int, float], Dict[int, str], Dict[int, float]]:
        """
        列表调度算法(异构处理器,事件驱动)
        """

        num_nodes = graph.num_nodes
        # 初始化入度

        in_degree = [len(graph.get_predecessors(i)) for i in range(num_nodes)]
        # 就绪队列:元素为 (priority, node_id)

        ready_queue = []
        for
     node in range(num_nodes):
            if
     in_degree[node] == 0:
                heapq.heappush(ready_queue, (priority_func(node), node))

        start_time = {}
        assigned_proc = {}
        finish_time = {}

        event_heap = []               # (finish_time, node_id, processor_index)
        free_procs = set(range(len(processors)))
        current_time = 0.0
        scheduled_count = 0

        while
     scheduled_count < num_nodes:
            # 1. 处理所有在当前时间完成的事件

            while
     event_heap and event_heap[0][0] == current_time:
                ft, node, proc_idx = heapq.heappop(event_heap)
                free_procs.add(proc_idx)
                for
     succ in graph.get_successors(node):
                    in_degree[succ] -= 1
                    if
     in_degree[succ] == 0:
                        heapq.heappush(ready_queue, (priority_func(succ), succ))

            # 2. 如果就绪队列非空且有空闲处理器,尝试调度

            while
     ready_queue and free_procs:
                prio, node = heapq.heappop(ready_queue)

                pred_finish = [finish_time[p] for p in graph.get_predecessors(node) if p in finish_time]
                ready_time_u = max(pred_finish) if pred_finish else current_time

                best_proc_idx = None
                best_start = None
                best_finish = float('inf')
                best_insert_idx = None

                for
     proc_idx in free_procs:
                    proc = processors[proc_idx]
                    if
     proc.type not in graph.get_op(node).compatible:
                        continue

                    exec_time = graph.get_op(node).get_exec_time(proc.type)
                    if
     exec_time is None:
                        continue


                    if
     allow_insertion:
                        start, insert_idx = proc.find_earliest_slot(exec_time, ready_time_u)
                        finish = start + exec_time
                    else
    :
                        last_end = proc.schedule[-1][1] if proc.schedule else 0.0
                        start = max(ready_time_u, last_end)
                        finish = start + exec_time
                        insert_idx = len(proc.schedule)

                    if
     finish < best_finish:
                        best_finish = finish
                        best_start = start
                        best_proc_idx = proc_idx
                        best_insert_idx = insert_idx

                if
     best_proc_idx is None:
                    raise
     RuntimeError(f"Node {node} cannot be scheduled on any free processor")

                proc = processors[best_proc_idx]
                proc.assign(best_start, best_finish, node, best_insert_idx)
                start_time[node] = best_start
                assigned_proc[node] = proc.id
                finish_time[node] = best_finish

                heapq.heappush(event_heap, (best_finish, node, best_proc_idx))
                free_procs.remove(best_proc_idx)
                scheduled_count += 1

            # 3. 推进时间到下一个事件时间

            if
     event_heap:
                current_time = event_heap[0][0]
            else
    :
                break


        return
     start_time, assigned_proc, finish_time
    • • 事件驱动机制:该实现采用事件驱动模型,通过event_heap管理任务完成事件,确保处理器释放的即时性。当多个任务在同一时刻完成时,一次性处理所有事件并批量更新后继节点的入度,避免重复检查。
    • • 插入调度支持:通过allow_insertion参数控制是否允许在处理器空闲间隙中插入新任务。当启用插入调度时,find_earliest_slot方法会扫描处理器的现有调度表,寻找能够容纳新任务的空隙,从而提升处理器利用率。
    • • 处理器选择策略:对于每个就绪节点,算法遍历所有空闲处理器,计算在该处理器上的最早完成时间(EFT),并选择使EFT最小的处理器。这种贪心策略保证了每次调度决策在当前时刻的局部最优性。
    • • 优先级函数灵活性priority_func 参数允许用户自定义优先级规则,例如基于执行时间、关键路径或混合策略,使算法能够适应不同的优化目标。

    5.5 示例计算图上的列表调度执行过程

    以图1为例,使用 LPT优先级(平均执行时间取负)和 插入调度,在三个处理器上执行列表调度:

    • • 初始化:入度0的节点只有 ,就绪队列 = 
    • • 时间0:空闲处理器有 CPU0、GPU0、TPU0。取出 ,计算其在各处理器上的 EFT:
      • • CPU0: 执行时间5,最早开始0,EFT=5
      • • GPU0: 执行时间2,最早开始0,EFT=2(最优)
      • • TPU0: 不兼容,跳过
        选择 GPU0,调度 ,完成时间2,事件堆加入 (2, v0, GPU0)。当前时间推进到2。
    • • 时间2:处理事件,释放 GPU0,更新后继  入度分别减1,均变为0,将  加入就绪队列(优先级:平均时间取负,平均3.5,平均1.5,故  优先级更高)。
      空闲处理器:CPU0、TPU0、GPU0。
      取出 ,计算 EFT:
      • • CPU0: 执行时间3,就绪时间2,开始2,EFT=5
      • • GPU0: 执行时间4,开始2,EFT=6
      • • TPU0: 不兼容
        选择 CPU0(EFT=5)。调度 ,完成时间5,事件堆加入 (5, v1, CPU0)。
        取出 ,计算 EFT:
      • • CPU0: 当前可用时间5,就绪时间2,开始5,EFT=7
      • • TPU0: 执行时间1,开始2,EFT=3(最优)
      • • GPU0: 不兼容
        选择 TPU0,调度 ,完成时间3,事件堆加入 (3, v2, TPU0)。
        此时空闲处理器只剩 GPU0(因 CPU0 和 TPU0 刚被分配,但需等待事件完成后才释放,当前时间2,事件未处理完)。
    • • 时间3:处理  完成事件,释放 TPU0,更新  入度(从2减为1)。
      空闲处理器:TPU0、GPU0(CPU0仍忙碌)。就绪队列为空( 尚未就绪)。推进时间到下一个事件时间5。
    • • 时间5:处理  完成事件,释放 CPU0,更新  入度(从1减为0),将  加入就绪队列。
      空闲处理器:CPU0、TPU0、GPU0。
      取出 ,计算 EFT:
      • • CPU0: 执行时间4,就绪时间5,开始5,EFT=9
      • • GPU0: 执行时间3,开始5,EFT=8
      • • TPU0: 执行时间2,开始5,EFT=7(最优)
        选择 TPU0,调度 ,完成时间7,事件堆加入 (7, v3, TPU0)。
    • • 时间7:处理  完成事件,所有节点调度完毕。最终 makespan = 7。

    最终调度结果如下:

    算子
    开始时间
    结束时间
    处理器
    0
    2
    GPU0
    2
    5
    CPU0
    2
    3
    TPU0
    5
    7
    TPU0

    总完成时间 = 7 ms

    6. 异构最早完成时间(HEFT)算法

    6.1 算法思想

    列表调度在优先级选择上较为朴素,而 HEFT 是一种专门针对异构处理器设计的静态调度算法,它通过更精细的优先级计算(考虑关键路径)和处理器选择(基于最早完成时间)来提升调度质量。HEFT 分为两个阶段:

    1. 1. 任务优先级排序
      为每个节点计算一个 rank 值,表示从该节点到出口节点的最长路径长度(包含平均执行时间)。公式为:

      其中  是节点  在所有兼容处理器上的平均执行时间 是  的后继节点集合。出口节点的  等于其平均执行时间。
      按照  值降序对节点排序,优先级高的节点先调度。

    2. 2. 处理器选择
      按排序顺序依次处理每个节点。对于当前节点 ,遍历所有兼容的处理器 ,计算其在处理器  上的最早完成时间(Earliest Finish Time, EFT):

      其中  是处理器  的空闲时间, 是节点  与  之间的数据传输时间(若在同一处理器上则为0)。选择使  最小的处理器 ,将  调度到 ,并更新处理器的可用时间。

    6.2 算法流程图

    下面采用流程图展示 HEFT 的执行逻辑:

    6.3 关键代码实现(HEFT)

    HEFT 的实现复用公共数据结构,核心代码仅包含算法逻辑。

    
    
    
        
    def heft_schedule(
        graph: ComputationGraph,
        processors: List[Processor]
    ) -> Tuple[Dict[int, float], Dict[int, str], Dict[int, float]]:
        """
        HEFT 调度算法(忽略通信开销)
        """

        num_nodes = graph.num_nodes
        ops = graph.ops

        # 1. 计算每个节点的平均执行时间

        avg_time = [op.get_avg_exec_time() for op in ops]

        # 2. 逆拓扑序计算 rank(使用 Kahn 算法得到拓扑序)

        in_degree = [len(graph.get_predecessors(i)) for i in range(num_nodes)]
        topo = []
        q = [i for i in range(num_nodes) if in_degree[i] == 0]
        while
     q:
            u = q.pop(0)
            topo.append(u)
            for
     v in graph.get_successors(u):
                in_degree[v] -= 1
                if
     in_degree[v] == 0:
                    q.append(v)

        rank = [0.0] * num_nodes
        for
     u in reversed(topo):
            max_succ_rank = 0.0
            for
     v in graph.get_successors(u):
                max_succ_rank = max(max_succ_rank, rank[v])
            rank[u] = avg_time[u] + max_succ_rank

        # 3. 按 rank 降序排序节点

        nodes_sorted = sorted(range(num_nodes), key=lambda x: rank[x], reverse=True)

        # 4. 调度

        start_time = {}
        finish_time = {}
        assigned_proc = {}
        proc_available = {proc.id: 0.0 for proc in processors}   # 记录每个处理器的可用时间
        proc_map = {proc.id: proc for proc in processors}

        for
     u in nodes_sorted:
            # 计算就绪时间(所有前驱完成时间的最大值)

            pred_finish = [finish_time[p] for p in graph.get_predecessors(u) if p in finish_time]
            ready = max(pred_finish) if pred_finish else 0.0

            best_proc = None
            best_start = None
            best_finish = float('inf')
            for
     proc in processors:
                if
     proc.type not in ops[u].compatible:
                    continue

                exec_time = ops[u].get_exec_time(proc.type)
                if
     exec_time is None:
                    continue

                start = max(ready, proc_available[proc.id])
                finish = start + exec_time
                if
     finish < best_finish:
                    best_finish = finish
                    best_start = start
                    best_proc = proc

            if
     best_proc is None:
                raise
     RuntimeError(f"Node {u} cannot be scheduled")

            start_time[u] = best_start
            finish_time[u] = best_finish
            assigned_proc[u] = best_proc.id
            proc_available[best_proc.id] = best_finish

        return
     start_time, assigned_proc, finish_time
    • • Rank 计算与拓扑排序:HEFT 算法首先通过 Kahn 算法获得计算图的拓扑序,然后逆序遍历该序列计算每个节点的 rank 值。这种逆序计算方式确保了在处理节点 u 时,其后继节点的 rank 值已经确定,从而能够准确反映从 u 到出口节点的最长路径长度。
    • • 平均执行时间的使用:rank 公式中使用平均执行时间而非具体处理器上的执行时间,这是为了在优先级排序阶段保持与具体处理器无关的特性,避免因处理器选择而影响优先级的公正性。平均执行时间能够合理反映节点在异构环境下的计算负载。
    • • 处理器选择策略:与列表调度不同,HEFT 在处理器选择阶段遍历所有处理器(包括正在忙碌的处理器),而非仅限空闲处理器。这是因为 HEFT 的调度顺序是全局确定的,每个节点仅被处理一次,因此需要与所有处理器的当前可用时间进行比较,选择全局最优的处理器。
    • • 通信开销的简化:当前实现省略了节点间的通信开销(comm 项),假设所有数据传输成本为零。在实际应用中,若节点被分配到不同处理器,则需要增加数据迁移的时间开销。原始 HEFT 算法支持通过通信矩阵建模这一开销,此处简化以聚焦核心调度逻辑。

    6.4 示例计算图上的 HEFT 执行过程

    仍采用图1的计算图,执行 HEFT 算法:

    • • 计算平均执行时间
      • • : 兼容 CPU、GPU,平均 
      • • : 兼容 CPU、GPU,平均 
      • • : 兼容 CPU、TPU,平均 
      • • : 兼容 CPU、GPU、TPU,平均 
    • • 逆拓扑序计算 rank(出口节点  开始):
      • • 
      • • 
      • • 
      • • 
    • • 按 rank 降序排序 (10.0),   (6.5),  (4.5),  (3.0)。
    • • 调度
    1. 1. 调度 :就绪时间0,计算各处理器 EFT:
    • • CPU0: 5,开始0,EFT=5
    • • GPU0: 2,开始0,EFT=2(最优)
      选择 GPU0,完成时间2,GPU0 可用时间更新为2。
  • 2. 调度 :就绪时间 = 。计算 EFT:
    • • CPU0: 执行3,开始2,EFT=5
    • • GPU0: 执行4,开始2,EFT=6
      选择 CPU0,完成时间5,CPU0 可用时间更新为5。
  • 3. 调度 :就绪时间 =  。计算 EFT:
    • • CPU0: 当前可用5,开始5,EFT=7
    • • TPU0: 执行1,开始2,EFT=3(最优)
      选择 TPU0,完成时间3,TPU0 可用时间更新为3。
  • 4. 调度  :就绪时间 = 。计算 EFT:
    • • CPU0: 当前可用5,开始5,EFT=9
    • • GPU0: 当前可用2(空闲),开始5,EFT=8
    • • TPU0: 当前可用3(空闲),开始5,EFT=7(最优)
      选择 TPU0,完成时间7,TPU0 可用时间更新为7。

    最终调度结果与列表调度相同,makespan = 7 ms。但在某些复杂图中,HEFT 因考虑了关键路径而能取得更优解。

    7. 两种算法的对比分析与最优调度差距

    7.1 理论近似比

    • • 列表调度:对于同构处理器,若使用 LPT 优先级(最长处理时间优先),其 makespan 不超过最优解的  倍,其中  为处理器数量。对于一般图,列表调度的近似比在最坏情况下可达  或更大,但实际表现通常较好。
    • • HEFT:原始论文中未给出严格的近似比,但实验表明 HEFT 的解通常非常接近最优(平均在 5% 以内)。在特定条件下(如通信开销可忽略),HEFT 的最坏情况近似比也可达到 

    7.2 实际性能对比

    特性
    列表调度
    HEFT
    优先级计算
    用户自定义(如 LPT、SPT、HLF)
    基于关键路径的 rank(考虑平均执行时间)
    处理器选择
    遍历空闲处理器,选择 EFT 最小的
    遍历所有处理器(含忙碌),选择 EFT 最小的
    插入调度
    可选(通过 allow_insertion)
    通常不支持插入,但可扩展
    通信开销
    可扩展加入通信模型
    原始版本包含通信开销建模
    时间复杂度
    (O(
    V
    解的质量
    依赖启发式,可能较差
    通常优于简单列表调度,接近最优
    应用场景
    快速原型、简单依赖图
    静态调度、对质量要求高的场景

    7.3 最优调度差距示例

    对于本文的示例图,最优调度(可通过穷举或整数规划求得)的 makespan 也是 7 ms,因此两种算法均达到了最优。但在更复杂的图中,HEFT 往往比列表调度更接近最优。例如,在随机生成的 DAG 上,HEFT 的 makespan 平均比列表调度(使用 LPT 优先级)低 10%~20%

    近似比公式:对于任意调度算法 ,其近似比定义为:

    其中  是实例  的最优 makespan。列表调度在一般情况下 ,而 HEFT 没有统一的理论上界,但实验表明  在大多数情况下成立。

    8. 总结

    本文系统介绍了深度学习模型在异构处理器上的两种经典调度算法:列表调度与 HEFT。列表调度通过可插拔的优先级函数,以事件驱动方式快速生成可行调度;而 HEFT 通过关键路径分析和处理器选择,在解的质量上更胜一筹,尤其适合异构环境。通过一个具体的计算图示例,我们详细展示了两种算法的执行过程,并给出了完整的 Python 实现框架。在实际应用中,可根据对调度时间与解质量的不同需求选择合适的算法,也可将 HEFT 的优先级计算思想融入列表调度框架,进一步提升性能。


    参考文献
    [1] Topcuoglu, H., Hariri, S., & Wu, M. Y. (2002). Performance-effective and low-complexity task scheduling for heterogeneous computing. IEEE Transactions on Parallel and Distributed Systems, 13(3), 260-274.
    [2] Kwok, Y. K., & Ahmad, I. (1999). Static scheduling algorithms for allocating directed task graphs to multiprocessors. ACM Computing Surveys, 31(4), 406-471.

     


    Python社区是高质量的Python/Django开发社区
    本文地址:http://www.python88.com/topic/194327