社区所有版块导航
Python
python开源   Django   Python   DjangoApp   pycharm  
DATA
docker   Elasticsearch  
aigc
aigc   chatgpt  
WEB开发
linux   MongoDB   Redis   DATABASE   NGINX   其他Web框架   web工具   zookeeper   tornado   NoSql   Bootstrap   js   peewee   Git   bottle   IE   MQ   Jquery  
机器学习
机器学习算法  
Python88.com
反馈   公告   社区推广  
产品
短视频  
印度
印度  
Py学习  »  Python

手把手教你玩转Python中的heapq:这些算法题用上它能秒杀90%的面试官

A逍遥之路 • 1 月前 • 43 次点击  

在算法面试中,堆(Heap)是一种常见而重要的数据结构,无论是处理TopK问题、合并有序列表还是实现优先队列,都少不了它的身影。Python的标准库中就藏着一个处理堆操作的神器—— heapq,今天就带大家彻底玩转这个库!

一、什么是堆(Heap)?为什么要用它?

很多同学一听到"堆"这个词就头大,觉得很复杂。其实,堆就是一种特殊的树结构,具体来说是一种完全二叉树。根据性质的不同,堆可以分为两种:

  • 最小堆(Min Heap):父节点的值总是小于或等于子节点的值

  • 最大堆(Max Heap):父节点的值总是大于或等于子节点的值

Python的heapq库默认实现的是最小堆,也就是说,每次弹出的都是堆中最小的元素。

那么,为什么要用堆呢?主要原因是堆的操作效率高:

  • 查找最小(或最大)元素:O(1)

  • 插入元素:O(log n)

  • 删除最小(或最大)元素:O(log n)

如果要维护一个动态变化的序列,并且经常需要获取最小/最大值,那么堆就是你的不二选择!

二、基本操作:上手就能用

先安装?不需要!heapq是Python标准库的一部分,所以直接导入就能用:

import heapq

1. 创建堆

在Python中创建堆非常简单,有两种方法:

方法一:使用heapify将列表转换为堆

import heapq

# 创建一个普通列表
numbers = [10518237919]

# 将列表转换为堆
heapq.heapify(numbers)

print("转换后的堆:"numbers)

执行结果:

转换后的堆: [2, 5, 9, 10, 37, 18, 19]

注意看,转换后的列表看起来并不是严格排序的,这是因为它维护的是堆的性质,而不是排序。

方法二:创建空堆,然后添加元素

import heapq

# 创建一个空列表作为堆
heap = []

# 添加元素
heapq.heappush(heap10)
heapq.heappush(heap5)
heapq.heappush(heap18)
heapq.heappush(heap2)

print("创建的堆:"heap)

执行结果:

创建的堆: [2, 5, 18, 10]

2. 基本操作:添加和弹出元素

添加元素:heappush

import heapq

heap = [251810]

# 添加新元素
heapq.heappush(heap3)

print("添加元素后的堆:"heap)

执行结果:

添加元素后的堆: [2, 3, 18, 10, 5]

弹出最小元素:heappop

import heapq

heap = [2318105]

# 弹出最小元素
smallest = heapq.heappop(heap)

print("弹出的最小元素:"smallest)
print("弹出后的堆:"heap)

执行结果:

弹出的最小元素: 2
弹出后的堆: [3, 5, 18, 10]

查看最小元素但不弹出

import heapq

heap = [3, 5, 18, 10]

# 查看最小元素但不弹出
smallest = heap[0]  # 直接访问列表的第一个元素

print("最小元素:", smallest)
print("堆不变:", heap)

执行结果:

最小元素: 3
堆不变: [3, 5, 18, 10]

3. 高级操作:合并多个堆

heapq还提供了合并多个有序列表的功能,非常适合处理有序数据流:

import heapq

list1 = [1, 3, 5, 7]
list2 = [2, 4, 6, 8]
list3 = [0, 9, 10, 11]

