From 5d8c98c27acba509f5393a99d4d70a6f14eb18e1 Mon Sep 17 00:00:00 2001 From: aldbr Date: Tue, 16 Jan 2024 18:09:51 +0100 Subject: [PATCH 1/2] feat: add a test to make sure the correct job is rescheduled by JobAgent --- .../Agent/test/Test_Agent_JobAgent.py | 70 +++++++++++++++++++ 1 file changed, 70 insertions(+) diff --git a/src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_JobAgent.py b/src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_JobAgent.py index 11172103d04..0495fb495de 100644 --- a/src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_JobAgent.py +++ b/src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_JobAgent.py @@ -578,3 +578,73 @@ def test_submitAndCheckJob(mocker, localCE, job, expectedResult1, expectedResult # From here, taskResults should be empty assert jobID in jobAgent.submissionDict assert len(jobAgent.computingElement.taskResults) == 0 + + +def test_submitAndCheck2Jobs(mocker): + """Test the submission and the management of the job status. + + This time, a first job is successfully submitted, but the second submission fails. + We want to make sure that both jobs are correctly managed. + """ + # Mock the JobAgent + mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobAgent.AgentModule.__init__") + mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobAgent.JobAgent.am_stopExecution") + mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobAgent.createJobWrapper", return_value=S_OK(["jobWrapper.py"])) + mocker.patch("DIRAC.Core.Security.X509Chain.X509Chain.dumpAllToString", return_value=S_OK()) + mocker.patch( + "DIRAC.Resources.Computing.InProcessComputingElement.InProcessComputingElement.submitJob", + side_effect=[S_OK(), S_ERROR("ComputingElement error")], + ) + + jobAgent = JobAgent("JobAgent", "Test") + jobAgent.log = gLogger.getSubLogger("JobAgent") + jobAgent._initializeComputingElement("InProcess") + jobAgent.ceName = "InProcess" + jobAgent.jobSubmissionDelay = 0 + + jobAgent.jobReport = JobReport(0) + mocker.patch.object(jobAgent, "jobReport", autospec=True) + mock_rescheduleFailedJob = mocker.patch.object(jobAgent, "_rescheduleFailedJob") + + # Submit a first job: should be successful + jobID = "123" + result = jobAgent._submitJob( + jobID=jobID, jobParams={}, resourceParams={}, optimizerParams={}, proxyChain=X509Chain() + ) + # Check that no error occurred during the submission process + # at the level of the JobAgent + assert result["OK"] + + # Check that the job was added to jobAgent.submissionDict + assert len(jobAgent.submissionDict) == 1 + assert jobID in jobAgent.submissionDict + + # The submission is synchronous taskResults should already contain the result + assert len(jobAgent.computingElement.taskResults) == 1 + + # Check errors that could have occurred in the innerCE + result = jobAgent._checkSubmittedJobs() + assert result["OK"] + assert result["Value"] == ([], []) + + mock_rescheduleFailedJob.assert_not_called() + + # Submit a second job: should fail + jobID = "456" + result = jobAgent._submitJob( + jobID=jobID, jobParams={}, resourceParams={}, optimizerParams={}, proxyChain=X509Chain() + ) + # Check that no error occurred during the submission process + # at the level of the JobAgent + assert result["OK"] + + # Check errors that could have occurred in the innerCE + result = jobAgent._checkSubmittedJobs() + assert result["OK"] + assert result["Value"] == (["ComputingElement error"], []) + + # Make sure that the correct job is rescheduled + mock_rescheduleFailedJob.assert_called_with(jobID, "ComputingElement error") + + # From here, taskResults should be empty + assert len(jobAgent.computingElement.taskResults) == 0 From c1c49c4cc8d10366f4b5277962f78fa72b0beb4f Mon Sep 17 00:00:00 2001 From: aldbr Date: Wed, 17 Jan 2024 08:18:55 +0100 Subject: [PATCH 2/2] fix: reschedule the correct job in JobAgent --- src/DIRAC/WorkloadManagementSystem/Agent/JobAgent.py | 7 ++++++- .../Agent/test/Test_Agent_JobAgent.py | 6 +----- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/src/DIRAC/WorkloadManagementSystem/Agent/JobAgent.py b/src/DIRAC/WorkloadManagementSystem/Agent/JobAgent.py index d88fe0ef78a..80dbd4c3f66 100755 --- a/src/DIRAC/WorkloadManagementSystem/Agent/JobAgent.py +++ b/src/DIRAC/WorkloadManagementSystem/Agent/JobAgent.py @@ -694,7 +694,10 @@ def _checkSubmittedJobs(self): submissionErrors = [] payloadErrors = [] originalJobID = self.jobReport.jobID - for jobID, taskID in self.submissionDict.items(): + # Loop over the jobIDs submitted to the CE + # Here we iterate over a copy of the keys because we are modifying the dictionary within the loop + for jobID in list(self.submissionDict.keys()): + taskID = self.submissionDict[jobID] if taskID not in self.computingElement.taskResults: continue @@ -731,7 +734,9 @@ def _checkSubmittedJobs(self): self.log.info(message) # Remove taskID from computingElement.taskResults as it has been treated + # Remove jobID from submissionDict as it has been treated del self.computingElement.taskResults[taskID] + del self.submissionDict[jobID] self.jobReport.setJob(originalJobID) return S_OK((submissionErrors, payloadErrors)) diff --git a/src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_JobAgent.py b/src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_JobAgent.py index 0495fb495de..2a53908c3a4 100644 --- a/src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_JobAgent.py +++ b/src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_JobAgent.py @@ -517,6 +517,7 @@ def test_submitAndCheckJob(mocker, localCE, job, expectedResult1, expectedResult jobAgent.log = gLogger.getSubLogger("JobAgent") jobAgent._initializeComputingElement(localCE) jobAgent.jobReport = JobReport(jobID) + jobAgent.jobSubmissionDelay = 3 # Submit a job result = jobAgent._submitJob( @@ -547,10 +548,6 @@ def test_submitAndCheckJob(mocker, localCE, job, expectedResult1, expectedResult assert result["OK"] assert result["Value"] == expectedResult1 - # Check that the job is still present in jobAgent.submissionDict - assert len(jobAgent.submissionDict) == 1 - assert jobID in jobAgent.submissionDict - # If the submission is synchronous jobAgent.computingElement.taskResults # should not contain the result anymore: already processed by checkSubmittedJobs if not jobAgent.computingElement.ceParameters.get("AsyncSubmission", False): @@ -576,7 +573,6 @@ def test_submitAndCheckJob(mocker, localCE, job, expectedResult1, expectedResult assert result["Value"] == expectedResult2 # From here, taskResults should be empty - assert jobID in jobAgent.submissionDict assert len(jobAgent.computingElement.taskResults) == 0