Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: slice the two-headed application.py monster in half #3643

Merged
merged 11 commits into from
May 19, 2020
2 changes: 2 additions & 0 deletions tensorboard/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ py_library(
"//tensorboard:expect_absl_flags_installed",
"//tensorboard:expect_absl_logging_installed",
"//tensorboard/backend:application",
"//tensorboard/backend/event_processing:data_ingester",
"//tensorboard/backend/event_processing:event_file_inspector",
"//tensorboard/util:argparse_util",
"@org_pocoo_werkzeug",
Expand All @@ -189,6 +190,7 @@ py_test(
"//tensorboard/plugins/core:core_plugin",
"@org_pocoo_werkzeug",
"@org_pythonhosted_mock",
"@org_pythonhosted_six",
],
)

Expand Down
9 changes: 1 addition & 8 deletions tensorboard/backend/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,7 @@ py_library(
":security_validator",
"//tensorboard:errors",
"//tensorboard:plugin_util",
"//tensorboard/backend/event_processing:data_provider",
"//tensorboard/backend/event_processing:event_multiplexer",
"//tensorboard/backend/event_processing:tag_types",
"//tensorboard/plugins/core:core_plugin",
"//tensorboard/plugins/histogram:metadata",
"//tensorboard/plugins/image:metadata",
"//tensorboard/plugins/pr_curve:metadata",
"//tensorboard/plugins/scalar:metadata",
"//tensorboard/util:tb_logging",
"@org_pocoo_werkzeug",
"@org_pythonhosted_six",
Expand All @@ -83,7 +76,7 @@ py_test(
name = "application_test",
size = "small",
srcs = ["application_test.py"],
srcs_version = "PY2AND3",
srcs_version = "PY3",
tags = ["support_notf"],
deps = [
":application",
Expand Down
296 changes: 38 additions & 258 deletions tensorboard/backend/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,12 @@
Provides TensorBoardWSGIApp for building a TensorBoard WSGI app.
"""

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import atexit
import base64
import collections
import contextlib
import hashlib
import json
import os
import re
import shutil
import tempfile
import textwrap
import threading
import time

import six
Expand All @@ -50,37 +40,11 @@
from tensorboard.backend import http_util
from tensorboard.backend import path_prefix
from tensorboard.backend import security_validator
from tensorboard.backend.event_processing import (
data_provider as event_data_provider,
)
from tensorboard.backend.event_processing import (
plugin_event_multiplexer as event_multiplexer,
)
from tensorboard.backend.event_processing import tag_types
from tensorboard.plugins import base_plugin
from tensorboard.plugins.audio import metadata as audio_metadata
from tensorboard.plugins.core import core_plugin
from tensorboard.plugins.histogram import metadata as histogram_metadata
from tensorboard.plugins.image import metadata as image_metadata
from tensorboard.plugins.pr_curve import metadata as pr_curve_metadata
from tensorboard.plugins.scalar import metadata as scalar_metadata
from tensorboard.util import tb_logging


DEFAULT_SIZE_GUIDANCE = {
tag_types.TENSORS: 10,
}

# TODO(@wchargin): Once SQL mode is in play, replace this with an
# alternative that does not privilege first-party plugins.
DEFAULT_TENSOR_SIZE_GUIDANCE = {
scalar_metadata.PLUGIN_NAME: 1000,
image_metadata.PLUGIN_NAME: 10,
audio_metadata.PLUGIN_NAME: 10,
histogram_metadata.PLUGIN_NAME: 500,
pr_curve_metadata.PLUGIN_NAME: 100,
}

DATA_PREFIX = "/data"
PLUGIN_PREFIX = "/plugin"
PLUGINS_LISTING_ROUTE = "/plugins_listing"
Expand All @@ -96,75 +60,6 @@
logger = tb_logging.get_logger()


def _apply_tensor_size_guidance(sampling_hints):
"""Apply user per-summary size guidance overrides."""
tensor_size_guidance = dict(DEFAULT_TENSOR_SIZE_GUIDANCE)
tensor_size_guidance.update(sampling_hints)
return tensor_size_guidance


def standard_tensorboard_wsgi(flags, plugin_loaders, assets_zip_provider):
"""Construct a TensorBoardWSGIApp with standard plugins and multiplexer.

Args:
flags: An argparse.Namespace containing TensorBoard CLI flags.
plugin_loaders: A list of TBLoader instances.
assets_zip_provider: See TBContext documentation for more information.

Returns:
The new TensorBoard WSGI application.

:type plugin_loaders: list[base_plugin.TBLoader]
:rtype: TensorBoardWSGI
"""
data_provider = None
multiplexer = None
reload_interval = flags.reload_interval
# Regular logdir loading mode.
sampling_hints = flags.samples_per_plugin
multiplexer = event_multiplexer.EventMultiplexer(
size_guidance=DEFAULT_SIZE_GUIDANCE,
tensor_size_guidance=_apply_tensor_size_guidance(sampling_hints),
purge_orphaned_data=flags.purge_orphaned_data,
max_reload_threads=flags.max_reload_threads,
event_file_active_filter=_get_event_file_active_filter(flags),
)
data_provider = event_data_provider.MultiplexerDataProvider(
multiplexer, flags.logdir or flags.logdir_spec
)

if reload_interval >= 0:
# We either reload the multiplexer once when TensorBoard starts up, or we
# continuously reload the multiplexer.
if flags.logdir:
path_to_run = {os.path.expanduser(flags.logdir): None}
else:
path_to_run = parse_event_files_spec(flags.logdir_spec)
start_reloading_multiplexer(
multiplexer, path_to_run, reload_interval, flags.reload_task
)
return TensorBoardWSGIApp(
flags, plugin_loaders, data_provider, assets_zip_provider, multiplexer
)


def _handling_errors(wsgi_app):
def wrapper(*args):
(environ, start_response) = (args[-2], args[-1])
try:
return wsgi_app(*args)
except errors.PublicError as e:
request = wrappers.Request(environ)
error_app = http_util.Respond(
request, str(e), "text/plain", code=e.http_code
)
return error_app(environ, start_response)
# Let other exceptions be handled by the server, as an opaque
# internal server error.

return wrapper


def TensorBoardWSGIApp(
flags,
plugins,
Expand Down Expand Up @@ -227,6 +122,30 @@ def TensorBoardWSGIApp(
)


def make_plugin_loader(plugin_spec):
"""Returns a plugin loader for the given plugin.

Args:
plugin_spec: A TBPlugin subclass, or a TBLoader instance or subclass.

Returns:
A TBLoader for the given plugin.

:type plugin_spec:
Type[base_plugin.TBPlugin] | Type[base_plugin.TBLoader] |
base_plugin.TBLoader
:rtype: base_plugin.TBLoader
"""
if isinstance(plugin_spec, base_plugin.TBLoader):
return plugin_spec
if isinstance(plugin_spec, type):
if issubclass(plugin_spec, base_plugin.TBLoader):
return plugin_spec()
if issubclass(plugin_spec, base_plugin.TBPlugin):
return base_plugin.BasicLoader(plugin_spec)
raise TypeError("Not a TBLoader or TBPlugin subclass: %r" % (plugin_spec,))


class TensorBoardWSGI(object):
"""The TensorBoard WSGI app that delegates to a set of TBPlugin."""

Expand Down Expand Up @@ -598,119 +517,21 @@ def _route_request(self, environ, start_response):
# pylint: enable=too-many-function-args


def parse_event_files_spec(logdir_spec):
"""Parses `logdir_spec` into a map from paths to run group names.

The `--logdir_spec` flag format is a comma-separated list of path
specifications. A path spec looks like 'group_name:/path/to/directory' or
'/path/to/directory'; in the latter case, the group is unnamed. Group names
cannot start with a forward slash: /foo:bar/baz will be interpreted as a spec
with no name and path '/foo:bar/baz'.

Globs are not supported.

Args:
logdir: A comma-separated list of run specifications.
Returns:
A dict mapping directory paths to names like {'/path/to/directory': 'name'}.
Groups without an explicit name are named after their path. If logdir is
None, returns an empty dict, which is helpful for testing things that don't
require any valid runs.
"""
files = {}
if logdir_spec is None:
return files
# Make sure keeping consistent with ParseURI in core/lib/io/path.cc
uri_pattern = re.compile("[a-zA-Z][0-9a-zA-Z.]*://.*")
for specification in logdir_spec.split(","):
# Check if the spec contains group. A spec start with xyz:// is regarded as
# URI path spec instead of group spec. If the spec looks like /foo:bar/baz,
# then we assume it's a path with a colon. If the spec looks like
# [a-zA-z]:\foo then we assume its a Windows path and not a single letter
# group
if (
uri_pattern.match(specification) is None
and ":" in specification
and specification[0] != "/"
and not os.path.splitdrive(specification)[0]
):
# We split at most once so run_name:/path:with/a/colon will work.
run_name, _, path = specification.partition(":")
else:
run_name = None
path = specification
if uri_pattern.match(path) is None:
path = os.path.realpath(os.path.expanduser(path))
files[path] = run_name
return files


def start_reloading_multiplexer(
multiplexer, path_to_run, load_interval, reload_task
):
"""Starts automatically reloading the given multiplexer.

If `load_interval` is positive, the thread will reload the multiplexer
by calling `ReloadMultiplexer` every `load_interval` seconds, starting
immediately. Otherwise, reloads the multiplexer once and never again.

Args:
multiplexer: The `EventMultiplexer` to add runs to and reload.
path_to_run: A dict mapping from paths to run names, where `None` as the run
name is interpreted as a run name equal to the path.
load_interval: An integer greater than or equal to 0. If positive, how many
seconds to wait after one load before starting the next load. Otherwise,
reloads the multiplexer once and never again (no continuous reloading).
reload_task: Indicates the type of background task to reload with.

Raises:
ValueError: If `load_interval` is negative.
"""
if load_interval < 0:
raise ValueError("load_interval is negative: %d" % load_interval)

def _reload():
while True:
start = time.time()
logger.info("TensorBoard reload process beginning")
for path, name in six.iteritems(path_to_run):
multiplexer.AddRunsFromDirectory(path, name)
logger.info(
"TensorBoard reload process: Reload the whole Multiplexer"
)
multiplexer.Reload()
duration = time.time() - start
logger.info(
"TensorBoard done reloading. Load took %0.3f secs", duration
)
if load_interval == 0:
# Only load the multiplexer once. Do not continuously reload.
break
time.sleep(load_interval)

if reload_task == "process":
logger.info("Launching reload in a child process")
import multiprocessing

process = multiprocessing.Process(target=_reload, name="Reloader")
# Best-effort cleanup; on exit, the main TB parent process will attempt to
# kill all its daemonic children.
process.daemon = True
process.start()
elif reload_task in ("thread", "auto"):
logger.info("Launching reload in a daemon thread")
thread = threading.Thread(target=_reload, name="Reloader")
# Make this a daemon thread, which won't block TB from exiting.
thread.daemon = True
thread.start()
elif reload_task == "blocking":
if load_interval != 0:
raise ValueError(
"blocking reload only allowed with load_interval=0"
def _handling_errors(wsgi_app):
def wrapper(*args):
(environ, start_response) = (args[-2], args[-1])
try:
return wsgi_app(*args)
except errors.PublicError as e:
request = wrappers.Request(environ)
error_app = http_util.Respond(
request, str(e), "text/plain", code=e.http_code
)
_reload()
else:
raise ValueError("unrecognized reload_task: %s" % reload_task)
return error_app(environ, start_response)
# Let other exceptions be handled by the server, as an opaque
# internal server error.

return wrapper


def _clean_path(path):
Expand All @@ -725,44 +546,3 @@ def _clean_path(path):
if path != "/" and path.endswith("/"):
return path[:-1]
return path


def _get_event_file_active_filter(flags):
"""Returns a predicate for whether an event file load timestamp is active.

Returns:
A predicate function accepting a single UNIX timestamp float argument, or
None if multi-file loading is not enabled.
"""
if not flags.reload_multifile:
return None
inactive_secs = flags.reload_multifile_inactive_secs
if inactive_secs == 0:
return None
if inactive_secs < 0:
return lambda timestamp: True
return lambda timestamp: timestamp + inactive_secs >= time.time()


def make_plugin_loader(plugin_spec):
"""Returns a plugin loader for the given plugin.

Args:
plugin_spec: A TBPlugin subclass, or a TBLoader instance or subclass.

Returns:
A TBLoader for the given plugin.

:type plugin_spec:
Type[base_plugin.TBPlugin] | Type[base_plugin.TBLoader] |
base_plugin.TBLoader
:rtype: base_plugin.TBLoader
"""
if isinstance(plugin_spec, base_plugin.TBLoader):
return plugin_spec
if isinstance(plugin_spec, type):
if issubclass(plugin_spec, base_plugin.TBLoader):
return plugin_spec()
if issubclass(plugin_spec, base_plugin.TBPlugin):
return base_plugin.BasicLoader(plugin_spec)
raise TypeError("Not a TBLoader or TBPlugin subclass: %r" % (plugin_spec,))
Loading