Py学习  »  Python

基于Python自动定时从天擎下载中国气象局格点指导预报产品

气象学家 • 2 周前 • 74 次点击  


基于python自动定时从天擎下载中国气象局格点指导预报产品


作者:

第八星系-李智、第八星系-三水

邮箱:lizhi258147369@163.com


1.导入所需的库

import shutil #提供高级文件操作,如复制、移动、删除等import re #提供正则表达式操作,用于字符串匹配和提取import datetime as dt #提供日期和时间操作功能from concurrent.futures import ThreadPoolExecutor, as_completed #提供并发执行功能,允许在独立线程中执行任务并收集结果from glob import glob #用于文件路径匹配(通配符)from pathlib import Path #提供面向对象的文件系统路径操作from apscheduler.schedulers.blocking import BlockingScheduler #提供定时任务调度功能from apscheduler.triggers.cron import CronTrigger #提供Cron表达式支持的调度触发器



2.定义extract_timestamp函数:从文件名中提取时间戳并将其转换为datetime对象

def extract_timestamp(filename):    """    提取文件名中的第一个时间戳部分并解析为datetime对象。    时间戳格式为YYYYMMDDhhmmss。    """    match = re.search(r'Z_NWGD_C_BABJ_(\d{14})', filename)    if match:        return dt.datetime.strptime(match.group(1), '%Y%m%d%H%M%S')return None

具体说明

#文件名举例:Z_NWGD_C_BABJ_20230429102603_P_RFFC_SCMOC-ER03_202304290800_24003.GRB2

#第一个时间20230429102603表示文件生成时间,第二个时间202304290800表示起报时间

#降水预报场生成后,会持续人工订正,然后再次发布,因此,同样的起报时间,会有多个降水预报场文件存在

def extract_timestamp(filename):

# def 是定义函数的关键字。

# extract_timestamp 是函数的名称,通常反映函数的用途。

# filename 是传递给函数的参数,表示一个文件名字符串

    match = re.search(r'Z_NWGD_C_BABJ_(\d{14})', filename)

# re.search 是正则表达式模块 re 中的函数,用于在字符串中搜索模式。

# r'Z_NWGD_C_BABJ_(\d{14})' 是正则表达式模式:

# r 前缀表示原始字符串,不需要转义反斜杠。如果不加r 前缀,\d{14}会被识别为/d{14}

# Z_NWGD_C_BABJ_ 是固定的字符串部分。

# (\d{14}) 是一个捕获组,匹配14个连续的数字(\d 表示数字,{14} 表示重复14次)。

# filename 是要搜索的目标字符串。

# 如果找到匹配,re.search 返回一个匹配对象;否则,返回 None。

    if match:

# if条件语句

# 检查 match 是否不为 None,即是否找到了匹配。

# 如果 match 为真,表示找到了符合条件的时间戳

                                           return dt.datetime.strptime(match.group(1), '%Y%m%d%H%M%S')

# 提取匹配的时间戳并解析为 datetime 对象

# match.group(1) 提取第一个捕获组的内容,即匹配到的14位时间戳字符串。

# dt.datetime.strptime 将字符串解析为 datetime 对象:

# '%Y%m%d%H%M%S' 是时间字符串的格式:

# %Y 表示4位年份。

# %m 表示2位月份。

# %d 表示2位日期。

# %H 表示2位小时(24小时制)。

# %M 表示2位分钟。

# %S 表示2位秒钟。

# 解析后的 datetime 对象作为函数的返回值。

    return None

# 未找到匹配时返回 None

# 如果没有找到匹配,函数返回 None,表示无法从文件名中提取时间戳。

    # 示例

# 假设文件名是 Z_NWGD_C_BABJ_20240605123000_somefile.txt,函数 extract_timestamp 的执行过程如下:

# 调用 re.search(r'Z_NWGD_C_BABJ_(\d{14})', filename) 进行正则表达式匹配。

# 匹配成功,match 是一个包含匹配信息的对象。

# if match: 条件为真,进入条件块。

# 提取匹配的时间戳字符串 20240605123000。

# 调用 dt.datetime.strptime('20240605123000', '%Y%m%d%H%M%S') 将字符串解析为 datetime 对象,表示时间 2024-06-05 12:30:00。

