Skip to content

Commit

Permalink
DRY: move GDALWarp.deinit() elsewhere
Browse files Browse the repository at this point in the history
  • Loading branch information
bossie committed Sep 16, 2024
1 parent 56ef03f commit a47292b
Showing 1 changed file with 60 additions and 51 deletions.
111 changes: 60 additions & 51 deletions openeogeotrellis/deploy/batch_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,47 +152,66 @@ def context_with_retry(conf):
logger.info(f"Failed to create SparkContext, retrying {retry_counter} ... {repr(GeoPySparkBackendImplementation.summarize_exception_static(e))}")

with context_with_retry(conf) as sc:
principal = sc.getConf().get("spark.yarn.principal")
key_tab = sc.getConf().get("spark.yarn.keytab")

default_sentinel_hub_credentials = _get_sentinel_hub_credentials_from_spark_conf(sc.getConf())
vault_token = _get_vault_token(sc.getConf())
access_token = _get_access_token(sc.getConf())

if get_backend_config().setup_kerberos_auth:
setup_kerberos_auth(principal, key_tab)

def run_driver():
run_job(
job_specification=job_specification, output_file=output_file, metadata_file=metadata_file,
api_version=api_version, job_dir=job_dir, dependencies=dependencies, user_id=user_id,
max_soft_errors_ratio=max_soft_errors_ratio,
default_sentinel_hub_credentials=default_sentinel_hub_credentials,
sentinel_hub_client_alias=sentinel_hub_client_alias, vault_token=vault_token, access_token=access_token,
)

if sc.getConf().get('spark.python.profile', 'false').lower() == 'true':
# Including the driver in the profiling: a bit hacky solution but spark profiler api does not allow passing args&kwargs
driver_profile = BasicProfiler(sc)
driver_profile.profile(run_driver)
# running the driver code and adding driver's profiling results as "RDD==-1"
sc.profiler_collector.add_profiler(-1, driver_profile)
# collect profiles into a zip file
profile_dumps_dir = job_dir / 'profile_dumps'
sc.dump_profiles(profile_dumps_dir)

profile_zip = shutil.make_archive(base_name=str(profile_dumps_dir), format='gztar',
root_dir=profile_dumps_dir)
add_permissions(Path(profile_zip), stat.S_IWGRP)

shutil.rmtree(profile_dumps_dir,
onerror=lambda func, path, exc_info:
logger.warning(f"could not recursively delete {profile_dumps_dir}: {func} {path} failed",
exc_info=exc_info))

logger.info("Saved profiling info to: " + profile_zip)
else:
run_driver()
try:
principal = sc.getConf().get("spark.yarn.principal")
key_tab = sc.getConf().get("spark.yarn.keytab")

default_sentinel_hub_credentials = _get_sentinel_hub_credentials_from_spark_conf(sc.getConf())
vault_token = _get_vault_token(sc.getConf())
access_token = _get_access_token(sc.getConf())

if get_backend_config().setup_kerberos_auth:
setup_kerberos_auth(principal, key_tab)

def run_driver():
run_job(
job_specification=job_specification,
output_file=output_file,
metadata_file=metadata_file,
api_version=api_version,
job_dir=job_dir,
dependencies=dependencies,
user_id=user_id,
max_soft_errors_ratio=max_soft_errors_ratio,
default_sentinel_hub_credentials=default_sentinel_hub_credentials,
sentinel_hub_client_alias=sentinel_hub_client_alias,
vault_token=vault_token,
access_token=access_token,
)

if sc.getConf().get("spark.python.profile", "false").lower() == "true":
# Including the driver in the profiling: a bit hacky solution but spark profiler api does not allow passing args&kwargs
driver_profile = BasicProfiler(sc)
driver_profile.profile(run_driver)
# running the driver code and adding driver's profiling results as "RDD==-1"
sc.profiler_collector.add_profiler(-1, driver_profile)
# collect profiles into a zip file
profile_dumps_dir = job_dir / "profile_dumps"
sc.dump_profiles(profile_dumps_dir)

profile_zip = shutil.make_archive(
base_name=str(profile_dumps_dir), format="gztar", root_dir=profile_dumps_dir
)
add_permissions(Path(profile_zip), stat.S_IWGRP)

shutil.rmtree(
profile_dumps_dir,
onerror=lambda func, path, exc_info: logger.warning(
f"could not recursively delete {profile_dumps_dir}: {func} {path} failed", exc_info=exc_info
),
)

logger.info("Saved profiling info to: " + profile_zip)
else:
run_driver()
finally:
try:
get_jvm().com.azavea.gdal.GDALWarp.deinit()
except Py4JError as e:
if str(e) == "com.azavea.gdal.GDALWarp does not exist in the JVM":
logger.debug(f"intentionally swallowing exception {e}", exc_info=True)
else:
raise


@log_memory
Expand Down Expand Up @@ -393,15 +412,6 @@ def run_job(
finally:
write_metadata({**result_metadata, **_get_tracker_metadata("")}, metadata_file, job_dir)

if __name__ == "__main__":
try:
get_jvm().com.azavea.gdal.GDALWarp.deinit()
except Py4JError as e:
if str(e) == "com.azavea.gdal.GDALWarp does not exist in the JVM":
logger.debug(f"intentionally swallowing exception {e}", exc_info=True)
else:
raise


def write_metadata(metadata, metadata_file, job_dir):
with open(metadata_file, 'w') as f:
Expand Down Expand Up @@ -520,7 +530,6 @@ def start_main():
main(sys.argv)
except Exception as e:
error_summary = GeoPySparkBackendImplementation.summarize_exception_static(e)
most_recent_exception = sys.exc_info()[1]
fmt = Format(max_value_str_len=1000)
logger.exception("OpenEO batch job failed: " + error_summary.summary)
logger.error(
Expand Down

0 comments on commit a47292b

Please sign in to comment.