Skip to content

Commit

Permalink
more cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
AndrewEichmann-NOAA committed May 31, 2024
1 parent 85ae518 commit 64c323a
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 133 deletions.
1 change: 0 additions & 1 deletion jobs/JGDAS_GLOBAL_OCEAN_ANALYSIS_LETKF
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ YMD=${gPDY} HH=${gcyc} declare_from_tmpl -rx \
COM_ICE_HISTORY_PREV:COM_ICE_HISTORY_TMPL

YMD=${PDY} HH=${cyc} declare_from_tmpl -rx COM_OBS
ROTDIR=${COMROOT} RUN=enkf${RUN} YMD=${gPDY} HH=${gcyc} declare_from_tmpl -rx COM_TOP_PREV_ENS:COM_TOP_TMPL

##############################################
# Begin JOB SPECIFIC work
Expand Down
167 changes: 35 additions & 132 deletions ush/python/pygfs/task/marine_letkf.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@
from soca import bkg_utils
from typing import Dict
import ufsda
from wxflow import (Executable,
from wxflow import (AttrDict,
Executable,
FileHandler,
logit,
parse_j2yaml,
Expand Down Expand Up @@ -39,28 +40,23 @@ def __init__(self, config: Dict) -> None:
logger.info("init")
super().__init__(config)

PDY = self.runtime_config.PDY
cyc = self.runtime_config.cyc
gcyc = str(self.config.gcyc).zfill(2)
self.runtime_config.gcyc = gcyc
self.config.gcyc = self.runtime_config.previous_cycle.strftime('%H')
RUN = self.runtime_config.RUN
DATA = path.realpath(self.runtime_config.DATA)
cdate = PDY + timedelta(hours=cyc)
COM_TOP_PREV_ENS = self.config.COM_TOP_PREV_ENS
JEDI_BIN = self.config.JEDI_BIN

gdas_home = path.join(self.config.HOMEgfs, 'sorc', 'gdas.cd')

half_assim_freq = timedelta(hours=int(self.config.assim_freq) / 2)
window_begin = cdate - half_assim_freq
window_begin = self.runtime_config.current_cycle - half_assim_freq
window_begin_iso = window_begin.strftime('%Y-%m-%dT%H:%M:%SZ')
window_middle_iso = cdate.strftime('%Y-%m-%dT%H:%M:%SZ')
window_middle_iso = self.runtime_config.current_cycle.strftime('%Y-%m-%dT%H:%M:%SZ')
self.config.ATM_WINDOW_BEGIN = window_begin_iso
self.config.ATM_WINDOW_MIDDLE = window_middle_iso

letkf_exec = path.join(JEDI_BIN, 'gdas.x')
letkf_exec = path.join(self.config.JEDI_BIN, 'gdas.x')
letkf_yaml_dir = path.join(gdas_home, 'parm', 'soca', 'letkf')
self.config['letkf_yaml_template'] = path.join(letkf_yaml_dir, 'letkf.yaml.j2')
self.config['letkf_stage_yaml_template'] = path.join(letkf_yaml_dir, 'letkf_stage.yaml.j2')
letkf_yaml_file = path.join(DATA, 'letkf.yaml')
self.config.letkf_exec_args = [letkf_exec,
'fv3jedi',
Expand All @@ -72,7 +68,7 @@ def __init__(self, config: Dict) -> None:
self.config.BKG_LIST = 'bkg_list.yaml'
self.config.bkg_dir = path.join(DATA, 'bkg')

self.config.exec_name_gridgen = path.join(JEDI_BIN, 'gdas_soca_gridgen.x')
self.config.exec_name_gridgen = path.join(self.config.JEDI_BIN, 'gdas_soca_gridgen.x')
self.config.gridgen_yaml = path.join(gdas_home, 'parm', 'soca', 'gridgen', 'gridgen.yaml')

self.config.mom_input_nml_src = path.join(gdas_home, 'parm', 'soca', 'fms', 'input.nml')
Expand All @@ -84,38 +80,11 @@ def __init__(self, config: Dict) -> None:
self.config.obs_dir = path.join(DATA, 'obs')

# set up lists of files for ens background
ocn_ens_bkg_filename = f"enkf{RUN}.ocean.t{gcyc}z.inst.f009.nc"
ice_ens_bkg_filename = f"enkf{RUN}.ice.t{gcyc}z.inst.f009.nc"

# create list of subdirs to make in initialize, and list of some of the files to stage
ens_bkg_files_to_stage = []
dirs_to_make = [self.config.bkg_dir, self.config.data_output_dir, self.config.obs_dir]
for mem in range(1, self.config.NMEM_ENS + 1):
mem_dir = f'mem{str(mem).zfill(3)}' # will make pattern mem001
dirs_to_make.append(path.join(self.config.ens_dir, mem_dir))
ocn_file_path = path.join(COM_TOP_PREV_ENS,
mem_dir,
'model_data',
'ocean',
'history',
ocn_ens_bkg_filename)
ocn_file_dest = path.join(self.config.ens_dir,
mem_dir,
ocn_ens_bkg_filename)
ice_file_path = path.join(COM_TOP_PREV_ENS,
mem_dir,
'model_data',
'ice',
'history',
ice_ens_bkg_filename)
ice_file_dest = path.join(self.config.ens_dir,
mem_dir,
ice_ens_bkg_filename)
ens_bkg_files_to_stage.append((ocn_file_path, ocn_file_dest))
ens_bkg_files_to_stage.append((ice_file_path, ice_file_dest))

self.config.ens_bkg_files_to_stage = ens_bkg_files_to_stage
self.config.dirs_to_make = dirs_to_make
self.config.ocn_ens_bkg_filename = f"enkf{RUN}.ocean.t{self.config.gcyc}z.inst.f009.nc"
self.config.ice_ens_bkg_filename = f"enkf{RUN}.ice.t{self.config.gcyc}z.inst.f009.nc"

self.task_config = AttrDict(dict(**self.config, **self.runtime_config))


@logit(logger)
def initialize(self):
Expand All @@ -130,40 +99,28 @@ def initialize(self):

logger.info("initialize")

cyc = self.runtime_config.cyc
DATA = self.runtime_config.DATA
ens_dir = self.config.ens_dir
PDYstr = self.runtime_config.PDY.strftime("%Y%m%d")
RUN = self.runtime_config.RUN

# create directories under DATA
FileHandler({'mkdir': self.config.dirs_to_make}).sync()

# copy ensemble background to DATA/ens/mem???
FileHandler({'copy': self.config.ens_bkg_files_to_stage}).sync()

bkg_utils.gen_bkg_list(bkg_path=self.config.COM_OCEAN_HISTORY_PREV,
out_path=self.config.bkg_dir,
window_begin=self.config.window_begin,
yaml_name=self.config.BKG_LIST)
# make directories and stage ensemble background files
letkf_stage_list = parse_j2yaml(self.task_config.letkf_stage_yaml_template, self.task_config)
FileHandler(letkf_stage_list).sync()

# TODO(AFE): probably needs to be jinjafied
obs_list = YAMLFile(self.config.OBS_YAML)
obs_list = YAMLFile(self.task_config.OBS_YAML)

# get the list of observations
CDATE = self.runtime_config.current_cycle.strftime("%Y%m%d%H")
obs_files = []
for ob in obs_list['observers']:
obs_name = ob['obs space']['name'].lower()
obs_filename = f"{RUN}.t{cyc}z.{obs_name}.{PDYstr}{cyc}.nc4"
obs_filename = f"{self.task_config.RUN}.t{self.task_config.cyc}z.{obs_name}.{CDATE}.nc4"
obs_files.append((obs_filename, ob))

obs_files_to_copy = []
obs_to_use = []
# copy obs from COM_OBS to DATA/obs
for obs_file, ob in obs_files:
logger.info(f"******* {obs_file}")
obs_src = path.join(self.config.COM_OBS, obs_file)
obs_dst = path.join(DATA, self.config.obs_dir, obs_file)
obs_src = path.join(self.task_config.COM_OBS, obs_file)
obs_dst = path.join(self.task_config.DATA, self.task_config.obs_dir, obs_file)
logger.info(f"******* {obs_src}")
if path.exists(obs_src):
logger.info(f"******* fetching {obs_file}")
Expand All @@ -175,26 +132,22 @@ def initialize(self):
FileHandler({'copy': obs_files_to_copy}).sync()

# make the letkf.yaml
letkf_yaml = parse_j2yaml(self.config.letkf_yaml_template, self.config)
letkf_yaml = parse_j2yaml(self.task_config.letkf_yaml_template, self.task_config)
letkf_yaml.observations.observers = obs_to_use
letkf_yaml.save(self.config.letkf_yaml_file)

FileHandler({'copy': [[self.config.mom_input_nml_src,
self.config.mom_input_nml_tmpl]]}).sync()
letkf_yaml.save(self.task_config.letkf_yaml_file)

self.stage_fix_files()

bkg_utils.stage_ic(self.config.bkg_dir, self.runtime_config.DATA, self.runtime_config.gcyc)
FileHandler({'copy': [[self.task_config.mom_input_nml_src,
self.task_config.mom_input_nml_tmpl]]}).sync()

# swap date and stack size
domain_stack_size = self.config.DOMAIN_STACK_SIZE
ymdhms = [int(s) for s in self.config.window_begin.strftime('%Y,%m,%d,%H,%M,%S').split(',')]
with open(self.config.mom_input_nml_tmpl, 'r') as nml_file:
domain_stack_size = self.task_config.DOMAIN_STACK_SIZE
ymdhms = [int(s) for s in self.task_config.window_begin.strftime('%Y,%m,%d,%H,%M,%S').split(',')]
with open(self.task_config.mom_input_nml_tmpl, 'r') as nml_file:
nml = f90nml.read(nml_file)
nml['ocean_solo_nml']['date_init'] = ymdhms
nml['fms_nml']['domains_stack_size'] = int(domain_stack_size)
ufsda.disk_utils.removefile(self.config.mom_input_nml)
nml.write(self.config.mom_input_nml)
ufsda.disk_utils.removefile(self.task_config.mom_input_nml)
nml.write(self.task_config.mom_input_nml)

@logit(logger)
def run(self):
Expand All @@ -209,9 +162,9 @@ def run(self):

logger.info("run")

exec_cmd_gridgen = Executable(self.config.APRUN_OCNANALLETKF)
exec_cmd_gridgen.add_default_arg(self.config.exec_name_gridgen)
exec_cmd_gridgen.add_default_arg(self.config.gridgen_yaml)
exec_cmd_gridgen = Executable(self.task_config.APRUN_OCNANALLETKF)
exec_cmd_gridgen.add_default_arg(self.task_config.exec_name_gridgen)
exec_cmd_gridgen.add_default_arg(self.task_config.gridgen_yaml)

try:
logger.debug(f"Executing {exec_cmd_gridgen}")
Expand All @@ -222,8 +175,8 @@ def run(self):
raise WorkflowException(f"An error occured during execution of {exec_cmd_gridgen}")
pass

exec_cmd_letkf = Executable(self.config.APRUN_OCNANALLETKF)
for letkf_exec_arg in self.config.letkf_exec_args:
exec_cmd_letkf = Executable(self.task_config.APRUN_OCNANALLETKF)
for letkf_exec_arg in self.task_config.letkf_exec_args:
exec_cmd_letkf.add_default_arg(letkf_exec_arg)

try:
Expand All @@ -247,53 +200,3 @@ def finalize(self):
"""

logger.info("finalize")

@logit(logger)
def stage_fix_files(self):
"""Stage fixed files for marine DA
Parameters:
------------
None
Returns:
--------
None
"""
# adapted from ufsda stage_fix
# TODO(AFE): this method maybe should go in a different class

logger.info("stage_fix_files")

DATA = self.runtime_config.DATA
SOCA_INPUT_FIX_DIR = self.config.SOCA_INPUT_FIX_DIR

fix_files = []
# copy Rossby Radius file
fix_files.append([path.join(SOCA_INPUT_FIX_DIR, 'rossrad.dat'),
path.join(DATA, 'rossrad.dat')])
# link name lists
fix_files.append([path.join(SOCA_INPUT_FIX_DIR, 'field_table'),
path.join(DATA, 'field_table')])
fix_files.append([path.join(SOCA_INPUT_FIX_DIR, 'diag_table'),
path.join(DATA, 'diag_table')])
fix_files.append([path.join(SOCA_INPUT_FIX_DIR, 'MOM_input'),
path.join(DATA, 'MOM_input')])
# link field metadata
fix_files.append([path.join(SOCA_INPUT_FIX_DIR, 'fields_metadata.yaml'),
path.join(DATA, 'fields_metadata.yaml')])

# link ufo <---> soca name variable mapping
fix_files.append([path.join(SOCA_INPUT_FIX_DIR, 'obsop_name_map.yaml'),
path.join(DATA, 'obsop_name_map.yaml')])

# INPUT
src_input_dir = path.join(SOCA_INPUT_FIX_DIR, 'INPUT')
dst_input_dir = path.join(DATA, 'INPUT')
FileHandler({'mkdir': [dst_input_dir]}).sync()

input_files = glob(f'{src_input_dir}/*')
for input_file in input_files:
fname = path.basename(input_file)
fix_files.append([path.join(src_input_dir, fname),
path.join(dst_input_dir, fname)])

FileHandler({'copy': fix_files}).sync()

0 comments on commit 64c323a

Please sign in to comment.