Skip to content

Commit

Permalink
Infinite Tracing Compression (#758)
Browse files Browse the repository at this point in the history
* Initial commit

* Add compression option in StreamingRPC

* Add compression default to tests

* Add test to confirm compression settings

* Remove commented out code

* Set compression settings from settings override
  • Loading branch information
lrafeei committed Feb 7, 2023
1 parent 99f89cc commit 8fe288b
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 37 deletions.
12 changes: 9 additions & 3 deletions newrelic/core/agent_streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class StreamingRpc(object):
)
OPTIONS = [("grpc.enable_retries", 0)]

def __init__(self, endpoint, stream_buffer, metadata, record_metric, ssl=True):
def __init__(self, endpoint, stream_buffer, metadata, record_metric, ssl=True, compression=None):
self._endpoint = endpoint
self._ssl = ssl
self.metadata = metadata
Expand All @@ -57,15 +57,21 @@ def __init__(self, endpoint, stream_buffer, metadata, record_metric, ssl=True):
self.notify = self.condition()
self.record_metric = record_metric
self.closed = False
# If this is not set, None is still a falsy value.
self.compression_setting = grpc.Compression.Gzip if compression else grpc.Compression.NoCompression

self.create_channel()

def create_channel(self):
if self._ssl:
credentials = grpc.ssl_channel_credentials()
self.channel = grpc.secure_channel(self._endpoint, credentials, options=self.OPTIONS)
self.channel = grpc.secure_channel(
self._endpoint, credentials, compression=self.compression_setting, options=self.OPTIONS
)
else:
self.channel = grpc.insecure_channel(self._endpoint, options=self.OPTIONS)
self.channel = grpc.insecure_channel(
self._endpoint, compression=self.compression_setting, options=self.OPTIONS
)

self.rpc = self.channel.stream_stream(self.PATH, Span.SerializeToString, RecordStatus.FromString)

Expand Down
22 changes: 7 additions & 15 deletions newrelic/core/data_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ def connect_span_stream(self, span_iterator, record_metric):

port = self.configuration.infinite_tracing.trace_observer_port
ssl = self.configuration.infinite_tracing.ssl
compression_setting = self.configuration.infinite_tracing.compression
endpoint = "{}:{}".format(host, port)

if (
Expand All @@ -75,7 +76,7 @@ def connect_span_stream(self, span_iterator, record_metric):
)

rpc = self._rpc = StreamingRpc(
endpoint, span_iterator, metadata, record_metric, ssl=ssl
endpoint, span_iterator, metadata, record_metric, ssl=ssl, compression=compression_setting
)
rpc.connect()
return rpc
Expand Down Expand Up @@ -135,9 +136,7 @@ def send_log_events(self, sampling_info, log_event_data):
return self._protocol.send("log_event_data", payload)

def get_agent_commands(self):
"""Receive agent commands from the data collector.
"""
"""Receive agent commands from the data collector."""

payload = (self.agent_run_id,)
return self._protocol.send("get_agent_commands", payload)
Expand Down Expand Up @@ -180,8 +179,7 @@ def send_agent_command_results(self, cmd_results):
return self._protocol.send("agent_command_results", payload)

def send_profile_data(self, profile_data):
"""Called to submit Profile Data.
"""
"""Called to submit Profile Data."""

payload = (self.agent_run_id, profile_data)
return self._protocol.send("profile_data", payload)
Expand All @@ -206,9 +204,7 @@ class DeveloperModeSession(Session):

def connect_span_stream(self, span_iterator, record_metric):
if self.configuration.debug.connect_span_stream_in_developer_mode:
super(DeveloperModeSession, self).connect_span_stream(
span_iterator, record_metric
)
super(DeveloperModeSession, self).connect_span_stream(span_iterator, record_metric)


class ServerlessModeSession(Session):
Expand All @@ -231,12 +227,8 @@ def shutdown_session():
def create_session(license_key, app_name, linked_applications, environment):
settings = global_settings()
if settings.serverless_mode.enabled:
return ServerlessModeSession(
app_name, linked_applications, environment, settings
)
return ServerlessModeSession(app_name, linked_applications, environment, settings)
elif settings.developer_mode:
return DeveloperModeSession(
app_name, linked_applications, environment, settings
)
return DeveloperModeSession(app_name, linked_applications, environment, settings)
else:
return Session(app_name, linked_applications, environment, settings)
15 changes: 8 additions & 7 deletions tests/agent_streaming/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,18 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import pytest
import random
import threading

import pytest
from testing_support.fixtures import collector_available_fixture # noqa
from testing_support.fixtures import (
code_coverage_fixture,
collector_agent_registration_fixture,
collector_available_fixture,
)
from testing_support.mock_external_grpc_server import MockExternalgRPCServer