# 返回解析后的 datetime 对象。

# 如果文件名是 somefile.txt,则:

# 调用 re.search(r'Z_NWGD_C_BABJ_(\d{14})', filename) 进行正则表达式匹配。

# 匹配失败,match 为 None。

# if match: 条件为假,跳过条件块。

# 返回 None,表示没有找到时间戳





3.定义find_latest_file函数:从文件名列表中找到最新的文件

def find_latest_file(file_names):    """    找到文件名列表中最新的文件。        file_names: 文件名列表    return: 最新的文件名    """    latest_file = max(file_names, key=extract_timestamp, default=None)return latest_file


具体说明

def find_latest_file(file_names):

# def 是定义函数的关键字。

# find_latest_file 是函数的名称,反映了函数的用途,即找到最新的文件。

# file_names 是传递给函数的参数,表示一个文件名列表。


    latest_file = max(file_names, key=extract_timestamp, default=None)

# 使用 max 函数找到最新文件

# max 是一个内置函数,用于找到可迭代对象中的最大值。

# file_names 是传递给 max 函数的可迭代对象,即文件名列表。

# key=extract_timestamp 指定了一个键函数 extract_timestamp,用于从文件名中提取比较值(即时间戳)。

# extract_timestamp 函数用于从文件名中提取时间戳并解析为 datetime 对象,这样可以直接比较 datetime 对象的大小。

# default=None 指定了在 file_names 为空时返回的默认值,即 None。


    return latest_file

# 返回最新文件名   

# return 关键字用于返回 latest_file,这是 max 函数找到的最新文件名。

# 如果 file_names 为空,返回值将是 None。


  # 示例

# 假设我们有一个文件名列表:

# file_names = [

#     "Z_NWGD_C_BABJ_20240605123000_file1.txt",

#     "Z_NWGD_C_BABJ_20240606123000_file2.txt",

#     "Z_NWGD_C_BABJ_20240604123000_file3.txt"

# ]

# 调用 find_latest_file(file_names) 的过程如下:


    # 1.调用 max 函数

# max(file_names, key=extract_timestamp, default=None) 逐一将 file_names 列表中的文件名传递给 extract_timestamp 函数。


    # 2.提取时间戳并进行比较

# 对于 "Z_NWGD_C_BABJ_20240605123000_file1.txt",extract_timestamp 提取到时间戳 20240605123000,解析为 datetime.datetime(2024, 6, 5, 12, 30, 0)。

# 对于 "Z_NWGD_C_BABJ_20240606123000_file2.txt",extract_timestamp 提取到时间戳 20240606123000,解析为 datetime.datetime(2024, 6, 6, 12, 30, 0)。

# 对于 "Z_NWGD_C_BABJ_20240604123000_file3.txt",extract_timestamp 提取到时间戳 20240604123000,解析为 datetime.datetime(2024, 6, 4, 12, 30, 0)。


    # 3.比较提取到的 datetime 对象

# max 函数比较这些 datetime 对象,找到最大的一个,即最晚的时间。

# datetime.datetime(2024, 6, 6, 12, 30, 0) 是最大的时间戳,对应的文件名是 "Z_NWGD_C_BABJ_20240606123000_file2.txt"。

 

    # 4.返回最新文件名

# max 函数返回 "Z_NWGD_C_BABJ_20240606123000_file2.txt"。

# 函数 find_latest_file 返回这个文件名。



4.定义download函数:从源路径复制文件到目标路径

# 从源路径下载文件到目标路径def download(src, dst):    """    从源路径下载文件到目标路径。        :param src: 源文件路径    :param dst: 目标文件路径    """    shutil.copy(src, dst)    print(f"Downloaded: {src} to {dst}")


具体说明

# 从源路径下载文件到目标路径

def download(src, dst):

# def 是定义函数的关键字。

# download 是函数的名称,表示该函数的用途是下载文件。

# src 和 dst 是传递给函数的参数,分别表示源文件路径和目标文件路径。


    shutil.copy(src, dst)

# shutil 是Python标准库中的一个模块,提供了高级的文件操作功能。

# shutil.copy 函数用于复制文件,从源路径 src 到目标路径 dst。

# src 是源文件路径。

# dst 是目标文件路径。

