Skip to content

Commit

Permalink
fix: reschedule the correct job in JobAgent
Browse files Browse the repository at this point in the history
  • Loading branch information
aldbr committed Jan 17, 2024
1 parent 5d8c98c commit c1c49c4
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 6 deletions.
7 changes: 6 additions & 1 deletion src/DIRAC/WorkloadManagementSystem/Agent/JobAgent.py
Original file line number Diff line number Diff line change
Expand Up @@ -694,7 +694,10 @@ def _checkSubmittedJobs(self):
submissionErrors = []
payloadErrors = []
originalJobID = self.jobReport.jobID
for jobID, taskID in self.submissionDict.items():
# Loop over the jobIDs submitted to the CE
# Here we iterate over a copy of the keys because we are modifying the dictionary within the loop
for jobID in list(self.submissionDict.keys()):
taskID = self.submissionDict[jobID]
if taskID not in self.computingElement.taskResults:
continue

Expand Down Expand Up @@ -731,7 +734,9 @@ def _checkSubmittedJobs(self):
self.log.info(message)

# Remove taskID from computingElement.taskResults as it has been treated
# Remove jobID from submissionDict as it has been treated
del self.computingElement.taskResults[taskID]
del self.submissionDict[jobID]

self.jobReport.setJob(originalJobID)
return S_OK((submissionErrors, payloadErrors))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,7 @@ def test_submitAndCheckJob(mocker, localCE, job, expectedResult1, expectedResult
jobAgent.log = gLogger.getSubLogger("JobAgent")
jobAgent._initializeComputingElement(localCE)
jobAgent.jobReport = JobReport(jobID)
jobAgent.jobSubmissionDelay = 3

# Submit a job
result = jobAgent._submitJob(
Expand Down Expand Up @@ -547,10 +548,6 @@ def test_submitAndCheckJob(mocker, localCE, job, expectedResult1, expectedResult
assert result["OK"]
assert result["Value"] == expectedResult1

# Check that the job is still present in jobAgent.submissionDict
assert len(jobAgent.submissionDict) == 1
assert jobID in jobAgent.submissionDict

# If the submission is synchronous jobAgent.computingElement.taskResults
# should not contain the result anymore: already processed by checkSubmittedJobs
if not jobAgent.computingElement.ceParameters.get("AsyncSubmission", False):
Expand All @@ -576,7 +573,6 @@ def test_submitAndCheckJob(mocker, localCE, job, expectedResult1, expectedResult
assert result["Value"] == expectedResult2

# From here, taskResults should be empty
assert jobID in jobAgent.submissionDict
assert len(jobAgent.computingElement.taskResults) == 0


Expand Down

0 comments on commit c1c49c4

Please sign in to comment.