Skip to content

Commit

Permalink
Merge pull request #7415 from aldbr/v8.0_FIX_PJA-failover
Browse files Browse the repository at this point in the history
[8.0] PushJobAgent do not send failover request anymore
  • Loading branch information
fstagni committed Feb 7, 2024
2 parents 2d5ab55 + 15ff95a commit 99f68cc
Show file tree
Hide file tree
Showing 3 changed files with 234 additions and 131 deletions.
164 changes: 88 additions & 76 deletions src/DIRAC/WorkloadManagementSystem/Agent/JobAgent.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,12 @@ def __init__(self, agentName, loadName, baseAgentName=False, properties=None):
self.timeLeftUtil = None
self.pilotInfoReportedFlag = False

# Submission results
self.submissionDict = {}
# Attributes related to the processed jobs, it should take the following form:
# {"<jobID>": {"jobReport": JobReport(), "taskID": "<taskID>"}}
# where taskID is the ID of the job as seen by the CE
# and jobReport is the JobReport instance for the job
# (one instance per job to avoid any discrepancy when communicating with the WMS)
self.jobs = {}

#############################################################################
def initialize(self):
Expand Down Expand Up @@ -135,7 +139,6 @@ def initialize(self):

# Utilities
self.timeLeftUtil = TimeLeft()
self.jobReport = JobReport(0, f"{self.__class__.__name__}@{self.siteName}")
return S_OK()

def _initializeComputingElement(self, localCE):
Expand Down Expand Up @@ -211,8 +214,11 @@ def execute(self):
matcherParams = ["JDL", "DN", "Group"]
matcherInfo = jobRequest["Value"]
jobID = matcherInfo["JobID"]
self.jobReport.setJob(jobID)
result = self._checkMatcherInfo(matcherInfo, matcherParams)

self.jobs[jobID] = {}
self.jobs[jobID]["JobReport"] = JobReport(jobID, f"{self.__class__.__name__}@{self.siteName}")

result = self._checkMatcherInfo(jobID, matcherInfo, matcherParams)
if not result["OK"]:
return self._finish(result["Message"])

Expand All @@ -235,30 +241,35 @@ def execute(self):
# Get JDL paramters
parameters = self._getJDLParameters(jobJDL)
if not parameters["OK"]:
self.jobReport.setJobStatus(status=JobStatus.FAILED, minorStatus="Could Not Extract JDL Parameters")
self.jobs[jobID]["JobReport"].setJobStatus(
status=JobStatus.FAILED, minorStatus="Could Not Extract JDL Parameters"
)
self.log.warn("Could Not Extract JDL Parameters", parameters["Message"])
return self._finish("JDL Problem")
return self._finish("JDL Problem", self.stopOnApplicationFailure)

params = parameters["Value"]
result = self._extractValuesFromJobParams(params)
if not result["OK"]:
return self._finish(result["Value"])
self.jobs[jobID]["JobReport"].setJobStatus(status=JobStatus.FAILED, minorStatus=result["Message"])
return self._finish(result["Value"], self.stopOnApplicationFailure)
submissionParams = result["Value"]
jobID = submissionParams["jobID"]
jobType = submissionParams["jobType"]