# 需要注意的是,在使用 shutil 模块之前,必须先导入它,例如 import shutil。


    print(f"Downloaded: {src} to {dst}")

# print 函数用于输出信息到控制台。

# f"Downloaded: {src} to {dst}" 是一个格式化字符串(f-string),用于插入变量值。

# f 字符串前缀表示这是一个格式化字符串。

# {src} 和 {dst} 分别表示插入 src 和 dst 变量的值。

# 输出的字符串格式为 Downloaded:to,例如 Downloaded: /path/to/source to /path/to/destination。





5.定义process_files函数:处理单个文件模板的文件下载

def process_files(src_base, dst_base, patterns, existing_files):    """    处理单个文件模板的文件下载。        :param src_base: 源文件基本路径    :param dst_base: 目标文件基本路径    :param patterns: 文件模式列表    :param existing_files: 现有文件列表    """    for pattern in patterns:        # 使用glob找到匹配模式的所有文件        srclist = glob(src_base + pattern)        if not srclist:            print(f"No files found for pattern: {pattern} in {src_base}")            continue
       # 找到最新的源文件        latest_src_file = find_latest_file(srclist)        if not latest_src_file:            print(f"No valid files found for pattern: {pattern} in {src_base}")            continue
       # 设置目标文件路径        dst_file = dst_base + Path(latest_src_file).name        if Path(dst_file).exists():            print(f"File {dst_file} already exists. Checking if it's the latest version.")            latest_dst_file = find_latest_file([dst_file])            if extract_timestamp(latest_dst_file) >= extract_timestamp(latest_src_file):                print(f"Local file {dst_file} is up-to-date. Skipping download.")                continue            else:                print(f"Local file {dst_file} is outdated. Downloading the latest version.")
       # 下载最新的源文件到目标路径        download(latest_src_file, dst_file)


具体说明

def process_files(src_base, dst_base, patterns, existing_files):

# def 是定义函数的关键字。

# process_files 是函数的名称,表示该函数的用途是处理文件下载。

# src_base, dst_base, patterns, existing_files 是传递给函数的参数,分别表示源文件基本路径、目标文件基本路径、文件模式列表和现有文件列表。

    for pattern in patterns:

# for 循环用于遍历 patterns 列表中的每个模式。

# pattern 是当前循环中正在处理的文件模式。

        srclist = glob(src_base + pattern)

# glob 函数用于查找与模式匹配的所有文件。

# src_base + pattern 将源文件基本路径和文件模式连接起来,形成完整的路径模式。

# srclist 是找到的文件列表。

        if not srclist:

            print(f"No files found for pattern: {pattern} in {src_base}")

            continue

# if not srclist: 检查 srclist 是否为空列表。

# 如果 srclist 为空,打印提示信息,并使用 continue 语句跳过当前循环,继续处理下一个模式。

        # 找到最新的源文件

        latest_src_file = find_latest_file(srclist)

        if not latest_src_file:

            print(f"No valid files found for pattern: {pattern} in {src_base}")

            continue

# latest_src_file = find_latest_file(srclist) 调用 find_latest_file 函数,从 srclist 中找到最新的文件。

# if not latest_src_file: 检查是否找到最新的文件。

# 如果没有找到,打印提示信息,并跳过当前循环。

        # 设置目标文件路径

        dst_file = dst_base + Path(latest_src_file).name

# dst_file 是目标文件路径。

# Path(latest_src_file).name 获取最新源文件的文件名。

# dst_base + Path(latest_src_file).name 将目标文件基本路径和文件名连接起来,形成完整的目标文件路径。

        if Path(dst_file).exists():

            print(f"File {dst_file} already exists. Checking if it's the latest version.")

            latest_dst_file = find_latest_file([dst_file])

            if extract_timestamp(latest_dst_file) >= extract_timestamp(latest_src_file):

                print(f"Local file {dst_file} is up-to-date. Skipping download.")

                continue

            else:

                print(f"Local file {dst_file} is outdated. Downloading the latest version.")

# if Path(dst_file).exists(): 检查目标文件是否存在。

# 如果存在,打印提示信息,并检查目标文件是否是最新版本。

# latest_dst_file = find_latest_file([dst_file]) 获取目标文件的最新版本。

