grib2io.tables
Functions for retrieving data from NCEP GRIB2 Tables.
1"""Functions for retrieving data from NCEP GRIB2 Tables.""" 2 3from functools import lru_cache 4from typing import Optional, Union, List 5from numpy.typing import ArrayLike 6import itertools 7import importlib 8 9from .section0 import * 10from .section1 import * 11from .section3 import * 12from .section4 import * 13from .section5 import * 14from .section6 import * 15from .originating_centers import * 16 17_varinfo_tables_datastore = {} 18 19GRIB2_DISCIPLINES = [0, 1, 2, 3, 4, 10, 20] 20 21AEROSOL_PDTNS = [46, 48] # 47, 49, 80, 81, 82, 83, 84, 85] <- these don't seem to be working 22AEROSOL_PARAMS = list(itertools.chain(range(0,19),range(50,82),range(100,113),range(192,197))) 23 24def _load_varinfo_tables(modname: str): 25 """ 26 Load variable information tables from sub modules into the local 27 varinfo table datastore (_varinfo_tables_datastore). 28 29 Parameters 30 ---------- 31 modname 32 Module name to extract variable info tables from. 33 """ 34 module = importlib.import_module(modname, package=__name__) 35 names = getattr(module, '__all__', [name for name in dir(module) if name.startswith('table_')]) 36 _varinfo_tables_datastore.update({name: getattr(module, name) for name in names}) 37 38 39def get_table(table: str, expand: bool=False) -> dict: 40 """ 41 Return GRIB2 code table as a dictionary. 42 43 Parameters 44 ---------- 45 table 46 Code table number (e.g. '1.0'). 47 48 NOTE: Code table '4.1' requires a 3rd value representing the product 49 discipline (e.g. '4.1.0'). 50 expand 51 If `True`, expand output dictionary wherever keys are a range. 52 53 Returns 54 ------- 55 get_table 56 GRIB2 code table as a dictionary. 57 """ 58 if len(table) == 3 and table == '4.1': 59 raise Exception('GRIB2 Code Table 4.1 requires a 3rd value representing the discipline.') 60 if len(table) == 3 and table.startswith('4.2'): 61 raise Exception('Use function get_varinfo_from_table() for GRIB2 Code Table 4.2') 62 try: 63 tbl = globals()['table_'+table.replace('.','_')] 64 if expand: 65 _tbl = {} 66 for k,v in tbl.items(): 67 if '-' in k: 68 irng = [int(i) for i in k.split('-')] 69 for i in range(irng[0],irng[1]+1): 70 _tbl[str(i)] = v 71 else: 72 _tbl[k] = v 73 tbl = _tbl 74 return tbl 75 except(KeyError): 76 return {} 77 78 79def get_value_from_table( 80 value: Union[int, str], 81 table: str, 82 expand: bool = False, 83) -> Optional[Union[float, int]]: 84 """ 85 Return the definition given a GRIB2 code table. 86 87 Parameters 88 ---------- 89 value 90 Code table value. 91 table 92 Code table number. 93 expand 94 If `True`, expand output dictionary where keys are a range. 95 96 Returns 97 ------- 98 get_value_from_table 99 Table value or `None` if not found. 100 """ 101 try: 102 tbl = get_table(table,expand=expand) 103 value = str(value) 104 return tbl[value] 105 except(KeyError): 106 for k in tbl.keys(): 107 if '-' in k: 108 bounds = k.split('-') 109 if value >= bounds[0] and value <= bounds[1]: 110 return tbl[k] 111 return None 112 113 114def get_varinfo_from_table( 115 discipline: Union[int, str], 116 parmcat: Union[int, str], 117 parmnum: Union[int, str], 118 isNDFD: bool = False, 119): 120 """ 121 Return the GRIB2 variable information. 122 123 NOTE: This functions allows for all arguments to be converted to a string 124 type if arguments are integer. 125 126 Parameters 127 ---------- 128 discipline 129 Discipline code value of a GRIB2 message. 130 parmcat 131 Parameter Category value of a GRIB2 message. 132 parmnum 133 Parameter Number value of a GRIB2 message. 134 isNDFD: optional 135 If `True`, signals function to try to get variable information from the 136 supplemental NDFD tables. 137 138 Returns 139 ------- 140 full_name 141 Full name of the GRIB2 variable. "Unknown" if variable is not found. 142 units 143 Units of the GRIB2 variable. "Unknown" if variable is not found. 144 shortName 145 Abbreviated name of the GRIB2 variable. "Unknown" if variable is not 146 found. 147 """ 148 tblname = f'table_4_2_{discipline}_{parmcat}' 149 if isNDFD: 150 tblname += '_ndfd' 151 modname = f'.section4_discipline{discipline}' 152 if tblname not in _varinfo_tables_datastore.keys(): 153 _load_varinfo_tables(modname) 154 try: 155 return _varinfo_tables_datastore[tblname][str(parmnum)] 156 except(KeyError): 157 return ['Unknown','Unknown','Unknown'] 158 159 160@lru_cache(maxsize=None) 161def get_shortnames( 162 discipline: Optional[Union[int, str]] = None, 163 parmcat: Optional[Union[int, str]] = None, 164 parmnum: Optional[Union[int, str]] = None, 165 isNDFD: bool = False, 166) -> List[str]: 167 """ 168 Return a list of variable shortNames. 169 170 If all 3 args are None, then shortNames from all disciplines, parameter 171 categories, and numbers will be returned. 172 173 Parameters 174 ---------- 175 discipline 176 GRIB2 discipline code value. 177 parmcat 178 GRIB2 parameter category value. 179 parmnum 180 Parameter Number value of a GRIB2 message. 181 isNDFD: optional 182 If `True`, signals function to try to get variable information from the 183 supplemental NDFD tables. 184 185 Returns 186 ------- 187 get_shortnames 188 list of GRIB2 shortNames. 189 """ 190 shortnames = list() 191 if discipline is None: 192 discipline = GRIB2_DISCIPLINES 193 else: 194 discipline = [discipline] 195 if parmcat is None: 196 parmcat = list() 197 for d in discipline: 198 parmcat += list(get_table(f'4.1.{d}').keys()) 199 else: 200 parmcat = [parmcat] 201 if parmnum is None: 202 parmnum = list(range(256)) 203 else: 204 parmnum = [parmnum] 205 for d in discipline: 206 207 for pc in parmcat: 208 for pn in parmnum: 209 shortnames.append(get_varinfo_from_table(d,pc,pn,isNDFD)[2]) 210 211 shortnames = sorted(set(shortnames)) 212 try: 213 shortnames.remove('unknown') 214 shortnames.remove('Unknown') 215 except(ValueError): 216 pass 217 return shortnames 218 219 220@lru_cache(maxsize=None) 221def get_metadata_from_shortname(shortname: str): 222 """ 223 Provide GRIB2 variable metadata attributes given a GRIB2 shortName. 224 225 Parameters 226 ---------- 227 shortname 228 GRIB2 variable shortName. 229 230 Returns 231 ------- 232 get_metadata_from_shortname 233 list of dictionary items where each dictionary item contains the 234 variable metadata key:value pairs. 235 236 NOTE: Some variable shortNames will exist in multiple parameter 237 category/number tables according to the GRIB2 discipline. 238 """ 239 metadata = [] 240 for d in GRIB2_DISCIPLINES: 241 parmcat = list(get_table(f'4.1.{d}').keys()) 242 for pc in parmcat: 243 for pn in range(256): 244 varinfo = get_varinfo_from_table(d,pc,pn,False) 245 if shortname == varinfo[2]: 246 metadata.append(dict(discipline=d,parameterCategory=pc,parameterNumber=pn, 247 fullName=varinfo[0],units=varinfo[1])) 248 return metadata 249 250 251def get_wgrib2_level_string(pdtn: int, pdt: ArrayLike) -> str: 252 """ 253 Return a string that describes the level or layer of the GRIB2 message. 254 255 The format and language of the string is an exact replica of how wgrib2 256 produces the level/layer string in its inventory output. 257 258 Contents of wgrib2 source, 259 [Level.c](https://github.com/NOAA-EMC/NCEPLIBS-wgrib2/blob/develop/wgrib2/Level.c), 260 were converted into a Python dictionary and stored in grib2io as table 261 'wgrib2_level_string'. 262 263 Parameters 264 ---------- 265 pdtn 266 GRIB2 Product Definition Template Number 267 pdt 268 Sequence containing GRIB2 Product Definition Template (Section 4). 269 270 Returns 271 ------- 272 get_wgrib2_level_string 273 wgrib2-formatted level/layer string. 274 """ 275 lvlstr = '' 276 if pdtn == 32: 277 return 'no_level' 278 elif pdtn == 48: 279 idxs = slice(20,26) 280 else: 281 idxs = slice(9,15) 282 type1, sfac1, sval1, type2, sfac2, sval2 = map(int,pdt[idxs]) 283 val1 = sval1/10**sfac1 284 if type1 in [100,108]: val1 *= 0.01 285 if type2 != 255: 286 # Layer 287 #assert type2 == type1, "Surface types are not equal: %g - %g" % (type1,type2) 288 val2 = sval2/10**sfac2 289 if type2 in [100,108]: val2 *= 0.01 290 lvlstr = get_value_from_table(type1,table='wgrib2_level_string')[1] 291 vals = (val1,val2) 292 else: 293 # Level 294 lvlstr = get_value_from_table(type1,table='wgrib2_level_string')[0] 295 vals = (val1) 296 if '%g' in lvlstr: lvlstr %= vals 297 return lvlstr 298 299 300def _build_aerosol_shortname(obj) -> str: 301 """ 302 """ 303 304 _OPTICAL_WAVELENGTH_MAPPING = get_table('aerosol_optical_wavelength') 305 _LEVEL_MAPPING = get_table('aerosol_level') 306 _PARAMETER_MAPPING = get_table('aerosol_parameter') 307 _AERO_TYPE_MAPPING = get_table('aerosol_type') 308 309 # Build shortname from aerosol components 310 parts = [] 311 312 # Get aerosol type 313 aero_type = str(obj.typeOfAerosol.value) if obj.typeOfAerosol is not None else "" 314 315 # Add size information if applicable 316 aero_size = "" 317 if hasattr(obj, 'scaledValueOfFirstSize'): 318 if float(obj.scaledValueOfFirstSize) > 0: 319 first_size = float(obj.scaledValueOfFirstSize) 320 321 # Map common PM sizes 322 size_map = {1: 'pm1', 25: 'pm25', 10: 'pm10', 20: 'pm20'} 323 aero_size = size_map.get(first_size, f"pm{int(first_size)}") 324 325 # Check for size intervals 326 if (hasattr(obj, 'scaledValueOfSecondSize') and 327 obj.scaledValueOfSecondSize is not None and 328 hasattr(obj, 'typeOfIntervalForAerosolSize') and 329 obj.typeOfIntervalForAerosolSize.value == 6): 330 331 second_size = float(obj.scaledValueOfSecondSize) 332 if second_size > 0: 333 if (first_size == 2.5 and second_size == 10): 334 aero_size = 'PM25to10' 335 elif (first_size == 10 and second_size == 20): 336 aero_size = 'PM10to20' 337 else: 338 aero_size = f"PM{int(first_size)}to{int(second_size)}" 339 340 # Add optical and wavelength information 341 var_wavelength = '' 342 if (hasattr(obj, 'parameterNumber') and 343 hasattr(obj, 'scaledValueOfFirstWavelength') and 344 hasattr(obj, 'scaledValueOfSecondWavelength')): 345 346 optical_type = str(obj.parameterNumber) 347 if obj.scaledValueOfFirstWavelength > 0: 348 first_wl = obj.scaledValueOfFirstWavelength 349 second_wl = obj.scaledValueOfSecondWavelength 350 351 # Special case for AE between 440-870nm 352 if optical_type == '111' and first_wl == 440 and second_wl == 870: 353 key = (optical_type, '440TO870') 354 else: 355 # Find matching wavelength band 356 for wl_key, wl_info in _OPTICAL_WAVELENGTH_MAPPING.items(): 357 if (wl_key[0] == optical_type and # Check optical_type first 358 int(wl_key[1]) == first_wl and 359 (second_wl is None or int(wl_key[2]) == second_wl)): 360 key = wl_key 361 break 362 else: 363 # If no match found, use raw values 364 key = (optical_type, str(first_wl), 365 str(second_wl) if second_wl is not None else '') 366 367 # FIX THIS... 368 if key in _OPTICAL_WAVELENGTH_MAPPING.keys(): 369 var_wavelength = _OPTICAL_WAVELENGTH_MAPPING[key] 370 371 # Add level information 372 level_str = '' 373 if hasattr(obj, 'typeOfFirstFixedSurface'): 374 first_level = str(obj.typeOfFirstFixedSurface.value) 375 first_value = str(obj.scaledValueOfFirstFixedSurface) if obj.scaledValueOfFirstFixedSurface > 0 else '' 376 if first_level in _LEVEL_MAPPING: 377 level_str = f"{_LEVEL_MAPPING[first_level]}{first_value}" 378 379 # Get parameter type 380 param = '' 381 if hasattr(obj, 'parameterNumber'): 382 param_num = str(obj.parameterNumber) 383 if param_num in _PARAMETER_MAPPING: 384 param = _PARAMETER_MAPPING[param_num] 385 386 # Build the final shortname 387 if var_wavelength and aero_type in _AERO_TYPE_MAPPING: 388 shortname = f"{_AERO_TYPE_MAPPING[aero_type]}{var_wavelength}" 389 elif aero_type in _AERO_TYPE_MAPPING: 390 parts = [] 391 if level_str: 392 parts.append(level_str) 393 parts.append(_AERO_TYPE_MAPPING[aero_type]) 394 if aero_size: 395 parts.append(aero_size) 396 if param: 397 parts.append(param) 398 shortname = '_'.join(parts) if len(parts) > 1 else parts[0] 399 else: 400 return get_varinfo_from_table(obj.section0[2], *obj.section4[2:4], isNDFD=obj._isNDFD)[2] 401 402 return shortname
40def get_table(table: str, expand: bool=False) -> dict: 41 """ 42 Return GRIB2 code table as a dictionary. 43 44 Parameters 45 ---------- 46 table 47 Code table number (e.g. '1.0'). 48 49 NOTE: Code table '4.1' requires a 3rd value representing the product 50 discipline (e.g. '4.1.0'). 51 expand 52 If `True`, expand output dictionary wherever keys are a range. 53 54 Returns 55 ------- 56 get_table 57 GRIB2 code table as a dictionary. 58 """ 59 if len(table) == 3 and table == '4.1': 60 raise Exception('GRIB2 Code Table 4.1 requires a 3rd value representing the discipline.') 61 if len(table) == 3 and table.startswith('4.2'): 62 raise Exception('Use function get_varinfo_from_table() for GRIB2 Code Table 4.2') 63 try: 64 tbl = globals()['table_'+table.replace('.','_')] 65 if expand: 66 _tbl = {} 67 for k,v in tbl.items(): 68 if '-' in k: 69 irng = [int(i) for i in k.split('-')] 70 for i in range(irng[0],irng[1]+1): 71 _tbl[str(i)] = v 72 else: 73 _tbl[k] = v 74 tbl = _tbl 75 return tbl 76 except(KeyError): 77 return {}
Return GRIB2 code table as a dictionary.
Parameters
- table: Code table number (e.g. '1.0').
NOTE: Code table '4.1' requires a 3rd value representing the product discipline (e.g. '4.1.0').
- expand: If
True
, expand output dictionary wherever keys are a range.
Returns
- get_table: GRIB2 code table as a dictionary.
80def get_value_from_table( 81 value: Union[int, str], 82 table: str, 83 expand: bool = False, 84) -> Optional[Union[float, int]]: 85 """ 86 Return the definition given a GRIB2 code table. 87 88 Parameters 89 ---------- 90 value 91 Code table value. 92 table 93 Code table number. 94 expand 95 If `True`, expand output dictionary where keys are a range. 96 97 Returns 98 ------- 99 get_value_from_table 100 Table value or `None` if not found. 101 """ 102 try: 103 tbl = get_table(table,expand=expand) 104 value = str(value) 105 return tbl[value] 106 except(KeyError): 107 for k in tbl.keys(): 108 if '-' in k: 109 bounds = k.split('-') 110 if value >= bounds[0] and value <= bounds[1]: 111 return tbl[k] 112 return None
Return the definition given a GRIB2 code table.
Parameters
- value: Code table value.
- table: Code table number.
- expand: If
True
, expand output dictionary where keys are a range.
Returns
- get_value_from_table: Table value or
None
if not found.
115def get_varinfo_from_table( 116 discipline: Union[int, str], 117 parmcat: Union[int, str], 118 parmnum: Union[int, str], 119 isNDFD: bool = False, 120): 121 """ 122 Return the GRIB2 variable information. 123 124 NOTE: This functions allows for all arguments to be converted to a string 125 type if arguments are integer. 126 127 Parameters 128 ---------- 129 discipline 130 Discipline code value of a GRIB2 message. 131 parmcat 132 Parameter Category value of a GRIB2 message. 133 parmnum 134 Parameter Number value of a GRIB2 message. 135 isNDFD: optional 136 If `True`, signals function to try to get variable information from the 137 supplemental NDFD tables. 138 139 Returns 140 ------- 141 full_name 142 Full name of the GRIB2 variable. "Unknown" if variable is not found. 143 units 144 Units of the GRIB2 variable. "Unknown" if variable is not found. 145 shortName 146 Abbreviated name of the GRIB2 variable. "Unknown" if variable is not 147 found. 148 """ 149 tblname = f'table_4_2_{discipline}_{parmcat}' 150 if isNDFD: 151 tblname += '_ndfd' 152 modname = f'.section4_discipline{discipline}' 153 if tblname not in _varinfo_tables_datastore.keys(): 154 _load_varinfo_tables(modname) 155 try: 156 return _varinfo_tables_datastore[tblname][str(parmnum)] 157 except(KeyError): 158 return ['Unknown','Unknown','Unknown']
Return the GRIB2 variable information.
NOTE: This functions allows for all arguments to be converted to a string type if arguments are integer.
Parameters
- discipline: Discipline code value of a GRIB2 message.
- parmcat: Parameter Category value of a GRIB2 message.
- parmnum: Parameter Number value of a GRIB2 message.
- isNDFD (optional):
If
True
, signals function to try to get variable information from the supplemental NDFD tables.
Returns
- full_name: Full name of the GRIB2 variable. "Unknown" if variable is not found.
- units: Units of the GRIB2 variable. "Unknown" if variable is not found.
- shortName: Abbreviated name of the GRIB2 variable. "Unknown" if variable is not found.
161@lru_cache(maxsize=None) 162def get_shortnames( 163 discipline: Optional[Union[int, str]] = None, 164 parmcat: Optional[Union[int, str]] = None, 165 parmnum: Optional[Union[int, str]] = None, 166 isNDFD: bool = False, 167) -> List[str]: 168 """ 169 Return a list of variable shortNames. 170 171 If all 3 args are None, then shortNames from all disciplines, parameter 172 categories, and numbers will be returned. 173 174 Parameters 175 ---------- 176 discipline 177 GRIB2 discipline code value. 178 parmcat 179 GRIB2 parameter category value. 180 parmnum 181 Parameter Number value of a GRIB2 message. 182 isNDFD: optional 183 If `True`, signals function to try to get variable information from the 184 supplemental NDFD tables. 185 186 Returns 187 ------- 188 get_shortnames 189 list of GRIB2 shortNames. 190 """ 191 shortnames = list() 192 if discipline is None: 193 discipline = GRIB2_DISCIPLINES 194 else: 195 discipline = [discipline] 196 if parmcat is None: 197 parmcat = list() 198 for d in discipline: 199 parmcat += list(get_table(f'4.1.{d}').keys()) 200 else: 201 parmcat = [parmcat] 202 if parmnum is None: 203 parmnum = list(range(256)) 204 else: 205 parmnum = [parmnum] 206 for d in discipline: 207 208 for pc in parmcat: 209 for pn in parmnum: 210 shortnames.append(get_varinfo_from_table(d,pc,pn,isNDFD)[2]) 211 212 shortnames = sorted(set(shortnames)) 213 try: 214 shortnames.remove('unknown') 215 shortnames.remove('Unknown') 216 except(ValueError): 217 pass 218 return shortnames
Return a list of variable shortNames.
If all 3 args are None, then shortNames from all disciplines, parameter categories, and numbers will be returned.
Parameters
- discipline: GRIB2 discipline code value.
- parmcat: GRIB2 parameter category value.
- parmnum: Parameter Number value of a GRIB2 message.
- isNDFD (optional):
If
True
, signals function to try to get variable information from the supplemental NDFD tables.
Returns
- get_shortnames: list of GRIB2 shortNames.
221@lru_cache(maxsize=None) 222def get_metadata_from_shortname(shortname: str): 223 """ 224 Provide GRIB2 variable metadata attributes given a GRIB2 shortName. 225 226 Parameters 227 ---------- 228 shortname 229 GRIB2 variable shortName. 230 231 Returns 232 ------- 233 get_metadata_from_shortname 234 list of dictionary items where each dictionary item contains the 235 variable metadata key:value pairs. 236 237 NOTE: Some variable shortNames will exist in multiple parameter 238 category/number tables according to the GRIB2 discipline. 239 """ 240 metadata = [] 241 for d in GRIB2_DISCIPLINES: 242 parmcat = list(get_table(f'4.1.{d}').keys()) 243 for pc in parmcat: 244 for pn in range(256): 245 varinfo = get_varinfo_from_table(d,pc,pn,False) 246 if shortname == varinfo[2]: 247 metadata.append(dict(discipline=d,parameterCategory=pc,parameterNumber=pn, 248 fullName=varinfo[0],units=varinfo[1])) 249 return metadata
Provide GRIB2 variable metadata attributes given a GRIB2 shortName.
Parameters
- shortname: GRIB2 variable shortName.
Returns
- get_metadata_from_shortname: list of dictionary items where each dictionary item contains the variable metadata key:value pairs.
NOTE: Some variable shortNames will exist in multiple parameter category/number tables according to the GRIB2 discipline.
252def get_wgrib2_level_string(pdtn: int, pdt: ArrayLike) -> str: 253 """ 254 Return a string that describes the level or layer of the GRIB2 message. 255 256 The format and language of the string is an exact replica of how wgrib2 257 produces the level/layer string in its inventory output. 258 259 Contents of wgrib2 source, 260 [Level.c](https://github.com/NOAA-EMC/NCEPLIBS-wgrib2/blob/develop/wgrib2/Level.c), 261 were converted into a Python dictionary and stored in grib2io as table 262 'wgrib2_level_string'. 263 264 Parameters 265 ---------- 266 pdtn 267 GRIB2 Product Definition Template Number 268 pdt 269 Sequence containing GRIB2 Product Definition Template (Section 4). 270 271 Returns 272 ------- 273 get_wgrib2_level_string 274 wgrib2-formatted level/layer string. 275 """ 276 lvlstr = '' 277 if pdtn == 32: 278 return 'no_level' 279 elif pdtn == 48: 280 idxs = slice(20,26) 281 else: 282 idxs = slice(9,15) 283 type1, sfac1, sval1, type2, sfac2, sval2 = map(int,pdt[idxs]) 284 val1 = sval1/10**sfac1 285 if type1 in [100,108]: val1 *= 0.01 286 if type2 != 255: 287 # Layer 288 #assert type2 == type1, "Surface types are not equal: %g - %g" % (type1,type2) 289 val2 = sval2/10**sfac2 290 if type2 in [100,108]: val2 *= 0.01 291 lvlstr = get_value_from_table(type1,table='wgrib2_level_string')[1] 292 vals = (val1,val2) 293 else: 294 # Level 295 lvlstr = get_value_from_table(type1,table='wgrib2_level_string')[0] 296 vals = (val1) 297 if '%g' in lvlstr: lvlstr %= vals 298 return lvlstr
Return a string that describes the level or layer of the GRIB2 message.
The format and language of the string is an exact replica of how wgrib2 produces the level/layer string in its inventory output.
Contents of wgrib2 source, Level.c, were converted into a Python dictionary and stored in grib2io as table 'wgrib2_level_string'.
Parameters
- pdtn: GRIB2 Product Definition Template Number
- pdt: Sequence containing GRIB2 Product Definition Template (Section 4).
Returns
- get_wgrib2_level_string: wgrib2-formatted level/layer string.