Source code for radtraq.proc.profile

"""
radtraq.proc.profile
--------------------

Module for various processing of profiles
"""
import warnings

import numpy as np
import pint
import xarray as xr
from act.utils.geo_utils import destination_azimuth_distance
from scipy import stats

from radtraq.proc.cloud_mask import calc_cloud_mask
from radtraq.utils.dataset_utils import get_height_variable_name
from radtraq.utils.utils import (
    calc_ground_range_and_height,
    calculate_azimuth_distance_from_lat_lon,
)


[docs] def calc_avg_profile( _obj, variable=None, mask_variable='cloud_mask_2', mask_value=None, mask_meaning='cloud', first_height=500.0, last_height=15000.0, step_height=50, height_variable=None, ygrid=None, ): """ Function for calculating average profiles from data after applying the cloud mask Parameters ---------- _obj : Xarray.Dataset xarray object with all the data variable : str or list List of variables to average mask_variable : string Name of mask variable to apply to data mask_value : int Value of mask to filter mask_meaning : string Mask meaning listed in mask variable flag_meanings. Ignored if mask_value set. first_height : float First height to start the analysis. Start at 500 to throw out non-meteorological echo last_height : float Last height to end the analysis. step_height : float Height step value to use between first_height and last_height to make ygrid. height_variable : string Name of the height variable to use. If set to None, will attempt to determine by using coordinate varible name. ygrid : numpy array or list Numpy array of ygrid values to interpolate. first_height, last_height, step_height ignored if set. Returns ------- obj : Xarray.Dataset xarray dataset with new variables added References ---------- Kollias, P., I. Jo, P. Borque, A. Tatarevic, K. Lamer, N. Bharadwaj, K. Widener, K. Johnson, and E.E. Clothiaux, 2014: Scanning ARM Cloud Radars. Part II: Data Quality Control and Processing. J. Atmos. Oceanic Technol., 31, 583–598, https://doi.org/10.1175/JTECH-D-13-00045.1 """ # Check for variables to process if variable is None: raise ValueError('Please Specify a Variable Name') if isinstance(variable, str): variable = [variable] # Determine height coordinate varible name. if height_variable is None: height_variable = get_height_variable_name(_obj, variable[0]) # Check for first_height, if none is passed used first in data if first_height is None: first_height = _obj[height_variable].values[0] # If ygrid is not passed, set one up if ygrid is None: ygrid = np.arange(first_height, last_height, step_height) # Get height attributes as they do disappear at some point ht_attrs = _obj[height_variable].attrs # Get variable name for calc_cloud_mask if mask_variable in _obj.keys(): try: cloud_mask_variable_name = _obj[mask_variable].attrs['variable_used'] except KeyError: cloud_mask_variable_name = variable[0] else: cloud_mask_variable_name = variable[0] # Get data types for all variables to double check later on if the int # type were up converted to float. dtypes = {var_name: _obj[var_name].dtype for var_name in _obj.keys()} # Interpolate the data to height grid # Note, standard methodology dictates that logarithmic data # should be converted to linear space before averaging # This will need to be added in the future obj = _obj.interp(coords={height_variable: ygrid}, method='nearest') # The interp method will convert the mask values to float and interpret. # So we need to correct the data type of the mask and add missing meaning. if mask_variable not in list(obj.keys()): obj = calc_cloud_mask(obj, cloud_mask_variable_name, height_variable=height_variable) # Add new variables to dtypes dictionary for var_name in list(obj.keys()): if var_name in list(dtypes.keys()): continue dtypes[var_name] = obj[var_name].dtype # Apply mask to data if mask_value is None: flag_values = obj[mask_variable].attrs['flag_values'] flag_meanings = obj[mask_variable].attrs['flag_meanings'] index = flag_meanings.index(mask_meaning) mask_value = flag_values[index] obj = obj.where(obj[mask_variable] == mask_value) # Mask data based on first_height to use obj = obj.where(obj[height_variable] >= first_height) # For each variable, calculate average profile and add back into the dataset prof_names = [] for var in variable: prof = obj[var].mean(axis=0, skipna=True) # Add data back to the object with warnings.catch_warnings(): warnings.filterwarnings( 'ignore', category=RuntimeWarning, message='.*invalid value encountered in true_divide.*', ) prof_name = var + '_avg_prof' prof_names.append(prof_name) long_name = 'Average profile of ' + var attrs = {'long_name': long_name, 'units': obj[var].attrs['units']} da = xr.DataArray( prof.values, dims=[height_variable], coords=[obj[height_variable].values], attrs=attrs, ) obj[prof_name] = da obj.attrs['_prof_names'] = prof_names # Fix height variable attributes obj[height_variable].attrs = ht_attrs # The data type of mask variables is changed in this process. Change the # mask from float to inteter. for var_name, dtype in dtypes.items(): try: if obj[var_name].dtype == dtype: continue flag_values = obj[var_name].attrs['flag_values'] flag_meanings = obj[var_name].attrs['flag_meanings'] index = 0 for ii in ['missing', 'no_cloud']: try: index = flag_meanings.index(ii) break except ValueError: pass data = obj[var_name].values data[np.isnan(data)] = flag_values[index] obj[var_name].values = data obj[var_name] = obj[var_name].astype(dtype) except KeyError: pass return obj
[docs] def extract_profile( obj, azimuth, ground_dist, append_obj=None, variables=None, azimuth_range=None, ground_dist_range=200, azimuth_name='azimuth', range_name='range', ground_range_units='m', elevation_name='elevation', ): """ Function for extracting vertical profile over a location from a PPI scan giving azimuth direction and ground range distance Parameters ---------- obj : Xarray.Dataset Xarray Dataset with all the data azimuth : float Azimuth direction to extract profile in degrees ground_dist : float Horizontal ground distance to extract profile append_obj : Xarray.Dataset Xarray Dataset to return with new profile appened. variables : str, list, None List of variables to extract profile azimuth_range : float, None Range to used to determine if close to correct profile location. If set to None will calculate using mode of difference. Assumed to be in degrees. ground_dist_range : float Distance range window size allowed to extract profile. If the profile location is off from lat/lon location by more than this distance, will not extract a profile and will return None. azimuth_name : string Variable name in Xarray object containing azimuth values. range_name : string Variable name in Xarray object containing range values. ground_range_units : string ground_dist units elevation_name : string Variable name in Xarray object containing elevation values. Assumed to be in degrees. Returns ------- obj : Xarray.Dataset or None Xarray Dataset with profile extracted and new coordinate variable height added or if unable to find profile returns None. """ profile_obj = None if append_obj is not None: profile_obj = append_obj # If no variable names provided get list of names by checking dimentions. if variables is None: variables = [] for var_name in list(obj.keys()): if ( len(obj[var_name].dims) == 2 and len(set(obj[var_name].dims) - set(['time', range_name])) == 0 ): variables.append(var_name) elif isinstance(variables, str): variables = [variables] unit_registry = pint.UnitRegistry() if azimuth_range is None: azimuth_range = np.floor(np.abs(np.diff(obj[azimuth_name].values))) azimuth_range = stats.mode(azimuth_range).mode if isinstance(azimuth_range, list): azimuth_range = azimuth_range[0] ground_dist = ground_dist * unit_registry.parse_expression(ground_range_units) ground_dist = ground_dist.to(obj[range_name].attrs['units']).magnitude ground_dist_range = ground_dist_range * unit_registry.parse_expression(ground_range_units) ground_dist_range = ground_dist_range.to(obj[range_name].attrs['units']).magnitude number_of_sweeps = obj['sweep_start_ray_index'].values.size height = np.empty(number_of_sweeps, dtype=np.float32) found_value = np.full(number_of_sweeps, False) range_index = [] time_index = [] true_range = None true_azimuth = None for sweep_number in range(0, number_of_sweeps): index = np.arange( obj['sweep_start_ray_index'].values[sweep_number], obj['sweep_end_ray_index'].values[sweep_number] + 1, dtype=int, ) index = index[0] + np.argmin(np.abs(obj[azimuth_name].values[index] - azimuth)) matched_azimuth = obj[azimuth_name].values[index] matched_elevation = obj[elevation_name].values[index] result = calc_ground_range_and_height(obj[range_name], matched_elevation) rng_index = np.nanargmin(np.abs(result['ground_range'].values - ground_dist)) if true_range is None: true_range = result['ground_range'].values[rng_index] range_index.append(rng_index) height[sweep_number] = result['height'].values[rng_index] time_index.append(index) if np.abs(matched_azimuth - azimuth) <= azimuth_range: found_value[sweep_number] = True if true_azimuth is None: true_azimuth = matched_azimuth # Check if azimuth values are within range if not np.all(found_value): return profile_obj # Check if the distance is within range if not np.abs(ground_dist - true_range) <= ground_dist_range: return profile_obj temp_obj = obj.isel(time=time_index) write_time = [ temp_obj['time'].values[0] + (temp_obj['time'].values[-1] - temp_obj['time'].values[0]) / 2 ] profile_obj = xr.Dataset() profile_obj = profile_obj.assign_coords(time=write_time) profile_obj = profile_obj.assign_coords(height=height) for var_name in variables: data = np.full((1, len(range_index)), np.nan) for ii, _ in enumerate(range_index): if not found_value[sweep_number]: continue data[0, ii] = temp_obj[var_name].values[ii, range_index[ii]] profile_obj[var_name] = xr.DataArray( data=data, dims=['time', 'height'], attrs=obj[var_name].attrs ) # Adding attributes for coordinate variables after subsetting data because # setting values to DataArray with dims defined clears the attributes. profile_obj['time'].attrs = temp_obj['time'].attrs profile_obj['height'].attrs = { 'long_name': 'Height above ground', 'units': temp_obj[range_name].attrs['units'], 'standard_name': 'height', } # Add location variables # Get latitude variable name lat_name = '' for var_name in ['lat', 'latitude']: try: temp_obj[var_name] lat_name = var_name break except KeyError: pass # Get longitude variable name lon_name = '' for var_name in ['lon', 'longitude']: try: temp_obj[var_name] lon_name = var_name break except KeyError: pass # Get altitude variable name alt_name = '' for var_name in ['alt', 'altitude']: try: temp_obj[var_name] alt_name = var_name break except KeyError: pass # Get location variables and ensure scalar value lat_value = temp_obj[lat_name].values lon_value = temp_obj[lon_name].values if len(lat_value.shape) > 0: lat_value = lat_value[0] if len(lon_value.shape) > 0: lon_value = lon_value[0] # Calcualte new lat/lon values from radar location and azimuth and range result = destination_azimuth_distance( lat_value, lon_value, true_azimuth, true_range, dist_units=temp_obj[range_name].attrs['units'], ) # Copy over DataArray for attributes profile_obj[lat_name] = temp_obj[lat_name] profile_obj[lon_name] = temp_obj[lon_name] profile_obj[alt_name] = temp_obj[alt_name] # Replace latitude values with calculated value if len(profile_obj[lat_name].values.shape) > 0: profile_obj[lat_name].values = np.full(profile_obj[lat_name].values.shape, result[0]) else: profile_obj[lat_name].values = result[0] # Replace longitude values with calculated value if len(profile_obj[lon_name].values.shape) > 0: profile_obj[lon_name].values = np.full(profile_obj[lon_name].values.shape, result[1]) else: profile_obj[lon_name].values = result[1] del temp_obj if isinstance(append_obj, xr.core.dataset.Dataset): profile_obj = xr.concat([append_obj, profile_obj], dim='time') return profile_obj
[docs] def extract_profile_at_lat_lon( obj, desired_lat, desired_lon, append_obj=None, azimuth_name='azimuth', range_name='range', elevation_name='elevation', azimuth_range=None, ground_dist_range=200, variables=None, lat_name_in_obj='lat', lon_name_in_obj='lon', ): """ Function for extracting vertical profile over a location defined by latitude and longitude from a PPI scan Parameters ---------- obj : Xarray.Dataset Xarray object with all the data desired_lat : float Latitude of desired profile in same units as latitued in obj desired_lon : float Longitude of desired profile in same units as longitude in obj append_obj : Xarray.Dataset Xarray Dataset to return with new profile appened. azimuth_name : str Name of azimuth variable in obj range_name : str Name of range variable in obj elevation_name : str Name of elevation variable in obj azimuth_range : float or None Range to use for tollerance in selecting azimuth to extract profile. If set to None will use the mode of azimuth differences. Assumed to be in degrees. ground_dist_range : float Distance range window size allowed to extract profile. If the profile location is off from lat/lon location by more than this distance, will not extract a profile and will return None. variables : str, list, None List of variables to extract profile lat_name_in_obj : str Name of latitude varible in object lon_name_in_obj : str Name of longitude varible in object Returns ------- obj : Xarray.Dataset Xarray Dataset with profile extracted at desired latitued and longitude location, and new coordinate variable height added. """ lat = obj[lat_name_in_obj].values lon = obj[lon_name_in_obj].values if len(lat.shape) > 0: lat = lat[0] if len(lon.shape) > 0: lon = lon[0] result = calculate_azimuth_distance_from_lat_lon(lat, lon, desired_lat, desired_lon) profile_obj = extract_profile( obj, result['azimuth'], result['distance'], append_obj=append_obj, variables=variables, azimuth_range=azimuth_range, azimuth_name=azimuth_name, range_name=range_name, ground_range_units='m', elevation_name=elevation_name, ground_dist_range=ground_dist_range, ) return profile_obj
[docs] def extract_rhi_profile( obj, append_obj=None, variables=None, elevation_range=[89, 91], elevation_name='elevation', sweep_variables=['sweep_start_ray_index', 'sweep_end_ray_index'], ): """ Function for extracting vertical profile over a location from a RHI scan Parameters ---------- obj : Xarray.Dataset Xarray object with all the data. Requires the additional variables in sweep_variables for finding sweeps. They will be removed from returned Dataset to allow concatination. append_obj : Xarray.Dataset If provided will append extracted profiles to this object. variables : str, list, None List of variables to return in extracted Dataset. If set to None returns all variables in the Dataset. elevation_range : list Range of elevation values to use in subsetting to find a vertical profile. Will return profile closest to 90 degrees for each RHI scan that fits within this range. If the scan does not have a value within this range, will skip that scan. Assumed to be in degrees. elevation_name : string Variable name in Xarray object containing elevation values. Assumed to be in degrees. sweep_variables : list of str Variable names used to determine sweeps to extract profile. Returns ------- obj : Xarray.Dataset or None Xarray Dataset with profile extracted and new coordinate variable height added or if unable to find profile returns None. """ if obj is None: return append_obj extract_index = [] for ii, _ in enumerate(obj[sweep_variables[0]].values): index = np.arange( obj[sweep_variables[0]].values[ii], obj[sweep_variables[1]].values[ii], dtype=int ) index = index[0] + np.argmin(np.abs(obj[elevation_name].values[index] - 90.0)) elevation = obj[elevation_name].values[index] if elevation >= elevation_range[0] and elevation <= elevation_range[1]: extract_index.append(index) if len(extract_index) > 0: obj = obj.isel(time=extract_index) for var_name in sweep_variables: del obj[var_name] if variables is not None: if isinstance(variables, str): variables = [variables] obj = obj[variables] if isinstance(append_obj, xr.core.dataset.Dataset): append_obj = xr.concat([append_obj, obj], dim='time') else: append_obj = obj return append_obj
[docs] def calc_zdr_offset(obj, zdr_var=None, thresh=None): """ Function for extracting vertical profile over a location from a RHI scan Parameters ---------- obj : Xarray.Dataset Xarray object with radar data zdr_var : string Variable name for differential reflectivity thresh : dict Disctionary of variables and values following the form of thresh = {'cross_correlation_ratio_hv': [0.995, 1], 'reflectivity': [10, 30], 'range': [1000, 3000]} Returns ------- obj : Xarray.Dataset or None Xarray Dataset with profile extracted and new coordinate variable height added or if unable to find profile returns None. """ height_var = get_height_variable_name(obj, variable=zdr_var) new = obj for k in thresh: new = new.where(obj[k] >= thresh[k][0]) new = new.where(obj[k] <= thresh[k][1]) bias = np.nanmean(new[zdr_var].values) results = { 'bias': bias, 'profile_zdr': new[zdr_var].mean(dim='time').values, 'range': new[height_var].values, } for k in thresh: if k != height_var: results['profile_' + k] = new[k].mean(dim='time').values return results