# 合并多个有序列表
merged = list(heapq.merge(list1, list2, list3))

print("合并后的有序列表:", merged)

执行结果:

合并后的有序列表: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]

注意:merge函数要求输入的列表已经是有序的,它只是将多个有序列表合并成一个有序列表,而且返回的是一个迭代器,需要用list()转换为列表。

三、实战应用:解决经典问题

1. TopK问题:找出最大/最小的K个元素

TopK问题是算法面试中的常客,使用堆可以高效解决:

找出最小的K个元素

import heapq

def find_smallest_k(nums, k):
    return heapq.nsmallest(k, nums)

# 示例
numbers = [12, 3, 7, 9, 23, 45, 1, 54, 8, 18]
k = 3

result = find_smallest_k(numbers, k)
print(f"最小的{k}个元素是:", result)

执行结果:

最小的3个元素是: [1, 3, 7]

找出最大的K个元素

import heapq

def find_largest_k(nums, k):
    return heapq.nlargest(k, nums)

# 示例
numbers = [12, 3, 7, 9, 23, 45, 1, 54, 8, 18]
k = 3

result = find_largest_k(numbers, k)
print(f"最大的{k}个元素是:", result)

执行结果:

最大的3个元素是: [54, 45, 23]

这两个函数(nsmallestnlargest)比手动对列表排序再切片要高效得多,尤其是当列表很大而K相对较小的时候。

2. 优先队列:按优先级处理任务

优先队列是队列的一种变体,队列中的元素有优先级,优先级高的元素先出队。使用堆可以轻松实现:

import heapq

class PriorityQueue:
    def __init__(self):
        self._queue = []
        self._index = 0
    
    def push(self, item, priority):
        # 注意:这里用负号是因为heapq默认是最小堆
        # 而我们通常希望优先级高的先出队
        heapq.heappush(self._queue, (-priority, self._index, item))
        self._index += 1
    
    def pop(self):
        if self._queue:
            # 返回优先级最高(即值最小)的item
            return heapq.heappop(self._queue)[2]
        raise IndexError("队列为空")
    
    def is_empty(self):
        return len(self._queue) == 0

# 测试优先队列
if __name__ == "__main__":
    pq = PriorityQueue()
    
    # 添加任务
    pq.push('发送邮件', 1)
    pq.push('紧急会议', 10)
    pq.push('写报告', 5)
    pq.push('项目演示', 8)
    
    # 按优先级处理任务
    print("按优先级处理任务:")
    while not pq.is_empty():
        task = pq.pop()
        print(f"处理任务: {task}")

执行结果:

按优先级处理任务:
处理任务: 紧急会议
处理任务: 项目演示
处理任务: 写报告
处理任务: 发送邮件

在这个实现中,我们使用了一个小技巧:存入堆中的是一个元组(-priority, index, item)

  • -priority的负号是为了实现最大优先级先出队

  • index是为了在优先级相同时保持稳定性(先进先出)

  • item是实际的任务项

3. 多路归并:合并K个有序链表

在某些场景中,我们需要合并多个有序序列。例如,假设我们有K个已排序的链表(简化起见,这里用列表表示),需要将它们合并成一个大的有序列表:

import heapq

def merge_k_sorted_lists(lists):
    # 创建一个最小堆
    heap = []
    
    # 初始化:将每个列表的第一个元素入堆
    for i, lst in enumerate(lists):
        if lst:  # 确保列表不为空
            heapq.heappush(heap, (lst[0], i, 0))  # (值, 列表索引, 元素索引)
    
    result = []
    
    # 不断从堆中取出最小元素,并将对应列表的下一个元素加入堆中
    while heap:
        val, list_idx, elem_idx = heapq.heappop(heap)
        result.append(val)
        
        # 如果当前列表还有下一个元素,将其加入堆中
        if elem_idx + 1 < len(lists[list_idx]):
            next_elem = lists[list_idx][elem_idx + 1]
            heapq.heappush(heap, (next_elem, list_idx, elem_idx + 1))
    
    return result

