Gluing and Merging#
Within this notebook, we glue the individual files which each contain a single sweep (elevation level), into full volume scans, where each file represents a volume scan. We have downloaded the raw data from the ARM Data portal, using the gucxprecipradarS2.00
datastream.
import warnings
warnings.filterwarnings("ignore", category=DeprecationWarning)
import os
import time
import datetime
import numpy as np
import xarray as xr
import tempfile
from pathlib import Path
import shutil
from matplotlib import pyplot as plt
from dask.distributed import Client, LocalCluster, progress
import pyart
## You are using the Python ARM Radar Toolkit (Py-ART), an open source
## library for working with weather radar data. Py-ART is partly
## supported by the U.S. Department of Energy as part of the Atmospheric
## Radiation Measurement (ARM) Climate Research Facility, an Office of
## Science user facility.
##
## If you use this software to prepare a publication, please cite:
##
## JJ Helmus and SM Collis, JORS 2016, doi: 10.5334/jors.119
Define Helper Functions#
def glue_fix(ds):
# Define the required encodings for the glue files
encodings = {'DBZ' : {'_FillValue' : -32768.0},
'VEL' : {'_FillValue' : -32768.0},
'WIDTH' : {'_FillValue' : -32768.0},
'ZDR' : {'_FillValue' : -32768.0},
'PHIDP' : {'_FillValue' : -32768.0},
'RHOHV' : {'_FillValue' : -32768.0},
'NCP' : {'_FillValue' : -32768.0},
'DBZhv' : {'_FillValue' : -32768.0},
}
# Loop over all the variables; update the FillValue and Data Type
for var in encodings:
# Convert all values within the DataArray to the correct Fill Value
# NOTE: xr.where(condition, value_when_condition_is_not_met); so if every index is MVC, check for opposite
mask = ds[var].where(ds[var] > -99800, encodings[var]['_FillValue'])
# Append the corrected data to the variable.
ds[var] = mask
return ds
def radar_glue(b_radar, radar_list):
if radar_list is not None:
for rad in radar_list:
b_radar = pyart.util.join_radar(b_radar, rad)
del rad
else:
b_radar = None
return b_radar
def volume_from_list(base_radar, vlist, base_dir):
try:
radars = [pyart.io.read(base_dir+sw) for sw in vlist[1::]]
except:
radars = None
return radar_glue(base_radar, radars)
def fix_times(ds):
# Determine number of specific times within the file
specific = set(ds.time.data)
# Loop through the specific times, and add offset
for value in specific:
dt = np.arange(0, len(ds.sel(time=slice(value, value)).time.data))
dt = dt.astype('timedelta64[ms]')
# add the time offset to the original times
new_times = ds.sel(time=slice(value, value)).time.data + dt
ds.sel(time=slice(value, value)).time.data[:] = new_times
# Send back to the main
return ds
def granule(Dvolume):
n_tilts = 8
month = "202203"
data_dir = "/gpfs/wolf/atm124/proj-shared/gucxprecipradarS2.00/nc_files/" + month + "_nc/"
out_dir = "/gpfs/wolf/atm124/proj-shared/gucxprecipradarS2.00/glue_files/" + month + "_glued/"
# Read the base scan to determine if it can be read in
if len(Dvolume) == 8:
try:
base_rad = pyart.io.read(data_dir+Dvolume[0])
except:
base_rad = None
# Read all scans and join with base scan
if base_rad is not None:
out_radar = volume_from_list(base_rad, Dvolume, data_dir)
if out_radar is not None:
# Define the filename time from the radar object
ff = time.strptime(out_radar.time['units'][14:], '%Y-%m-%dT%H:%M:%SZ')
dt = datetime.datetime.fromtimestamp(time.mktime(ff)) + datetime.timedelta(seconds= int(out_radar.time['data'][0]))
strform = dt.strftime(out_dir + 'xprecipradar_guc_volume_%Y%m%d-%H%M%S.b1.nc')
#FIX for join issue.. to be fixed in Py-ART
out_radar.sweep_mode['data']=np.tile(base_rad.sweep_mode['data'], n_tilts)
try:
pyart.io.write_cfradial(strform, out_radar, arm_time_variables=True)
print('SUCCESS', strform)
except:
print('FAILURE', strform)
# Delete the radars to free up memory
del base_rad
del out_radar
# Fix the times and encodings of the generated file
with tempfile.TemporaryDirectory() as tmpdir:
tmp_path = Path(tmpdir)
with xr.open_dataset(strform, mask_and_scale=False) as ds:
ds = ds.load()
ds = fix_times(ds)
ds = glue_fix(ds)
out_path = str(tmp_path) + '/' + strform.split('/')[-1]
# set time encoding for miliseconds
ds.time.encoding['units'] = 'milliseconds since 1970-01-01'
ds.to_netcdf(path=out_path)
shutil.copy(out_path, strform)
Locate all the PPI scans within the Desired Directories#
month = "202203"
data_dir = "/gpfs/wolf/atm124/proj-shared/gucxprecipradarS2.00/nc_files/" + month + "_nc/"
out_dir = "/gpfs/wolf/atm124/proj-shared/gucxprecipradarS2.00/glue_files/" + month + "_glued/"
all_files = os.listdir(data_dir)
all_files.sort()
base_scan_ppi = '1_PPI.nc'
ppi_pattern = 'PPI.nc'
base_scans = []
volumes = []
ppis = []
in_volume = False
for file in all_files:
if ppi_pattern in file:
ppis.append(file)
if base_scan_ppi in file:
base_scans.append(file)
n_tilts = 8
volumes = []
for base in base_scans:
base_scan_index = np.where(np.array(ppis) == base)[0][0]
#print(base_scan_index)
volume = ppis[base_scan_index: base_scan_index+n_tilts]
volumes.append(volume)
Try Single Volume to Verify Process#
volumes[0]
['gucxprecipradarS2.00.20220301.004845.raw.csu.sail-20220301-004845_278032_22_1_PPI.nc',
'gucxprecipradarS2.00.20220301.004916.raw.csu.sail-20220301-004916_278034_22_2_PPI.nc',
'gucxprecipradarS2.00.20220301.004949.raw.csu.sail-20220301-004949_278035_22_4_PPI.nc',
'gucxprecipradarS2.00.20220301.005020.raw.csu.sail-20220301-005020_278036_22_6_PPI.nc',
'gucxprecipradarS2.00.20220301.005053.raw.csu.sail-20220301-005053_278037_22_8_PPI.nc',
'gucxprecipradarS2.00.20220301.005124.raw.csu.sail-20220301-005124_278038_22_10_PPI.nc',
'gucxprecipradarS2.00.20220301.005156.raw.csu.sail-20220301-005156_278039_22_12_PPI.nc',
'gucxprecipradarS2.00.20220301.005229.raw.csu.sail-20220301-005229_278040_22_15_PPI.nc']
%%time
granule(volumes[0])
SUCCESS /gpfs/wolf/atm124/proj-shared/gucxprecipradarS2.00/glue_files/202203_glued/xprecipradar_guc_volume_20220301-004845.b1.nc
CPU times: user 14.4 s, sys: 1.66 s, total: 16 s
Wall time: 17.1 s
# smaller subset just for testing; for full month map volumes to granule
subset = volumes[0:16]
Start a Dask Cluster#
from dask.distributed import Client, LocalCluster
cluster = LocalCluster()
client = Client(cluster)
client
Client
Client-6b38fc35-d1cd-11ee-aa6b-b8cb29b120a2
Connection method: Cluster object | Cluster type: distributed.LocalCluster |
Dashboard: http://127.0.0.1:8787/status |
Cluster Info
LocalCluster
d5f460ae
Dashboard: http://127.0.0.1:8787/status | Workers: 1 |
Total threads: 1 | Total memory: 251.48 GiB |
Status: running | Using processes: True |
Scheduler Info
Scheduler
Scheduler-8ee07010-a59b-4d9e-9820-fab289c28b05
Comm: tcp://127.0.0.1:34455 | Workers: 1 |
Dashboard: http://127.0.0.1:8787/status | Total threads: 1 |
Started: Just now | Total memory: 251.48 GiB |
Workers
Worker: 0
Comm: tcp://127.0.0.1:43873 | Total threads: 1 |
Dashboard: http://127.0.0.1:37303/status | Memory: 251.48 GiB |
Nanny: tcp://127.0.0.1:34077 | |
Local directory: /tmp/dask-scratch-space/worker-6_zfrvvd |
Glue the Files#
%%time
future = client.map(granule, subset)
my_data = client.gather(future)
CPU times: user 3.33 s, sys: 2.07 s, total: 5.39 s
Wall time: 4min 33s