# if extract_timestamp(latest_dst_file) >= extract_timestamp(latest_src_file): 比较目标文件和最新源文件的时间戳。

# 如果目标文件是最新版本,打印提示信息,并跳过当前循环。

# 否则,打印提示信息,准备下载最新版本。

        # 下载最新的源文件到目标路径

        download(latest_src_file, dst_file)

# download(latest_src_file, dst_file) 调用 download 函数,将最新源文件下载到目标路径。




6.定义ml_task函数:主任务函数,执行文件下载任务

# 主任务函数,用于执行文件下载任务def ml_task():    """    主任务函数,用于执行文件下载任务。    """    src_base = 'I:\\DATA\\NAFP\\NWFD\\SCMOC\\BEHT\\*\\*\\'    dst_base = 'E:\\DATA\\SCMOC\\BEHTNEW\\'    patterns = [        'Z_NWGD_C_BABJ_*_P_RFFC_SCMOC-ER03_*0800_*03.GRB2',        'Z_NWGD_C_BABJ_*_P_RFFC_SCMOC-ER03_*2000_*03.GRB2',        'Z_NWGD_C_BABJ_*_P_RFFC_SCMOC-TMIN_*0800_*024.GRB2',        'Z_NWGD_C_BABJ_*_P_RFFC_SCMOC-TMAX_*0800_*024.GRB2',        'Z_NWGD_C_BABJ_*_P_RFFC_SCMOC-TMP_*0800_*03.GRB2',        'Z_NWGD_C_BABJ_*_P_RFFC_SCMOC-TMIN_*2000_*024.GRB2',        'Z_NWGD_C_BABJ_*_P_RFFC_SCMOC-TMAX_*2000_*024.GRB2',        'Z_NWGD_C_BABJ_*_P_RFFC_SCMOC-TMP_*2000_*03.GRB2'    ]
   folders = glob(src_base)
   with ThreadPoolExecutor(max_workers=8) as executor:        futures = []        for folder in folders:            futures.append(executor.submit(process_files, folder, dst_base, patterns, None))
       for future in as_completed(futures):            future.result()


具体说明

def ml_task():

# def 是定义函数的关键字。

# ml_task 是函数的名称,这里表示某种任务,与机器学习(ML)有关,但具体任务是处理文件。





    src_base = 'I:\\DATA\\NAFP\\NWFD\\SCMOC\\BEHT\\*\\*\\'

    dst_base = 'E:\\DATA\\SCMOC\\BEHTNEW\\'

# src_base 和 dst_base 分别是源文件的基本路径和目标文件的基本路径。

# 使用 *\\*\\ 表示目录中的通配符,用于匹配任意子目录。

 

   

    patterns = [

        'Z_NWGD_C_BABJ_*_P_RFFC_SCMOC-ER03_*0800_*03.GRB2',

        'Z_NWGD_C_BABJ_*_P_RFFC_SCMOC-ER03_*2000_*03.GRB2',

        'Z_NWGD_C_BABJ_*_P_RFFC_SCMOC-TMIN_*0800_*024.GRB2',

        'Z_NWGD_C_BABJ_*_P_RFFC_SCMOC-TMAX_*0800_*024.GRB2',

        'Z_NWGD_C_BABJ_*_P_RFFC_SCMOC-TMP_*0800_*03.GRB2',

        'Z_NWGD_C_BABJ_*_P_RFFC_SCMOC-TMIN_*2000_*024.GRB2',

        'Z_NWGD_C_BABJ_*_P_RFFC_SCMOC-TMAX_*2000_*024.GRB2',

        'Z_NWGD_C_BABJ_*_P_RFFC_SCMOC-TMP_*2000_*03.GRB2'

    ]

# patterns 是一个列表,包含多个文件模式,表示需要处理的文件格式。



    folders = glob(src_base)

# glob(src_base) 查找并返回与 src_base 模式匹配的所有文件夹路径。

# folders 是找到的文件夹列表。


    with ThreadPoolExecutor(max_workers=8) as executor:

# ThreadPoolExecutor 是 concurrent.futures 模块中的一个类,用于创建线程池。

# max_workers=8 指定线程池的最大工作线程数为8。

 

 

        futures = []

        for folder in folders:

            futures.append(executor.submit(process_files, folder, dst_base, patterns, None))