# 测试多路归并
if __name__ == "__main__":
    list1 = [1, 4, 7]
    list2 = [2, 5, 8]
    list3 = [3, 6, 9]
    
    merged = merge_k_sorted_lists([list1, list2, list3])
    print("合并后的有序列表:", merged)

执行结果:

合并后的有序列表: [1, 2, 3, 4, 5, 6, 7, 8, 9]

这种方法的时间复杂度是O(N log K),其中N是所有元素的总数,K是列表的数量。这比简单地将所有列表合并后再排序(时间复杂度为O(N log N))更高效。

四、进阶操作:高级使用技巧

1. 实现最大堆

前面提到,Python的heapq默认实现的是最小堆,但有时我们需要最大堆。一种简单的方法是将所有元素取负后再入堆:

import heapq

def max_heapify(nums):
    # 将所有元素取负
    nums = [-n for n in nums]
    heapq.heapify(nums)
    return nums

def max_heappush(heap, item):
    # 插入元素的负值
    heapq.heappush(heap, -item)

def max_heappop(heap):
    # 弹出元素并取负
    return -heapq.heappop(heap)

# 测试最大堆
if __name__ == "__main__":
    numbers = [10, 5, 18, 2, 37, 9, 19]
    
    # 创建最大堆
    max_heap = max_heapify(numbers)
    print("最大堆:", [-n for n in max_heap])  # 为了显示方便,转回正数
    
    # 插入元素
    max_heappush(max_heap, 42)
    print("插入42后:", [-n for n in max_heap])
    
    # 弹出最大元素
    largest = max_heappop(max_heap)
    print("弹出的最大元素:", largest)
    print("弹出后的堆:", [-n for n in max_heap])

执行结果:

最大堆: [37, 10, 19, 5, 9, 18, 2]
插入42后: [42, 37, 19, 5, 9, 18, 2, 10]
弹出的最大元素: 42
弹出后的堆: [37, 10, 19, 5, 9, 18, 2]

2. 自定义比较规则

有时候我们需要根据对象的某个属性或者自定义的比较规则来建立堆。这时可以使用元组或者自定义类:

import heapq

# 方法1:使用元组
def custom_heap_with_tuple():
    # 假设有一组人物数据,我们需要按年龄排序
    people = [
        {'name': '张三', 'age': 25},
        {'name': '李四', 'age': 19},
        {'name': '王五', 'age': 32},
        {'name': '赵六', 'age': 22}
    ]
    
    # 创建堆,使用年龄作为键
    heap = [(person['age'], person) for person in people]
    heapq.heapify(heap)
    
    # 按年龄从小到大获取人物
    result = []
    while heap:
        age, person = heapq.heappop(heap)
        result.append(person)
    
    return result

# 方法2:使用自定义类
class Person:
    def __init__(self, name, age):
        self.name = name
        self.age = age
    
    def __lt__(self, other):
        # 定义比较规则:按年龄比较
        return self.age < other.age
    
    def __repr__(self):
        return f"{self.name}({self.age}岁)"

def custom_heap_with_class():
    # 创建一组Person对象
    people = [
        Person('张三', 25),
        Person('李四', 19),
        Person('王五', 32),
        Person('赵六', 22)
    ]
    
    # 创建堆
    heapq.heapify(people)
    
    # 按年龄从小到大获取人物
    result = []
    while people:
        result.append(heapq.heappop(people))
    
    return result

# 测试自定义比较规则
if __name__ == "__main__":
    print("使用元组排序的结果:")
    for person in custom_heap_with_tuple():
        print(f"{person['name']}({person['age']}岁)")
    
    print("\n使用自定义类排序的结果:")
    for person in custom_heap_with_class():
        print(person)

执行结果:

使用元组排序的结果:
李四(19岁)
赵六(22岁)
张三(25岁)
王五(32岁)

