Skip to content

Commit

Permalink
redis: add a request time metric to redis upstream (envoyproxy#7890)
Browse files Browse the repository at this point in the history
Signed-off-by: Nicolas Flacco <nflacco@lyft.com>
  • Loading branch information
1 parent 2603cf1 commit 1e16f9c
Show file tree
Hide file tree
Showing 21 changed files with 346 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ message RedisProxy {
// downstream unchanged. This limit defaults to 100.
google.protobuf.UInt32Value max_upstream_unknown_connections = 6;

// Enable per-command statistics per upstream cluster, in addition to the filter level aggregate
// count.
bool enable_command_stats = 8;

// ReadPolicy controls how Envoy routes read commands to Redis nodes. This is currently
// supported for Redis Cluster. All ReadPolicy settings except MASTER may return stale data
// because replication is asynchronous and requires some delay. You need to ensure that your
Expand Down
16 changes: 16 additions & 0 deletions docs/root/intro/arch_overview/other_protocols/redis.rst
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ If passive healthchecking is desired, also configure
For the purposes of passive healthchecking, connect timeouts, command timeouts, and connection
close map to 5xx. All other responses from Redis are counted as a success.

.. _arch_overview_redis_cluster_support:

Redis Cluster Support (Experimental)
----------------------------------------

Expand Down Expand Up @@ -90,6 +92,20 @@ Every Redis cluster has its own extra statistics tree rooted at *cluster.<name>.

max_upstream_unknown_connections_reached, Counter, Total number of times that an upstream connection to an unknown host is not created after redirection having reached the connection pool's max_upstream_unknown_connections limit
upstream_cx_drained, Counter, Total number of upstream connections drained of active requests before being closed
upstream_commands.upstream_rq_time, Histogram, Histogram of upstream request times for all types of requests

.. _arch_overview_redis_cluster_command_stats:

Per-cluster command statistics can be enabled via the setting :ref:`enable_command_stats <envoy_api_field_config.filter.network.redis_proxy.v2.RedisProxy.ConnPoolSettings.enable_command_stats>`:

.. csv-table::
:header: Name, Type, Description
:widths: 1, 1, 2

upstream_commands.[command].success, Counter, Total number of successful requests for a specific Redis command
upstream_commands.[command].error, Counter, Total number of failed or cancelled requests for a specific Redis command
upstream_commands.[command].total, Counter, Total number of requests for a specific Redis command (sum of success and error)
upstream_commands.[command].latency, Histogram, Latency of requests for a specific Redis command

Supported commands
------------------
Expand Down
1 change: 1 addition & 0 deletions docs/root/intro/version_history.rst
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ Version history
* performance: new buffer implementation enabled by default (to disable add "--use-libevent-buffers 1" to the command-line arguments when starting Envoy).
* performance: stats symbol table implementation (disabled by default; to test it, add "--use-fake-symbol-table 0" to the command-line arguments when starting Envoy).
* rbac: added support for DNS SAN as :ref:`principal_name <envoy_api_field_config.rbac.v2.Principal.Authenticated.principal_name>`.
* redis: added :ref:`enable_command_stats <envoy_api_field_config.filter.network.redis_proxy.v2.RedisProxy.ConnPoolSettings.enable_command_stats>` to enable :ref:`per command statistics <arch_overview_redis_cluster_command_stats>` for upstream clusters.
* redis: added :ref:`read_policy <envoy_api_field_config.filter.network.redis_proxy.v2.RedisProxy.ConnPoolSettings.read_policy>` to allow reading from redis replicas for Redis Cluster deployments.
* regex: introduce new :ref:`RegexMatcher <envoy_api_msg_type.matcher.RegexMatcher>` type that
provides a safe regex implementation for untrusted user input. This type is now used in all
Expand Down
8 changes: 6 additions & 2 deletions source/extensions/clusters/redis/redis_cluster.cc
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,10 @@ RedisCluster::RedisDiscoverySession::RedisDiscoverySession(
NetworkFilters::Common::Redis::Client::ClientFactory& client_factory)
: parent_(parent), dispatcher_(parent.dispatcher_),
resolve_timer_(parent.dispatcher_.createTimer([this]() -> void { startResolveRedis(); })),
client_factory_(client_factory), buffer_timeout_(0) {}
client_factory_(client_factory), buffer_timeout_(0),
redis_command_stats_(
NetworkFilters::Common::Redis::RedisCommandStats::createRedisCommandStats(
parent_.info()->statsScope().symbolTable())) {}

// Convert the cluster slot IP/Port response to and address, return null if the response does not
// match the expected type.
Expand Down Expand Up @@ -249,7 +252,8 @@ void RedisCluster::RedisDiscoverySession::startResolveRedis() {
if (!client) {
client = std::make_unique<RedisDiscoveryClient>(*this);
client->host_ = current_host_address_;
client->client_ = client_factory_.create(host, dispatcher_, *this);
client->client_ = client_factory_.create(host, dispatcher_, *this, redis_command_stats_,
parent_.info()->statsScope());
client->client_->addConnectionCallbacks(*client);
std::string auth_password =
Envoy::Config::DataSource::read(parent_.auth_password_datasource_, true, parent_.api_);
Expand Down
2 changes: 2 additions & 0 deletions source/extensions/clusters/redis/redis_cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ class RedisCluster : public Upstream::BaseDynamicClusterImpl {
uint32_t maxBufferSizeBeforeFlush() const override { return 0; }
std::chrono::milliseconds bufferFlushTimeoutInMs() const override { return buffer_timeout_; }
uint32_t maxUpstreamUnknownConnections() const override { return 0; }
bool enableCommandStats() const override { return false; }
// This is effectively not in used for making the "Cluster Slots" calls.
// since we call cluster slots on both the master and slaves, ANY is more appropriate here.
Extensions::NetworkFilters::Common::Redis::Client::ReadPolicy readPolicy() const override {
Expand Down Expand Up @@ -241,6 +242,7 @@ class RedisCluster : public Upstream::BaseDynamicClusterImpl {
Event::TimerPtr resolve_timer_;
NetworkFilters::Common::Redis::Client::ClientFactory& client_factory_;
const std::chrono::milliseconds buffer_timeout_;
NetworkFilters::Common::Redis::RedisCommandStatsSharedPtr redis_command_stats_;
};

Upstream::ClusterManager& cluster_manager_;
Expand Down
17 changes: 17 additions & 0 deletions source/extensions/filters/network/common/redis/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ envoy_cc_library(
hdrs = ["client.h"],
deps = [
":codec_lib",
":redis_command_stats_lib",
"//include/envoy/upstream:cluster_manager_interface",
],
)
Expand All @@ -58,6 +59,7 @@ envoy_cc_library(
":client_interface",
":codec_lib",
"//include/envoy/router:router_interface",
"//include/envoy/stats:timespan",
"//include/envoy/thread_local:thread_local_interface",
"//include/envoy/upstream:cluster_manager_interface",
"//source/common/buffer:buffer_lib",
Expand All @@ -78,3 +80,18 @@ envoy_cc_library(
":codec_lib",
],
)

envoy_cc_library(
name = "redis_command_stats_lib",
srcs = ["redis_command_stats.cc"],
hdrs = ["redis_command_stats.h"],
deps = [
":codec_interface",
":supported_commands_lib",
"//include/envoy/stats:stats_interface",
"//include/envoy/stats:timespan",
"//source/common/common:to_lower_table_lib",
"//source/common/common:utility_lib",
"//source/common/stats:symbol_table_lib",
],
)
10 changes: 9 additions & 1 deletion source/extensions/filters/network/common/redis/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include "envoy/upstream/cluster_manager.h"

#include "extensions/filters/network/common/redis/codec_impl.h"
#include "extensions/filters/network/common/redis/redis_command_stats.h"

namespace Envoy {
namespace Extensions {
Expand Down Expand Up @@ -163,6 +164,11 @@ class Config {
*/
virtual uint32_t maxUpstreamUnknownConnections() const PURE;

/**
* @return when enabled, upstream cluster per-command statistics will be recorded.
*/
virtual bool enableCommandStats() const PURE;

/**
* @return the read policy the proxy should use.
*/
Expand All @@ -184,7 +190,9 @@ class ClientFactory {
* @return ClientPtr a new connection pool client.
*/
virtual ClientPtr create(Upstream::HostConstSharedPtr host, Event::Dispatcher& dispatcher,
const Config& config) PURE;
const Config& config,
const RedisCommandStatsSharedPtr& redis_command_stats,
Stats::Scope& scope) PURE;
};

} // namespace Client
Expand Down
58 changes: 45 additions & 13 deletions source/extensions/filters/network/common/redis/client_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ ConfigImpl::ConfigImpl(
3)), // Default timeout is 3ms. If max_buffer_size_before_flush is zero, this is not used
// as the buffer is flushed on each request immediately.
max_upstream_unknown_connections_(
PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, max_upstream_unknown_connections, 100)) {
PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, max_upstream_unknown_connections, 100)),
enable_command_stats_(config.enable_command_stats()) {
switch (config.read_policy()) {
case envoy::config::filter::network::redis_proxy::v2::
RedisProxy_ConnPoolSettings_ReadPolicy_MASTER:
Expand Down Expand Up @@ -48,10 +49,11 @@ ConfigImpl::ConfigImpl(

ClientPtr ClientImpl::create(Upstream::HostConstSharedPtr host, Event::Dispatcher& dispatcher,
EncoderPtr&& encoder, DecoderFactory& decoder_factory,
const Config& config) {

std::unique_ptr<ClientImpl> client(
new ClientImpl(host, dispatcher, std::move(encoder), decoder_factory, config));
const Config& config,
const RedisCommandStatsSharedPtr& redis_command_stats,
Stats::Scope& scope) {
auto client = std::make_unique<ClientImpl>(host, dispatcher, std::move(encoder), decoder_factory,
config, redis_command_stats, scope);
client->connection_ = host->createConnection(dispatcher, nullptr, nullptr).connection_;
client->connection_->addConnectionCallbacks(*client);
client->connection_->addReadFilter(Network::ReadFilterSharedPtr{new UpstreamReadFilter(*client)});
Expand All @@ -61,11 +63,14 @@ ClientPtr ClientImpl::create(Upstream::HostConstSharedPtr host, Event::Dispatche
}

ClientImpl::ClientImpl(Upstream::HostConstSharedPtr host, Event::Dispatcher& dispatcher,
EncoderPtr&& encoder, DecoderFactory& decoder_factory, const Config& config)
EncoderPtr&& encoder, DecoderFactory& decoder_factory, const Config& config,
const RedisCommandStatsSharedPtr& redis_command_stats, Stats::Scope& scope)
: host_(host), encoder_(std::move(encoder)), decoder_(decoder_factory.create(*this)),
config_(config),
connect_or_op_timer_(dispatcher.createTimer([this]() -> void { onConnectOrOpTimeout(); })),
flush_timer_(dispatcher.createTimer([this]() -> void { flushBufferAndResetTimer(); })) {
connect_or_op_timer_(dispatcher.createTimer([this]() { onConnectOrOpTimeout(); })),
flush_timer_(dispatcher.createTimer([this]() { flushBufferAndResetTimer(); })),
time_source_(dispatcher.timeSource()), redis_command_stats_(redis_command_stats),
scope_(scope) {
host->cluster().stats().upstream_cx_total_.inc();
host->stats().cx_total_.inc();
host->cluster().stats().upstream_cx_active_.inc();
Expand Down Expand Up @@ -94,7 +99,17 @@ PoolRequest* ClientImpl::makeRequest(const RespValue& request, PoolCallbacks& ca

const bool empty_buffer = encoder_buffer_.length() == 0;

pending_requests_.emplace_back(*this, callbacks);
Stats::StatName command;
if (config_.enableCommandStats()) {
// Only lowercase command and get StatName if we enable command stats
command = redis_command_stats_->getCommandFromRequest(request);
redis_command_stats_->updateStatsTotal(scope_, command);
} else {
// If disabled, we use a placeholder stat name "unused" that is not used
command = redis_command_stats_->getUnusedStatName();
}

pending_requests_.emplace_back(*this, callbacks, command);
encoder_->encode(request, encoder_buffer_);

// If buffer is full, flush. If the buffer was empty before the request, start the timer.
Expand Down Expand Up @@ -186,6 +201,14 @@ void ClientImpl::onRespValue(RespValuePtr&& value) {
ASSERT(!pending_requests_.empty());
PendingRequest& request = pending_requests_.front();
const bool canceled = request.canceled_;

if (config_.enableCommandStats()) {
bool success = !canceled && (value->type() != Common::Redis::RespType::Error);
redis_command_stats_->updateStats(scope_, request.command_, success);
request.command_request_timer_->complete();
}
request.aggregate_request_timer_->complete();

PoolCallbacks& callbacks = request.callbacks_;

// We need to ensure the request is popped before calling the callback, since the callback might
Expand Down Expand Up @@ -225,8 +248,15 @@ void ClientImpl::onRespValue(RespValuePtr&& value) {
putOutlierEvent(Upstream::Outlier::Result::EXT_ORIGIN_REQUEST_SUCCESS);
}

ClientImpl::PendingRequest::PendingRequest(ClientImpl& parent, PoolCallbacks& callbacks)
: parent_(parent), callbacks_(callbacks) {
ClientImpl::PendingRequest::PendingRequest(ClientImpl& parent, PoolCallbacks& callbacks,
Stats::StatName command)
: parent_(parent), callbacks_(callbacks), command_{command},
aggregate_request_timer_(parent_.redis_command_stats_->createAggregateTimer(
parent_.scope_, parent_.time_source_)) {
if (parent_.config_.enableCommandStats()) {
command_request_timer_ = parent_.redis_command_stats_->createCommandTimer(
parent_.scope_, command_, parent_.time_source_);
}
parent.host_->cluster().stats().upstream_rq_total_.inc();
parent.host_->stats().rq_total_.inc();
parent.host_->cluster().stats().upstream_rq_active_.inc();
Expand All @@ -248,9 +278,11 @@ void ClientImpl::PendingRequest::cancel() {
ClientFactoryImpl ClientFactoryImpl::instance_;

ClientPtr ClientFactoryImpl::create(Upstream::HostConstSharedPtr host,
Event::Dispatcher& dispatcher, const Config& config) {
Event::Dispatcher& dispatcher, const Config& config,
const RedisCommandStatsSharedPtr& redis_command_stats,
Stats::Scope& scope) {
return ClientImpl::create(host, dispatcher, EncoderPtr{new EncoderImpl()}, decoder_factory_,
config);
config, redis_command_stats, scope);
}

} // namespace Client
Expand Down
23 changes: 18 additions & 5 deletions source/extensions/filters/network/common/redis/client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <chrono>

#include "envoy/config/filter/network/redis_proxy/v2/redis_proxy.pb.h"
#include "envoy/stats/timespan.h"
#include "envoy/thread_local/thread_local.h"
#include "envoy/upstream/cluster_manager.h"

Expand Down Expand Up @@ -49,6 +50,7 @@ class ConfigImpl : public Config {
uint32_t maxUpstreamUnknownConnections() const override {
return max_upstream_unknown_connections_;
}
bool enableCommandStats() const override { return enable_command_stats_; }
ReadPolicy readPolicy() const override { return read_policy_; }

private:
Expand All @@ -58,15 +60,21 @@ class ConfigImpl : public Config {
const uint32_t max_buffer_size_before_flush_;
const std::chrono::milliseconds buffer_flush_timeout_;
const uint32_t max_upstream_unknown_connections_;
const bool enable_command_stats_;
ReadPolicy read_policy_;
};

class ClientImpl : public Client, public DecoderCallbacks, public Network::ConnectionCallbacks {
public:
static ClientPtr create(Upstream::HostConstSharedPtr host, Event::Dispatcher& dispatcher,
EncoderPtr&& encoder, DecoderFactory& decoder_factory,
const Config& config);
const Config& config,
const RedisCommandStatsSharedPtr& redis_command_stats,
Stats::Scope& scope);

ClientImpl(Upstream::HostConstSharedPtr host, Event::Dispatcher& dispatcher, EncoderPtr&& encoder,
DecoderFactory& decoder_factory, const Config& config,
const RedisCommandStatsSharedPtr& redis_command_stats, Stats::Scope& scope);
~ClientImpl() override;

// Client
Expand Down Expand Up @@ -94,19 +102,20 @@ class ClientImpl : public Client, public DecoderCallbacks, public Network::Conne
};

struct PendingRequest : public PoolRequest {
PendingRequest(ClientImpl& parent, PoolCallbacks& callbacks);
PendingRequest(ClientImpl& parent, PoolCallbacks& callbacks, Stats::StatName stat_name);
~PendingRequest() override;

// PoolRequest
void cancel() override;

ClientImpl& parent_;
PoolCallbacks& callbacks_;
Stats::StatName command_;
bool canceled_{};
Stats::CompletableTimespanPtr aggregate_request_timer_;
Stats::CompletableTimespanPtr command_request_timer_;
};

ClientImpl(Upstream::HostConstSharedPtr host, Event::Dispatcher& dispatcher, EncoderPtr&& encoder,
DecoderFactory& decoder_factory, const Config& config);
void onConnectOrOpTimeout();
void onData(Buffer::Instance& data);
void putOutlierEvent(Upstream::Outlier::Result result);
Expand All @@ -129,13 +138,17 @@ class ClientImpl : public Client, public DecoderCallbacks, public Network::Conne
Event::TimerPtr connect_or_op_timer_;
bool connected_{};
Event::TimerPtr flush_timer_;
Envoy::TimeSource& time_source_;
const RedisCommandStatsSharedPtr redis_command_stats_;
Stats::Scope& scope_;
};

class ClientFactoryImpl : public ClientFactory {
public:
// RedisProxy::ConnPool::ClientFactoryImpl
ClientPtr create(Upstream::HostConstSharedPtr host, Event::Dispatcher& dispatcher,
const Config& config) override;
const Config& config, const RedisCommandStatsSharedPtr& redis_command_stats,
Stats::Scope& scope) override;

static ClientFactoryImpl instance_;

Expand Down
Loading

0 comments on commit 1e16f9c

Please sign in to comment.