# futures 是一个列表,用于存储提交的任务。

# for folder in folders: 循环遍历每个文件夹。

# executor.submit(process_files, folder, dst_base, patterns, None) 提交任务到线程池,调用 process_files 函数处理文件夹。




        for future in as_completed(futures):

            future.resul

t()

# as_completed(futures) 返回一个迭代器,当每个 future 完成时,迭代器会生成该 future。

# future.result() 等待任务完成并获取结果,如果任务抛出异常,会在此抛出。




7.定义download_init函数:初始化下载任务调度器。

# 初始化下载任务调度器def download_init():    """    初始化下载任务调度器。    """    ml_excutors = {        "default": ThreadPoolExecutor(1)    }
   ml_scheduler = BlockingScheduler(ml_excutors)    ml_trigger = CronTrigger.from_crontab('3 9,19 * * *')  # 北京时间的93分和193分自动开始运行    ml_scheduler.add_job(ml_task, trigger=ml_trigger)return ml_scheduler


具体说明

# 初始化下载任务调度器

def download_init():

# def 是定义函数的关键字。

# download_init 是函数的名称。



   

    ml_excutors = {

        "default": ThreadPoolExecutor(1)

    }


# ml_excutors 是一个字典,键为 "default",值为 ThreadPoolExecutor(1),表示创建一个包含单个线程的线程池执行器。

# ThreadPoolExecutor 是 concurrent.futures 模块中的一个类,用于创建线程池。参数 1 表示线程池中只有一个线程。


    ml_scheduler = BlockingScheduler(ml_excutors)

# ml_scheduler 是 BlockingScheduler 类的一个实例,用于创建一个阻塞调度器。

# BlockingScheduler 是 apscheduler.schedulers.blocking 模块中的一个类,用于创建一个阻塞的调度器。它接受一个执行器字典作为参数。



    ml_trigger = CronTrigger.from_crontab('3 9,19 * * *')  # 北京时间的9点3分和19点3分自动开始运行

# ml_trigger 是 CronTrigger 类的一个实例,用于创建一个基于 crontab 表达式的触发器。

# from_crontab('3 9,19 * * *') 方法从 crontab 表达式创建一个触发器。这个表达式表示在每天的 9 点 3 分和 19 点 3 分触发。


    ml_scheduler.add_job(ml_task, trigger=ml_trigger)

# ml_scheduler.add_job(ml_task, trigger=ml_trigger) 将任务 ml_task 添加到调度器中,并设置触发器 ml_trigger。

# ml_task 是先前定义的函数,它将在触发器指定的时间点运行。




    return ml_scheduler

# return ml_scheduler 返回创建的调度器实例。



8.

# 启动调度器ml_scheduler = download_init()ml_scheduler.start()




完整代码




