Skip to content

从NetCDF 格式数据文件进行站点提取

本文档详细说明基于Python语言对NetCDF格式数据进行站点插值,提取站点数据的操作方法。

1. 单站点数据提取:

该脚本可以实现从单个nc文件中提取单个站点的数据,并输出。

python
import scipy
import xarray as xr

def interp_grid_to_station(nc_path, var_name,STATION_LON,STATION_LAT):
    with xr.open_dataset(nc_path) as ds:
        ds_cropped = ds.sel(
            grid_xt=slice(STATION_LON - 1, STATION_LON + 1),
            grid_yt=slice(STATION_LAT - 1, STATION_LAT + 1)
        )
        ds_sta = ds_cropped[var_name].interp(
            grid_xt=STATION_LON,
            grid_yt=STATION_LAT,
            method="linear"
        )
    return ds_sta
    
# ===================== 函数调用示例 =====================
if __name__ == "__main__":
    input_nc = "./u100m.nc"       #需要裁剪的nc数据
    STATION_LON = 115   # 目标站点经度
    STATION_LAT = 25    # 目标站点纬度
    var_name = "u100m"   # 需要裁剪的要素,和文件名一致
    ds_sta = interp_grid_to_station(input_nc,var_name,STATION_LON,STATION_LAT)
    print(ds_sta)

2. 多站点批量提取(单时段)

针对指定时段(起报时间 + 预报时间),批量提取多个站点的多要素数据,并将UTC 时间转换为北京时间。每个站点数据输出对应站点的CSV 文件。

python
import numpy as np
import pandas as pd
from xarray import open_dataset as xropen_dataset
from pathlib import Path
from datetime import datetime, timedelta
from pytz import timezone, utc

# ========================== 配置区(请按需修改)==========================
# 1. 站点配置:ID+纬度+经度(支持多站点批量处理)
STATIONS = [{"id": "STATION_A", "lat": 22.395607, "lon": 113.557377},
            {"id": "STATION_B", "lat": 22.505607, "lon": 112.557377},
            {"id": "STATION_C", "lat": 23.123456, "lon": 114.678901},]

# 2. 时间配置(格式:YYYYMMDDHH,10位字符串)
BASE_YYYYMMDDHH = "2025090100"   # 起报时间
FCST_YYYYMMDDHH = "2025090200"  # 预报时间

#3. 要素变量配置(需与NC文件名一致)
VAR_LIST = ["UGRD10m", "VGRD10m", "u100m", "v100m", "TMP2m", "PRATEsfc", "rh2m", "psz", "DSWRFsfc"]

YYYY = BASE_YYYYMMDDHH[:4]
MM = BASE_YYYYMMDDHH[4:6]

# 4. 路径配置,其中{YYYY}/{MM}/{BASE_YYYYMMDDHH}/{FCST_YYYYMMDDHH}根据历史数据集的数据存放规则配置
INPUT_PATH = Path(f"./huazhong/arcgived_0p1/{YYYY}/{MM}/{BASE_YYYYMMDDHH}/{FCST_YYYYMMDDHH}")
OUTPUT_PATH = Path(f"./station_out/{YYYY}/{MM}/{BASE_YYYYMMDDHH}/{FCST_YYYYMMDDHH}") 
OUTPUT_PATH.mkdir(parents=True, exist_ok=True)
# ========================== 以上为配置区(请按需修改)==========================


# 对站点位置附近区域进行裁剪,并对站点进行插值,得到该站点的要素值
def interp_grid_to_station(nc_path, var_name, lat, lon):
    with xropen_dataset(nc_path) as ds:
        ds_cropped = ds.sel(
            grid_xt=slice(lon - 1, lon + 1),
            grid_yt=slice(lat - 1, lat + 1)
        )
        ds_sta = ds_cropped[var_name].interp(
            grid_xt=lon,
            grid_yt=lat,
            method="linear"
        )
    return ds_sta

#将数据中的世界时替换为北京时
def convert_time_to_bjt(time_array):
    bjt_tz = timezone("Asia/Shanghai")
    return [
        pd.Timestamp(t).tz_localize(utc).tz_convert(bjt_tz).strftime("%Y/%m/%d %H:%M")
        for t in time_array
    ]

#所有要素处理好后,将站点数据输出成csv
def save_station_to_csv(data_dict, var_list, station_id, output_dir):
    col_order = ["time(BJT)"] + var_list
    df = pd.DataFrame(data_dict, columns=col_order)
    output_csv = output_dir / f"{station_id}.csv"
    df.to_csv(output_csv,index=False,float_format="%.4f",encoding="utf-8-sig")
    print(f"站点{station_id}数据已保存为{output_csv}")
        
