社区所有版块导航
Python
python开源   Django   Python   DjangoApp   pycharm  
DATA
docker   Elasticsearch  
aigc
aigc   chatgpt  
WEB开发
linux   MongoDB   Redis   DATABASE   NGINX   其他Web框架   web工具   zookeeper   tornado   NoSql   Bootstrap   js   peewee   Git   bottle   IE   MQ   Jquery  
机器学习
机器学习算法  
Python88.com
反馈   公告   社区推广  
产品
短视频  
印度
印度  
Py学习  »  Python

气象编程 | Python并行——速度+

气象学家 • 2 年前 • 591 次点击  

一直对python的多线程、多进程、分布式多进程比较好奇。今天浅浅地学习了一下,里面涉及的内容其实比较多,包括进程锁、进程间的通信、进程池、共享内存等等。这里给一个简单的、大家可能会常用到的例子——从多个wrfout文件中提取变量T2并单独保存输出为nc文件,一起感受下多进程的魅力。如果不妥之处,还望大家不吝赐教!


常规代码

这份代码是大家实际中经常使用的,通过循环来实现从多个wrfout文件中提取变量T2并单独保存输出为nc文件。

import xarray as xr
import numpy as np
import glob
import sys
import os
import argparse
import time 

def nc2pkl(args):
    st =time.time()
    if not os.path.exists(args.outdir):
            os.mkdir(args.outdir)
    files = glob.glob(args.files)
    
    for file in files:
        filename = file.split('/')[-1]
        print('Reading ',file)
        sys.stdout.flush()
        ds = xr.open_dataset(file)
        T2 = ds['T2']

        sys.stdout.flush()
        # Write out everything 
        ofile = args.outdir+'/'+filename+'.nc'
        T2.to_netcdf(ofile)
        print('Written to '+ofile)

    et = time.time()
    print(et -st)
  
def parse_args():
    '''
    Parser for nc2pkl
    '''

    parser = argparse.ArgumentParser()
    parser.add_argument('-t','--template',type=str,default="../data/wrfout_d01_2018-08-01_00:00:00")
    parser.add_argument('-f','--files',type=str,default="../data/wrfout_d01_2018-08*")
    parser.add_argument('-o','--outdir',type=str,default="../output/T2")
    return parser.parse_args()

if __name__ == '__main__':
    nc2pkl(parse_args())

多进程并行代码

这份代码里面使用了多进程并行,从num_processes = 4可以知道开了4个进程同时处理,可以简单理解为同一时间同时处理4个wrfout文件。其实能开多少进程取决于我们的计算机有多少核数,在linux上可以通过nproc命令查看核数。

如果大家想使用下面的并行代码满足自己的需求,只需要更改被我用-----框起来的函数定义中的操作即可,比如更改变量,或者增加计算等。

import xarray as xr
import numpy as np
import multiprocessing as mp
from functools import partial
import glob
import sys
import os
import time

#--------------------------------------
def nc2pkl(file_path, output_dir):
    if not os.path.exists(output_dir):
            os.mkdir(output_dir)
    
    filename = file_path.split('/')[-1]
    print('Reading ',file_path)
    sys.stdout.flush()
    ds = xr.open_dataset(file_path)
    T2 = ds['T2']

    sys.stdout.flush()
    # Write out everything 
    ofile = output_dir+'/'+filename+'.nc'
    T2.to_netcdf(ofile)
    print('Written to '+ofile)
#--------------------------------------

def parallel_nc2pkl(input_dir, output_dir, num_processes):
    st = time.time()
    # 获取所有需要处理的文件路径
    file_paths = glob.glob(os.path.join(input_dir, "wrfout_d01_2018-08*"))

    # 创建进程池
    with mp.Pool(processes=num_processes) as pool:
        # 使用partial函数创建一个只有一个参数的nc2pkl函数
        worker_func = partial(nc2pkl, output_dir=output_dir)

        # 将需要处理的文件路径传递给进程池
        pool.map(worker_func, file_paths)
    et = time.time()
    print(et -st)

if __name__ == "__main__":
    # 设置输入和输出目录
    input_dir = "../data/"
    output_dir = "../output/T2_multi"

    # 设置进程数量
    num_processes = 4

    # 并行处理文件
    parallel_nc2pkl(input_dir, output_dir, num_processes)

计算效率

常规代码耗时及CPU使用情况

并行代码耗时及CPU使用情况

从中可以看到,并行代码极大地提升了速度。

注意

无论是多进程还是多线程,一旦任务数量多到一个限度,就会消耗掉系统所有的资源,结果就是效率急剧下降,所有任务都做不好。

参考:

【1】https://mofanpy.com/tutorials/python-basic/multiprocessing/why

【2】https://www.liaoxuefeng.com/wiki/1016959663602400/1017627212385376









声明:欢迎转载、转发本号原创内容,可留言区留言或者后台联系小编(微信:gavin7675)进行授权。气象学家公众号转载信息旨在传播交流,其内容由作者负责,不代表本号观点。文中部分图片来源于网络,如涉及作品内容、版权和其他问题,请后台联系小编处理。


往期推荐

 获取ERA5-Land陆面高分辨率再分析数据(32TB)

★ 获取最新版本CMIP6降尺度数据集30TB

★ 获取ERA5常用变量再分析数据(26TB)

 EC数据商店推出Python在线处理工具箱

★ EC打造实用气象Python工具Metview

★ 机器学习简介及在短临天气预警中的应用

★ Nature-地球系统科学领域的深度学习及理解

★ 采用神经网络与深度学习来预报降水、温度


   欢迎加入气象学家交流群   

请备注:姓名/昵称-单位/学校-研究方向


Python社区是高质量的Python/Django开发社区
本文地址:http://www.python88.com/topic/153267
 
591 次点击