from newrelic.common.streaming_utils import StreamBuffer
import threading

CONDITION_CLS = type(threading.Condition())

Expand All @@ -40,18 +41,18 @@
"agent_limits.errors_per_harvest": 100,
"distributed_tracing.enabled": True,
"infinite_tracing.trace_observer_host": "nr-internal.aws-us-east-2.tracing.staging-edge.nr-data.net",
"infinite_tracing.compression": True,
"debug.connect_span_stream_in_developer_mode": True,
}

collector_agent_registration = collector_agent_registration_fixture(
app_name="Python Agent Test (agent_streaming)",
default_settings=_default_settings
app_name="Python Agent Test (agent_streaming)", default_settings=_default_settings
)


@pytest.fixture(scope="module")
def grpc_app_server():
port = random.randint(50000, 50099)
port = random.randint(50000, 50099) # nosec: B311
with MockExternalgRPCServer(port=port) as server:
yield server, port

Expand Down Expand Up @@ -83,5 +84,5 @@ def buffer_empty_event(monkeypatch):
def condition(*args, **kwargs):
return SetEventOnWait(event, *args, **kwargs)

monkeypatch.setattr(StreamBuffer, 'condition', condition)
monkeypatch.setattr(StreamBuffer, "condition", condition)
return event
64 changes: 52 additions & 12 deletions tests/agent_streaming/test_streaming_rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,13 @@

import threading

from newrelic.core.agent_streaming import StreamingRpc
from newrelic.common.streaming_utils import StreamBuffer
from newrelic.core.infinite_tracing_pb2 import Span, AttributeValue
import pytest
from testing_support.fixtures import override_generic_settings

from newrelic.common.streaming_utils import StreamBuffer
from newrelic.core.agent_streaming import StreamingRpc
from newrelic.core.config import global_settings
from newrelic.core.infinite_tracing_pb2 import AttributeValue, Span

CONDITION_CLS = type(threading.Condition())
DEFAULT_METADATA = (("agent_run_token", ""), ("license_key", ""))
Expand All @@ -27,13 +30,54 @@ def record_metric(*args, **kwargs):
pass


# This enumeration is taken from gRPC's implementation for compression:
# https://grpc.github.io/grpc/python/grpc.html#compression
@pytest.mark.parametrize(
"compression_setting, gRPC_compression_val",
(
(None, 0),
(True, 2),
(False, 0),
),
)
def test_correct_settings(mock_grpc_server, compression_setting, gRPC_compression_val):
settings = global_settings()

@override_generic_settings(
settings,
{
"distributed_tracing.enabled": True,
"infinite_tracing.trace_observer_host": "localhost",
"infinite_tracing.trace_observer_port": mock_grpc_server,
"infinite_tracing.ssl": False,
"infinite_tracing.compression": compression_setting,
},
)
def _test():
endpoint = "localhost:%s" % mock_grpc_server
stream_buffer = StreamBuffer(1)

rpc = StreamingRpc(
endpoint,
stream_buffer,
DEFAULT_METADATA,
record_metric,
ssl=False,
compression=settings.infinite_tracing.compression,
)

rpc.connect()
assert rpc.compression_setting.value == gRPC_compression_val
rpc.close()

_test()


def test_close_before_connect(mock_grpc_server):
endpoint = "localhost:%s" % mock_grpc_server
stream_buffer = StreamBuffer(0)

rpc = StreamingRpc(
endpoint, stream_buffer, DEFAULT_METADATA, record_metric, ssl=False
)
rpc = StreamingRpc(endpoint, stream_buffer, DEFAULT_METADATA, record_metric, ssl=False)

# Calling close will close the grpc channel
rpc.close()
Expand All @@ -48,9 +92,7 @@ def test_close_while_connected(mock_grpc_server, buffer_empty_event):
endpoint = "localhost:%s" % mock_grpc_server
stream_buffer = StreamBuffer(1)

rpc = StreamingRpc(
endpoint, stream_buffer, DEFAULT_METADATA, record_metric, ssl=False
)
rpc = StreamingRpc(endpoint, stream_buffer, DEFAULT_METADATA, record_metric, ssl=False)

rpc.connect()
# Check the processing thread is alive and spans are being sent
Expand Down Expand Up @@ -91,9 +133,7 @@ def condition(*args, **kwargs):
endpoint = "localhost:%s" % mock_grpc_server
stream_buffer = StreamBuffer(1)

rpc = StreamingRpc(
endpoint, stream_buffer, DEFAULT_METADATA, record_metric, ssl=False
)
rpc = StreamingRpc(endpoint, stream_buffer, DEFAULT_METADATA, record_metric, ssl=False)

rpc.connect()
# Send a span to trigger reconnect
Expand Down

0 comments on commit 8fe288b

Please sign in to comment.