Skip to content

Commit

Permalink
expose grpc as http endpoint (#18221)
Browse files Browse the repository at this point in the history
expose resource grpc endpoints as http endpoints
  • Loading branch information
wangxinyi7 authored Aug 4, 2023
1 parent 0a48a24 commit 1f28ac2
Show file tree
Hide file tree
Showing 7 changed files with 563 additions and 5 deletions.
29 changes: 25 additions & 4 deletions agent/agent_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1601,14 +1601,35 @@ func TestAgent_Metrics_ACLDeny(t *testing.T) {
})
}

func newDefaultBaseDeps(t *testing.T) BaseDeps {
dataDir := testutil.TempDir(t, "acl-agent")
logBuffer := testutil.NewLogBuffer(t)
logger := hclog.NewInterceptLogger(nil)
loader := func(source config.Source) (config.LoadResult, error) {
dataDir := fmt.Sprintf(`data_dir = "%s"`, dataDir)
opts := config.LoadOpts{
HCL: []string{TestConfigHCL(NodeID()), "", dataDir},
DefaultConfig: source,
}
result, err := config.Load(opts)
if result.RuntimeConfig != nil {
result.RuntimeConfig.Telemetry.Disable = true
}
return result, err
}
bd, err := NewBaseDeps(loader, logBuffer, logger)
require.NoError(t, err)
return bd
}

