Skip to content

Commit

Permalink
Merge pull request #7751 from fstagni/cherry-pick-2-8715e1e0e-integra…
Browse files Browse the repository at this point in the history
…tion

[sweep:integration] Proper handling of waiting jobs when set to be killed
  • Loading branch information
fstagni committed Aug 13, 2024
2 parents 9f95f61 + f045505 commit 1cba046
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 43 deletions.
11 changes: 4 additions & 7 deletions src/DIRAC/WorkloadManagementSystem/Client/JobStatus.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
from DIRAC.Core.Utilities.StateMachine import State, StateMachine
from DIRAC.Core.Utilities.Decorators import deprecated

from DIRAC.WorkloadManagementSystem.Client.JobMonitoringClient import JobMonitoringClient


#:
SUBMITTING = "Submitting"
Expand Down Expand Up @@ -129,7 +131,7 @@ def checkJobStateTransition(jobID, candidateState, currentStatus=None, jobMonito
return S_OK()


def filterJobStateTransition(jobIDs, candidateState, jobMonitoringClient=None):
def filterJobStateTransition(jobIDs, candidateState):
"""Given a list of jobIDs, return a list that are allowed to transition
to the given candidate state.
"""
Expand All @@ -138,12 +140,7 @@ def filterJobStateTransition(jobIDs, candidateState, jobMonitoringClient=None):
if not isinstance(jobIDs, list):
jobIDs = [jobIDs]

if not jobMonitoringClient:
from DIRAC.WorkloadManagementSystem.Client.JobMonitoringClient import JobMonitoringClient

jobMonitoringClient = JobMonitoringClient()

res = jobMonitoringClient.getJobsStatus(jobIDs)
res = JobMonitoringClient().getJobsStatus(jobIDs)
if not res["OK"]:
return res

Expand Down
66 changes: 30 additions & 36 deletions src/DIRAC/WorkloadManagementSystem/Service/JobManagerHandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader
from DIRAC.FrameworkSystem.Client.ProxyManagerClient import gProxyManager
from DIRAC.StorageManagementSystem.Client.StorageManagerClient import StorageManagerClient
from DIRAC.WorkloadManagementSystem.Client.JobStatus import filterJobStateTransition
from DIRAC.WorkloadManagementSystem.Client import JobStatus
from DIRAC.WorkloadManagementSystem.Service.JobPolicy import (
RIGHT_DELETE,
Expand Down Expand Up @@ -305,6 +306,9 @@ def __getJobList(jobInput):
:return : a list of int job IDs
"""

if not jobInput:
return []

if isinstance(jobInput, int):
return [jobInput]
if isinstance(jobInput, str):
Expand Down Expand Up @@ -485,55 +489,44 @@ def __killJob(self, jobID, sendKillCommand=True):

return S_OK()

def __kill_delete_jobs(self, jobIDList, right):
def _kill_delete_jobs(self, jobIDList, right):
"""Kill (== set the status to "KILLED") or delete (== set the status to "DELETED") jobs as necessary
:param list jobIDList: job IDs
:param str right: right
:param str right: RIGHT_KILL or RIGHT_DELETE
:return: S_OK()/S_ERROR()
"""
jobList = self.__getJobList(jobIDList)
if not jobList:
return S_ERROR("Invalid job specification: " + str(jobIDList))
self.log.warn("No jobs specified")
return S_OK([])

validJobList, invalidJobList, nonauthJobList, ownerJobList = self.jobPolicy.evaluateJobRights(jobList, right)

badIDs = []

killJobList = []
deleteJobList = []
if validJobList:
# Get job status to see what is to be killed or deleted
result = self.jobDB.getJobsAttributes(validJobList, ["Status"])
# Get the jobs allowed to transition to the Killed state
filterRes = filterJobStateTransition(validJobList, JobStatus.KILLED)
if not filterRes["OK"]:
return filterRes
killJobList.extend(filterRes["Value"])

if not right == RIGHT_KILL:
# Get the jobs allowed to transition to the Deleted state
filterRes = filterJobStateTransition(validJobList, JobStatus.DELETED)
if not filterRes["OK"]:
return filterRes
deleteJobList.extend(filterRes["Value"])

# Look for jobs that are in the Staging state to send kill signal to the stager
result = self.jobDB.getJobsAttributes(killJobList, ["Status"])
if not result["OK"]:
return result
killJobList = []
deleteJobList = []
markKilledJobList = []
stagingJobList = []
for jobID, sDict in result["Value"].items():
if sDict["Status"] in (JobStatus.RUNNING, JobStatus.MATCHED, JobStatus.STALLED):
killJobList.append(jobID)
elif sDict["Status"] in (
JobStatus.SUBMITTING,
JobStatus.RECEIVED,
JobStatus.CHECKING,
JobStatus.WAITING,
JobStatus.RESCHEDULED,
JobStatus.DONE,
JobStatus.FAILED,
JobStatus.KILLED,
):
if not right == RIGHT_KILL:
deleteJobList.append(jobID)
else:
markKilledJobList.append(jobID)
if sDict["Status"] in [JobStatus.STAGING]:
stagingJobList.append(jobID)

for jobID in markKilledJobList:
result = self.__killJob(jobID, sendKillCommand=False)
if not result["OK"]:
badIDs.append(jobID)
stagingJobList = [jobID for jobID, sDict in result["Value"].items() if sDict["Status"] == JobStatus.STAGING]

for jobID in killJobList:
result = self.__killJob(jobID)
Expand Down Expand Up @@ -562,7 +555,8 @@ def __kill_delete_jobs(self, jobIDList, right):
result["FailedJobIDs"] = badIDs
return result

result = S_OK(validJobList)
jobsList = killJobList if right == RIGHT_KILL else deleteJobList
result = S_OK(jobsList)
result["requireProxyUpload"] = len(ownerJobList) > 0 and self.__checkIfProxyUploadIsRequired()

if invalidJobList:
Expand All @@ -581,7 +575,7 @@ def export_deleteJob(self, jobIDs):
:return: S_OK/S_ERROR
"""

return self.__kill_delete_jobs(jobIDs, RIGHT_DELETE)
return self._kill_delete_jobs(jobIDs, RIGHT_DELETE)

###########################################################################
types_killJob = []
Expand All @@ -594,7 +588,7 @@ def export_killJob(self, jobIDs):
:return: S_OK/S_ERROR
"""

return self.__kill_delete_jobs(jobIDs, RIGHT_KILL)
return self._kill_delete_jobs(jobIDs, RIGHT_KILL)

###########################################################################
types_resetJob = []
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
""" unit test (pytest) of JobManager service
"""

from unittest.mock import MagicMock
import pytest

from DIRAC import gLogger

gLogger.setLevel("DEBUG")

from DIRAC.WorkloadManagementSystem.Service.JobPolicy import (
RIGHT_DELETE,
RIGHT_KILL,
)

# sut
from DIRAC.WorkloadManagementSystem.Service.JobManagerHandler import JobManagerHandlerMixin

# mocks
jobPolicy_mock = MagicMock()
jobDB_mock = MagicMock()
jobDB_mock.getJobsAttributes.return_value = {"OK": True, "Value": {}}


@pytest.mark.parametrize(
"jobIDs_list, right, lists, filteredJobsList, expected_res, expected_value",
[
([], RIGHT_KILL, ([], [], [], []), [], True, []),
([], RIGHT_DELETE, ([], [], [], []), [], True, []),
(1, RIGHT_KILL, ([], [], [], []), [], True, []),
(1, RIGHT_KILL, ([1], [], [], []), [], True, []),
([1, 2], RIGHT_KILL, ([], [], [], []), [], True, []),
([1, 2], RIGHT_KILL, ([1], [], [], []), [], True, []),
(1, RIGHT_KILL, ([1], [], [], []), [1], True, [1]),
([1, 2], RIGHT_KILL, ([1], [], [], []), [1], True, [1]),
([1, 2], RIGHT_KILL, ([1], [2], [], []), [1], True, [1]),
([1, 2], RIGHT_KILL, ([1], [2], [], []), [], True, []),
([1, 2], RIGHT_KILL, ([1, 2], [], [], []), [1, 2], True, [1, 2]),
],
)
def test___kill_delete_jobs(mocker, jobIDs_list, right, lists, filteredJobsList, expected_res, expected_value):
mocker.patch(
"DIRAC.WorkloadManagementSystem.Service.JobManagerHandler.filterJobStateTransition",
return_value={"OK": True, "Value": filteredJobsList},
)

JobManagerHandlerMixin.log = gLogger
JobManagerHandlerMixin.jobPolicy = jobPolicy_mock
JobManagerHandlerMixin.jobDB = jobDB_mock
JobManagerHandlerMixin.taskQueueDB = MagicMock()

jobPolicy_mock.evaluateJobRights.return_value = lists

jm = JobManagerHandlerMixin()

res = jm._kill_delete_jobs(jobIDs_list, right)
assert res["OK"] == expected_res
assert res["Value"] == expected_value

0 comments on commit 1cba046

Please sign in to comment.