Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a request time metric to redis upstream #7890

Merged
merged 60 commits into from
Sep 10, 2019
Merged
Show file tree
Hide file tree
Changes from 59 commits
Commits
Show all changes
60 commits
Select commit Hold shift + click to select a range
e6731a9
Redo test
FAYiEKcbD0XFqF2QK2E4viAHg8rMm2VbjYKdjTg Aug 12, 2019
35b6128
formatting
FAYiEKcbD0XFqF2QK2E4viAHg8rMm2VbjYKdjTg Aug 12, 2019
028924c
Merge branch 'master' into upstream_rq_time-metric
FAYiEKcbD0XFqF2QK2E4viAHg8rMm2VbjYKdjTg Aug 13, 2019
bad7ea4
fix ws
FAYiEKcbD0XFqF2QK2E4viAHg8rMm2VbjYKdjTg Aug 13, 2019
a1288f1
add docs
FAYiEKcbD0XFqF2QK2E4viAHg8rMm2VbjYKdjTg Aug 13, 2019
44cc44d
Add skeleton of stats stuff
FAYiEKcbD0XFqF2QK2E4viAHg8rMm2VbjYKdjTg Aug 14, 2019
723a8fd
fix
FAYiEKcbD0XFqF2QK2E4viAHg8rMm2VbjYKdjTg Aug 14, 2019
2c169e2
record command value
FAYiEKcbD0XFqF2QK2E4viAHg8rMm2VbjYKdjTg Aug 14, 2019
8b05c9b
fix formatting
FAYiEKcbD0XFqF2QK2E4viAHg8rMm2VbjYKdjTg Aug 14, 2019
e74161a
update
FAYiEKcbD0XFqF2QK2E4viAHg8rMm2VbjYKdjTg Aug 15, 2019
8336e68
update
FAYiEKcbD0XFqF2QK2E4viAHg8rMm2VbjYKdjTg Aug 15, 2019
359679e
patched up a bit
FAYiEKcbD0XFqF2QK2E4viAHg8rMm2VbjYKdjTg Aug 15, 2019
506e724
fix counter
FAYiEKcbD0XFqF2QK2E4viAHg8rMm2VbjYKdjTg Aug 15, 2019
054400b
Revert moving RedisClusterStats
FAYiEKcbD0XFqF2QK2E4viAHg8rMm2VbjYKdjTg Aug 15, 2019
e5770f1
disable timer
FAYiEKcbD0XFqF2QK2E4viAHg8rMm2VbjYKdjTg Aug 15, 2019
a110c26
Add config setting to enable stats
FAYiEKcbD0XFqF2QK2E4viAHg8rMm2VbjYKdjTg Aug 15, 2019
4f72856
mend
FAYiEKcbD0XFqF2QK2E4viAHg8rMm2VbjYKdjTg Aug 15, 2019
7af7c7c
fix
FAYiEKcbD0XFqF2QK2E4viAHg8rMm2VbjYKdjTg Aug 16, 2019
e06eb57
try to fix
FAYiEKcbD0XFqF2QK2E4viAHg8rMm2VbjYKdjTg Aug 16, 2019
04314f5
revert
FAYiEKcbD0XFqF2QK2E4viAHg8rMm2VbjYKdjTg Aug 16, 2019
3e8636e
fix stats
FAYiEKcbD0XFqF2QK2E4viAHg8rMm2VbjYKdjTg Aug 16, 2019
6c5a8ca
TODO: Remove prefix?
FAYiEKcbD0XFqF2QK2E4viAHg8rMm2VbjYKdjTg Aug 16, 2019
89c1b26
improve stat prefix?
FAYiEKcbD0XFqF2QK2E4viAHg8rMm2VbjYKdjTg Aug 16, 2019
b4e5944
fix ws/format
FAYiEKcbD0XFqF2QK2E4viAHg8rMm2VbjYKdjTg Aug 16, 2019
ab8047b
fix prefixing
FAYiEKcbD0XFqF2QK2E4viAHg8rMm2VbjYKdjTg Aug 16, 2019
2441b00
update to get command latency
FAYiEKcbD0XFqF2QK2E4viAHg8rMm2VbjYKdjTg Aug 20, 2019
90afb6f
update per henry
FAYiEKcbD0XFqF2QK2E4viAHg8rMm2VbjYKdjTg Aug 20, 2019
94c0a7e
Merge branch 'master' into upstream_rq_time-metric
FAYiEKcbD0XFqF2QK2E4viAHg8rMm2VbjYKdjTg Aug 20, 2019
c945058
remove uneeded millis option, make latency timer have latency suffix
FAYiEKcbD0XFqF2QK2E4viAHg8rMm2VbjYKdjTg Aug 20, 2019
54b168f
lowercase commands for metrics
FAYiEKcbD0XFqF2QK2E4viAHg8rMm2VbjYKdjTg Aug 20, 2019
0f93e03
bit more docs
FAYiEKcbD0XFqF2QK2E4viAHg8rMm2VbjYKdjTg Aug 20, 2019
7f0001c
more docs
FAYiEKcbD0XFqF2QK2E4viAHg8rMm2VbjYKdjTg Aug 20, 2019
d04ef29
fix docs
FAYiEKcbD0XFqF2QK2E4viAHg8rMm2VbjYKdjTg Aug 20, 2019
4a028ae
address henry comments
FAYiEKcbD0XFqF2QK2E4viAHg8rMm2VbjYKdjTg Aug 21, 2019
b25b4d2
simplify slightly
FAYiEKcbD0XFqF2QK2E4viAHg8rMm2VbjYKdjTg Aug 21, 2019
becaba2
use builtins
FAYiEKcbD0XFqF2QK2E4viAHg8rMm2VbjYKdjTg Aug 22, 2019
f6ec8bf
updated
FAYiEKcbD0XFqF2QK2E4viAHg8rMm2VbjYKdjTg Aug 22, 2019
703d0fe
Kick CI
FAYiEKcbD0XFqF2QK2E4viAHg8rMm2VbjYKdjTg Aug 22, 2019
fe218a7
Kick CI
FAYiEKcbD0XFqF2QK2E4viAHg8rMm2VbjYKdjTg Aug 22, 2019
7f70392
Per jmarantz comments
FAYiEKcbD0XFqF2QK2E4viAHg8rMm2VbjYKdjTg Aug 23, 2019
66cb5fd
respond to comments
FAYiEKcbD0XFqF2QK2E4viAHg8rMm2VbjYKdjTg Aug 26, 2019
3ce76bf
Merge branch 'master' into upstream_rq_time-metric
FAYiEKcbD0XFqF2QK2E4viAHg8rMm2VbjYKdjTg Aug 26, 2019
7877ee7
Merge branch 'master' into upstream_rq_time-metric
FAYiEKcbD0XFqF2QK2E4viAHg8rMm2VbjYKdjTg Sep 5, 2019
6d7c4b5
Update for jmarantz new comments, part 1, basics
FAYiEKcbD0XFqF2QK2E4viAHg8rMm2VbjYKdjTg Sep 5, 2019
8d9d3d1
Update for jmarantz new comments, part 2, use stat_name_pool and stat…
FAYiEKcbD0XFqF2QK2E4viAHg8rMm2VbjYKdjTg Sep 5, 2019
bc5a96f
fix formatting
FAYiEKcbD0XFqF2QK2E4viAHg8rMm2VbjYKdjTg Sep 5, 2019
4e82245
update stat_name to command
FAYiEKcbD0XFqF2QK2E4viAHg8rMm2VbjYKdjTg Sep 5, 2019
abf2297
attempt #1, move stats to config
FAYiEKcbD0XFqF2QK2E4viAHg8rMm2VbjYKdjTg Sep 5, 2019
841b4a3
guard command timer on stats enabled
FAYiEKcbD0XFqF2QK2E4viAHg8rMm2VbjYKdjTg Sep 6, 2019
2520b40
fix test override
FAYiEKcbD0XFqF2QK2E4viAHg8rMm2VbjYKdjTg Sep 6, 2019
f76c164
Update command stats ptr name and remove redundant temp
FAYiEKcbD0XFqF2QK2E4viAHg8rMm2VbjYKdjTg Sep 6, 2019
140d9e5
Pass scope around, and store symboltable on command stats
FAYiEKcbD0XFqF2QK2E4viAHg8rMm2VbjYKdjTg Sep 6, 2019
414e5b3
Remove the move part
FAYiEKcbD0XFqF2QK2E4viAHg8rMm2VbjYKdjTg Sep 6, 2019
9ec13ee
refactor boolean enable flag, since now a global redis stats
FAYiEKcbD0XFqF2QK2E4viAHg8rMm2VbjYKdjTg Sep 6, 2019
4ad0e2c
remove some unneeded moves
FAYiEKcbD0XFqF2QK2E4viAHg8rMm2VbjYKdjTg Sep 6, 2019
228af54
fix clang tidy errors
FAYiEKcbD0XFqF2QK2E4viAHg8rMm2VbjYKdjTg Sep 6, 2019
f2b036f
Merge branch 'master' into upstream_rq_time-metric
FAYiEKcbD0XFqF2QK2E4viAHg8rMm2VbjYKdjTg Sep 9, 2019
d578a4b
Add TODO comment for singleton and remove duplicate params
FAYiEKcbD0XFqF2QK2E4viAHg8rMm2VbjYKdjTg Sep 10, 2019
6904da0
fix format
FAYiEKcbD0XFqF2QK2E4viAHg8rMm2VbjYKdjTg Sep 10, 2019
4fe9265
Update per mklein comments
FAYiEKcbD0XFqF2QK2E4viAHg8rMm2VbjYKdjTg Sep 10, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ message RedisProxy {
// downstream unchanged. This limit defaults to 100.
google.protobuf.UInt32Value max_upstream_unknown_connections = 6;

// Enable per-command statistics per upstream cluster, in addition to the filter level aggregate
FAYiEKcbD0XFqF2QK2E4viAHg8rMm2VbjYKdjTg marked this conversation as resolved.
Show resolved Hide resolved
// count.
bool enable_command_stats = 8;

// ReadPolicy controls how Envoy routes read commands to Redis nodes. This is currently
// supported for Redis Cluster. All ReadPolicy settings except MASTER may return stale data
// because replication is asynchronous and requires some delay. You need to ensure that your
Expand Down
12 changes: 12 additions & 0 deletions docs/root/intro/arch_overview/other_protocols/redis.rst
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,18 @@ Every Redis cluster has its own extra statistics tree rooted at *cluster.<name>.

max_upstream_unknown_connections_reached, Counter, Total number of times that an upstream connection to an unknown host is not created after redirection having reached the connection pool's max_upstream_unknown_connections limit
upstream_cx_drained, Counter, Total number of upstream connections drained of active requests before being closed
upstream_commands.upstream_rq_time, Histogram, Histogram of upstream request times for all types of requests
FAYiEKcbD0XFqF2QK2E4viAHg8rMm2VbjYKdjTg marked this conversation as resolved.
Show resolved Hide resolved

Per-cluster command statistics can be enabled via the setting :ref:`enable_command_stats <envoy_api_field_config.filter.network.redis_proxy.v2.RedisProxy.ConnPoolSettings.enable_command_stats>`:

.. csv-table::
:header: Name, Type, Description
:widths: 1, 1, 2

upstream_commands.[command].success, Counter, Total number of successful requests for a specific Redis command
upstream_commands.[command].error, Counter, Total number of failed or cancelled requests for a specific Redis command
upstream_commands.[command].total, Counter, Total number of requests for a specific Redis command (sum of success and error)
upstream_commands.[command].latency, Histogram, Latency of requests for a specific Redis command

Supported commands
------------------
Expand Down
3 changes: 3 additions & 0 deletions docs/root/intro/version_history.rst
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ Version history
* http: remove h2c upgrade headers for HTTP/1 as h2c upgrades are currently not supported.
* listeners: added :ref:`continue_on_listener_filters_timeout <envoy_api_field_Listener.continue_on_listener_filters_timeout>` to configure whether a listener will still create a connection when listener filters time out.
* listeners: added :ref:`HTTP inspector listener filter <config_listener_filters_http_inspector>`.
* redis: added :ref:`read_policy <envoy_api_field_config.filter.network.redis_proxy.v2.RedisProxy.ConnPoolSettings.read_policy>` to allow reading from redis replicas for Redis Cluster deployments.
* redis: added :ref:`enable_command_stats <envoy_api_field_config.filter.network.redis_proxy.v2.RedisProxy.ConnPoolSettings.enable_command_stats>` to enable per command statistics for upstream clusters.
* rbac: added support for DNS SAN as :ref:`principal_name <envoy_api_field_config.rbac.v2.Principal.Authenticated.principal_name>`.
FAYiEKcbD0XFqF2QK2E4viAHg8rMm2VbjYKdjTg marked this conversation as resolved.
Show resolved Hide resolved
* lua: extended `httpCall()` and `respond()` APIs to accept headers with entry values that can be a string or table of strings.
* outlier_detector: added :ref:`support for the grpc-status response header <arch_overview_outlier_detection_grpc>` by mapping it to HTTP status. Guarded by envoy.reloadable_features.outlier_detection_support_for_grpc_status which defaults to true.
* performance: new buffer implementation enabled by default (to disable add "--use-libevent-buffers 1" to the command-line arguments when starting Envoy).
Expand Down
8 changes: 6 additions & 2 deletions source/extensions/clusters/redis/redis_cluster.cc
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,10 @@ RedisCluster::RedisDiscoverySession::RedisDiscoverySession(
NetworkFilters::Common::Redis::Client::ClientFactory& client_factory)
: parent_(parent), dispatcher_(parent.dispatcher_),
resolve_timer_(parent.dispatcher_.createTimer([this]() -> void { startResolveRedis(); })),
client_factory_(client_factory), buffer_timeout_(0) {}
client_factory_(client_factory), buffer_timeout_(0),
redis_command_stats_(
NetworkFilters::Common::Redis::RedisCommandStats::createRedisCommandStats(
FAYiEKcbD0XFqF2QK2E4viAHg8rMm2VbjYKdjTg marked this conversation as resolved.
Show resolved Hide resolved
parent_.info()->statsScope().symbolTable())) {}

// Convert the cluster slot IP/Port response to and address, return null if the response does not
// match the expected type.
Expand Down Expand Up @@ -249,7 +252,8 @@ void RedisCluster::RedisDiscoverySession::startResolveRedis() {
if (!client) {
client = std::make_unique<RedisDiscoveryClient>(*this);
client->host_ = current_host_address_;
client->client_ = client_factory_.create(host, dispatcher_, *this);
client->client_ = client_factory_.create(host, dispatcher_, *this, redis_command_stats_,
parent_.info()->statsScope());
client->client_->addConnectionCallbacks(*client);
std::string auth_password =
Envoy::Config::DataSource::read(parent_.auth_password_datasource_, true, parent_.api_);
Expand Down
2 changes: 2 additions & 0 deletions source/extensions/clusters/redis/redis_cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ class RedisCluster : public Upstream::BaseDynamicClusterImpl {
uint32_t maxBufferSizeBeforeFlush() const override { return 0; }
std::chrono::milliseconds bufferFlushTimeoutInMs() const override { return buffer_timeout_; }
uint32_t maxUpstreamUnknownConnections() const override { return 0; }
bool enableCommandStats() const override { return false; }
// This is effectively not in used for making the "Cluster Slots" calls.
// since we call cluster slots on both the master and slaves, ANY is more appropriate here.
Extensions::NetworkFilters::Common::Redis::Client::ReadPolicy readPolicy() const override {
Expand Down Expand Up @@ -241,6 +242,7 @@ class RedisCluster : public Upstream::BaseDynamicClusterImpl {
Event::TimerPtr resolve_timer_;
NetworkFilters::Common::Redis::Client::ClientFactory& client_factory_;
const std::chrono::milliseconds buffer_timeout_;
NetworkFilters::Common::Redis::RedisCommandStatsSharedPtr redis_command_stats_;
};

Upstream::ClusterManager& cluster_manager_;
Expand Down
17 changes: 17 additions & 0 deletions source/extensions/filters/network/common/redis/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ envoy_cc_library(
hdrs = ["client.h"],
deps = [
":codec_lib",
":redis_command_stats_lib",
"//include/envoy/upstream:cluster_manager_interface",
],
)
Expand All @@ -58,6 +59,7 @@ envoy_cc_library(
":client_interface",
":codec_lib",
"//include/envoy/router:router_interface",
"//include/envoy/stats:timespan",
"//include/envoy/thread_local:thread_local_interface",
"//include/envoy/upstream:cluster_manager_interface",
"//source/common/buffer:buffer_lib",
Expand All @@ -78,3 +80,18 @@ envoy_cc_library(
":codec_lib",
],
)

envoy_cc_library(
name = "redis_command_stats_lib",
srcs = ["redis_command_stats.cc"],
hdrs = ["redis_command_stats.h"],
deps = [
":codec_interface",
":supported_commands_lib",
"//include/envoy/stats:stats_interface",
"//include/envoy/stats:timespan",
"//source/common/common:to_lower_table_lib",
"//source/common/common:utility_lib",
"//source/common/stats:symbol_table_lib",
],
)
10 changes: 9 additions & 1 deletion source/extensions/filters/network/common/redis/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include "envoy/upstream/cluster_manager.h"

#include "extensions/filters/network/common/redis/codec_impl.h"
#include "extensions/filters/network/common/redis/redis_command_stats.h"

namespace Envoy {
namespace Extensions {
Expand Down Expand Up @@ -163,6 +164,11 @@ class Config {
*/
virtual uint32_t maxUpstreamUnknownConnections() const PURE;

/**
* @return when enabled, upstream cluster per-command statistics will be recorded.
*/
virtual bool enableCommandStats() const PURE;

/**
* @return the read policy the proxy should use.
*/
Expand All @@ -184,7 +190,9 @@ class ClientFactory {
* @return ClientPtr a new connection pool client.
*/
virtual ClientPtr create(Upstream::HostConstSharedPtr host, Event::Dispatcher& dispatcher,
const Config& config) PURE;
const Config& config,
const RedisCommandStatsSharedPtr& redis_command_stats,
Stats::Scope& scope) PURE;
};

} // namespace Client
Expand Down
58 changes: 45 additions & 13 deletions source/extensions/filters/network/common/redis/client_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ ConfigImpl::ConfigImpl(
3)), // Default timeout is 3ms. If max_buffer_size_before_flush is zero, this is not used
// as the buffer is flushed on each request immediately.
max_upstream_unknown_connections_(
PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, max_upstream_unknown_connections, 100)) {
PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, max_upstream_unknown_connections, 100)),
enable_command_stats_(config.enable_command_stats()) {
switch (config.read_policy()) {
case envoy::config::filter::network::redis_proxy::v2::
RedisProxy_ConnPoolSettings_ReadPolicy_MASTER:
Expand Down Expand Up @@ -48,10 +49,11 @@ ConfigImpl::ConfigImpl(

ClientPtr ClientImpl::create(Upstream::HostConstSharedPtr host, Event::Dispatcher& dispatcher,
EncoderPtr&& encoder, DecoderFactory& decoder_factory,
const Config& config) {

std::unique_ptr<ClientImpl> client(
new ClientImpl(host, dispatcher, std::move(encoder), decoder_factory, config));
const Config& config,
const RedisCommandStatsSharedPtr& redis_command_stats,
Stats::Scope& scope) {
auto client = std::make_unique<ClientImpl>(host, dispatcher, std::move(encoder), decoder_factory,
config, redis_command_stats, scope);
client->connection_ = host->createConnection(dispatcher, nullptr, nullptr).connection_;
client->connection_->addConnectionCallbacks(*client);
client->connection_->addReadFilter(Network::ReadFilterSharedPtr{new UpstreamReadFilter(*client)});
Expand All @@ -61,11 +63,14 @@ ClientPtr ClientImpl::create(Upstream::HostConstSharedPtr host, Event::Dispatche
}

ClientImpl::ClientImpl(Upstream::HostConstSharedPtr host, Event::Dispatcher& dispatcher,
EncoderPtr&& encoder, DecoderFactory& decoder_factory, const Config& config)
EncoderPtr&& encoder, DecoderFactory& decoder_factory, const Config& config,
const RedisCommandStatsSharedPtr& redis_command_stats, Stats::Scope& scope)
: host_(host), encoder_(std::move(encoder)), decoder_(decoder_factory.create(*this)),
config_(config),
connect_or_op_timer_(dispatcher.createTimer([this]() -> void { onConnectOrOpTimeout(); })),
flush_timer_(dispatcher.createTimer([this]() -> void { flushBufferAndResetTimer(); })) {
connect_or_op_timer_(dispatcher.createTimer([this]() { onConnectOrOpTimeout(); })),
flush_timer_(dispatcher.createTimer([this]() { flushBufferAndResetTimer(); })),
time_source_(dispatcher.timeSource()), redis_command_stats_(redis_command_stats),
scope_(scope) {
host->cluster().stats().upstream_cx_total_.inc();
host->stats().cx_total_.inc();
host->cluster().stats().upstream_cx_active_.inc();
Expand Down Expand Up @@ -94,7 +99,17 @@ PoolRequest* ClientImpl::makeRequest(const RespValue& request, PoolCallbacks& ca

const bool empty_buffer = encoder_buffer_.length() == 0;

pending_requests_.emplace_back(*this, callbacks);
Stats::StatName command;
if (config_.enableCommandStats()) {
// Only lowercase command and get StatName if we enable command stats
command = redis_command_stats_->getCommandFromRequest(request);
redis_command_stats_->updateStatsTotal(scope_, command);
} else {
// If disabled, we use a placeholder stat name "unused" that is not used
command = redis_command_stats_->getUnusedStatName();
}

pending_requests_.emplace_back(*this, callbacks, command);
encoder_->encode(request, encoder_buffer_);

// If buffer is full, flush. If the buffer was empty before the request, start the timer.
Expand Down Expand Up @@ -186,6 +201,14 @@ void ClientImpl::onRespValue(RespValuePtr&& value) {
ASSERT(!pending_requests_.empty());
PendingRequest& request = pending_requests_.front();
const bool canceled = request.canceled_;

if (config_.enableCommandStats()) {
bool success = !canceled && (value->type() != Common::Redis::RespType::Error);
redis_command_stats_->updateStats(scope_, request.command_, success);
request.command_request_timer_->complete();
}
request.aggregate_request_timer_->complete();

PoolCallbacks& callbacks = request.callbacks_;

// We need to ensure the request is popped before calling the callback, since the callback might
Expand Down Expand Up @@ -225,8 +248,15 @@ void ClientImpl::onRespValue(RespValuePtr&& value) {
putOutlierEvent(Upstream::Outlier::Result::EXT_ORIGIN_REQUEST_SUCCESS);
}

ClientImpl::PendingRequest::PendingRequest(ClientImpl& parent, PoolCallbacks& callbacks)
: parent_(parent), callbacks_(callbacks) {
ClientImpl::PendingRequest::PendingRequest(ClientImpl& parent, PoolCallbacks& callbacks,
Stats::StatName command)
: parent_(parent), callbacks_(callbacks), command_{command},
FAYiEKcbD0XFqF2QK2E4viAHg8rMm2VbjYKdjTg marked this conversation as resolved.
Show resolved Hide resolved
aggregate_request_timer_(parent_.redis_command_stats_->createAggregateTimer(
parent_.scope_, parent_.time_source_)) {
if (parent_.config_.enableCommandStats()) {
command_request_timer_ = parent_.redis_command_stats_->createCommandTimer(
parent_.scope_, command_, parent_.time_source_);
}
parent.host_->cluster().stats().upstream_rq_total_.inc();
parent.host_->stats().rq_total_.inc();
parent.host_->cluster().stats().upstream_rq_active_.inc();
Expand All @@ -248,9 +278,11 @@ void ClientImpl::PendingRequest::cancel() {
ClientFactoryImpl ClientFactoryImpl::instance_;

ClientPtr ClientFactoryImpl::create(Upstream::HostConstSharedPtr host,
Event::Dispatcher& dispatcher, const Config& config) {
Event::Dispatcher& dispatcher, const Config& config,
const RedisCommandStatsSharedPtr& redis_command_stats,
Stats::Scope& scope) {
return ClientImpl::create(host, dispatcher, EncoderPtr{new EncoderImpl()}, decoder_factory_,
config);
config, redis_command_stats, scope);
}

} // namespace Client
Expand Down
23 changes: 18 additions & 5 deletions source/extensions/filters/network/common/redis/client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <chrono>

#include "envoy/config/filter/network/redis_proxy/v2/redis_proxy.pb.h"
#include "envoy/stats/timespan.h"
#include "envoy/thread_local/thread_local.h"
#include "envoy/upstream/cluster_manager.h"

Expand Down Expand Up @@ -49,6 +50,7 @@ class ConfigImpl : public Config {
uint32_t maxUpstreamUnknownConnections() const override {
return max_upstream_unknown_connections_;
}
bool enableCommandStats() const override { return enable_command_stats_; }
ReadPolicy readPolicy() const override { return read_policy_; }

private:
Expand All @@ -58,15 +60,21 @@ class ConfigImpl : public Config {
const uint32_t max_buffer_size_before_flush_;
const std::chrono::milliseconds buffer_flush_timeout_;
const uint32_t max_upstream_unknown_connections_;
const bool enable_command_stats_;
ReadPolicy read_policy_;
};

class ClientImpl : public Client, public DecoderCallbacks, public Network::ConnectionCallbacks {
public:
static ClientPtr create(Upstream::HostConstSharedPtr host, Event::Dispatcher& dispatcher,
EncoderPtr&& encoder, DecoderFactory& decoder_factory,
const Config& config);
const Config& config,
const RedisCommandStatsSharedPtr& redis_command_stats,
Stats::Scope& scope);

ClientImpl(Upstream::HostConstSharedPtr host, Event::Dispatcher& dispatcher, EncoderPtr&& encoder,
DecoderFactory& decoder_factory, const Config& config,
const RedisCommandStatsSharedPtr& redis_command_stats, Stats::Scope& scope);
~ClientImpl() override;

// Client
Expand Down Expand Up @@ -94,19 +102,20 @@ class ClientImpl : public Client, public DecoderCallbacks, public Network::Conne
};

struct PendingRequest : public PoolRequest {
PendingRequest(ClientImpl& parent, PoolCallbacks& callbacks);
PendingRequest(ClientImpl& parent, PoolCallbacks& callbacks, Stats::StatName stat_name);
~PendingRequest() override;

// PoolRequest
void cancel() override;

ClientImpl& parent_;
PoolCallbacks& callbacks_;
Stats::StatName command_;
bool canceled_{};
Stats::CompletableTimespanPtr aggregate_request_timer_;
Stats::CompletableTimespanPtr command_request_timer_;
};

ClientImpl(Upstream::HostConstSharedPtr host, Event::Dispatcher& dispatcher, EncoderPtr&& encoder,
DecoderFactory& decoder_factory, const Config& config);
void onConnectOrOpTimeout();
void onData(Buffer::Instance& data);
void putOutlierEvent(Upstream::Outlier::Result result);
Expand All @@ -129,13 +138,17 @@ class ClientImpl : public Client, public DecoderCallbacks, public Network::Conne
Event::TimerPtr connect_or_op_timer_;
bool connected_{};
Event::TimerPtr flush_timer_;
Envoy::TimeSource& time_source_;
const RedisCommandStatsSharedPtr redis_command_stats_;
Stats::Scope& scope_;
};

class ClientFactoryImpl : public ClientFactory {
public:
// RedisProxy::ConnPool::ClientFactoryImpl
ClientPtr create(Upstream::HostConstSharedPtr host, Event::Dispatcher& dispatcher,
const Config& config) override;
const Config& config, const RedisCommandStatsSharedPtr& redis_command_stats,
Stats::Scope& scope) override;

static ClientFactoryImpl instance_;

Expand Down
Loading