Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Drop py27 more #63

Merged
merged 18 commits into from
Jan 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ nosetests.xml
coverage.xml
*,cover

# virtual environment
venv

# Translations
*.mo
*.pot
Expand Down
25 changes: 20 additions & 5 deletions pysipp/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

SocketAddr = namedtuple('SocketAddr', 'ip port')

DEFAULT_RUNNER_TIMEOUT = 180

def tuple_property(attrs):
def getter(self):
Expand All @@ -27,7 +28,6 @@ def getter(self):
return None

def setter(self, pair):

if not isinstance(pair, tuple):
if pair is None:
pair = (None, None)
Expand Down Expand Up @@ -269,6 +269,7 @@ def __init__(

# agents iterable in launch-order
self._agents = agents
self._prepared_agents = None
ua_attrs = UserAgent.keys()

# default settings
Expand Down Expand Up @@ -444,13 +445,27 @@ def from_agents(self, agents=None, autolocalsocks=True, **scenkwargs):

async def arun(
self,
timeout=180,
timeout=DEFAULT_RUNNER_TIMEOUT,
runner=None,
block=True,
):
agents = self.prepare()
runner = runner or launch.TrioRunner()
self._prepared_agents = agents = self.prepare()
self._runner = runner = runner or launch.TrioRunner()

return await launch.run_all_agents(runner, agents, timeout=timeout)
return await launch.run_all_agents(runner, agents, timeout=timeout, block=block)

def finalize(self, *, timeout=DEFAULT_RUNNER_TIMEOUT):
assert (
self._prepared_agents and self._runner
), "Must run scenario before finalizing."
return trio.run(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh right this is triggered from sync code yes?

Sorry trying to get my head back in this change set!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, it's not the most elegant solution, do you want me to try something that doesn't require abusing objects attributes and be more functional? (it may change the API a bit though)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you have something functional that you think is cleaner then yeah for sure.

I have no real problems with this I just forgot this is how we have to do it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we can just return a handle to finalize from Scenario.run(), atm we return the runner which is exposing internals.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah this is a good idea - feels like a continuation/future then.

partial(
launch.finalize,
self._runner,
self._prepared_agents,
timeout=timeout,
)
)

def run(
self,
Expand Down
45 changes: 22 additions & 23 deletions pysipp/launch.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,7 @@ async def run(
for cmd in cmds:
log.debug(
"launching cmd:\n\"{}\"\n".format(cmd))
# proc = await trio.open_process(
proc = trio.Process(
proc = await trio.open_process(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we want the async context manager interface here?

I'm pretty sure there's some teardown we may find useful?

I'm not sure how this is going to change though given python-trio/trio#1568.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aren't changes in this trio PR going to make it so you can use trio.open_process inside a nursery?
in pysipp we want the Scenario API to not be async right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we want the async context manager interface here?

what do you have in mind? isn't the teardown part of finalize that will be called from sync code?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in pysipp we want the Scenario API to not be async right?

Right, I think that's what we have to aim for at the outset.

isn't the teardown part of finalize that will be called from sync code?

Yes you're right, so I guess I was thinking we should trigger the shutdown code I linked to, but because it's getting called from sync code we might have to either call Process.aclose() explicitly or we use an async exit stack. I'm thinking the former is simpler and cleaner?

shlex.split(cmd),
stdout=subprocess.DEVNULL,
stderr=subprocess.PIPE
Expand Down Expand Up @@ -157,32 +156,32 @@ def clear(self):
self._procs.clear()


async def run_all_agents(runner, agents, timeout=180):
"""Run a sequencec of agents using a ``TrioRunner``.
"""
async def finalize():
# this might raise TimeoutError
cmds2procs = await runner.get(timeout=timeout)
agents2procs = list(zip(agents, cmds2procs.values()))
msg = report.err_summary(agents2procs)
if msg:
# report logs and stderr
await report.emit_logfiles(agents2procs)
raise SIPpFailure(msg)

return cmds2procs
async def run_all_agents(runner, agents, timeout=180, block=True):
"""Run a sequencec of agents using a ``TrioRunner``."""

try:
await runner.run(
(ua.render() for ua in agents),
timeout=timeout
)
await finalize()
await runner.run((ua.render() for ua in agents), timeout=timeout)
if block:
await finalize(runner, agents, timeout)
return runner
except TimeoutError as terr:
# print error logs even when we timeout
try:
await finalize()
await finalize(runner, agents, timeout)
except SIPpFailure as err:
assert 'exit code -9' in str(err)
assert "exit code -9" in str(err)
raise terr


async def finalize(runner, agents, timeout):
"""Block up to `timeout` seconds for all agents to complete."""
# this might raise TimeoutError
cmds2procs = await runner.get(timeout=timeout)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm could be maybe do this in more trionic style using with trio.fail_after(timeout): ?

Not sure what will have to get factored out of .get()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i will give it a try and see if we can refactor finalize() and .get()

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yah I mentioned the spot from my original branch here. Not sure if I'm convinced though given a lot of users might not be familiar with this style?

What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh yes i would prefer a common global timeout, we can also document it

users might not be familiar with this style

which style are you referring to?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with trio.fail_after(1):
    scen.finalize()

I'm actually now thinking we can't do this anyway..
If we want to keep a sync api this won't work I don't think (fail_after() won't work outside a call to trio.run() pretty sure). I just don't have my head in this code so I'm probably making bad suggestions.

This should work if we offer an async .aclose() method to do finalization.

Copy link
Member

@goodboy goodboy Jan 9, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh wait, nm .get() is async now - see what I mean, obviously haven't looked at this in a while.

And, duh, we await that in the above code.
So yeah I think maybe pulling out the timeout is the more trionic way to do this thus giving the user more control on waiting / cancellation. The only down side is we'll have to document this style for async users.

agents2procs = list(zip(agents, cmds2procs.values()))
msg = report.err_summary(agents2procs)
if msg:
# report logs and stderr
await report.emit_logfiles(agents2procs)
raise SIPpFailure(msg)

return cmds2procs
1 change: 0 additions & 1 deletion tests/test_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,6 @@ def test_pass_bad_socket_addr():
with pytest.raises(ValueError):
pysipp.client(proxyaddr='10.10.8.88')


def test_authentication_arguments():
client = agent.client(auth_username='username', auth_password='passw0rd')

Expand Down