数据使用说明
>
从NetCDF格式数据文件进行站点提取
Appearance
Appearance
本文档详细说明基于Python语言对NetCDF格式数据进行站点插值,提取站点数据的操作方法。
该脚本可以实现从单个nc文件中提取单个站点的数据,并输出。
import scipy
import xarray as xr
def interp_grid_to_station(nc_path, var_name,STATION_LON,STATION_LAT):
with xr.open_dataset(nc_path) as ds:
ds_cropped = ds.sel(
grid_xt=slice(STATION_LON - 1, STATION_LON + 1),
grid_yt=slice(STATION_LAT - 1, STATION_LAT + 1)
)
ds_sta = ds_cropped[var_name].interp(
grid_xt=STATION_LON,
grid_yt=STATION_LAT,
method="linear"
)
return ds_sta
# ===================== 函数调用示例 =====================
if __name__ == "__main__":
input_nc = "./u100m.nc" #需要裁剪的nc数据
STATION_LON = 115 # 目标站点经度
STATION_LAT = 25 # 目标站点纬度
var_name = "u100m" # 需要裁剪的要素,和文件名一致
ds_sta = interp_grid_to_station(input_nc,var_name,STATION_LON,STATION_LAT)
print(ds_sta)针对指定时段(起报时间 + 预报时间),批量提取多个站点的多要素数据,并将UTC 时间转换为北京时间。每个站点数据输出对应站点的CSV 文件。
import numpy as np
import pandas as pd
from xarray import open_dataset as xropen_dataset
from pathlib import Path
from datetime import datetime, timedelta
from pytz import timezone, utc
# ========================== 配置区(请按需修改)==========================
# 1. 站点配置:ID+纬度+经度(支持多站点批量处理)
STATIONS = [{"id": "STATION_A", "lat": 22.395607, "lon": 113.557377},
{"id": "STATION_B", "lat": 22.505607, "lon": 112.557377},
{"id": "STATION_C", "lat": 23.123456, "lon": 114.678901},]
# 2. 时间配置(格式:YYYYMMDDHH,10位字符串)
BASE_YYYYMMDDHH = "2025090100" # 起报时间
FCST_YYYYMMDDHH = "2025090200" # 预报时间
#3. 要素变量配置(需与NC文件名一致)
VAR_LIST = ["UGRD10m", "VGRD10m", "u100m", "v100m", "TMP2m", "PRATEsfc", "rh2m", "psz", "DSWRFsfc"]
YYYY = BASE_YYYYMMDDHH[:4]
MM = BASE_YYYYMMDDHH[4:6]
# 4. 路径配置,其中{YYYY}/{MM}/{BASE_YYYYMMDDHH}/{FCST_YYYYMMDDHH}根据历史数据集的数据存放规则配置
INPUT_PATH = Path(f"./huazhong/arcgived_0p1/{YYYY}/{MM}/{BASE_YYYYMMDDHH}/{FCST_YYYYMMDDHH}")
OUTPUT_PATH = Path(f"./station_out/{YYYY}/{MM}/{BASE_YYYYMMDDHH}/{FCST_YYYYMMDDHH}")
OUTPUT_PATH.mkdir(parents=True, exist_ok=True)
# ========================== 以上为配置区(请按需修改)==========================
# 对站点位置附近区域进行裁剪,并对站点进行插值,得到该站点的要素值
def interp_grid_to_station(nc_path, var_name, lat, lon):
with xropen_dataset(nc_path) as ds:
ds_cropped = ds.sel(
grid_xt=slice(lon - 1, lon + 1),
grid_yt=slice(lat - 1, lat + 1)
)
ds_sta = ds_cropped[var_name].interp(
grid_xt=lon,
grid_yt=lat,
method="linear"
)
return ds_sta
#将数据中的世界时替换为北京时
def convert_time_to_bjt(time_array):
bjt_tz = timezone("Asia/Shanghai")
return [
pd.Timestamp(t).tz_localize(utc).tz_convert(bjt_tz).strftime("%Y/%m/%d %H:%M")
for t in time_array
]
#所有要素处理好后,将站点数据输出成csv
def save_station_to_csv(data_dict, var_list, station_id, output_dir):
col_order = ["time(BJT)"] + var_list
df = pd.DataFrame(data_dict, columns=col_order)
output_csv = output_dir / f"{station_id}.csv"
df.to_csv(output_csv,index=False,float_format="%.4f",encoding="utf-8-sig")
print(f"站点{station_id}数据已保存为{output_csv}")
def main():
for station in STATIONS:
sid, lat, lon = station["id"], station["lat"], station["lon"]
data_dict = {}
for var in VAR_LIST:
nc_path = Path(INPUT_PATH)/f"{var}.nc"
ds_sta = interp_grid_to_station(nc_path, var, lat, lon)
data_dict[var] = ds_sta.values.flatten()
if "time(BJT)" not in data_dict:
data_dict["time(BJT)"] = convert_time_to_bjt(ds_sta.time.values)
save_station_to_csv(data_dict, VAR_LIST, sid, OUTPUT_PATH)
if __name__ == "__main__":
main()import numpy as np
import pandas as pd
from xarray import open_dataset as xropen_dataset
from pathlib import Path
from datetime import datetime, timedelta
from pytz import timezone, utc
# ========================== 配置区(请按需修改)==========================
# 1. 站点配置:ID+纬度+经度(支持多站点批量处理)
STATIONS = [{"id": "STATION_A", "lat": 22.395607, "lon": 113.557377},
{"id": "STATION_B", "lat": 22.505607, "lon": 112.557377},]
# 2. 时间循环配置
START_BASE_TIME = "2025090100" # 起始起报时间
END_BASE_TIME = "2025090200" # 结束起报时间
HOUR_OPTIONS = [0, 12] # 仅允许00/12时
FCST_DAYS = 10 # 预报未来10天
#3. 要素变量配置(需与NC文件名一致)
VAR_LIST = ["UGRD10m", "VGRD10m", "u100m", "v100m", "TMP2m", "PRATEsfc", "rh2m", "psz", "DSWRFsfc"]
# 4. 路径配置,
base_input_dir = "./huadong"
base_output_dir = "./station_out"
# ========================== 以上为配置区(请按需修改)==========================
def generate_base_times(start, end):
base_times = []
curr_dt = datetime.strptime(start, "%Y%m%d%H")
end_dt = datetime.strptime(end, "%Y%m%d%H")
while curr_dt <= end_dt:
base_times.append(curr_dt.strftime("%Y%m%d%H"))
curr_dt += timedelta(hours=12)
return base_times
def generate_fcst_times(base_time):
base_dt = datetime.strptime(base_time, "%Y%m%d%H")
return [(base_dt + timedelta(days=d)).strftime("%Y%m%d%H") for d in range(FCST_DAYS)]
def interp_grid_to_station(nc_path, var_name, lat, lon):
with xropen_dataset(nc_path) as ds:
ds_cropped = ds.sel(grid_xt=slice(lon - 1, lon + 1),
grid_yt=slice(lat - 1, lat + 1))
ds_sta = ds_cropped[var_name].interp(
grid_xt=lon,
grid_yt=lat,
method="linear")
return ds_sta
def convert_time_to_bjt(time_array):
bjt_tz = timezone("Asia/Shanghai")
return [pd.Timestamp(t).tz_localize(utc).tz_convert(bjt_tz).strftime("%Y/%m/%d %H:%M") for t in time_array]
def save_station_to_csv(data_dict, var_list, station_id, output_dir):
col_order = ["time(BJT)"] + var_list
df = pd.DataFrame(data_dict, columns=col_order)
output_csv = output_dir / f"{station_id}.csv"
df.to_csv(output_csv,index=False,float_format="%.4f",encoding="utf-8-sig")
print(f"站点{station_id}数据已保存为{output_csv}")
def main():
base_times = generate_base_times(START_BASE_TIME, END_BASE_TIME)
for base_time in base_times:
yyyy, mm = base_time[:4], base_time[4:6]
fcst_times = generate_fcst_times(base_time)
for fcst_time in fcst_times:
in_path = Path(f"{base_input_dir}/{yyyy}/{mm}/{base_time}/{fcst_time}")
out_path = Path(f"{base_output_dir}/{yyyy}/{mm}/{base_time}/{fcst_time}")
out_path.mkdir(parents=True, exist_ok=True)
for station in STATIONS:
sid, lat, lon = station["id"], station["lat"], station["lon"]
data_dict = {}
for var in VAR_LIST:
ds_sta = interp_grid_to_station(in_path/f"{var}.nc", var, lat, lon)
data_dict[var] = ds_sta.values.flatten()
if "time(BJT)" not in data_dict:
data_dict["time(BJT)"] = convert_time_to_bjt(ds_sta.time.values)
save_station_to_csv(data_dict, VAR_LIST, sid, out_path)
if __name__ == "__main__":
main()