diff --git a/src/DIRAC/WorkloadManagementSystem/Agent/JobAgent.py b/src/DIRAC/WorkloadManagementSystem/Agent/JobAgent.py index 80dbd4c3f66..e0fb0d98005 100755 --- a/src/DIRAC/WorkloadManagementSystem/Agent/JobAgent.py +++ b/src/DIRAC/WorkloadManagementSystem/Agent/JobAgent.py @@ -88,8 +88,12 @@ def __init__(self, agentName, loadName, baseAgentName=False, properties=None): self.timeLeftUtil = None self.pilotInfoReportedFlag = False - # Submission results - self.submissionDict = {} + # Attributes related to the processed jobs, it should take the following form: + # {"": {"jobReport": JobReport(), "taskID": ""}} + # where taskID is the ID of the job as seen by the CE + # and jobReport is the JobReport instance for the job + # (one instance per job to avoid any discrepancy when communicating with the WMS) + self.jobs = {} ############################################################################# def initialize(self): @@ -135,7 +139,6 @@ def initialize(self): # Utilities self.timeLeftUtil = TimeLeft() - self.jobReport = JobReport(0, f"{self.__class__.__name__}@{self.siteName}") return S_OK() def _initializeComputingElement(self, localCE): @@ -211,8 +214,11 @@ def execute(self): matcherParams = ["JDL", "DN", "Group"] matcherInfo = jobRequest["Value"] jobID = matcherInfo["JobID"] - self.jobReport.setJob(jobID) - result = self._checkMatcherInfo(matcherInfo, matcherParams) + + self.jobs[jobID] = {} + self.jobs[jobID]["JobReport"] = JobReport(jobID, f"{self.__class__.__name__}@{self.siteName}") + + result = self._checkMatcherInfo(jobID, matcherInfo, matcherParams) if not result["OK"]: return self._finish(result["Message"]) @@ -235,14 +241,17 @@ def execute(self): # Get JDL paramters parameters = self._getJDLParameters(jobJDL) if not parameters["OK"]: - self.jobReport.setJobStatus(status=JobStatus.FAILED, minorStatus="Could Not Extract JDL Parameters") + self.jobs[jobID]["JobReport"].setJobStatus( + status=JobStatus.FAILED, minorStatus="Could Not Extract JDL Parameters" + ) self.log.warn("Could Not Extract JDL Parameters", parameters["Message"]) - return self._finish("JDL Problem") + return self._finish("JDL Problem", self.stopOnApplicationFailure) params = parameters["Value"] result = self._extractValuesFromJobParams(params) if not result["OK"]: - return self._finish(result["Value"]) + self.jobs[jobID]["JobReport"].setJobStatus(status=JobStatus.FAILED, minorStatus=result["Message"]) + return self._finish(result["Value"], self.stopOnApplicationFailure) submissionParams = result["Value"] jobID = submissionParams["jobID"] jobType = submissionParams["jobType"] @@ -250,15 +259,17 @@ def execute(self): self.log.verbose("Job request successful: \n", jobRequest["Value"]) self.log.info("Received", f"JobID={jobID}, JobType={jobType}, OwnerDN={ownerDN}, JobGroup={jobGroup}") self.jobCount += 1 - self.jobReport.setJobParameter(par_name="MatcherServiceTime", par_value=str(matchTime), sendFlag=False) + self.jobs[jobID]["JobReport"].setJobParameter( + par_name="MatcherServiceTime", par_value=str(matchTime), sendFlag=False + ) if "BOINC_JOB_ID" in os.environ: # Report BOINC environment for thisp in ("BoincUserID", "BoincHostID", "BoincHostPlatform", "BoincHostName"): - self.jobReport.setJobParameter( + self.jobs[jobID]["JobReport"].setJobParameter( par_name=thisp, par_value=gConfig.getValue(f"/LocalSite/{thisp}", "Unknown"), sendFlag=False ) - self.jobReport.setJobStatus(minorStatus="Job Received by Agent", sendFlag=False) + self.jobs[jobID]["JobReport"].setJobStatus(minorStatus="Job Received by Agent", sendFlag=False) result_setupProxy = self._setupProxy(ownerDN, jobGroup) if not result_setupProxy["OK"]: result = self._rescheduleFailedJob(jobID, result_setupProxy["Message"]) @@ -269,7 +280,8 @@ def execute(self): self._saveJobJDLRequest(jobID, jobJDL) # Check software and install them if required - software = self._checkInstallSoftware(jobID, params, ceDict) + self.jobs[jobID]["JobReport"].setJobStatus(minorStatus="Installing Software", sendFlag=False) + software = self._checkInstallSoftware(params, ceDict) if not software["OK"]: self.log.error("Failed to install software for job", f"{jobID}") errorMsg = software["Message"] @@ -280,14 +292,14 @@ def execute(self): gridCE = gConfig.getValue("/LocalSite/GridCE", "") if gridCE: - self.jobReport.setJobParameter(par_name="GridCE", par_value=gridCE, sendFlag=False) + self.jobs[jobID]["JobReport"].setJobParameter(par_name="GridCE", par_value=gridCE, sendFlag=False) queue = gConfig.getValue("/LocalSite/CEQueue", "") if queue: - self.jobReport.setJobParameter(par_name="CEQueue", par_value=queue, sendFlag=False) + self.jobs[jobID]["JobReport"].setJobParameter(par_name="CEQueue", par_value=queue, sendFlag=False) if batchSystem := gConfig.getValue("/LocalSite/BatchSystem/Type", ""): - self.jobReport.setJobParameter(par_name="BatchSystem", par_value=batchSystem, sendFlag=False) + self.jobs[jobID]["JobReport"].setJobParameter(par_name="BatchSystem", par_value=batchSystem, sendFlag=False) self.log.debug(f"Before self._submitJob() ({self.ceName}CE)") result = self._submitJob( @@ -307,32 +319,18 @@ def execute(self): return self._finish(result["Message"]) self.log.debug(f"After {self.ceName}CE submitJob()") - # Committing the JobReport before evaluating the result of job submission - res = self.jobReport.commit() - if not res["OK"]: - resFD = self.jobReport.generateForwardDISET() - if not resFD["OK"]: - self.log.error("Error generating ForwardDISET operation", resFD["Message"]) - elif resFD["Value"]: - # Here we create the Request. - op = resFD["Value"] - request = Request() - requestName = f"jobAgent_{jobID}" - request.RequestName = requestName.replace('"', "") - request.JobID = jobID - request.SourceComponent = f"JobAgent_{jobID}" - request.addOperation(op) - # This might fail, but only a message would be printed. - self._sendFailoverRequest(request) - # Checking errors that could have occurred during the job submission and/or execution result = self._checkSubmittedJobs() if not result["OK"]: return result + submissionErrors = result["Value"][0] payloadErrors = result["Value"][1] if submissionErrors: - return self._finish("Error during the submission process") + # Stop the JobAgent if too many CE errors occurred + return self._finish( + "Error during the submission process", self.hostFailureCount > self.stopAfterHostFailures + ) if payloadErrors: return self._finish("Error during a payload execution", self.stopOnApplicationFailure) @@ -525,7 +523,7 @@ def _requestProxyFromProxyManager(self, ownerDN, ownerGroup): return S_OK(chain) ############################################################################# - def _checkInstallSoftware(self, jobID, jobParams, resourceParams): + def _checkInstallSoftware(self, jobParams, resourceParams): """Checks software requirement of job and whether this is already present before installing software locally. """ @@ -534,7 +532,6 @@ def _checkInstallSoftware(self, jobID, jobParams, resourceParams): self.log.verbose(msg) return S_OK(msg) - self.jobReport.setJobStatus(minorStatus="Installing Software", sendFlag=False) softwareDist = jobParams["SoftwareDistModule"] self.log.verbose("Found VO Software Distribution module", f": {softwareDist}") argumentsDict = {"Job": jobParams, "CE": resourceParams} @@ -586,15 +583,19 @@ def _checkMatchingIssues(self, jobRequest): return self._finish("Nothing to do for more than %d cycles" % self.stopAfterFailedMatches) return S_OK() - def _checkMatcherInfo(self, matcherInfo, matcherParams): + def _checkMatcherInfo(self, jobID, matcherInfo, matcherParams): """Check that all relevant information about the job are available""" for param in matcherParams: if param not in matcherInfo: - self.jobReport.setJobStatus(status=JobStatus.FAILED, minorStatus=f"Matcher did not return {param}") + self.jobs[jobID]["JobReport"].setJobStatus( + status=JobStatus.FAILED, minorStatus=f"Matcher did not return {param}" + ) return S_ERROR("Matcher Failed") if not matcherInfo[param]: - self.jobReport.setJobStatus(status=JobStatus.FAILED, minorStatus=f"Matcher returned null {param}") + self.jobs[jobID]["JobReport"].setJobStatus( + status=JobStatus.FAILED, minorStatus=f"Matcher returned null {param}" + ) return S_ERROR("Matcher Failed") self.log.verbose("Matcher returned", f"{param} = {matcherInfo[param]} ") @@ -636,7 +637,7 @@ def _submitJob( wrapperFile = result["Value"][0] inputs = list(result["Value"][1:]) - self.jobReport.setJobStatus(minorStatus="Submitting To CE") + self.jobs[jobID]["JobReport"].setJobStatus(minorStatus="Submitting To CE") self.log.info("Submitting JobWrapper", f"{os.path.basename(wrapperFile)} to {self.ceName}CE") @@ -666,7 +667,7 @@ def _submitJob( taskID = 0 # We create a S_ERROR from the exception to compute it as a normal error self.computingElement.taskResults[taskID] = S_ERROR(unexpectedSubmitException) - self.submissionDict[jobID] = taskID + self.jobs[jobID]["TaskID"] = taskID return S_OK() # Submission results are processed in _checkSubmittedJobs @@ -684,7 +685,7 @@ def _submitJob( self.log.info("Job being submitted", f"(DIRAC JobID: {jobID}; Task ID: {taskID})") - self.submissionDict[jobID] = taskID + self.jobs[jobID]["TaskID"] = taskID time.sleep(self.jobSubmissionDelay) return S_OK() @@ -693,31 +694,26 @@ def _checkSubmittedJobs(self): # We expect the computingElement to have a taskResult dictionary. submissionErrors = [] payloadErrors = [] - originalJobID = self.jobReport.jobID # Loop over the jobIDs submitted to the CE # Here we iterate over a copy of the keys because we are modifying the dictionary within the loop - for jobID in list(self.submissionDict.keys()): - taskID = self.submissionDict[jobID] - if taskID not in self.computingElement.taskResults: + for jobID in list(self.jobs.keys()): + taskID = self.jobs[jobID].get("TaskID") + if taskID is None or taskID not in self.computingElement.taskResults: continue result = self.computingElement.taskResults[taskID] - # jobReport will handle different jobIDs - # setJobParameter() and setJobStatus() should send status immediately (sendFlag=True by default) - self.jobReport.setJob(jobID) # The submission process failed if not result["OK"]: self.log.error("Job submission failed", jobID) - self.jobReport.setJobParameter(par_name="ErrorMessage", par_value=f"{self.ceName} CE Submission Error") + self.jobs[jobID]["JobReport"].setJobParameter( + par_name="ErrorMessage", par_value=f"{self.ceName} CE Submission Error", sendFlag=False + ) self.log.error("Error in DIRAC JobWrapper or inner CE execution:", result["Message"]) submissionErrors.append(result["Message"]) self._rescheduleFailedJob(jobID, result["Message"]) - # Stop the JobAgent if too many CE errors self.hostFailureCount += 1 - if self.hostFailureCount > self.stopAfterHostFailures: - return self._finish(result["Message"], self.stopAfterHostFailures) # The payload failed (if result["Value"] is not 0) elif result["Value"]: @@ -726,19 +722,38 @@ def _checkSubmittedJobs(self): if not res["OK"]: return res if res["Value"][int(jobID)]["Status"] == JobStatus.RUNNING: - self.jobReport.setJobStatus(status=JobStatus.FAILED, minorStatus="Payload failed") + self.jobs[jobID]["JobReport"].setJobStatus( + status=JobStatus.FAILED, minorStatus="Payload failed", sendFlag=False + ) # Do not keep running and do not overwrite the Payload error message = f"Payload execution failed with error code {result['Value']}" payloadErrors.append(message) self.log.info(message) + # The job has been treated, we can commit the JobReport + res = self.jobs[jobID]["JobReport"].commit() + if not res["OK"]: + resFD = self.jobs[jobID]["JobReport"].generateForwardDISET() + if not resFD["OK"]: + self.log.error("Error generating ForwardDISET operation", resFD["Message"]) + elif resFD["Value"]: + # Here we create the Request. + op = resFD["Value"] + request = Request() + requestName = f"jobAgent_{jobID}" + request.RequestName = requestName.replace('"', "") + request.JobID = jobID + request.SourceComponent = f"JobAgent_{jobID}" + request.addOperation(op) + # This might fail, but only a message would be printed. + self._sendFailoverRequest(request) + # Remove taskID from computingElement.taskResults as it has been treated - # Remove jobID from submissionDict as it has been treated + # Remove jobID from jobs as it has been treated del self.computingElement.taskResults[taskID] - del self.submissionDict[jobID] + del self.jobs[jobID] - self.jobReport.setJob(originalJobID) return S_OK((submissionErrors, payloadErrors)) ############################################################################# @@ -777,9 +792,8 @@ def _extractValuesFromJobParams(self, params): submissionDict["jobID"] = params.get("JobID") if not submissionDict["jobID"]: msg = "Job has not JobID defined in JDL parameters" - self.jobReport.setJobStatus(status=JobStatus.FAILED, minorStatus=msg) self.log.warn(msg) - return S_ERROR("JDL Problem") + return S_ERROR(msg) submissionDict["jobType"] = params.get("JobType", "Unknown") if submissionDict["jobType"] == "Unknown": @@ -816,25 +830,19 @@ def _finish(self, message, stop=True): return S_OK(message) ############################################################################# - def _rescheduleFailedJob(self, jobID, message, direct=False): + def _rescheduleFailedJob(self, jobID, message): """ Set Job Status to "Rescheduled" and issue a reschedule command to the Job Manager """ self.log.warn("Failure ==> rescheduling", f"(during {message})") - if direct: - JobStateUpdateClient().setJobStatus( - int(jobID), status=JobStatus.RESCHEDULED, applicationStatus=message, source="JobAgent@%s", force=True - ) - else: - originalJobID = self.jobReport.jobID - self.jobReport.setJob(jobID) - # Setting a job parameter does not help since the job will be rescheduled, - # instead set the status with the cause and then another status showing the - # reschedule operation. - self.jobReport.setJobStatus(status=JobStatus.RESCHEDULED, applicationStatus=message, sendFlag=True) - self.jobReport.setJob(originalJobID) + # Setting a job parameter does not help since the job will be rescheduled, + # instead set the status with the cause and then another status showing the + # reschedule operation. + self.jobs[jobID]["JobReport"].setJobStatus( + status=JobStatus.RESCHEDULED, applicationStatus=message, sendFlag=True + ) self.log.info("Job will be rescheduled") result = JobManagerClient().rescheduleJob(jobID) @@ -882,11 +890,15 @@ def finalize(self): if not res["OK"]: self.log.error("CE could not be properly shut down", res["Message"]) - # Check the submitted jobs a last time - result = self._checkSubmittedJobs() - if not result["OK"]: - self.log.error("Problem while trying to get status of the last submitted jobs") + # Check the latest submitted jobs + while self.jobs: + result = self._checkSubmittedJobs() + if not result["OK"]: + self.log.error("Problem while trying to get status of the last submitted jobs") + break + time.sleep(int(self.am_getOption("PollingTime"))) + # Set the pilot status to Done gridCE = gConfig.getValue("/LocalSite/GridCE", "") queue = gConfig.getValue("/LocalSite/CEQueue", "") result = PilotManagerClient().setPilotStatus( diff --git a/src/DIRAC/WorkloadManagementSystem/Agent/PushJobAgent.py b/src/DIRAC/WorkloadManagementSystem/Agent/PushJobAgent.py index ebb15b29238..bb36280b84a 100644 --- a/src/DIRAC/WorkloadManagementSystem/Agent/PushJobAgent.py +++ b/src/DIRAC/WorkloadManagementSystem/Agent/PushJobAgent.py @@ -20,6 +20,7 @@ from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations from DIRAC.FrameworkSystem.Client.ProxyManagerClient import gProxyManager from DIRAC.WorkloadManagementSystem.Client import JobStatus +from DIRAC.WorkloadManagementSystem.Client.JobReport import JobReport from DIRAC.WorkloadManagementSystem.Utilities.QueueUtilities import getQueuesResolved from DIRAC.WorkloadManagementSystem.Service.WMSUtilities import getGridEnv from DIRAC.WorkloadManagementSystem.Agent.JobAgent import JobAgent @@ -187,8 +188,9 @@ def execute(self): matcherParams = ["JDL", "DN", "Group"] matcherInfo = jobRequest["Value"] jobID = matcherInfo["JobID"] - self.jobReport.setJob(jobID) - result = self._checkMatcherInfo(matcherInfo, matcherParams) + self.jobs[jobID] = {} + self.jobs[jobID]["JobReport"] = JobReport(jobID, f"{self.__class__.__name__}@{self.siteName}") + result = self._checkMatcherInfo(jobID, matcherInfo, matcherParams) if not result["OK"]: self.failedQueues[queueName] += 1 break @@ -207,7 +209,9 @@ def execute(self): # Get JDL paramters parameters = self._getJDLParameters(jobJDL) if not parameters["OK"]: - self.jobReport.setJobStatus(status=JobStatus.FAILED, minorStatus="Could Not Extract JDL Parameters") + self.jobs[jobID]["JobReport"].setJobStatus( + status=JobStatus.FAILED, minorStatus="Could Not Extract JDL Parameters" + ) self.log.warn("Could Not Extract JDL Parameters", parameters["Message"]) self.failedQueues[queueName] += 1 break @@ -224,8 +228,10 @@ def execute(self): self.log.verbose("Job request successful: \n", jobRequest["Value"]) self.log.info("Received", f"JobID={jobID}, JobType={jobType}, OwnerDN={ownerDN}, JobGroup={jobGroup}") - self.jobReport.setJobParameter(par_name="MatcherServiceTime", par_value=str(matchTime), sendFlag=False) - self.jobReport.setJobStatus( + self.jobs[jobID]["JobReport"].setJobParameter( + par_name="MatcherServiceTime", par_value=str(matchTime), sendFlag=False + ) + self.jobs[jobID]["JobReport"].setJobStatus( status=JobStatus.MATCHED, minorStatus="Job Received by Agent", sendFlag=False ) @@ -238,7 +244,8 @@ def execute(self): proxyChain = result_setupProxy.get("Value") # Check software and install them if required - software = self._checkInstallSoftware(jobID, params, ceDict) + self.jobs[jobID]["JobReport"].setJobStatus(minorStatus="Installing Software", sendFlag=False) + software = self._checkInstallSoftware(params, ceDict) if not software["OK"]: self.log.error("Failed to install software for job", f"{jobID}") errorMsg = software["Message"] @@ -267,24 +274,6 @@ def execute(self): break self.log.debug(f"After {self.ceName}CE submitJob()") - # Committing the JobReport before evaluating the result of job submission - res = self.jobReport.commit() - if not res["OK"]: - resFD = self.jobReport.generateForwardDISET() - if not resFD["OK"]: - self.log.error("Error generating ForwardDISET operation", resFD["Message"]) - elif resFD["Value"]: - # Here we create the Request. - op = resFD["Value"] - request = Request() - requestName = f"jobAgent_{jobID}" - request.RequestName = requestName.replace('"', "") - request.JobID = jobID - request.SourceComponent = f"JobAgent_{jobID}" - request.addOperation(op) - # This might fail, but only a message would be printed. - self._sendFailoverRequest(request) - # Check that there is enough slots locally result = self._checkCEAvailability(self.computingElement) if not result["OK"] or result["Value"]: diff --git a/src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_JobAgent.py b/src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_JobAgent.py index 2a53908c3a4..20c6a542f17 100644 --- a/src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_JobAgent.py +++ b/src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_JobAgent.py @@ -238,12 +238,15 @@ def test__checkMatcherInfo(mocker, matcherInfo, matcherParams, expectedResult): mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobAgent.AgentModule.__init__") mocker.patch("DIRAC.WorkloadManagementSystem.Client.JobReport.JobReport.setJobStatus") + jobID = 123 jobAgent = JobAgent("Test", "Test1") jobAgent.log = gLogger jobAgent.log.setLevel("DEBUG") - jobAgent.jobReport = JobReport(123) - result = jobAgent._checkMatcherInfo(matcherInfo, matcherParams) + jobAgent.jobs[jobID] = {} + jobAgent.jobs[jobID]["JobReport"] = JobReport(jobID) + + result = jobAgent._checkMatcherInfo(jobID, matcherInfo, matcherParams) assert result["OK"] == expectedResult["OK"] if "Value" in result: assert result["Value"] == expectedResult["Value"] @@ -353,12 +356,16 @@ def test__checkInstallSoftware(mocker): mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobAgent.AgentModule.__init__") + jobID = 123 + jobAgent = JobAgent("Test", "Test1") jobAgent.log = gLogger jobAgent.log.setLevel("DEBUG") - jobAgent.jobReport = JobReport(123) - result = jobAgent._checkInstallSoftware(101, {}, {}) + jobAgent.jobs[jobID] = {} + jobAgent.jobs[jobID]["JobReport"] = JobReport(jobID) + + result = jobAgent._checkInstallSoftware({}, {}) assert result["OK"], result["Message"] assert result["Value"] == "Job has no software installation requirement" @@ -418,32 +425,121 @@ def test__getJDLParameters(mocker): assert result["Value"]["Tags"] == ["16Processors", "MultiProcessor"] -@pytest.mark.parametrize( - "mockJMInput, expected", - [ - ({"OK": True}, {"OK": True, "Value": "Problem Rescheduling Job"}), - ( - {"OK": False, "Message": "Test"}, - {"OK": True, "Value": "Problem Rescheduling Job"}, - ), - ], -) -def test__rescheduleFailedJob(mocker, mockJMInput, expected): - """Testing JobAgent()._rescheduleFailedJob()""" +############################################################################# + + +def test__rescheduleFailedJob_success(mocker): + """Testing rescheduleFailedJob success""" + mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobAgent.AgentModule.__init__") + mocker.patch( + "DIRAC.WorkloadManagementSystem.Client.JobManagerClient.JobManagerClient.rescheduleJob", return_value=S_OK() + ) + + jobAgent = JobAgent("Test", "Test1") + + jobID = 101 + message = "An error occurred" + + jobAgent.log = gLogger + jobAgent.log.setLevel("DEBUG") + + jobAgent.jobs[jobID] = {} + jobAgent.jobs[jobID]["JobReport"] = JobReport(jobID) + + result = jobAgent._rescheduleFailedJob(jobID, message) + + assert not result["OK"], result + assert result["Message"] == "Job Rescheduled", result + + # Because the jobReport cannot communicate with the JobManager, we are supposed to have the report info here + jobReport = jobAgent.jobs[jobID]["JobReport"] + assert len(jobReport.jobStatusInfo) == 1, jobReport.jobStatusInfo + assert jobReport.jobStatusInfo[0][0] == "Rescheduled", jobReport.jobStatusInfo[0][0] + assert jobReport.jobStatusInfo[0][1] == "", jobReport.jobStatusInfo[0][1] + + assert len(jobReport.appStatusInfo) == 1, jobReport.appStatusInfo + assert jobReport.appStatusInfo[0][0] == message, jobReport.appStatusInfo[0][0] + + +def test__rescheduleFailedJob_fail(mocker): + """Testing rescheduleFailedJob when the job fails to be rescheduled.""" mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobAgent.AgentModule.__init__") + mocker.patch( + "DIRAC.WorkloadManagementSystem.Client.JobManagerClient.JobManagerClient.rescheduleJob", + return_value=S_ERROR("Cannot contact JobManager"), + ) + jobAgent = JobAgent("Test", "Test1") jobID = 101 - message = "Test" + message = "An error occurred" jobAgent.log = gLogger jobAgent.log.setLevel("DEBUG") - jobAgent.jobReport = JobReport(jobID) + + jobAgent.jobs[jobID] = {} + jobAgent.jobs[jobID]["JobReport"] = JobReport(jobID) result = jobAgent._rescheduleFailedJob(jobID, message) - result = jobAgent._finish(result["Message"], False) - assert result == expected + assert not result["OK"], result + assert result["Message"] == "Problem Rescheduling Job" + + # The JobManager could not be contacted to reschedule the jobs + # In such a case, we do not expect any status in the jobReport job/appStatusInfo + # TODO: rescheduling is currently performed in 2 operations: setJobStatus and rescheduleJob + # This should be changed to a single operation in the future, then we can adjust this test + # jobReport = jobAgent.jobs[jobID]["JobReport"] + # assert len(jobReport.jobStatusInfo) == 0 + # assert len(jobReport.appStatusInfo) == 0 + + +def test__rescheduleFailedJob_multipleJobIDs(mocker): + """Testing rescheduleFailedJob() with multiple jobIDs""" + mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobAgent.AgentModule.__init__") + mocker.patch( + "DIRAC.WorkloadManagementSystem.Client.JobManagerClient.JobManagerClient.rescheduleJob", + return_value=S_ERROR("Cannot contact JobManager"), + ) + + jobAgent = JobAgent("Test", "Test1") + + jobID1 = 101 + jobID2 = 102 + message = "An error occurred" + + jobAgent.log = gLogger + jobAgent.log.setLevel("DEBUG") + + jobAgent.jobs[jobID1] = {} + jobAgent.jobs[jobID1]["JobReport"] = JobReport(jobID1) + + jobAgent.jobs[jobID2] = {} + jobAgent.jobs[jobID2]["JobReport"] = JobReport(jobID2) + + # First rescheduled job + result = jobAgent._rescheduleFailedJob(jobID1, message) + + assert not result["OK"], result + assert result["Message"] == "Problem Rescheduling Job" + + # Second rescheduled job + result = jobAgent._rescheduleFailedJob(jobID2, message) + + assert not result["OK"], result + assert result["Message"] == "Problem Rescheduling Job" + + # Here we want to make sure that rescheduleFailedJob created 2 distinct reports + jobReport1 = jobAgent.jobs[jobID1]["JobReport"] + assert len(jobReport1.jobStatusInfo) == 1 + assert len(jobReport1.appStatusInfo) == 1 + + jobReport2 = jobAgent.jobs[jobID2]["JobReport"] + assert len(jobReport2.jobStatusInfo) == 1 + assert len(jobReport2.appStatusInfo) == 1 + + +############################################################################# @pytest.mark.parametrize( @@ -507,6 +603,7 @@ def test_submitAndCheckJob(mocker, localCE, job, expectedResult1, expectedResult return_value=S_OK({int(jobID): {"Status": JobStatus.RUNNING}}), ) mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobAgent.createJobWrapper", return_value=S_OK([jobName])) + mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobAgent.JobAgent._sendFailoverRequest", return_value=S_OK()) mocker.patch("DIRAC.Core.Security.X509Chain.X509Chain.dumpAllToString", return_value=S_OK()) mocker.patch( "DIRAC.Resources.Computing.SingularityComputingElement.SingularityComputingElement._SingularityComputingElement__hasSingularity", @@ -516,9 +613,11 @@ def test_submitAndCheckJob(mocker, localCE, job, expectedResult1, expectedResult jobAgent = JobAgent("JobAgent", "Test") jobAgent.log = gLogger.getSubLogger("JobAgent") jobAgent._initializeComputingElement(localCE) - jobAgent.jobReport = JobReport(jobID) jobAgent.jobSubmissionDelay = 3 + jobAgent.jobs[jobID] = {} + jobAgent.jobs[jobID]["JobReport"] = JobReport(jobID) + # Submit a job result = jobAgent._submitJob( jobID=jobID, jobParams={}, resourceParams={}, optimizerParams={}, proxyChain=X509Chain() @@ -527,9 +626,9 @@ def test_submitAndCheckJob(mocker, localCE, job, expectedResult1, expectedResult # at the level of the JobAgent assert result["OK"] - # Check that the job was added to jobAgent.submissionDict - assert len(jobAgent.submissionDict) == 1 - assert jobID in jobAgent.submissionDict + # Check that the job was added to jobAgent.jobs + assert len(jobAgent.jobs) == 1 + assert jobID in jobAgent.jobs # If the submission is synchronous jobAgent.computingElement.taskResults # should already contain the result @@ -545,8 +644,8 @@ def test_submitAndCheckJob(mocker, localCE, job, expectedResult1, expectedResult # Check errors that could have occurred in the innerCE result = jobAgent._checkSubmittedJobs() - assert result["OK"] - assert result["Value"] == expectedResult1 + assert result["OK"], result + assert result["Value"] == expectedResult1, result # If the submission is synchronous jobAgent.computingElement.taskResults # should not contain the result anymore: already processed by checkSubmittedJobs @@ -586,6 +685,7 @@ def test_submitAndCheck2Jobs(mocker): mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobAgent.AgentModule.__init__") mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobAgent.JobAgent.am_stopExecution") mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobAgent.createJobWrapper", return_value=S_OK(["jobWrapper.py"])) + mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobAgent.JobAgent._sendFailoverRequest", return_value=S_OK()) mocker.patch("DIRAC.Core.Security.X509Chain.X509Chain.dumpAllToString", return_value=S_OK()) mocker.patch( "DIRAC.Resources.Computing.InProcessComputingElement.InProcessComputingElement.submitJob", @@ -598,12 +698,12 @@ def test_submitAndCheck2Jobs(mocker): jobAgent.ceName = "InProcess" jobAgent.jobSubmissionDelay = 0 - jobAgent.jobReport = JobReport(0) - mocker.patch.object(jobAgent, "jobReport", autospec=True) mock_rescheduleFailedJob = mocker.patch.object(jobAgent, "_rescheduleFailedJob") # Submit a first job: should be successful jobID = "123" + jobAgent.jobs[jobID] = {} + jobAgent.jobs[jobID]["JobReport"] = JobReport(jobID) result = jobAgent._submitJob( jobID=jobID, jobParams={}, resourceParams={}, optimizerParams={}, proxyChain=X509Chain() ) @@ -611,9 +711,9 @@ def test_submitAndCheck2Jobs(mocker): # at the level of the JobAgent assert result["OK"] - # Check that the job was added to jobAgent.submissionDict - assert len(jobAgent.submissionDict) == 1 - assert jobID in jobAgent.submissionDict + # Check that the job was added to jobAgent.jobs + assert len(jobAgent.jobs) == 1 + assert jobID in jobAgent.jobs # The submission is synchronous taskResults should already contain the result assert len(jobAgent.computingElement.taskResults) == 1 @@ -627,6 +727,8 @@ def test_submitAndCheck2Jobs(mocker): # Submit a second job: should fail jobID = "456" + jobAgent.jobs[jobID] = {} + jobAgent.jobs[jobID]["JobReport"] = JobReport(jobID) result = jobAgent._submitJob( jobID=jobID, jobParams={}, resourceParams={}, optimizerParams={}, proxyChain=X509Chain() )