Skip to content

Commit

Permalink
remove some deepcopy to speed up workflow conductor
Browse files Browse the repository at this point in the history
  • Loading branch information
guzzijones committed Jul 7, 2023
1 parent 2823b42 commit 6a6f0b8
Showing 1 changed file with 23 additions and 23 deletions.
46 changes: 23 additions & 23 deletions orquesta/conducting.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,12 @@ def __init__(self, conductor=None):

def serialize(self):
data = {
"contexts": json_util.deepcopy(self.contexts),
"routes": json_util.deepcopy(self.routes),
"sequence": json_util.deepcopy(self.sequence),
"contexts": self.contexts,
"routes": self.routes,
"sequence": self.sequence,
"staged": json_util.deepcopy(self.staged),
"status": self.status,
"tasks": json_util.deepcopy(self.tasks),
"tasks": self.tasks,
}

if self.reruns:
Expand All @@ -65,13 +65,13 @@ def serialize(self):
@classmethod
def deserialize(cls, data):
instance = cls()
instance.contexts = json_util.deepcopy(data.get("contexts", list()))
instance.routes = json_util.deepcopy(data.get("routes", list()))
instance.sequence = json_util.deepcopy(data.get("sequence", list()))
instance.staged = json_util.deepcopy(data.get("staged", list()))
instance.contexts = data.get("contexts", list())
instance.routes = data.get("routes", list())
instance.sequence = data.get("sequence", list())
instance.staged = data.get("staged", list())
instance.status = data.get("status", statuses.UNSET)
instance.tasks = json_util.deepcopy(data.get("tasks", dict()))
instance.reruns = json_util.deepcopy(data.get("reruns", list()))
instance.tasks = data.get("tasks", dict())
instance.reruns = data.get("reruns", list())

return instance

Expand Down Expand Up @@ -281,8 +281,8 @@ def serialize(self):
"input": self.get_workflow_input(),
"context": self.get_workflow_parent_context(),
"state": self.workflow_state.serialize(),
"log": json_util.deepcopy(self.log),
"errors": json_util.deepcopy(self.errors),
"log": self.log,
"errors": self.errors,
"output": self.get_workflow_output(),
}

Expand All @@ -292,12 +292,12 @@ def deserialize(cls, data):
spec = spec_module.WorkflowSpec.deserialize(data["spec"])

graph = graphing.WorkflowGraph.deserialize(data["graph"])
inputs = json_util.deepcopy(data["input"])
context = json_util.deepcopy(data["context"])
inputs = data["input"]
context = data["context"]
state = WorkflowState.deserialize(data["state"])
log = json_util.deepcopy(data.get("log", []))
errors = json_util.deepcopy(data["errors"])
outputs = json_util.deepcopy(data["output"])
log = data.get("log", [])
errors = data["errors"]
outputs = data["output"]

instance = cls(spec)
instance.restore(graph, log, errors, state, inputs, outputs, context)
Expand Down Expand Up @@ -412,7 +412,7 @@ def get_workflow_parent_context(self):
return json_util.deepcopy(self._parent_ctx)

def get_workflow_input(self):
return json_util.deepcopy(self._inputs)
return self._inputs

def get_workflow_status(self):
return self.workflow_state.status
Expand Down Expand Up @@ -461,7 +461,7 @@ def request_workflow_status(self, status):
raise exc.InvalidWorkflowStatusTransition(current_status, wf_ex_event.name)

def get_workflow_initial_context(self):
return json_util.deepcopy(self.workflow_state.contexts[0])
return self.workflow_state.contexts[0]

def get_workflow_terminal_context(self):
if self.get_workflow_status() not in statuses.COMPLETED_STATUSES:
Expand Down Expand Up @@ -513,7 +513,7 @@ def render_workflow_output(self):
self.request_workflow_status(statuses.FAILED)

def get_workflow_output(self):
return json_util.deepcopy(self._outputs) if self._outputs else None
return self._outputs if self._outputs else None

def reset_workflow_output(self):
self._outputs = None
Expand Down Expand Up @@ -780,7 +780,7 @@ def setup_retry_in_task_state(self, task_state_entry, in_ctx_idxs):
# Setup the retry in the task state.
task_id = task_state_entry["id"]
task_retry_spec = self.graph.get_task_retry_spec(task_id)
task_state_entry["retry"] = json_util.deepcopy(task_retry_spec)
task_state_entry["retry"] = task_retry_spec
task_state_entry["retry"]["tally"] = 0

# Get task context for evaluating the expression in delay and count.
Expand Down Expand Up @@ -1186,8 +1186,8 @@ def get_task_transition_contexts(self, task_id, route):

def _request_task_rerun(self, task_id, route, reset_items=False):
task = self.workflow_state.get_task(task_id, route)
task_ctx = json_util.deepcopy(task["ctxs"]["in"])
task_prev = json_util.deepcopy(task["prev"])
task_ctx = task["ctxs"]["in"]
task_prev = task["prev"]
task_spec = self.spec.tasks.get_task(task_id)

# Reset terminal status for the rerunnable candidate.
Expand Down

0 comments on commit 6a6f0b8

Please sign in to comment.