Skip to content

Commit

Permalink
Use trio for process and scenario launching!
Browse files Browse the repository at this point in the history
After attempting to find an OS portable way to spawn subprocesses using
the stdlib and coming out unsatisfied, I've decided use the new
subprocess launching support in `trio`! This will of course require that
the project moves to python 3.6+ giving us access to a lot of neat
features of modern python including async/await support and adherence to
the structured concurrency principles prominent in `trio`. It turns out
this is a good fit since SIPp already has a built in cancellation
mechanism via the SIGUSR1 signal.

There's a lot of "core" changes to go over in this commit:
- drop the "run protocol" and "runner creation" related hooks since they
  really shouldn't be overridden until there's some need for it and it's
  likely smarter to keep those "machinery" details strictly internal for now
- the run "protocol" has now been relegated to an async function:
  `pysipp.launch.run_all_agents()`
- many routines have been converted to async functions particularly at the
  runner (`pysipp.TrioRunner.run()`, `.get()`) and scenario
  (`pysipp.Scenario.arun()`) levels allowing us to expose both a sync and
  async interface for running subprocesses / agents
- drop all the epoll/select loop stuff as this is entirely delegated to
  `trio.open_process()` and it's underlying machinery and APIs

Resolves #53
  • Loading branch information
goodboy committed Dec 18, 2022
1 parent a8f296e commit 0898b53
Show file tree
Hide file tree
Showing 4 changed files with 157 additions and 182 deletions.
60 changes: 3 additions & 57 deletions pysipp/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,17 @@
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
# Authors : Tyler Goodlet

"""
pysipp - a python wrapper for launching SIPp
pysipp - a Python wrapper for launching SIPp
"""
import sys
from os.path import dirname

from . import agent
from . import launch
from . import netplug
from . import plugin
from . import report
from .agent import client
from .agent import server
from .load import iter_scen_dirs
Expand Down Expand Up @@ -202,59 +202,5 @@ def pysipp_conf_scen(agents, scen):
ua.rtp_echo = True


@plugin.hookimpl
def pysipp_new_runner():
"""Provision and assign a default cmd runner"""
return launch.PopenRunner()


@plugin.hookimpl
def pysipp_run_protocol(scen, runner, block, timeout, raise_exc):
""" "Run all rendered commands with the provided runner or the built-in
PopenRunner which runs commands locally.
"""
# use provided runner or default provided by hook
runner = runner or plugin.mng.hook.pysipp_new_runner()
agents = scen.prepare()

def finalize(cmds2procs=None, timeout=180, raise_exc=True):
"""Wait for all remaining agents in the scenario to finish executing
and perform error and logfile reporting.
"""
cmds2procs = cmds2procs or runner.get(timeout=timeout)
agents2procs = list(zip(agents, cmds2procs.values()))
msg = report.err_summary(agents2procs)
if msg:
# report logs and stderr
report.emit_logfiles(agents2procs)
if raise_exc:
# raise RuntimeError on agent failure(s)
# (HINT: to rerun type `scen()` from the debugger)
raise SIPpFailure(msg)

return cmds2procs

try:
# run all agents (raises RuntimeError on timeout)
cmds2procs = runner(
(ua.render() for ua in agents), block=block, timeout=timeout
)
except launch.TimeoutError: # sucessful timeout
cmds2procs = finalize(timeout=0, raise_exc=False)
if raise_exc:
raise
else:
# async
if not block:
# XXX async run must bundle up results for later processing
scen.finalize = finalize
return finalize

# sync
finalize(cmds2procs, raise_exc=raise_exc)

return runner


# register the default hook set
plugin.mng.register(sys.modules[__name__])
61 changes: 36 additions & 25 deletions pysipp/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,17 @@
import tempfile
from collections import namedtuple
from collections import OrderedDict
from functools import partial
from copy import deepcopy
from os import path

from distutils import spawn

import trio

from . import command
from . import plugin
from . import utils
from . import launch

log = utils.get_logger()

Expand Down Expand Up @@ -66,8 +69,13 @@ def name(self):
ipcaddr = tuple_property(("ipc_host", "ipc_port"))
call_load = tuple_property(("rate", "limit", "call_count"))

def __call__(
self, block=True, timeout=180, runner=None, raise_exc=True, **kwargs
def __call__(self, *args, **kwargs):
return self.run(*args, **kwargs)

def run(
self,
timeout=180,
**kwargs
):

# create and configure a temp scenario
Expand All @@ -76,16 +84,7 @@ def __call__(
confpy=None,
scenkwargs={},
)
# run the standard protocol
# (attach allocted runner for reuse/post-portem)
return plugin.mng.hook.pysipp_run_protocol(
scen=scen,
block=block,
timeout=timeout,
runner=runner,
raise_exc=raise_exc,
**kwargs
)
return scen.run(timeout=timeout, **kwargs)

def is_client(self):
return "uac" in self.name.lower()
Expand Down Expand Up @@ -277,6 +276,9 @@ def __init__(
confpy=None,
enable_screen_file=True,
):
# placeholder for process "runner"
self._runner = None

# agents iterable in launch-order
self._agents = agents
ua_attrs = UserAgent.keys()
Expand Down Expand Up @@ -452,21 +454,30 @@ def from_agents(self, agents=None, autolocalsocks=True, **scenkwargs):
self.prepare(agents), self._defaults, confpy=self.mod
)

def __call__(
async def arun(
self,
agents=None,
block=True,
timeout=180,
runner=None,
raise_exc=True,
copy_agents=False,
):
agents = self.prepare()
runner = runner or launch.TrioRunner()

return await launch.run_all_agents(runner, agents, timeout=timeout)

def run(
self,
timeout=180,
**kwargs
):
return plugin.mng.hook.pysipp_run_protocol(
scen=self,
block=block,
timeout=timeout,
runner=runner,
raise_exc=raise_exc,
**kwargs
"""Run scenario blocking to completion."""
return trio.run(
partial(
self.arun,
timeout=timeout,
**kwargs
)
)

def __call__(self, *args, **kwargs):
# TODO: deprecation warning here
return self.run(*args, **kwargs)
Loading

0 comments on commit 0898b53

Please sign in to comment.