Py学习  »  Python

气象编程 | Python并行——速度+

气象学家 • 1 年前 • 195 次点击  

一直对python的多线程、多进程、分布式多进程比较好奇。今天浅浅地学习了一下,里面涉及的内容其实比较多,包括进程锁、进程间的通信、进程池、共享内存等等。这里给一个简单的、大家可能会常用到的例子——从多个wrfout文件中提取变量T2并单独保存输出为nc文件,一起感受下多进程的魅力。如果不妥之处,还望大家不吝赐教!


常规代码

这份代码是大家实际中经常使用的,通过循环来实现从多个wrfout文件中提取变量T2并单独保存输出为nc文件。

import xarray as xr
import numpy as np
import glob
import sys
import os
import argparse
import time 

def nc2pkl(args):
    st =time.time()
    if not os.path.exists(args.outdir):
            os.mkdir(args.outdir)
    files = glob.glob(args.files)
    
    for file in files:
        filename = file.split('/')[-1]
        print('Reading ',file)
        sys.stdout.flush()
        ds = xr.open_dataset(file)
        T2 = ds['T2']

        sys.stdout.flush()
        # Write out everything 
        ofile = args.outdir+'/'+filename+'.nc'
        T2.to_netcdf(ofile)
        print('Written to '+ofile)

    et = time.time()
    print(et -st)
  
def parse_args():
    '''
    Parser for nc2pkl
    '''

    parser = argparse.ArgumentParser()
    parser.add_argument('-t','--template',type=str,default="../data/wrfout_d01_2018-08-01_00:00:00")
    parser.add_argument('-f','--files',type=str,default="../data/wrfout_d01_2018-08*")
    parser.add_argument('-o','--outdir',type=str,default="../output/T2")
    return parser.parse_args()

if __name__ == '__main__':
    nc2pkl(parse_args())

多进程并行代码

这份代码里面使用了多进程并行,从num_processes = 4可以知道开了4个进程同时处理,可以简单理解为同一时间同时处理4个wrfout文件。其实能开多少进程取决于我们的计算机有多少核数,在linux上可以通过nproc命令查看核数。

如果大家想使用下面的并行代码满足自己的需求,只需要更改被我用-----框起来的函数定义中的操作即可,比如更改变量,或者增加计算等。

import xarray as xr
import numpy as np
import multiprocessing as mp
from functools import partial
import glob
import sys
import os
import time

#--------------------------------------
def nc2pkl(file_path, output_dir):
    if not os.path.exists(output_dir):
            os.mkdir(output_dir)
    
    filename = file_path.split('/')[-1]
    print('Reading ',file_path)
    sys.stdout.flush()
    ds = xr.open_dataset(file_path)
    T2 = ds['T2']

    sys.stdout.flush()
    # Write out everything 
    ofile = output_dir+'/'+filename+'.nc'
    T2.to_netcdf(ofile)
    print('Written to '+ofile)
#--------------------------------------

def parallel_nc2pkl(input_dir, output_dir, num_processes):
    st = time.time()
    # 获取所有需要处理的文件路径
    file_paths = glob.glob(os.path.join(input_dir, "wrfout_d01_2018-08*"))

    # 创建进程池
    with mp.Pool(processes=num_processes) as pool:
        # 使用partial函数创建一个只有一个参数的nc2pkl函数
        worker_func = partial(nc2pkl, output_dir=output_dir)

        # 将需要处理的文件路径传递给进程池
        pool.map(worker_func, file_paths)
    et = time.time()
    print(et -st)

if __name__ == "__main__":
    # 设置输入和输出目录
    input_dir = "../data/"
    output_dir = "../output/T2_multi"

    # 设置进程数量
    num_processes = 4

    # 并行处理文件
    parallel_nc2pkl(input_dir, output_dir, num_processes)

计算效率

常规代码耗时及CPU使用情况

并行代码耗时及CPU使用情况

从中可以看到,并行代码极大地提升了速度。

注意

无论是多进程还是多线程,一旦任务数量多到一个限度,就会消耗掉系统所有的资源,结果就是效率急剧下降,所有任务都做不好。

参考:

【1】https://mofanpy.com/tutorials/python-basic/multiprocessing/why

【2】https://www.liaoxuefeng.com/wiki/1016959663602400/1017627212385376









声明:欢迎转载、转发本号原创内容,可留言区留言或者后台联系小编(微信:gavin7675)进行授权。气象学家公众号转载信息旨在传播交流,其内容由作者负责,不代表本号观点。文中部分图片来源于网络,如涉及作品内容、版权和其他问题,请后台联系小编处理。


往期推荐

 获取ERA5-Land陆面高分辨率再分析数据(32TB)

★ 获取最新版本CMIP6降尺度数据集30TB

★ 获取ERA5常用变量再分析数据(26TB)

 EC数据商店推出Python在线处理工具箱

★ EC打造实用气象Python工具Metview

★ 机器学习简介及在短临天气预警中的应用

★ Nature-地球系统科学领域的深度学习及理解

★ 采用神经网络与深度学习来预报降水、温度


   欢迎加入气象学家交流群   

请备注:姓名/昵称-单位/学校-研究方向


Python社区是高质量的Python/Django开发社区
本文地址:http://www.python88.com/topic/153267
 
195 次点击