Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Metadata Exchange Filter #2325

Merged
merged 12 commits into from
Oct 8, 2019
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/envoy/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ envoy_cc_binary(
"//src/envoy/http/authn:filter_lib",
"//src/envoy/http/jwt_auth:http_filter_factory",
"//src/envoy/http/mixer:filter_lib",
"//src/envoy/tcp/alpn_proxy:config_lib",
"//src/envoy/tcp/forward_downstream_sni:config_lib",
"//src/envoy/tcp/mixer:filter_lib",
"//src/envoy/tcp/sni_verifier:config_lib",
Expand Down
87 changes: 87 additions & 0 deletions src/envoy/tcp/alpn_proxy/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
# Copyright 2019 Istio Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
################################################################################
#

# Alpn Proxy filter

load(
"@envoy//bazel:envoy_build_system.bzl",
"envoy_cc_library",
"envoy_cc_test",
"envoy_package",
)

envoy_package()

envoy_cc_library(
name = "alpn_proxy",
srcs = [
"alpn_proxy.cc",
"alpn_proxy_initial_header.cc",
],
hdrs = [
"alpn_proxy.h",
"alpn_proxy_initial_header.h",
],
repository = "@envoy",
deps = [
"//src/envoy/tcp/alpn_proxy/config:alpn_proxy_cc_proto",
"@com_google_absl//absl/base:core_headers",
"@com_google_absl//absl/base:endian",
"@com_google_absl//absl/strings",
"@envoy//include/envoy/local_info:local_info_interface",
"@envoy//include/envoy/network:connection_interface",
"@envoy//include/envoy/network:filter_interface",
"@envoy//include/envoy/runtime:runtime_interface",
"@envoy//include/envoy/stats:stats_macros",
"@envoy//source/common/http:utility_lib",
"@envoy//source/common/network:utility_lib",
"@envoy//source/common/protobuf",
"@envoy//source/common/protobuf:utility_lib",
"@envoy//source/extensions/filters/network:well_known_names",
],
)

envoy_cc_library(
name = "config_lib",
srcs = ["config.cc"],
hdrs = ["config.h"],
repository = "@envoy",
visibility = ["//visibility:public"],
deps = [
":alpn_proxy",
"//src/envoy/tcp/alpn_proxy/config:alpn_proxy_cc_proto",
"//src/envoy/utils:utils_lib",
"@envoy//include/envoy/registry",
"@envoy//source/extensions/filters/network/common:factory_base_lib",
],
)

envoy_cc_test(
name = "alpnproxy_test",
srcs = [
"alpn_proxy_test.cc",
],
repository = "@envoy",
deps = [
":alpn_proxy",
":config_lib",
"@envoy//source/common/protobuf",
"@envoy//test/mocks/local_info:local_info_mocks",
"@envoy//test/mocks/network:network_mocks",
"@envoy//test/mocks/protobuf:protobuf_mocks",
],
)
259 changes: 259 additions & 0 deletions src/envoy/tcp/alpn_proxy/alpn_proxy.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,259 @@
/* Copyright 2019 Istio Authors. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <cstdint>
#include <string>

#include "absl/base/internal/endian.h"
#include "absl/strings/string_view.h"
#include "common/buffer/buffer_impl.h"
#include "common/protobuf/utility.h"
#include "envoy/network/connection.h"
#include "envoy/stats/scope.h"
#include "src/envoy/tcp/alpn_proxy/alpn_proxy.h"
#include "src/envoy/tcp/alpn_proxy/alpn_proxy_initial_header.h"

namespace Envoy {
namespace Tcp {
namespace AlpnProxy {
namespace {

std::unique_ptr<::Envoy::Buffer::OwnedImpl> constructProxyHeaderData(
const Envoy::ProtobufWkt::Any& proxy_data) {
AlpnProxyInitialHeader initial_header;
std::string proxy_data_str = proxy_data.SerializeAsString();
// Converting from host to network byte order so that most significant byte is
// placed first.
initial_header.magic = absl::ghtonl(AlpnProxyInitialHeader::magic_number);
kyessenov marked this conversation as resolved.
Show resolved Hide resolved
initial_header.data_size = absl::ghtonl(proxy_data_str.length());

::Envoy::Buffer::OwnedImpl initial_header_buffer{
absl::string_view(reinterpret_cast<const char*>(&initial_header),
sizeof(AlpnProxyInitialHeader))};
auto proxy_data_buffer =
std::make_unique<::Envoy::Buffer::OwnedImpl>(proxy_data_str);
proxy_data_buffer->prepend(initial_header_buffer);
return proxy_data_buffer;
}

} // namespace

AlpnProxyConfig::AlpnProxyConfig(const std::string& stat_prefix,
const std::string& protocol,
const std::string& node_metadata_id,
const FilterDirection filter_direction,
Stats::Scope& scope)
: scope_(scope),
stat_prefix_(stat_prefix),
protocol_(protocol),
node_metadata_id_(node_metadata_id),
filter_direction_(filter_direction),
stats_(generateStats(stat_prefix, scope)) {}

Network::FilterStatus AlpnProxyFilter::onData(Buffer::Instance& data, bool) {
switch (conn_state_) {
case Invalid:
case Done:
// No work needed if connection state is Done or Invalid.
return Network::FilterStatus::Continue;
case ConnProtocolNotRead: {
// If Alpn protocol is not the expected one, then return.
// Else find and write node metadata.
if (read_callbacks_->connection().nextProtocol() != config_->protocol_) {
conn_state_ = Invalid;
config_->stats().alpn_protocol_not_found_.inc();
return Network::FilterStatus::Continue;
}
conn_state_ = WriteMetadata;
config_->stats().alpn_protocol_found_.inc();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this has to fall-through to WriteMetadata. Can you make that explicit or add a comment?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.. removed the else stmt.

case WriteMetadata: {
// TODO(gargnupur): Try to move this just after alpn protocol is
// determined and first onData is called in Downstream filter.
// If downstream filter, write metadata.
// Otherwise, go ahead and try to read initial header and proxy data.
writeNodeMetadata();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, this is intended to fall through I think.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

case ReadingInitialHeader:
case NeedMoreDataInitialHeader: {
tryReadInitialProxyHeader(data);
if (conn_state_ == NeedMoreDataInitialHeader) {
return Network::FilterStatus::StopIteration;
}
if (conn_state_ == Invalid) {
return Network::FilterStatus::Continue;
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here. If buffer is large, then we need to drain the proxy header.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought "return Network::FilterStatus::StopIteration;" is doing that and we don't need to drain "data" and buffer bytes read till now..

case ReadingProxyHeader:
case NeedMoreDataProxyHeader: {
tryReadProxyData(data);
if (conn_state_ == NeedMoreDataProxyHeader) {
return Network::FilterStatus::StopIteration;
}
if (conn_state_ == Invalid) {
return Network::FilterStatus::Continue;
}
}
default:
conn_state_ = Done;
return Network::FilterStatus::Continue;
}

return Network::FilterStatus::Continue;
}

Network::FilterStatus AlpnProxyFilter::onNewConnection() {
return Network::FilterStatus::Continue;
}

Network::FilterStatus AlpnProxyFilter::onWrite(Buffer::Instance&, bool) {
switch (conn_state_) {
case Invalid:
case Done:
// No work needed if connection state is Done or Invalid.
return Network::FilterStatus::Continue;
case ConnProtocolNotRead: {
if (read_callbacks_->connection().nextProtocol() != config_->protocol_) {
conn_state_ = Invalid;
config_->stats().alpn_protocol_not_found_.inc();
return Network::FilterStatus::Continue;
} else {
conn_state_ = WriteMetadata;
config_->stats().alpn_protocol_found_.inc();
}
}
case WriteMetadata: {
// TODO(gargnupur): Try to move this just after alpn protocol is
// determined and first onWrite is called in Upstream filter.
writeNodeMetadata();
}
case ReadingInitialHeader:
case ReadingProxyHeader:
case NeedMoreDataInitialHeader:
case NeedMoreDataProxyHeader:
// These are to be handled in Reading Pipeline.
return Network::FilterStatus::Continue;
}

return Network::FilterStatus::Continue;
}

void AlpnProxyFilter::writeNodeMetadata() {
if (conn_state_ != WriteMetadata) {
return;
}

std::unique_ptr<const google::protobuf::Struct> metadata =
getMetadata(config_->node_metadata_id_);
if (metadata != nullptr) {
Envoy::ProtobufWkt::Any metadata_any_value;
*metadata_any_value.mutable_type_url() = StructTypeUrl;
*metadata_any_value.mutable_value() = metadata->SerializeAsString();
std::unique_ptr<::Envoy::Buffer::OwnedImpl> buf =
constructProxyHeaderData(metadata_any_value);
write_callbacks_->injectWriteDataToFilterChain(*buf, false);

if (config_->filter_direction_ == FilterDirection::Downstream) {
setMetadata(DownstreamDynamicDataKey, *metadata);
} else {
setMetadata(UpstreamDynamicDataKey, *metadata);
}
config_->stats().metadata_added_.inc();
}

conn_state_ = ReadingInitialHeader;
}

void AlpnProxyFilter::tryReadInitialProxyHeader(Buffer::Instance& data) {
if (conn_state_ != ReadingInitialHeader &&
conn_state_ != NeedMoreDataInitialHeader) {
return;
}
const uint32_t initial_header_length = sizeof(AlpnProxyInitialHeader);
if (data.length() < initial_header_length) {
config_->stats().initial_header_not_found_.inc();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does not seem right to increment header on buffering. This is mostly random depending how TCP data is chunked, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please check replied above...
I thought "return Network::FilterStatus::StopIteration;" is doing that and we don't need to drain "data" and buffer bytes read till now..

// Not enough data to read. Wait for it to come.
conn_state_ = NeedMoreDataInitialHeader;
return;
}
std::string initial_header_buf = std::string(
static_cast<const char*>(data.linearize(initial_header_length)),
initial_header_length);
const AlpnProxyInitialHeader* initial_header =
reinterpret_cast<const AlpnProxyInitialHeader*>(
initial_header_buf.c_str());
if (absl::gntohl(initial_header->magic) !=
AlpnProxyInitialHeader::magic_number) {
config_->stats().initial_header_not_found_.inc();
conn_state_ = Invalid;
return;
}
proxy_data_length_ = absl::gntohl(initial_header->data_size);
// Drain the initial header length bytes read.
data.drain(initial_header_length);
conn_state_ = ReadingProxyHeader;
}

void AlpnProxyFilter::tryReadProxyData(Buffer::Instance& data) {
if (conn_state_ != ReadingProxyHeader &&
conn_state_ != NeedMoreDataProxyHeader) {
return;
}
if (data.length() < proxy_data_length_) {
// Not enough data to read. Wait for it to come.
conn_state_ = NeedMoreDataProxyHeader;
return;
}
std::string proxy_data_buf =
std::string(static_cast<const char*>(data.linearize(proxy_data_length_)),
proxy_data_length_);
Envoy::ProtobufWkt::Any proxy_data;
if (!proxy_data.ParseFromString(proxy_data_buf)) {
config_->stats().header_not_found_.inc();
conn_state_ = Invalid;
return;
}
data.drain(proxy_data_length_);

Envoy::ProtobufWkt::Struct struct_metadata =
Envoy::MessageUtil::anyConvert<Envoy::ProtobufWkt::Struct>(proxy_data);
if (config_->filter_direction_ == FilterDirection::Downstream) {
setMetadata(UpstreamDynamicDataKey, struct_metadata);
} else {
setMetadata(DownstreamDynamicDataKey, struct_metadata);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

set conn_state_ = Done?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

State Machine fall through will take care of it.. in default section?

}

void AlpnProxyFilter::setMetadata(const std::string key,
const ProtobufWkt::Struct& value) {
read_callbacks_->connection().streamInfo().setDynamicMetadata(key, value);
}

std::unique_ptr<const google::protobuf::Struct> AlpnProxyFilter::getMetadata(
const std::string& key) {
if (local_info_.node().has_metadata()) {
auto metadata_fields = local_info_.node().metadata().fields();
auto node_metadata = metadata_fields.find(key);
if (node_metadata != metadata_fields.end()) {
return std::make_unique<const google::protobuf::Struct>(
node_metadata->second.struct_value());
}
}
return nullptr;
}

} // namespace AlpnProxy
} // namespace Tcp
} // namespace Envoy
Loading