From 8f88381da060bc053ecbc9836d78ade81ff90054 Mon Sep 17 00:00:00 2001 From: Federico Stagni Date: Fri, 13 Sep 2024 14:51:45 +0200 Subject: [PATCH] sweep: #7789 feat: SD will always bundle proxy --- .../WorkloadManagement/tagsAndJobs.rst | 1 - .../Tutorials/installWMS.rst | 1 - .../Agent/SiteDirector.py | 22 +++++-------- .../Agent/test/Test_Agent_SiteDirector.py | 31 ++++++++++++++++--- .../Utilities/QueueUtilities.py | 4 --- 5 files changed, 33 insertions(+), 26 deletions(-) diff --git a/docs/source/AdministratorGuide/Systems/WorkloadManagement/tagsAndJobs.rst b/docs/source/AdministratorGuide/Systems/WorkloadManagement/tagsAndJobs.rst index 6c1e087cca4..1e85686cb96 100644 --- a/docs/source/AdministratorGuide/Systems/WorkloadManagement/tagsAndJobs.rst +++ b/docs/source/AdministratorGuide/Systems/WorkloadManagement/tagsAndJobs.rst @@ -35,7 +35,6 @@ Let's take an example:: maxCPUTime = 200 MaxTotalJobs = 5 MaxWaitingJobs = 10 - BundleProxy = True RemoveOutput = True } # This queue has Tag = GPU. So it will accept: diff --git a/docs/source/AdministratorGuide/Tutorials/installWMS.rst b/docs/source/AdministratorGuide/Tutorials/installWMS.rst index 6a1e6e510c3..d4fb341d9ae 100644 --- a/docs/source/AdministratorGuide/Tutorials/installWMS.rst +++ b/docs/source/AdministratorGuide/Tutorials/installWMS.rst @@ -227,7 +227,6 @@ Then, as ``diracuser`` with the ``dirac_admin`` proxy, we need to define a CE in CPUTime = 40000 MaxTotalJobs = 5 MaxWaitingJobs = 10 - BundleProxy = True BatchError = /home/diracpilot/localsite/error ExecutableArea = /home/diracpilot/localsite/submission RemoveOutput = True diff --git a/src/DIRAC/WorkloadManagementSystem/Agent/SiteDirector.py b/src/DIRAC/WorkloadManagementSystem/Agent/SiteDirector.py index 5e0ac9f834d..d9a480e0019 100644 --- a/src/DIRAC/WorkloadManagementSystem/Agent/SiteDirector.py +++ b/src/DIRAC/WorkloadManagementSystem/Agent/SiteDirector.py @@ -168,8 +168,7 @@ def beginExecution(self): self.log.always("MaxPilotsToSubmit:", self.maxPilotsToSubmit) # Build the dictionary of queues that are going to be used: self.queueDict - result = self._buildQueueDict(siteNames, ceTypes, ces, tags) - if not result: + if not (result := self._buildQueueDict(siteNames, ceTypes, ces, tags))["OK"]: return result # Stop the execution if there is no usable queue @@ -449,16 +448,11 @@ def _submitPilotsToQueue(self, pilotsToSubmit: int, ce: ComputingElement, queue: """ self.log.info("Going to submit pilots", f"(a maximum of {pilotsToSubmit} pilots to {queue} queue)") - # Get parameters to generate the pilot executable - bundleProxy = self.queueDict[queue].get("BundleProxy", False) - proxy = None - if bundleProxy: - proxy = ce.proxy jobExecDir = self.queueDict[queue]["ParametersDict"].get("JobExecDir", "") envVariables = self.queueDict[queue]["ParametersDict"].get("EnvironmentVariables", None) # Generate the executable - executable = self._getExecutable(queue, proxy=proxy, jobExecDir=jobExecDir, envVariables=envVariables) + executable = self._getExecutable(queue, proxy=ce.proxy, jobExecDir=jobExecDir, envVariables=envVariables) # Submit the job submitResult = ce.submitJob(executable, "", pilotsToSubmit) @@ -564,13 +558,11 @@ def _addPilotReferences(self, queue: str, pilotList: list[str], stampDict: dict[ return result return S_OK() - def _getExecutable( - self, queue: str, proxy: X509Chain = None, jobExecDir: str = "", envVariables: dict[str, str] = None - ): + def _getExecutable(self, queue: str, proxy: X509Chain, jobExecDir: str = "", envVariables: dict[str, str] = None): """Prepare the full executable for queue :param queue: queue name - :param proxy: flag that say if to bundle or not the proxy + :param proxy: proxy to bundle :param jobExecDir: pilot execution dir (normally an empty string) :returns: a string the options for the pilot @@ -580,6 +572,7 @@ def _getExecutable( if not pilotOptions: self.log.warn("Pilots will be submitted without additional options") pilotOptions = [] + pilotOptions = " ".join(pilotOptions) self.log.verbose(f"pilotOptions: {pilotOptions}") @@ -614,7 +607,7 @@ def _getPilotOptions(self, queue: str) -> list[str]: setup = gConfig.getValue("/DIRAC/Setup", "unknown") if setup == "unknown": self.log.error("Setup is not defined in the configuration") - return [None, None] + return [] pilotOptions.append(f"-S {setup}") opsHelper = Operations(vo=self.vo, setup=setup) @@ -687,7 +680,7 @@ def _writePilotScript( self, workingDirectory: str, pilotOptions: str, - proxy: X509Chain = None, + proxy: X509Chain, pilotExecDir: str = "", envVariables: dict[str, str] = None, ): @@ -717,7 +710,6 @@ def _writePilotScript( location=location, CVMFS_locations=CVMFS_locations, ) - return _writePilotWrapperFile(workingDirectory=workingDirectory, localPilot=localPilot) ##################################################################################### diff --git a/src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_SiteDirector.py b/src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_SiteDirector.py index d005cf3bd87..a20c09e8cc7 100644 --- a/src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_SiteDirector.py +++ b/src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_SiteDirector.py @@ -4,18 +4,17 @@ import datetime import os +from unittest.mock import MagicMock + import pytest from diraccfg import CFG -from DIRAC import gLogger, gConfig +from DIRAC import S_OK, gConfig, gLogger from DIRAC.ConfigurationSystem.Client import ConfigurationData -from DIRAC.Core.Utilities.ProcessPool import S_OK from DIRAC.ResourceStatusSystem.Client.SiteStatus import SiteStatus - from DIRAC.WorkloadManagementSystem.Agent.SiteDirector import SiteDirector from DIRAC.WorkloadManagementSystem.Client import PilotStatus - CONFIG = """ Registry { @@ -261,13 +260,35 @@ def test_getPilotWrapper(mocker, sd, pilotWrapperDirectory): "-e 1,2,3", } == set(pilotOptions) + proxyObject_mock = MagicMock() + proxyObject_mock.dumpAllToString.return_value = S_OK("aProxy") + # Write pilot script - res = sd._writePilotScript(pilotWrapperDirectory, pilotOptions) + res = sd._writePilotScript(pilotWrapperDirectory, pilotOptions, proxyObject_mock) # Make sure the file exists assert os.path.exists(res) and os.path.isfile(res) +def test__submitPilotsToQueue(sd): + """Testing SiteDirector()._submitPilotsToQueue()""" + # Create a MagicMock that does not have the workingDirectory + # attribute (https://cpython-test-docs.readthedocs.io/en/latest/library/unittest.mock.html#deleting-attributes) + # This is to use the SiteDirector's working directory, not the CE one + ceMock = MagicMock() + del ceMock.workingDirectory + proxyObject_mock = MagicMock() + proxyObject_mock.dumpAllToString.return_value = S_OK("aProxy") + ceMock.proxy = proxyObject_mock + + sd.queueCECache = {"ce1.site1.com_condor": {"CE": ceMock, "Hash": "3d0dd0c60fffa900c511d7442e9c7634"}} + sd.queueSlots = {"ce1.site1.com_condor": {"AvailableSlots": 10}} + sd._buildQueueDict() + sd.sendSubmissionAccounting = False + sd.sendSubmissionMonitoring = False + assert sd._submitPilotsToQueue(1, ceMock, "ce1.site1.com_condor")["OK"] + + def test_updatePilotStatus(sd): """Updating the status of some fake pilot references""" # 1. We have not submitted any pilots, there is nothing to update diff --git a/src/DIRAC/WorkloadManagementSystem/Utilities/QueueUtilities.py b/src/DIRAC/WorkloadManagementSystem/Utilities/QueueUtilities.py index 226c4183a38..0aa1c34b5ea 100644 --- a/src/DIRAC/WorkloadManagementSystem/Utilities/QueueUtilities.py +++ b/src/DIRAC/WorkloadManagementSystem/Utilities/QueueUtilities.py @@ -81,10 +81,6 @@ def getQueuesResolved(siteDict, queueCECache, vo=None, checkPlatform=False, inst if checkPlatform: setPlatform(ceDict, queueDict[queueName]["ParametersDict"]) - bundleProxy = queueDict[queueName]["ParametersDict"].get("BundleProxy", ceDict.get("BundleProxy")) - if bundleProxy and bundleProxy.lower() in ["true", "yes", "1"]: - queueDict[queueName]["BundleProxy"] = True - return S_OK(queueDict)