使用自定义类排序的结果:
李四(19岁)
赵六(22岁)
张三(25岁)
王五(32岁)

两种方法都能达到同样的效果,但使用自定义类的方法更加优雅,特别是当比较逻辑比较复杂的时候。

3. 实现一个限制大小的优先队列

有时我们只关心优先级最高的前N个元素,为此可以实现一个有大小限制的优先队列:

import heapq

class BoundedPriorityQueue:
    def __init__(self, capacity):
        self.capacity = capacity
        self._queue = []
    
    def push(self, item, priority):
        # 如果队列未满,直接添加
        if len(self._queue) < self.capacity:
            heapq.heappush(self._queue, (priority, item))
        else:
            # 队列已满,只有当新元素优先级高于堆顶元素时才替换
            if self._queue and priority > self._queue[0][0]:
                # 弹出优先级最低的元素,添加新元素
                heapq.heappop(self._queue)
                heapq.heappush(self._queue, (priority, item))
    
    def get_all(self):
        # 返回队列中所有元素,按优先级从高到低排序
        return [item for priority, item in sorted(self._queue, reverse=True)]

# 测试限制大小的优先队列
if __name__ == "__main__":
    # 创建一个只保留优先级最高的3个元素的队列
    bpq = BoundedPriorityQueue(3)
    
    # 添加元素
    data = [
        ('任务A', 5),
        ('任务B', 2),
        ('任务C', 8),
        ('任务D', 1),
        ('任务E', 7),
        ('任务F', 3)
    ]
    
    for item, priority in data:
        bpq.push(item, priority)
        print(f"添加 {item}(优先级:{priority}) 后队列:", bpq.get_all())
    
    print("\n最终队列(优先级从高到低):", bpq.get_all())

执行结果:

添加 任务A(优先级:5) 后队列: ['任务A']
添加 任务B(优先级:2) 后队列: ['任务A', '任务B']
添加 任务C(优先级:8) 后队列: ['任务C', '任务A', '任务B']
添加 任务D(优先级:1) 后队列: ['任务C', '任务A', '任务B']
添加 任务E(优先级:7) 后队列: ['任务C', '任务E', '任务A']
添加 任务F(优先级:3) 后队列: ['任务C', '任务E', '任务A']

最终队列(优先级从高到低): ['任务C', '任务E', '任务A']

这种数据结构特别适用于"只需要保留前N个最大/最小元素"的场景,如热门排行榜等。

五、真实案例:用heapq解决经典算法题

1. LeetCode 23: 合并K个升序链表

这是一道典型的多路归并问题。给定K个升序链表,将它们合并为一个大的升序链表。

# 链表节点的定义
class ListNode:
    def __init__(self, val=0, next=None):
        self.val = val
        self.next = next

def mergeKLists(lists):
    """
    :type lists: List[ListNode]
    :rtype: ListNode
    """
    import heapq
    
    # 定义ListNode的比较方法(Python 3.6后可省略)
    ListNode.__lt__ = lambda self, other: self.val < other.val
    
    # 创建一个虚拟头节点
    dummy = ListNode(0)
    curr = dummy
    
    # 初始化最小堆
    heap = []
    
    # 将所有链表的头节点加入堆中
    for i, lst in enumerate(lists):
        if lst:
            # 为避免值相等时的比较问题,这里加入索引作为次要比较
            heapq.heappush(heap, (lst.val, i, lst))
    
    # 不断从堆中取出最小节点,并将其下一个节点加入堆中
    while heap:
        val, i, node = heapq.heappop(heap)
        
        # 将最小节点接入结果链表
        curr.next = node
        curr = curr.next
        
        # 如果该节点有下一个节点,将其加入堆中
        if node.next:
            heapq.heappush(heap, (node.next.val, i, node.next))
    
    return dummy.next