self.log.verbose("Job request successful: \n", jobRequest["Value"])
self.log.info("Received", f"JobID={jobID}, JobType={jobType}, OwnerDN={ownerDN}, JobGroup={jobGroup}")
self.jobCount += 1
self.jobReport.setJobParameter(par_name="MatcherServiceTime", par_value=str(matchTime), sendFlag=False)
self.jobs[jobID]["JobReport"].setJobParameter(
par_name="MatcherServiceTime", par_value=str(matchTime), sendFlag=False
)
if "BOINC_JOB_ID" in os.environ:
# Report BOINC environment
for thisp in ("BoincUserID", "BoincHostID", "BoincHostPlatform", "BoincHostName"):
self.jobReport.setJobParameter(
self.jobs[jobID]["JobReport"].setJobParameter(
par_name=thisp, par_value=gConfig.getValue(f"/LocalSite/{thisp}", "Unknown"), sendFlag=False
)

self.jobReport.setJobStatus(minorStatus="Job Received by Agent", sendFlag=False)
self.jobs[jobID]["JobReport"].setJobStatus(minorStatus="Job Received by Agent", sendFlag=False)
result_setupProxy = self._setupProxy(ownerDN, jobGroup)
if not result_setupProxy["OK"]:
result = self._rescheduleFailedJob(jobID, result_setupProxy["Message"])
Expand All @@ -269,7 +280,8 @@ def execute(self):
self._saveJobJDLRequest(jobID, jobJDL)

# Check software and install them if required
software = self._checkInstallSoftware(jobID, params, ceDict)
self.jobs[jobID]["JobReport"].setJobStatus(minorStatus="Installing Software", sendFlag=False)
software = self._checkInstallSoftware(params, ceDict)
if not software["OK"]:
self.log.error("Failed to install software for job", f"{jobID}")
errorMsg = software["Message"]
Expand All @@ -280,14 +292,14 @@ def execute(self):

gridCE = gConfig.getValue("/LocalSite/GridCE", "")
if gridCE:
self.jobReport.setJobParameter(par_name="GridCE", par_value=gridCE, sendFlag=False)
self.jobs[jobID]["JobReport"].setJobParameter(par_name="GridCE", par_value=gridCE, sendFlag=False)

queue = gConfig.getValue("/LocalSite/CEQueue", "")
if queue:
self.jobReport.setJobParameter(par_name="CEQueue", par_value=queue, sendFlag=False)
self.jobs[jobID]["JobReport"].setJobParameter(par_name="CEQueue", par_value=queue, sendFlag=False)

if batchSystem := gConfig.getValue("/LocalSite/BatchSystem/Type", ""):
self.jobReport.setJobParameter(par_name="BatchSystem", par_value=batchSystem, sendFlag=False)
self.jobs[jobID]["JobReport"].setJobParameter(par_name="BatchSystem", par_value=batchSystem, sendFlag=False)

self.log.debug(f"Before self._submitJob() ({self.ceName}CE)")
result = self._submitJob(
Expand All @@ -307,32 +319,18 @@ def execute(self):
return self._finish(result["Message"])
self.log.debug(f"After {self.ceName}CE submitJob()")

# Committing the JobReport before evaluating the result of job submission
res = self.jobReport.commit()
if not res["OK"]:
resFD = self.jobReport.generateForwardDISET()
if not resFD["OK"]:
self.log.error("Error generating ForwardDISET operation", resFD["Message"])
elif resFD["Value"]:
# Here we create the Request.
op = resFD["Value"]
request = Request()
requestName = f"jobAgent_{jobID}"
request.RequestName = requestName.replace('"', "")
request.JobID = jobID
request.SourceComponent = f"JobAgent_{jobID}"
request.addOperation(op)
# This might fail, but only a message would be printed.
self._sendFailoverRequest(request)

# Checking errors that could have occurred during the job submission and/or execution
result = self._checkSubmittedJobs()
if not result["OK"]:
return result

submissionErrors = result["Value"][0]
payloadErrors = result["Value"][1]
if submissionErrors:
return self._finish("Error during the submission process")
# Stop the JobAgent if too many CE errors occurred
return self._finish(
"Error during the submission process", self.hostFailureCount > self.stopAfterHostFailures
)
if payloadErrors:
return self._finish("Error during a payload execution", self.stopOnApplicationFailure)

Expand Down Expand Up @@ -525,7 +523,7 @@ def _requestProxyFromProxyManager(self, ownerDN, ownerGroup):
return S_OK(chain)

#############################################################################
def _checkInstallSoftware(self, jobID, jobParams, resourceParams):
def _checkInstallSoftware(self, jobParams, resourceParams):
"""Checks software requirement of job and whether this is already present
before installing software locally.
"""
Expand All @@ -534,7 +532,6 @@ def _checkInstallSoftware(self, jobID, jobParams, resourceParams):
self.log.verbose(msg)
return S_OK(msg)

self.jobReport.setJobStatus(minorStatus="Installing Software", sendFlag=False)
softwareDist = jobParams["SoftwareDistModule"]
self.log.verbose("Found VO Software Distribution module", f": {softwareDist}")
argumentsDict = {"Job": jobParams, "CE": resourceParams}
Expand Down Expand Up @@ -586,15 +583,19 @@ def _checkMatchingIssues(self, jobRequest):
return self._finish("Nothing to do for more than %d cycles" % self.stopAfterFailedMatches)
return S_OK()

def _checkMatcherInfo(self, matcherInfo, matcherParams):
def _checkMatcherInfo(self, jobID, matcherInfo, matcherParams):
"""Check that all relevant information about the job are available"""
for param in matcherParams:
if param not in matcherInfo:
self.jobReport.setJobStatus(status=JobStatus.FAILED, minorStatus=f"Matcher did not return {param}")
self.jobs[jobID]["JobReport"].setJobStatus(
status=JobStatus.FAILED, minorStatus=f"Matcher did not return {param}"
)
return S_ERROR("Matcher Failed")

if not matcherInfo[param]:
self.jobReport.setJobStatus(status=JobStatus.FAILED, minorStatus=f"Matcher returned null {param}")
self.jobs[jobID]["JobReport"].setJobStatus(
status=JobStatus.FAILED, minorStatus=f"Matcher returned null {param}"
)
return S_ERROR("Matcher Failed")

self.log.verbose("Matcher returned", f"{param} = {matcherInfo[param]} ")
Expand Down Expand Up @@ -636,7 +637,7 @@ def _submitJob(

wrapperFile = result["Value"][0]
inputs = list(result["Value"][1:])
self.jobReport.setJobStatus(minorStatus="Submitting To CE")
self.jobs[jobID]["JobReport"].setJobStatus(minorStatus="Submitting To CE")

self.log.info("Submitting JobWrapper", f"{os.path.basename(wrapperFile)} to {self.ceName}CE")

Expand Down Expand Up @@ -666,7 +667,7 @@ def _submitJob(
taskID = 0
# We create a S_ERROR from the exception to compute it as a normal error
self.computingElement.taskResults[taskID] = S_ERROR(unexpectedSubmitException)
self.submissionDict[jobID] = taskID
self.jobs[jobID]["TaskID"] = taskID
return S_OK()

# Submission results are processed in _checkSubmittedJobs
Expand All @@ -684,7 +685,7 @@ def _submitJob(

self.log.info("Job being submitted", f"(DIRAC JobID: {jobID}; Task ID: {taskID})")

self.submissionDict[jobID] = taskID
self.jobs[jobID]["TaskID"] = taskID
time.sleep(self.jobSubmissionDelay)
return S_OK()

Expand All @@ -693,31 +694,26 @@ def _checkSubmittedJobs(self):
# We expect the computingElement to have a taskResult dictionary.
submissionErrors = []
payloadErrors = []
originalJobID = self.jobReport.jobID
# Loop over the jobIDs submitted to the CE
# Here we iterate over a copy of the keys because we are modifying the dictionary within the loop
for jobID in list(self.submissionDict.keys()):
taskID = self.submissionDict[jobID]
if taskID not in self.computingElement.taskResults:
for jobID in list(self.jobs.keys()):
taskID = self.jobs[jobID].get("TaskID")
if taskID is None or taskID not in self.computingElement.taskResults:
continue

result = self.computingElement.taskResults[taskID]
# jobReport will handle different jobIDs
# setJobParameter() and setJobStatus() should send status immediately (sendFlag=True by default)
self.jobReport.setJob(jobID)

# The submission process failed
if not result["OK"]:
self.log.error("Job submission failed", jobID)
self.jobReport.setJobParameter(par_name="ErrorMessage", par_value=f"{self.ceName} CE Submission Error")
self.jobs[jobID]["JobReport"].setJobParameter(
par_name="ErrorMessage", par_value=f"{self.ceName} CE Submission Error", sendFlag=False
)

self.log.error("Error in DIRAC JobWrapper or inner CE execution:", result["Message"])
submissionErrors.append(result["Message"])
self._rescheduleFailedJob(jobID, result["Message"])
# Stop the JobAgent if too many CE errors
self.hostFailureCount += 1
if self.hostFailureCount > self.stopAfterHostFailures:
return self._finish(result["Message"], self.stopAfterHostFailures)

# The payload failed (if result["Value"] is not 0)
elif result["Value"]:
Expand All @@ -726,19 +722,38 @@ def _checkSubmittedJobs(self):
if not res["OK"]:
return res
if res["Value"][int(jobID)]["Status"] == JobStatus.RUNNING:
self.jobReport.setJobStatus(status=JobStatus.FAILED, minorStatus="Payload failed")
self.jobs[jobID]["JobReport"].setJobStatus(
status=JobStatus.FAILED, minorStatus="Payload failed", sendFlag=False
)

# Do not keep running and do not overwrite the Payload error
message = f"Payload execution failed with error code {result['Value']}"
payloadErrors.append(message)
self.log.info(message)

# The job has been treated, we can commit the JobReport
res = self.jobs[jobID]["JobReport"].commit()
if not res["OK"]:
resFD = self.jobs[jobID]["JobReport"].generateForwardDISET()
if not resFD["OK"]:
self.log.error("Error generating ForwardDISET operation", resFD["Message"])
elif resFD["Value"]:
# Here we create the Request.
op = resFD["Value"]
request = Request()
requestName = f"jobAgent_{jobID}"
request.RequestName = requestName.replace('"', "")
request.JobID = jobID
request.SourceComponent = f"JobAgent_{jobID}"
request.addOperation(op)
# This might fail, but only a message would be printed.
self._sendFailoverRequest(request)

# Remove taskID from computingElement.taskResults as it has been treated
# Remove jobID from submissionDict as it has been treated
# Remove jobID from jobs as it has been treated
del self.computingElement.taskResults[taskID]
del self.submissionDict[jobID]
del self.jobs[jobID]

self.jobReport.setJob(originalJobID)
return S_OK((submissionErrors, payloadErrors))

#############################################################################
Expand Down Expand Up @@ -777,9 +792,8 @@ def _extractValuesFromJobParams(self, params):
submissionDict["jobID"] = params.get("JobID")
if not submissionDict["jobID"]:
msg = "Job has not JobID defined in JDL parameters"
self.jobReport.setJobStatus(status=JobStatus.FAILED, minorStatus=msg)
self.log.warn(msg)
return S_ERROR("JDL Problem")
return S_ERROR(msg)

submissionDict["jobType"] = params.get("JobType", "Unknown")
if submissionDict["jobType"] == "Unknown":
Expand Down Expand Up @@ -816,25 +830,19 @@ def _finish(self, message, stop=True):
return S_OK(message)

#############################################################################
def _rescheduleFailedJob(self, jobID, message, direct=False):
def _rescheduleFailedJob(self, jobID, message):
"""
Set Job Status to "Rescheduled" and issue a reschedule command to the Job Manager
"""

self.log.warn("Failure ==> rescheduling", f"(during {message})")

if direct:
JobStateUpdateClient().setJobStatus(
int(jobID), status=JobStatus.RESCHEDULED, applicationStatus=message, source="JobAgent@%s", force=True
)
else:
originalJobID = self.jobReport.jobID
self.jobReport.setJob(jobID)
# Setting a job parameter does not help since the job will be rescheduled,
# instead set the status with the cause and then another status showing the
# reschedule operation.
self.jobReport.setJobStatus(status=JobStatus.RESCHEDULED, applicationStatus=message, sendFlag=True)
self.jobReport.setJob(originalJobID)
# Setting a job parameter does not help since the job will be rescheduled,
# instead set the status with the cause and then another status showing the
# reschedule operation.
self.jobs[jobID]["JobReport"].setJobStatus(
status=JobStatus.RESCHEDULED, applicationStatus=message, sendFlag=True
)

self.log.info("Job will be rescheduled")
result = JobManagerClient().rescheduleJob(jobID)
Expand Down Expand Up @@ -882,11 +890,15 @@ def finalize(self):
if not res["OK"]:
self.log.error("CE could not be properly shut down", res["Message"])

# Check the submitted jobs a last time
result = self._checkSubmittedJobs()
if not result["OK"]:
self.log.error("Problem while trying to get status of the last submitted jobs")
# Check the latest submitted jobs
while self.jobs:
result = self._checkSubmittedJobs()
if not result["OK"]:
self.log.error("Problem while trying to get status of the last submitted jobs")
break
time.sleep(int(self.am_getOption("PollingTime")))

# Set the pilot status to Done
gridCE = gConfig.getValue("/LocalSite/GridCE", "")
queue = gConfig.getValue("/LocalSite/CEQueue", "")
result = PilotManagerClient().setPilotStatus(
Expand Down
Loading

0 comments on commit 99f68cc

Please sign in to comment.