Skip to content

Commit

Permalink
feat: (opt-in): terminate handling of work when the request has alrea…
Browse files Browse the repository at this point in the history
…dy timed out (#328)

Overhead-free (or at least very cheap).

The “timeout” gunicorn config means drastically different things for
sync and non-sync workers:

> Workers silent for more than this many seconds are killed and restarted.
> 
> Value is a positive number or 0. Setting it to 0 has the effect of
> infinite timeouts by disabling timeouts for all workers entirely.
> 
> Generally, the default of thirty seconds should suffice. Only set this
> noticeably higher if you’re sure of the repercussions for sync workers.
> For the non sync workers it just means that the worker process is still
> communicating and is not tied to the length of time required to handle a
> single request.

So.  For cases where threads = 1 (user set or our defaults), we’ll use
the sync worker and let the regular timeout functionality do its thing.

For cases where threads > 1, we’re using the gthread worker, and timeout
means something completely different and not really user-observable.  So
we’ll leave the communication timeout (default gunicorn “timeout”) at 30
seconds, but create our own gthread-derived worker class to use instead,
which terminates request handling (with no mind to gunicorn’s “graceful
shutdown” config), to emulate GCFv1.

The arbiter spawns these workers, so we have to maintain some sort of
global timeout state for us to read in our custom gthread worker.

In the future, we should consider letting the user adjust the graceful
shutdown seconds.  But the default of 30 seems like it’s worked fine
historically, so it’s hard to argue for changing it.  IIUC, this means
that on gen 2, there’s a small behavior difference for the sync workers
compared to gen 1, in that gen 2 sync worker workloads will get an extra
30 seconds of timeout to gracefully shut down.  I don’t think monkeying
with this config and opting-in to sync workers is very common, though,
so let’s not worry about it here; everyone should be on the gthread path
outlined above.

* fix tests

* small test fixes

give up on coverage support for things that are tested in different
processes, or in gthread, because it looks like pytest-cov gave up on
support for these, where as coverage has out-of-the-box support

* format

* isort everything

* skip tests on mac

there's something test-specific about how mac pickles functions for
execution in multiprocessing.Process which is causing problems.  it
seems somewhere in the innards of flask and gunicorn and macos...

since this feature is opt-in anyway, let's just skip testing darwin.

* sort tuple of dicts in async tests before asserting

causes flakes sometimes in workflows

* use double-quotes

* also skip tests on windows - this is all built for gunicorn, there's no
value adding it for windows anyway

* skip import on windows

* easy stuff

* add a few tests for sync worker timeouts

these shouldn't have changed with this commit
  • Loading branch information
jrmfg committed May 17, 2024
1 parent fff38ae commit 2601975
Show file tree
Hide file tree
Showing 9 changed files with 381 additions and 8 deletions.
2 changes: 1 addition & 1 deletion examples/cloud_run_cloud_events/send_cloud_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from cloudevents.http import CloudEvent, to_structured
import requests

from cloudevents.http import CloudEvent, to_structured

# Create a cloudevent using https://github.com/cloudevents/sdk-python
# Note we only need source and type because the cloudevents constructor by
Expand Down
16 changes: 16 additions & 0 deletions playground/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import logging
import time

import functions_framework

logger = logging.getLogger(__name__)


@functions_framework.http
def main(request):
timeout = 2
for _ in range(timeout * 10):
time.sleep(0.1)
logger.info("logging message after timeout elapsed")
return "Hello, world!"

42 changes: 37 additions & 5 deletions src/functions_framework/_http/gunicorn.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2020 Google LLC
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -16,17 +16,43 @@

import gunicorn.app.base

from gunicorn.workers.gthread import ThreadWorker

from ..request_timeout import ThreadingTimeout

# global for use in our custom gthread worker; the gunicorn arbiter spawns these
# and it's not possible to inject (and self.timeout means something different to
# async workers!)
# set/managed in gunicorn application init for test-friendliness
TIMEOUT_SECONDS = None


class GunicornApplication(gunicorn.app.base.BaseApplication):
def __init__(self, app, host, port, debug, **options):
threads = int(os.environ.get("THREADS", (os.cpu_count() or 1) * 4))

global TIMEOUT_SECONDS
TIMEOUT_SECONDS = int(os.environ.get("CLOUD_RUN_TIMEOUT_SECONDS", 0))

self.options = {
"bind": "%s:%s" % (host, port),
"workers": os.environ.get("WORKERS", 1),
"threads": os.environ.get("THREADS", (os.cpu_count() or 1) * 4),
"timeout": os.environ.get("CLOUD_RUN_TIMEOUT_SECONDS", 0),
"loglevel": "error",
"workers": int(os.environ.get("WORKERS", 1)),
"threads": threads,
"loglevel": os.environ.get("GUNICORN_LOG_LEVEL", "error"),
"limit_request_line": 0,
}

if (
TIMEOUT_SECONDS > 0
and threads > 1
and (os.environ.get("THREADED_TIMEOUT_ENABLED", "False").lower() == "true")
): # pragma: no cover
self.options["worker_class"] = (
"functions_framework._http.gunicorn.GThreadWorkerWithTimeoutSupport"
)
else:
self.options["timeout"] = TIMEOUT_SECONDS

self.options.update(options)
self.app = app

Expand All @@ -38,3 +64,9 @@ def load_config(self):

def load(self):
return self.app


class GThreadWorkerWithTimeoutSupport(ThreadWorker): # pragma: no cover
def handle_request(self, req, conn):
with ThreadingTimeout(TIMEOUT_SECONDS):
super(GThreadWorkerWithTimeoutSupport, self).handle_request(req, conn)
6 changes: 5 additions & 1 deletion src/functions_framework/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2020 Google LLC
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -35,3 +35,7 @@ class MissingTargetException(FunctionsFrameworkException):

class EventConversionException(FunctionsFrameworkException):
pass


class RequestTimeoutException(FunctionsFrameworkException):
pass
42 changes: 42 additions & 0 deletions src/functions_framework/request_timeout.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import ctypes
import logging
import threading

from .exceptions import RequestTimeoutException

logger = logging.getLogger(__name__)


class ThreadingTimeout(object): # pragma: no cover
def __init__(self, seconds):
self.seconds = seconds
self.target_tid = threading.current_thread().ident
self.timer = None

def __enter__(self):
self.timer = threading.Timer(self.seconds, self._raise_exc)
self.timer.start()
return self

def __exit__(self, exc_type, exc_val, exc_tb):
self.timer.cancel()
if exc_type is RequestTimeoutException:
logger.warning(
"Request handling exceeded {0} seconds timeout; terminating request handling...".format(
self.seconds
),
exc_info=(exc_type, exc_val, exc_tb),
)
return False

def _raise_exc(self):
ret = ctypes.pythonapi.PyThreadState_SetAsyncExc(
ctypes.c_long(self.target_tid), ctypes.py_object(RequestTimeoutException)
)
if ret == 0:
raise ValueError("Invalid thread ID {}".format(self.target_tid))
elif ret > 1:
ctypes.pythonapi.PyThreadState_SetAsyncExc(
ctypes.c_long(self.target_tid), None
)
raise SystemError("PyThreadState_SetAsyncExc failed")
3 changes: 2 additions & 1 deletion tests/test_execution_id.py
Original file line number Diff line number Diff line change
Expand Up @@ -378,4 +378,5 @@ async def test_maintains_execution_id_for_concurrent_requests(monkeypatch, capsy
logs = record.err.strip().split("\n")
logs_as_json = tuple(json.loads(log) for log in logs)

assert logs_as_json == expected_logs
sort_key = lambda d: d["message"]
assert sorted(logs_as_json, key=sort_key) == sorted(expected_logs, key=sort_key)
12 changes: 12 additions & 0 deletions tests/test_functions/timeout/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import logging
import time

logger = logging.getLogger(__name__)


def function(request):
# sleep for 1200 total ms (1.2 sec)
for _ in range(12):
time.sleep(0.1)
logger.info("some extra logging message")
return "success", 200
Loading

0 comments on commit 2601975

Please sign in to comment.