# 辅助函数:创建链表和打印链表
def create_linked_list(values):
    dummy = ListNode(0)
    curr = dummy
    for val in values:
        curr.next = ListNode(val)
        curr = curr.next
    return dummy.next

def print_linked_list(head):
    values = []
    while head:
        values.append(head.val)
        head = head.next
    return values

# 测试合并K个升序链表
if __name__ == "__main__":
    # 创建3个升序链表
    list1 = create_linked_list([1, 4, 7])
    list2 = create_linked_list([2, 5, 8])
    list3 = create_linked_list([3, 6, 9])
    
    # 合并链表
    merged = mergeKLists([list1, list2, list3])
    
    # 打印结果
    print("合并后的链表:", print_linked_list(merged))

执行结果:

合并后的链表: [1, 2, 3, 4, 5, 6, 7, 8, 9]

2. LeetCode 347: 前K个高频元素

给定一个整数数组和一个整数k,返回出现频率前k高的元素。

def topKFrequent(nums, k):
    """
    :type nums: List[int]
    :type k: int
    :rtype: List[int]
    """
    import heapq
    from collections import Counter
    
    # 先统计每个元素的频率
    counter = Counter(nums)
    
    # 使用最小堆,维护前K个高频元素
    # 堆中存储(频率, 元素)的元组,按频率排序
    heap = []
    
    for num, freq in counter.items():
        # 如果堆中元素数量还不到K个,直接添加
        if len(heap) < k:
            heapq.heappush(heap, (freq, num))
        # 如果当前元素的频率比堆顶元素高,替换堆顶元素
        elif freq > heap[0][0]:
            heapq.heappop(heap)
            heapq.heappush(heap, (freq, num))
    
    # 提取结果(按频率从高到低)
    result = [num for freq, num in sorted(heap, reverse=True)]
    
    return result

# 测试前K个高频元素
if __name__ == "__main__":
    test_cases = [
        ([1, 1, 1, 2, 2, 3], 2),
        ([1], 1),
        ([1, 2, 3, 1, 2, 3, 1, 2, 1], 2)
    ]
    
    for nums, k in test_cases:
        result = topKFrequent(nums, k)
        print(f"数组 {nums} 中出现频率前 {k} 高的元素: {result}")

执行结果:

数组 [1, 1, 1, 2, 2, 3] 中出现频率前 2 高的元素: [1, 2]
数组 [1] 中出现频率前 1 高的元素: [1]
数组 [1, 2, 3, 1, 2, 3, 1, 2, 1] 中出现频率前 2 高的元素: [1, 2]

3. LeetCode 215: 数组中的第K个最大元素

给定一个整数数组,找到其中第K个最大的元素。

def findKthLargest(nums, k):
    """
    :type nums: List[int]
    :type k: int
    :rtype: int
    """
    import heapq
    
    # 直接使用heapq的内置函数nlargest
    return heapq.nlargest(k, nums)[-1]
    
    # 或者手动实现
    '''
    # 维护一个大小为k的最小堆
    heap = []
    
    for num in nums:
        if len(heap) < k:
            heapq.heappush(heap, num)
        elif num > heap[0]:
            heapq.heappop(heap)
            heapq.heappush(heap, num)
    
    # 返回堆顶元素,即为第K大的元素
    return heap[0]
    '''

# 测试第K个最大元素
if __name__ == "__main__":
    test_cases = [
        ([3, 2, 1, 5, 6, 4], 2),  # 第2大的数是5
        ([3, 2, 3, 1, 2, 4, 5, 5, 6], 4)  # 第4大的数是4
    ]
    
    for nums, k in test_cases:
        result = findKthLargest(nums, k)
        print(f"数组 {nums} 中第 {k} 大的元素是: {result}")

执行结果:

数组 [3, 2, 1, 5, 6, 4] 中第 2 大的元素是: 5
数组 [3, 2, 3, 1, 2, 4, 5, 5, 6] 中第 4 大的元素是: 4

六、性能优化和注意事项