def main():
    for station in STATIONS:
        sid, lat, lon = station["id"], station["lat"], station["lon"]
        data_dict = {}
        for var in VAR_LIST:
            nc_path = Path(INPUT_PATH)/f"{var}.nc"
            ds_sta = interp_grid_to_station(nc_path, var, lat, lon)
            data_dict[var] = ds_sta.values.flatten()
            if "time(BJT)" not in data_dict:
              data_dict["time(BJT)"] = convert_time_to_bjt(ds_sta.time.values)
        save_station_to_csv(data_dict, VAR_LIST, sid, OUTPUT_PATH)
    
if __name__ == "__main__":
    main()

3. 处理多时段多站点并输出为CSV

python
import numpy as np
import pandas as pd
from xarray import open_dataset as xropen_dataset
from pathlib import Path
from datetime import datetime, timedelta
from pytz import timezone, utc

# ========================== 配置区(请按需修改)==========================
# 1. 站点配置:ID+纬度+经度(支持多站点批量处理)
STATIONS = [{"id": "STATION_A", "lat": 22.395607, "lon": 113.557377},
            {"id": "STATION_B", "lat": 22.505607, "lon": 112.557377},]

# 2. 时间循环配置
START_BASE_TIME = "2025090100"  # 起始起报时间
END_BASE_TIME = "2025090200"    # 结束起报时间
HOUR_OPTIONS = [0, 12]          # 仅允许00/12时
FCST_DAYS = 10                  # 预报未来10天

#3. 要素变量配置(需与NC文件名一致)
VAR_LIST = ["UGRD10m", "VGRD10m", "u100m", "v100m", "TMP2m", "PRATEsfc", "rh2m", "psz", "DSWRFsfc"]

# 4. 路径配置,
base_input_dir = "./huadong"
base_output_dir = "./station_out"
# ========================== 以上为配置区(请按需修改)==========================

def generate_base_times(start, end):
    base_times = []
    curr_dt = datetime.strptime(start, "%Y%m%d%H")
    end_dt = datetime.strptime(end, "%Y%m%d%H")
    while curr_dt <= end_dt:
      base_times.append(curr_dt.strftime("%Y%m%d%H"))
      curr_dt += timedelta(hours=12)
    return base_times
  
def generate_fcst_times(base_time):
    base_dt = datetime.strptime(base_time, "%Y%m%d%H")
    return [(base_dt + timedelta(days=d)).strftime("%Y%m%d%H") for d in range(FCST_DAYS)]

def interp_grid_to_station(nc_path, var_name, lat, lon):
    with xropen_dataset(nc_path) as ds:
        ds_cropped = ds.sel(grid_xt=slice(lon - 1, lon + 1),
                            grid_yt=slice(lat - 1, lat + 1))
        ds_sta = ds_cropped[var_name].interp(
              grid_xt=lon,
              grid_yt=lat,
              method="linear")
    return ds_sta

def convert_time_to_bjt(time_array):
    bjt_tz = timezone("Asia/Shanghai")
    return [pd.Timestamp(t).tz_localize(utc).tz_convert(bjt_tz).strftime("%Y/%m/%d %H:%M") for t in time_array]

def save_station_to_csv(data_dict, var_list, station_id, output_dir):
    col_order = ["time(BJT)"] + var_list
    df = pd.DataFrame(data_dict, columns=col_order)
    output_csv = output_dir / f"{station_id}.csv"
    df.to_csv(output_csv,index=False,float_format="%.4f",encoding="utf-8-sig")
    print(f"站点{station_id}数据已保存为{output_csv}")


def main():
    base_times = generate_base_times(START_BASE_TIME, END_BASE_TIME)
    for base_time in base_times:
        yyyy, mm = base_time[:4], base_time[4:6]
        fcst_times = generate_fcst_times(base_time)
        for fcst_time in fcst_times:
            in_path = Path(f"{base_input_dir}/{yyyy}/{mm}/{base_time}/{fcst_time}")
            out_path = Path(f"{base_output_dir}/{yyyy}/{mm}/{base_time}/{fcst_time}")
            out_path.mkdir(parents=True, exist_ok=True)
            for station in STATIONS:
                sid, lat, lon = station["id"], station["lat"], station["lon"]
                data_dict = {}
                for var in VAR_LIST:
                    ds_sta = interp_grid_to_station(in_path/f"{var}.nc", var, lat, lon)
                    data_dict[var] = ds_sta.values.flatten()
                    if "time(BJT)" not in data_dict:
                        data_dict["time(BJT)"] = convert_time_to_bjt(ds_sta.time.values)
                save_station_to_csv(data_dict, VAR_LIST, sid, out_path)

if __name__ == "__main__":
    main()