Skip to content

Commit

Permalink
Added support for OpenTelemetry
Browse files Browse the repository at this point in the history
  • Loading branch information
ezimuel committed Aug 12, 2024
1 parent bbeba20 commit db9cfbc
Show file tree
Hide file tree
Showing 6 changed files with 494 additions and 7 deletions.
9 changes: 6 additions & 3 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,16 @@
"psr/log": "^1 || ^2 || ^3",
"php-http/discovery": "^1.14",
"php-http/httplug": "^2.3",
"composer-runtime-api": "^2.0"
"composer-runtime-api": "^2.0",
"open-telemetry/api": "^1.0"
},
"require-dev": {
"phpunit/phpunit": "^9.5",
"phpstan/phpstan": "^1.4",
"php-http/mock-client": "^1.5",
"nyholm/psr7": "^1.5"
"nyholm/psr7": "^1.5",
"open-telemetry/sdk": "^1.0",
"symfony/http-client": "^7.1"
},
"autoload": {
"psr-4": {
Expand All @@ -39,7 +42,7 @@
},
"scripts": {
"test": [
"vendor/bin/phpunit"
"vendor/bin/phpunit --testdox"
],
"phpstan": [
"vendor/bin/phpstan analyse"
Expand Down
136 changes: 136 additions & 0 deletions src/OpenTelemetry.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
<?php
/**
* Elastic Transport
*
* @link https://github.com/elastic/elastic-transport-php
* @copyright Copyright (c) Elasticsearch B.V (https://www.elastic.co)
* @license https://opensource.org/licenses/MIT MIT License
*
* Licensed to Elasticsearch B.V under one or more agreements.
* Elasticsearch B.V licenses this file to you under the MIT License.
* See the LICENSE file in the project root for more information.
*/
declare(strict_types=1);

namespace Elastic\Transport;

use Elastic\Transport\Exception\InvalidArgumentException;
use Elastic\Transport\Serializer\JsonSerializer;
use OpenTelemetry\API\Trace\TracerInterface;
use OpenTelemetry\API\Trace\TracerProviderInterface;

class OpenTelemetry
{
const OTEL_TRACER_NAME = 'elasticsearch-api';
// Valid values for the enabled config are 'true' and 'false'
const ENV_VARIABLE_ENABLED = 'OTEL_PHP_INSTRUMENTATION_ELASTICSEARCH_ENABLED';
/**
* Describes how to handle search queries in the request body when assigned to
* span attribute.
* Valid values are 'raw', 'omit', 'sanitize'. Default is 'omit'
*/
const ALLOWED_BODY_STRATEGIES = ['raw', 'omit', 'sanitize'];
const ENV_VARIABLE_BODY_STRATEGY = 'OTEL_PHP_INSTRUMENTATION_ELASTICSEARCH_CAPTURE_SEARCH_QUERY';
const DEFAULT_BODY_STRATEGY = 'omit';
/**
* A string list of keys whose values are redacted. This is only relevant if the body strategy is
* 'sanitize'. For example, a config 'sensitive-key,other-key' will redact the values at
* 'sensitive-key' and 'other-key' in addition to the default keys
*/
const ENV_VARIABLE_BODY_SANITIZE_KEYS = 'OTEL_PHP_INSTRUMENTATION_ELASTICSEARCH_SEARCH_QUERY_SANITIZE_KEYS';

const SEARCH_ENDPOINTS = [
'search',
'async_search.submit',
'msearch',
'eql.search',
'terms_enum',
'search_template',
'msearch_template',
'render_search_template',
'esql.query'
];

const DEFAULT_SANITIZER_KEY_PATTERNS = [
'password',
'passwd',
'pwd',
'secret',
'key',
'token',
'session',
'credit',
'card',
'auth',
'set-cookie',
'email',
'tel',
'phone'
];
const REDACTED_STRING = 'REDACTED';

private array $sanitizeKeys = [];
private string $bodyStrategy;

public function __construct()
{
$strategy = getenv(self::ENV_VARIABLE_BODY_STRATEGY);
if (false === $strategy) {
$strategy = self::DEFAULT_BODY_STRATEGY;
}
if (!in_array($strategy, self::ALLOWED_BODY_STRATEGIES)) {
throw new InvalidArgumentException(sprintf(
'The body strategy specified %s is not valid. The available strategies are %s',
$strategy,
implode(',', self::ALLOWED_BODY_STRATEGIES)
));
}
$this->bodyStrategy = $strategy;
$sanitizeKeys = getenv(self::ENV_VARIABLE_BODY_SANITIZE_KEYS);
if (false !== $sanitizeKeys) {
$this->sanitizeKeys = explode(',', $sanitizeKeys);
}
}

public function processBody(string $body): string
{
switch ($this->bodyStrategy) {
case 'sanitize':
return $this->sanitizeBody($body, $this->sanitizeKeys);
case 'raw':
return $body;
default:
return '';
}
}

public static function getTracer(TracerProviderInterface $tracerProvider): TracerInterface
{
return $tracerProvider->getTracer(
self::OTEL_TRACER_NAME,
Transport::VERSION
);
}

private function sanitizeBody(string $body, array $sanitizeKeys): string
{
if (empty($body)) {
return '';
}
$json = json_decode($body, true);
if (!is_array($json)) {
return '';
}
$patterns = array_merge(self::DEFAULT_SANITIZER_KEY_PATTERNS, $sanitizeKeys);

// Convert the patterns array into a regex
$regex = sprintf('/%s/', implode('|', $patterns));
// Recursively traverse the array and redact the specified keys
array_walk_recursive($json, function (&$value, $key) use ($regex) {
if (preg_match($regex, $key, $matches)) {
$value = self::REDACTED_STRING;
}
});
return JsonSerializer::serialize($json);
}
}
72 changes: 69 additions & 3 deletions src/Transport.php
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
use Http\Client\HttpAsyncClient;
use Http\Discovery\HttpAsyncClientDiscovery;
use Http\Promise\Promise;
use OpenTelemetry\API\Globals;
use OpenTelemetry\API\Trace\TracerInterface;
use Psr\Http\Client\ClientExceptionInterface;
use Psr\Http\Client\ClientInterface;
use Psr\Http\Client\NetworkExceptionInterface;
Expand All @@ -48,7 +50,7 @@

final class Transport implements ClientInterface, HttpAsyncClient
{
const VERSION = "8.8.0";
const VERSION = "8.9.0";

private ClientInterface $client;
private LoggerInterface $logger;
Expand All @@ -63,7 +65,8 @@ final class Transport implements ClientInterface, HttpAsyncClient
private HttpAsyncClient $asyncClient;
private OnSuccessInterface $onAsyncSuccess;
private OnFailureInterface $onAsyncFailure;

private TracerInterface $otelTracer;

public function __construct(
ClientInterface $client,
NodePoolInterface $nodePool,
Expand All @@ -89,6 +92,22 @@ public function getLogger(): LoggerInterface
return $this->logger;
}

public function getOTelTracer(): TracerInterface
{
if (empty($this->otelTracer)) {
$this->otelTracer = OpenTelemetry::getTracer(
Globals::tracerProvider()
);
}
return $this->otelTracer;
}

public function setOTelTracer(TracerInterface $tracer): self
{
$this->otelTracer = $tracer;
return $this;
}

public function setHeader(string $name, string $value): self
{
$this->headers[$name] = $value;
Expand Down Expand Up @@ -285,7 +304,7 @@ private function logResponse(string $title, ResponseInterface $response, int $re
* @throws NoNodeAvailableException
* @throws ClientExceptionInterface
*/
public function sendRequest(RequestInterface $request): ResponseInterface
public function sendRequest(RequestInterface $request, array $opts = []): ResponseInterface
{
if (empty($request->getUri()->getHost())) {
$node = $this->nodePool->nextNode();
Expand All @@ -294,11 +313,27 @@ public function sendRequest(RequestInterface $request): ResponseInterface
$request = $this->decorateRequest($request);
$this->lastRequest = $request;
$this->logRequest("Request", $request);

// OpenTelemetry get tracer
if (getenv(OpenTelemetry::ENV_VARIABLE_ENABLED)) {
$tracer = $this->getOTelTracer();
}

$count = -1;
while ($count < $this->getRetries()) {
try {
$count++;
// OpenTelemetry span start
if (!empty($tracer)) {
$spanName = $opts['db.operation.name'] ?? $request->getUri()->getPath();
$span = $tracer->spanBuilder($spanName)->startSpan();
$span->setAttribute('http.request.method', $request->getMethod());
$span->setAttribute('url.full', $this->getFullUrl($request));
$span->setAttribute('server.address', $request->getUri()->getHost());
$span->setAttribute('server.port', $request->getUri()->getPort());
// @phpstan-ignore argument.type
$span->setAttributes($opts);
}
$response = $this->client->sendRequest($request);

$this->lastResponse = $response;
Expand All @@ -307,14 +342,25 @@ public function sendRequest(RequestInterface $request): ResponseInterface
return $response;
} catch (NetworkExceptionInterface $e) {
$this->logger->error(sprintf("Retry %d: %s", $count, $e->getMessage()));
if (!empty($span)) {
$span->setAttribute('error.type', $e->getMessage());
}
if (isset($node)) {
$node->markAlive(false);
$node = $this->nodePool->nextNode();
$request = $this->setupConnectionUri($node, $request);
}
} catch (ClientExceptionInterface $e) {
$this->logger->error(sprintf("Retry %d: %s", $count, $e->getMessage()));
if (!empty($span)) {
$span->setAttribute('error.type', $e->getMessage());
}
throw $e;
} finally {
// OpenTelemetry span end
if (!empty($span)) {
$span->end();
}
}
}
$exceededMsg = sprintf("Exceeded maximum number of retries (%d)", $this->getRetries());
Expand Down Expand Up @@ -464,4 +510,24 @@ private function getClientLibraryInfo(): array
}
return [];
}

/**
* Return the full URL in the format
* scheme://host:port/path?query_string
*/
private function getFullUrl(RequestInterface $request): string
{
$fullUrl = sprintf(
"%s://%s:%s%s",
$request->getUri()->getScheme(),
$request->getUri()->getHost(),
$request->getUri()->getPort(),
$request->getUri()->getPath()
);
$queryString = $request->getUri()->getQuery();
if (!empty($queryString)) {
$fullUrl .= '?' . $queryString;
}
return $fullUrl;
}
}
2 changes: 2 additions & 0 deletions src/TransportBuilder.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
use Elastic\Transport\NodePool\Resurrect\NoResurrect;
use Elastic\Transport\NodePool\Selector\RoundRobin;
use Http\Discovery\Psr18ClientDiscovery;
use OpenTelemetry\API\Trace\TracerInterface;
use Psr\Http\Client\ClientInterface;
use Psr\Log\LoggerInterface;
use Psr\Log\NullLogger;
Expand All @@ -31,6 +32,7 @@ class TransportBuilder
protected NodePoolInterface $nodePool;
protected LoggerInterface $logger;
protected array $hosts = [];
protected TracerInterface $OTelTracer;

final public function __construct()
{
Expand Down
Loading

0 comments on commit db9cfbc

Please sign in to comment.