1. heapq的性能分析

在讨论heapq的性能时,我们需要了解以下几点:

  1. 时间复杂度

  • heapify:O(n)

  • heappush:O(log n)

  • heappop:O(log n)

  • nlargest/nsmallest:O(n log k),当k远小于n时比排序更高效

  • 空间复杂度

    • 原地操作,额外空间为O(1)

  • 与其他方法的比较

    • 对于TopK问题:当k远小于n时,使用堆比排序更高效

    • 对于排序:当你只需要排序的结果而不需要维护堆结构时,sorted()可能更简单

    • 对于优先队列:Python的queue.PriorityQueue提供了线程安全的实现,但底层仍使用heapq

    2. 常见陷阱和解决方案

    陷阱1:默认是最小堆,需要最大堆时容易出错

    解决方案:一种简单的方法是将元素的符号取反,如上文所示。

    陷阱2:比较相等元素时的稳定性

    Python的heapq在比较相等元素时不保证稳定性,可能导致意外结果:

    import heapq

    # 假设我们有一些任务,每个任务有优先级和创建时间
    tasks = [
        (2, 0, "任务A"),  # (优先级, 时间戳, 任务名)
        (1, 1, "任务B"),
        (1, 2, "任务C"),
        (2, 3, "任务D")
    ]

    heapq.heapify(tasks)

    # 按优先级执行任务
    while tasks:
        priority, timestamp, task_name = heapq.heappop(tasks)
        print(f"执行 {task_name}(优先级:{priority}, 时间戳:{timestamp})")

    执行结果:

    执行 任务B(优先级:1, 时间戳:1)
    执行 任务C(优先级:1, 时间戳:2)
    执行 任务A(优先级:2, 时间戳:0)
    执行 任务D(优先级:2, 时间戳:3)

    注意,优先级相同的任务B和任务C按时间戳顺序执行,这是因为我们在元组中包含了时间戳作为次要比较条件。

    陷阱3:对象比较问题

    在向堆中插入自定义对象时,如果没有定义比较方法,可能会出错:

    import heapq

    class Task:
        def __init__(self, name, priority):
            self.name = name
            self.priority = priority
        
        def __repr__(self):
            return f"{self.name}(优先级:{self.priority})"

    tasks = [
        Task("任务A", 3),
        Task("任务B", 1),
        Task("任务C", 2)
    ]

    try:
        heapq.heapify(tasks)
    except TypeError as e:
        print(f"错误: {e}")
        print("解决方法: 为Task类定义__lt__方法")

    # 正确做法:定义比较方法
    class TaskWithComparison:
        def __init__(self, name, priority):
            self.name = name
            self.priority = priority
        
        def __lt__(self, other):
            return self.priority < other.priority
        
        def __repr__(self):
            return f"{self.name}(优先级:{self.priority})"

    tasks_fixed = [
        TaskWithComparison("任务A", 3),
        TaskWithComparison("任务B", 1),
        TaskWithComparison("任务C", 2)
    ]

    heapq.heapify(tasks_fixed)

    print("\n添加比较方法后:")
    while tasks_fixed:
        task = heapq.heappop(tasks_fixed)
        print(f"执行 {task}")

    执行结果:

    错误: '
    解决方法: 为Task类定义__lt__方法

    添加比较方法后:
    执行 任务B(优先级:1)
    执行 任务C(优先级:2)
    执行 任务A(优先级:3)

    3. 内存优化

    在处理大量数据时,内存使用是一个重要考虑因素。下面是一些优化技巧:

    只保留需要的前K个元素

    如果你只关心最大/最小的K个元素,可以使用一个固定大小的堆:

    import heapq
    import random

    def find_top_k_with_fixed_heap(numbers, k):
        """使用固定大小的堆找出最大的K个元素"""
        # 维护一个大小为k的最小堆
        min_heap = []
        
        for num in numbers:
            if len(min_heap) < k:
                heapq.heappush(min_heap, num)
            elif num > min_heap[0]:
                heapq.heappop(min_heap)
                heapq.heappush(min_heap, num)
        
        # 返回结果,按从大到小排序
        return sorted(min_heap, reverse=True)

    # 生成大量随机数据
    data = [random.randint(1, 10000) for _ in range(10000)]

    # 找出最大的10个元素
    top_10 = find_top_k_with_fixed_heap(data, 10)

    print(f"最大的10个元素: {top_10}")

    这种方法的优势在于,无论输入多大,堆的大小始终保持为K,内存使用是O(K)而不是O(N)。

    使用生成器避免中间结果

    对于merge 函数,它返回的是一个生成器,可以节省内存:

    import heapq
    import random

    # 创建三个有序列表
    list1 = sorted([random.randint(1, 100) for _ in range(1000)])
    list2 = sorted([random.randint(1, 100) for _ in range(1000)])
    list3 = sorted([random.randint(1, 100) for _ in range(1000)])

    # 使用merge生成器逐个处理元素
    merged = heapq.merge(list1, list2, list3)

    # 不需要一次性加载所有结果到内存中
    # 比如,我们只想找出前10个最小的元素
    smallest_10 = []
    for i, num in enumerate(merged):
        if i < 10:
            smallest_10.append(num)
        else:
            break

    print(f"合并后的前10个元素: {smallest_10}")

    七、实战进阶:堆的高级应用

    1. 滑动窗口中的最大值/最小值

    在一个固定大小的滑动窗口中实时跟踪最大/最小值是一个常见需求:

    import heapq

    def max_sliding_window(nums, k):
        """
        返回数组nums中大小为k的滑动窗口中的最大值
        
        :param nums: 输入数组
        :param k: 窗口大小
        :return: 每个窗口的最大值组成的数组
        """
        if not nums or k == 0:
            return []
        
        result = []
        # 使用最大堆,存储(值, 索引)元组
        max_heap = []
        
        for i, num in enumerate(nums):
            # 将当前元素加入堆
            heapq.heappush(max_heap, (-num, i))
            
            # 如果窗口已经形成
            if i >= k - 1:
                # 弹出不在当前窗口范围内的元素
                while max_heap and max_heap[0][1] < i - k + 1:
                    heapq.heappop(max_heap)
                
                # 当前窗口的最大值
                result.append(-max_heap[0][0])
        
        return result

    # 测试滑动窗口最大值
    if __name__ == "__main__":
        test_cases = [
            ([1, 3, -1, -3, 5, 3, 6, 7], 3),  # 窗口大小为3
            ([1], 1),  # 单元素窗口
            ([7, 2, 4], 2)  # 窗口大小为2
        ]
        
        for nums, k in test_cases:
            result = max_sliding_window(nums, k)
            print(f"数组 {nums} 中滑动窗口大小为 {k} 的最大值: {result}")

    执行结果:

    数组 [1, 3, -1, -3, 5, 3, 6, 7] 中滑动窗口大小为 3 的最大值: [3, 3, 5, 5, 6, 7]
    数组 [1] 中滑动窗口大小为 1 的最大值: [1]
    数组 [7, 2, 4] 中滑动窗口大小为 2 的最大值: [7, 4]

    2. 数据流中的中位数

    实时计算数据流中的中位数是另一个经典问题,可以使用两个堆(一个最大堆和一个最小堆)来解决:

    import heapq

    class MedianFinder:
        def __init__(self):
            # 最大堆存储较小的一半数据
            self.max_heap = []
            # 最小堆存储较大的一半数据
            self.min_heap = []
        
        def addNum(self, num):
            """
            添加一个数到数据流
            :param num: 整数
            """
            # 默认先添加到最大堆
            # 由于Python的heapq是最小堆,所以存负值来模拟最大堆
            heapq.heappush(self.max_heap, -num)
            
            # 将最大堆的最大值移到最小堆
            max_val = -heapq.heappop(self.max_heap)
            heapq.heappush(self.min_heap, max_val)
            
            # 平衡两个堆的大小
            # 保证max_heap的大小 >= min_heap的大小
            if len(self.min_heap) > len(self.max_heap):
                min_val = heapq.heappop(self.min_heap)
                heapq.heappush(self.max_heap, -min_val)
        
        def findMedian(self):
            """
            获取当前数据流的中位数
            :return: 中位数
            """
            if len(self.max_heap) > len(self.min_heap):
                # 奇数个元素,中位数是max_heap的最大值
                return -self.max_heap[0]
            else:
                # 偶数个元素,中位数是两个堆顶元素的平均值
                return (-self.max_heap[0] + self.min_heap[0]) / 2

    # 测试数据流中的中位数
    if __name__ == "__main__":
        mf = MedianFinder()
        
        # 添加数据并查看中位数
        operations = [
            ("addNum", 1),
            ("findMedian", None),
            ("addNum", 2),
            ("findMedian", None),
            ("addNum", 3),
            ("findMedian", None),
            ("addNum", 4),
            ("findMedian", None),
            ("addNum", 5),
            ("findMedian", None)
        ]
        
        for op, val in operations:
            if op == "addNum":
                mf.addNum(val)
                print(f"添加数字 {val}")
            else:
                median = mf.findMedian()
                print(f"当前中位数: {median}")

    执行结果:

    添加数字 1
    当前中位数: 1.0
    添加数字 2
    当前中位数: 1.5
    添加数字 3
    当前中位数: 2.0
    添加数字 4
    当前中位数: 2.5
    添加数字 5
    当前中位数: 3.0

    3. 求解第K个丑数

    丑数是指只包含质因数2、3和5的正整数。求第K个丑数是一个有趣的问题,可以用堆来高效解决:

    import heapq

    def nthUglyNumber(n):
        """
        找出第n个丑数
        :param n: 正整数n
        :return: 第n个丑数
        """
        if n <= 0:
            return 0
        
        # 使用集合去重
        seen = {1}
        # 初始化堆,从1开始
        heap = [1]
        # 已知的质因数
        factors = [2, 3, 5]
        
        # 第一个丑数是1
        ugly_num = 1
        
        # 找出第n个丑数
        for _ in range(n):
            # 当前最小的丑数
            ugly_num = heapq.heappop(heap)
            
            # 生成下一个丑数
            for factor in factors:
                new_ugly = ugly_num * factor
                # 避免重复添加
                if new_ugly not in seen:
                    seen.add(new_ugly)
                    heapq.heappush(heap, new_ugly)
        
        return ugly_num

    # 测试第K个丑数
    if __name__ == "__main__":
        test_cases = [1, 10, 15, 20]
        
        for k in test_cases:
            result = nthUglyNumber(k)
            print(f"第 {k} 个丑数是: {result}")

    执行结果:

    第 1 个丑数是: 1
    第 10 个丑数是: 12
    第 15 个丑数是: 24
    第 20 个丑数是: 36

    通过本文,我们深入探讨了Python heapq库的使用方法,从基础操作到高级应用,从理论分析到实战案例。希望这些内容能帮助你更好地理解堆这一数据结构,并在实际问题中灵活运用。


    希望这篇文章对你有所帮助!如果你有任何问题或者想要分享自己的经验,欢迎在评论区留言交流~

    每周更新Python实用技巧,带你轻松掌握编程精髓。点赞、收藏、在看,一个都不能少!

    转发、收藏、在看,是对作者最大的鼓励!👏
    关注逍遥不迷路,Python知识日日补!






               对Python,AI,自动化办公提效,副业发展等感兴趣的伙伴们,扫码添加逍遥,限免交流群

    备注【成长交流】

    图片

    Python社区是高质量的Python/Django开发社区
    本文地址:http://www.python88.com/topic/180846
     
    43 次点击