Skip to content

Commit

Permalink
Fix for sub-span overflow
Browse files Browse the repository at this point in the history
  • Loading branch information
pekhota committed Sep 27, 2021
1 parent e261065 commit a48cde0
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 0 deletions.
1 change: 1 addition & 0 deletions src/Jaeger/ReporterFactory/JaegerHttpReporterFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public function createReporter() : ReporterInterface
$client = new HttpAgentClient($protocol);
$this->config->getLogger()->debug('Initializing HTTP Jaeger Tracer with Jaeger.Thrift over Binary protocol');
$sender = new JaegerSender($client, $this->config->getLogger());
$sender->setMaxBufferLength($this->config->getMaxBufferLength());
return new JaegerReporter($sender);
}
}
1 change: 1 addition & 0 deletions src/Jaeger/ReporterFactory/JaegerReporterFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public function createReporter() : ReporterInterface
$client = new AgentClient($protocol);
$this->config->getLogger()->debug('Initializing UDP Jaeger Tracer with Jaeger.Thrift over Binary protocol');
$sender = new JaegerSender($client, $this->config->getLogger());
$sender->setMaxBufferLength($this->config->getMaxBufferLength());
return new JaegerReporter($sender);
}
}
65 changes: 65 additions & 0 deletions src/Jaeger/Sender/JaegerSender.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
use Jaeger\Tracer;
use Psr\Log\LoggerInterface;
use Psr\Log\NullLogger;
use Thrift\Protocol\TBinaryProtocol;
use Thrift\Protocol\TCompactProtocol;
use Thrift\Transport\TMemoryBuffer;
use const Jaeger\JAEGER_HOSTNAME_TAG_KEY;

class JaegerSender implements SenderInterface
Expand Down Expand Up @@ -44,6 +47,17 @@ class JaegerSender implements SenderInterface
*/
private $mapper;

/**
* @var int
*/
private $jaegerBatchOverheadLength = 512;

/**
* The maximum length of the thrift-objects for a jaeger-batch.
*
* @var int
*/
private $maxBufferLength = 64000;

/**
* @param AgentIf $agentClient
Expand Down Expand Up @@ -81,6 +95,11 @@ public function flush(): int
return $count;
}

public function setMaxBufferLength($maxBufferLength)
{
$this->maxBufferLength = $maxBufferLength;
}

/**
* @param JaegerSpan[] $spans
* @return array
Expand Down Expand Up @@ -110,6 +129,52 @@ private function send(array $spans)
return ;
}

$chunks = $this->chunkSplit($spans);
foreach ($chunks as $chunk) {
/** @var JaegerThriftSpan[] $chunk */
$this->emitJaegerBatch($chunk);
}
}

/**
* @param JaegerThriftSpan $span
*/
private function getBufferLength($span)
{
$memoryBuffer = new TMemoryBuffer();
$span->write(new TBinaryProtocol($memoryBuffer));
return $memoryBuffer->available();
}

private function chunkSplit(array $spans): array
{
$actualBufferSize = $this->jaegerBatchOverheadLength;
$chunkId = 0;
$chunks = [];

foreach ($spans as $span) {
$spanBufferLength = $this->getBufferLength($span);
if (!empty($chunks[$chunkId]) && ($actualBufferSize + $spanBufferLength) > $this->maxBufferLength) {
// point to next chunk
++$chunkId;

// reset buffer size
$actualBufferSize = $this->jaegerBatchOverheadLength;
}

if (!isset($chunks[$chunkId])) {
$chunks[$chunkId] = [];
}

$chunks[$chunkId][] = $span;
$actualBufferSize += $spanBufferLength;
}

return $chunks;
}

protected function emitJaegerBatch(array $spans)
{
/** @var Tag[] $tags */
$tags = [];

Expand Down
41 changes: 41 additions & 0 deletions tests/Jaeger/Sender/JaegerThriftSenderTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
namespace Jaeger\Tests\Sender;

use Jaeger\Sender\JaegerSender;
use Jaeger\Sender\UdpSender;
use Jaeger\Span;
use Jaeger\SpanContext;
use Jaeger\Thrift\Agent\AgentClient;
Expand Down Expand Up @@ -41,6 +42,7 @@ public function testFlush(): void

$client = $this->createMock(AgentClient::class);
$sender = new JaegerSender($client);
$sender->setMaxBufferLength(64000);

$client
->expects(self::exactly(1))
Expand All @@ -56,6 +58,7 @@ public function testFlush(): void
public function testEmitBatch() {
$client = $this->createMock(AgentClient::class);
$sender = new JaegerSender($client);
$sender->setMaxBufferLength(64000);

$span = $this->createMock(Span::class);
$span->method('getOperationName')->willReturn('dummy-operation');
Expand All @@ -82,4 +85,42 @@ public function testEmitBatch() {
$sender->append($span);
$this->assertEquals(1, $sender->flush());
}

public function testMaxBufferLength() {
$tracer = $this->createMock(Tracer::class);
$tracer->method('getIpAddress')->willReturn('');
$tracer->method('getServiceName')->willReturn('');

$context = $this->createMock(SpanContext::class);

$span = $this->createMock(Span::class);
$span->method('getOperationName')->willReturn('dummy-operation');
$span->method('getTracer')->willReturn($tracer);
$span->method('getContext')->willReturn($context);

$client = $this->createMock(AgentClient::class);

$mockBuilder = $this->getMockBuilder(JaegerSender::class);
$mockMethods = ['emitJaegerBatch'];
if (method_exists($mockBuilder, "onlyMethods")) {
$mockBuilder = $mockBuilder->onlyMethods($mockMethods);
} else {
$mockBuilder = $mockBuilder->setMethods($mockMethods);
}
$sender = $mockBuilder->setConstructorArgs([$client])->getMock();
$sender->setMaxBufferLength(800);
$sender->expects(self::exactly(2))
->method('emitJaegerBatch')
->withConsecutive(
[self::countOf(2)],
[self::countOf(1)]
);

// jaeger batch overhead parameter = 512
$sender->append($span); // 512 + 143 < 800 - chunk 1
$sender->append($span); // 512 + 143*2 => 798 < 800 - chunk 1
$sender->append($span); // 512 + 143*3 > 800 - chunk 2

self::assertEquals(3, $sender->flush());
}
}

0 comments on commit a48cde0

Please sign in to comment.