func TestHTTPHandlers_AgentMetricsStream_ACLDeny(t *testing.T) {
bd := BaseDeps{}
bd := newDefaultBaseDeps(t)
bd.Tokens = new(tokenStore.Store)
sink := metrics.NewInmemSink(30*time.Millisecond, time.Second)
bd.MetricsConfig = &lib.MetricsConfig{
Handler: sink,
}
d := fakeResolveTokenDelegate{authorizer: acl.DenyAll()}
d := fakeResolveTokenDelegate{delegate: &delegateMock{}, authorizer: acl.DenyAll()}
agent := &Agent{
baseDeps: bd,
delegate: d,
Expand All @@ -1631,13 +1652,13 @@ func TestHTTPHandlers_AgentMetricsStream_ACLDeny(t *testing.T) {
}

func TestHTTPHandlers_AgentMetricsStream(t *testing.T) {
bd := BaseDeps{}
bd := newDefaultBaseDeps(t)
bd.Tokens = new(tokenStore.Store)
sink := metrics.NewInmemSink(20*time.Millisecond, time.Second)
bd.MetricsConfig = &lib.MetricsConfig{
Handler: sink,
}
d := fakeResolveTokenDelegate{authorizer: acl.ManageAll()}
d := fakeResolveTokenDelegate{delegate: &delegateMock{}, authorizer: acl.ManageAll()}
agent := &Agent{
baseDeps: bd,
delegate: d,
Expand Down
11 changes: 11 additions & 0 deletions agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ import (
"github.com/hashicorp/consul/agent/token"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/internal/go-sso/oidcauth/oidcauthtest"
"github.com/hashicorp/consul/internal/resource"
"github.com/hashicorp/consul/ipaddr"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/proto/private/pbautoconf"
Expand Down Expand Up @@ -322,6 +323,7 @@ func TestAgent_HTTPMaxHeaderBytes(t *testing.T) {
Tokens: new(token.Store),
TLSConfigurator: tlsConf,
GRPCConnPool: &fakeGRPCConnPool{},
Registry: resource.NewRegistry(),
},
RuntimeConfig: &config.RuntimeConfig{
HTTPAddrs: []net.Addr{
Expand All @@ -344,6 +346,7 @@ func TestAgent_HTTPMaxHeaderBytes(t *testing.T) {
require.NoError(t, err)

a, err := New(bd)
a.delegate = &delegateMock{}
require.NoError(t, err)

a.startLicenseManager(testutil.TestContext(t))
Expand Down Expand Up @@ -5477,6 +5480,7 @@ func TestAgent_ListenHTTP_MultipleAddresses(t *testing.T) {
Tokens: new(token.Store),
TLSConfigurator: tlsConf,
GRPCConnPool: &fakeGRPCConnPool{},
Registry: resource.NewRegistry(),
},
RuntimeConfig: &config.RuntimeConfig{
HTTPAddrs: []net.Addr{
Expand All @@ -5499,6 +5503,7 @@ func TestAgent_ListenHTTP_MultipleAddresses(t *testing.T) {
require.NoError(t, err)

agent, err := New(bd)
agent.delegate = &delegateMock{}
require.NoError(t, err)

agent.startLicenseManager(testutil.TestContext(t))
Expand Down Expand Up @@ -6073,6 +6078,7 @@ func TestAgent_startListeners(t *testing.T) {
Logger: hclog.NewInterceptLogger(nil),
Tokens: new(token.Store),
GRPCConnPool: &fakeGRPCConnPool{},
Registry: resource.NewRegistry(),
},
RuntimeConfig: &config.RuntimeConfig{
HTTPAddrs: []net.Addr{},
Expand All @@ -6091,6 +6097,7 @@ func TestAgent_startListeners(t *testing.T) {
require.NoError(t, err)

agent, err := New(bd)
agent.delegate = &delegateMock{}
require.NoError(t, err)

// use up an address
Expand Down Expand Up @@ -6213,6 +6220,7 @@ func TestAgent_startListeners_scada(t *testing.T) {
HCP: hcp.Deps{
Provider: pvd,
},
Registry: resource.NewRegistry(),
},
RuntimeConfig: &config.RuntimeConfig{},
Cache: cache.New(cache.Options{}),
Expand All @@ -6230,6 +6238,7 @@ func TestAgent_startListeners_scada(t *testing.T) {
require.NoError(t, err)

agent, err := New(bd)
agent.delegate = &delegateMock{}
require.NoError(t, err)

_, err = agent.startListeners([]net.Addr{c})
Expand Down Expand Up @@ -6273,6 +6282,7 @@ func TestAgent_checkServerLastSeen(t *testing.T) {
Logger: hclog.NewInterceptLogger(nil),
Tokens: new(token.Store),
GRPCConnPool: &fakeGRPCConnPool{},
Registry: resource.NewRegistry(),
},
RuntimeConfig: &config.RuntimeConfig{},
Cache: cache.New(cache.Options{}),
Expand All @@ -6284,6 +6294,7 @@ func TestAgent_checkServerLastSeen(t *testing.T) {
Config: leafcert.Config{},
})
agent, err := New(bd)
agent.delegate = &delegateMock{}
require.NoError(t, err)

// Test that an ErrNotExist OS error is treated as ok.
Expand Down
34 changes: 33 additions & 1 deletion agent/grpc-external/services/resource/testing/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,50 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"

"github.com/hashicorp/go-uuid"

"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/acl/resolver"
svc "github.com/hashicorp/consul/agent/grpc-external/services/resource"
internal "github.com/hashicorp/consul/agent/grpc-internal"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/internal/resource"
"github.com/hashicorp/consul/internal/storage/inmem"
"github.com/hashicorp/consul/proto-public/pbresource"
"github.com/hashicorp/consul/sdk/testutil"
)

func randomACLIdentity(t *testing.T) structs.ACLIdentity {
id, err := uuid.GenerateUUID()
require.NoError(t, err)

return &structs.ACLToken{AccessorID: id}
}

func AuthorizerFrom(t *testing.T, policyStrs ...string) resolver.Result {
policies := []*acl.Policy{}
for _, policyStr := range policyStrs {
policy, err := acl.NewPolicyFromSource(policyStr, nil, nil)
require.NoError(t, err)
policies = append(policies, policy)
}

authz, err := acl.NewPolicyAuthorizerWithDefaults(acl.DenyAll(), policies, nil)
require.NoError(t, err)

return resolver.Result{
Authorizer: authz,
ACLIdentity: randomACLIdentity(t),
}
}

// RunResourceService runs a Resource Service for the duration of the test and
// returns a client to interact with it. ACLs will be disabled.
func RunResourceService(t *testing.T, registerFns ...func(resource.Registry)) pbresource.ResourceServiceClient {
return RunResourceServiceWithACL(t, resolver.DANGER_NO_AUTH{}, registerFns...)
}

func RunResourceServiceWithACL(t *testing.T, aclResolver svc.ACLResolver, registerFns ...func(resource.Registry)) pbresource.ResourceServiceClient {
t.Helper()

backend, err := inmem.NewBackend()
Expand All @@ -40,7 +72,7 @@ func RunResourceService(t *testing.T, registerFns ...func(resource.Registry)) pb
Backend: backend,
Registry: registry,
Logger: testutil.Logger(t),
ACLResolver: resolver.DANGER_NO_AUTH{},
ACLResolver: aclResolver,
}).Register(server)

pipe := internal.NewPipeListener()
Expand Down
12 changes: 12 additions & 0 deletions agent/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/uiserver"
"github.com/hashicorp/consul/api"
resourcehttp "github.com/hashicorp/consul/internal/resource/http"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/logging"
"github.com/hashicorp/consul/proto/private/pbcommon"
Expand Down Expand Up @@ -259,6 +260,17 @@ func (s *HTTPHandlers) handler() http.Handler {
handlePProf("/debug/pprof/symbol", pprof.Symbol)
handlePProf("/debug/pprof/trace", pprof.Trace)

mux.Handle("/api/",
http.StripPrefix("/api",
resourcehttp.NewHandler(
s.agent.delegate.ResourceServiceClient(),
s.agent.baseDeps.Registry,
s.parseToken,
s.agent.logger.Named(logging.HTTP),
),
),
)

if s.IsUIEnabled() {
// Note that we _don't_ support reloading ui_config.{enabled, content_dir,
// content_path} since this only runs at initial startup.
Expand Down
170 changes: 170 additions & 0 deletions internal/resource/http/http.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
package http

import (
"context"
"encoding/json"
"fmt"
"net/http"
"path"
"strings"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/types/known/anypb"

"github.com/hashicorp/go-hclog"

"github.com/hashicorp/consul/internal/resource"
"github.com/hashicorp/consul/proto-public/pbresource"
)

func NewHandler(
client pbresource.ResourceServiceClient,
registry resource.Registry,
parseToken func(req *http.Request, token *string),
logger hclog.Logger) http.Handler {
mux := http.NewServeMux()
for _, t := range registry.Types() {
// Individual Resource Endpoints.
prefix := strings.ToLower(fmt.Sprintf("/%s/%s/%s/", t.Type.Group, t.Type.GroupVersion, t.Type.Kind))
logger.Info("Registered resource endpoint", "endpoint", prefix)
mux.Handle(prefix, http.StripPrefix(prefix, &resourceHandler{t, client, parseToken, logger}))
}

return mux
}

type writeRequest struct {
Metadata map[string]string `json:"metadata"`
Data json.RawMessage `json:"data"`
Owner *pbresource.ID `json:"owner"`
}

type resourceHandler struct {
reg resource.Registration
client pbresource.ResourceServiceClient
parseToken func(req *http.Request, token *string)
logger hclog.Logger
}

func (h *resourceHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
var token string
h.parseToken(r, &token)
ctx := metadata.AppendToOutgoingContext(r.Context(), "x-consul-token", token)
switch r.Method {
case http.MethodPut:
h.handleWrite(w, r, ctx)
default:
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
}

func (h *resourceHandler) handleWrite(w http.ResponseWriter, r *http.Request, ctx context.Context) {
var req writeRequest
// convert req body to writeRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte("Request body didn't follow schema."))
}
// convert data struct to proto message
data := h.reg.Proto.ProtoReflect().New().Interface()
if err := protojson.Unmarshal(req.Data, data); err != nil {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte("Request body didn't follow schema."))
}
// proto message to any
anyProtoMsg, err := anypb.New(data)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
h.logger.Error("Failed to convert proto message to any type", "error", err)
return
}

tenancyInfo, resourceName, version := checkURL(r)

rsp, err := h.client.Write(ctx, &pbresource.WriteRequest{
Resource: &pbresource.Resource{
Id: &pbresource.ID{
Type: h.reg.Type,
Tenancy: tenancyInfo,
Name: resourceName,
},
Owner: req.Owner,
Version: version,
Metadata: req.Metadata,
Data: anyProtoMsg,
},
})
if err != nil {
handleResponseError(err, w, h)
return
}

output, err := jsonMarshal(rsp.Resource)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
h.logger.Error("Failed to unmarshal GRPC resource response", "error", err)
return
}
w.Write(output)
}

func checkURL(r *http.Request) (tenancy *pbresource.Tenancy, resourceName string, version string) {
params := r.URL.Query()
tenancy = &pbresource.Tenancy{
Partition: params.Get("partition"),
PeerName: params.Get("peer_name"),
Namespace: params.Get("namespace"),
}
resourceName = path.Base(r.URL.Path)
if resourceName == "." || resourceName == "/" {
resourceName = ""
}
version = params.Get("version")

return
}

func jsonMarshal(res *pbresource.Resource) ([]byte, error) {
output, err := protojson.Marshal(res)
if err != nil {
return nil, err
}

var stuff map[string]any
if err := json.Unmarshal(output, &stuff); err != nil {
return nil, err
}

delete(stuff["data"].(map[string]any), "@type")
return json.MarshalIndent(stuff, "", " ")
}

func handleResponseError(err error, w http.ResponseWriter, h *resourceHandler) {
if e, ok := status.FromError(err); ok {
switch e.Code() {
case codes.InvalidArgument:
w.WriteHeader(http.StatusBadRequest)
h.logger.Info("User has mal-formed request", "error", err)
case codes.NotFound:
w.WriteHeader(http.StatusNotFound)
h.logger.Info("Failed to write to GRPC resource: Not found", "error", err)
case codes.PermissionDenied:
w.WriteHeader(http.StatusForbidden)
h.logger.Info("Failed to write to GRPC resource: User not authenticated", "error", err)
case codes.Aborted:
w.WriteHeader(http.StatusConflict)
h.logger.Info("Failed to write to GRPC resource: the request conflict with the current state of the target resource", "error", err)
default:
w.WriteHeader(http.StatusInternalServerError)
h.logger.Error("Failed to write to GRPC resource", "error", err)
}
} else {
w.WriteHeader(http.StatusInternalServerError)
h.logger.Error("Failed to write to GRPC resource: not able to parse error returned", "error", err)
}
w.Write([]byte(err.Error()))
}
Loading

0 comments on commit 1f28ac2

Please sign in to comment.