import shutil #提供高级文件操作,如复制、移动、删除等import re #提供正则表达式操作,用于字符串匹配和提取import datetime as dt #提供日期和时间操作功能from concurrent.futures import ThreadPoolExecutor, as_completed #提供并发执行功能,允许在独立线程中执行任务并收集结果from glob import glob #用于文件路径匹配(通配符)from pathlib import Path #提供面向对象的文件系统路径操作from apscheduler.schedulers.blocking import BlockingScheduler #提供定时任务调度功能from apscheduler.triggers.cron import CronTrigger #提供Cron表达式支持的调度触发器def extract_timestamp(filename):    """    提取文件名中的第一个时间戳部分并解析为datetime对象。    时间戳格式为YYYYMMDDhhmmss。    """    match = re.search(r'Z_NWGD_C_BABJ_(\d{14})', filename)    if match:        return dt.datetime.strptime(match.group(1), '%Y%m%d%H%M%S')    return Nonedef find_latest_file(file_names):    """    找到文件名列表中最新的文件。        file_names: 文件名列表    return: 最新的文件名    """    latest_file = max(file_names, key=extract_timestamp, default=None)    return latest_file# 从源路径下载文件到目标路径def download(src, dst):    """    从源路径下载文件到目标路径。        :param src: 源文件路径    :param dst: 目标文件路径    """    shutil.copy(src, dst)    print(f"Downloaded: {src} to {dst}")def process_files(src_base, dst_base, patterns, existing_files):    """    处理单个文件模板的文件下载。        :param src_base: 源文件基本路径    :param dst_base: 目标文件基本路径    :param patterns: 文件模式列表    :param existing_files: 现有文件列表    """    for pattern in patterns:        # 使用glob找到匹配模式的所有文件        srclist = glob(src_base + pattern)        if not srclist:            print(f"No files found for pattern: {pattern} in {src_base}")            continue        # 找到最新的源文件        latest_src_file = find_latest_file(srclist)        if not latest_src_file:            print(f"No valid files found for pattern: {pattern} in {src_base}")            continue        # 设置目标文件路径        dst_file = dst_base + Path(latest_src_file).name        if Path(dst_file).exists():            print(f"File {dst_file} already exists. Checking if it's the latest version.")            latest_dst_file = find_latest_file([dst_file])            if extract_timestamp(latest_dst_file) >= extract_timestamp(latest_src_file):                print(f"Local file {dst_file} is up-to-date. Skipping download.")                continue            else:                print(f"Local file {dst_file} is outdated. Downloading the latest version.")        # 下载最新的源文件到目标路径        download(latest_src_file, dst_file)# 主任务函数,用于执行文件下载任务def ml_task():    """    主任务函数,用于执行文件下载任务。    """    src_base = 'I:\\DATA\\NAFP\\NWFD\\SCMOC\\BEHT\\*\\*\\'    dst_base = 'E:\\DATA\\SCMOC\\BEHTNEW\\'    patterns = [        'Z_NWGD_C_BABJ_*_P_RFFC_SCMOC-ER03_*0800_*03.GRB2',        'Z_NWGD_C_BABJ_*_P_RFFC_SCMOC-ER03_*2000_*03.GRB2',        'Z_NWGD_C_BABJ_*_P_RFFC_SCMOC-TMIN_*0800_*024.GRB2',        'Z_NWGD_C_BABJ_*_P_RFFC_SCMOC-TMAX_*0800_*024.GRB2',        


    
'Z_NWGD_C_BABJ_*_P_RFFC_SCMOC-TMP_*0800_*03.GRB2',        'Z_NWGD_C_BABJ_*_P_RFFC_SCMOC-TMIN_*2000_*024.GRB2',        'Z_NWGD_C_BABJ_*_P_RFFC_SCMOC-TMAX_*2000_*024.GRB2',        'Z_NWGD_C_BABJ_*_P_RFFC_SCMOC-TMP_*2000_*03.GRB2'    ]    folders = glob(src_base)    with ThreadPoolExecutor(max_workers=8) as executor:        futures = []        for folder in folders:            futures.append(executor.submit(process_files, folder, dst_base, patterns, None))        for future in as_completed(futures):            future.result()# 初始化下载任务调度器def download_init():    """    初始化下载任务调度器。    """    ml_excutors = {        "default": ThreadPoolExecutor(1)    }    ml_scheduler = BlockingScheduler(ml_excutors)    ml_trigger = CronTrigger.from_crontab('3 9,19 * * *')  # 北京时间的93分和193分自动开始运行    ml_scheduler.add_job(ml_task, trigger=ml_trigger)    return ml_scheduler# 启动调度器ml_scheduler = download_init()ml_scheduler.start()



END

本文编辑|Eva








声明:欢迎转载、转发。气象学家公众号转载信息旨在传播交流,其内容由作者负责,不代表本号观点。文中部分图片来源于网络,如涉及内容、版权和其他问题,请联系小编(微信:qxxjgzh)处理。


往期推荐
 获取ERA5/ERA5-Land再分析数据(36TB/32TB)
 获取全球GPM降水数据,半小时/逐日(4TB)
 获取1998-2019 TRMM 3B42逐日降水数据
 获取最新版本CMIP6降尺度数据集30TB
 EC数据商店推出Python在线处理工具箱
★ EC打造实用气象Python工具Metview
★ 机器学习简介及在短临天气预警中的应用
★ Nature-地球系统科学领域的深度学习及理解
★ 灵魂拷问:ChatGPT对气象人的饭碗是福是祸?
★ 气象局是做啥的?气象局的薪水多少?

Python社区是高质量的Python/Django开发社区
本文地址:http://www.python88.com/topic/171905
 
74 次点击