Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Manual backport of #17775 #17781

Merged
merged 1 commit into from
Jun 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/17775.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
connect: Fix issue where changes to service exports were not reflected in proxies.
```
6 changes: 5 additions & 1 deletion agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -4555,7 +4555,11 @@ func (a *Agent) proxyDataSources() proxycfg.DataSources {
sources.ExportedPeeredServices = proxycfgglue.ServerExportedPeeredServices(deps)
sources.FederationStateListMeshGateways = proxycfgglue.ServerFederationStateListMeshGateways(deps)
sources.GatewayServices = proxycfgglue.ServerGatewayServices(deps)
sources.Health = proxycfgglue.ServerHealth(deps, proxycfgglue.ClientHealth(a.rpcClientHealth))
// We do not use this health check currently due to a bug with the way that service exports
// interact with ACLs and the streaming backend. See comments in `proxycfgglue.ServerHealthBlocking`
// for more details.
// sources.Health = proxycfgglue.ServerHealth(deps, proxycfgglue.ClientHealth(a.rpcClientHealth))
sources.Health = proxycfgglue.ServerHealthBlocking(deps, proxycfgglue.ClientHealth(a.rpcClientHealth), server.FSM().State())
sources.HTTPChecks = proxycfgglue.ServerHTTPChecks(deps, a.config.NodeName, proxycfgglue.CacheHTTPChecks(a.cache), a.State)
sources.Intentions = proxycfgglue.ServerIntentions(deps)
sources.IntentionUpstreams = proxycfgglue.ServerIntentionUpstreams(deps)
Expand Down
14 changes: 11 additions & 3 deletions agent/consul/watch/server_local.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ import (
)

var (
ErrorNotFound = errors.New("no data found for query")
ErrorNotChanged = errors.New("data did not change for query")
ErrorNotFound = errors.New("no data found for query")
ErrorNotChanged = errors.New("data did not change for query")
ErrorACLResetData = errors.New("an acl update forced a state reset")

errNilContext = errors.New("cannot call ServerLocalNotify with a nil context")
errNilGetStore = errors.New("cannot call ServerLocalNotify without a callback to get a StateStore")
Expand Down Expand Up @@ -320,8 +321,15 @@ func serverLocalNotifyRoutine[ResultType any, StoreType StateStore](
return
}

// An ACL reset error can be raised so that the index greater-than check is
// bypassed. We should not propagate it to the caller.
forceReset := errors.Is(err, ErrorACLResetData)
if forceReset {
err = nil
}

// Check the index to see if we should call notify
if minIndex == 0 || minIndex < index {
if minIndex == 0 || minIndex < index || forceReset {
notify(ctx, correlationID, result, err)
minIndex = index
}
Expand Down
164 changes: 164 additions & 0 deletions agent/proxycfg-glue/health_blocking.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0

package proxycfgglue

import (
"context"
"fmt"
"time"

"github.com/hashicorp/go-bexpr"
"github.com/hashicorp/go-memdb"

"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/consul/watch"
"github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/structs/aclfilter"
)

// ServerHealthBlocking exists due to a bug with the streaming backend and its interaction with ACLs.
// Whenever an exported-services config entry is modified, this is effectively an ACL change.
// Assume the following situation:
// - no services are exported
// - an upstream watch to service X is spawned
// - the streaming backend filters out data for service X (because it's not exported yet)
// - service X is finally exported
//
// In this situation, the streaming backend does not trigger a refresh of its data.
// This means that any events that were supposed to have been received prior to the export are NOT backfilled,
// and the watches never see service X spawning.
//
// We currently have decided to not trigger a stream refresh in this situation due to the potential for a
// thundering herd effect (touching exports would cause a re-fetch of all watches for that partition, potentially).
// Therefore, this local blocking-query approach exists for agentless.
//
// It's also worth noting that the streaming subscription is currently bypassed most of the time with agentful,
// because proxycfg has a `req.Source.Node != ""` which prevents the `streamingEnabled` check from passing.
// This means that while agents should technically have this same issue, they don't experience it with mesh health
// watches.
func ServerHealthBlocking(deps ServerDataSourceDeps, remoteSource proxycfg.Health, state *state.Store) *serverHealthBlocking {
return &serverHealthBlocking{deps, remoteSource, state, 5 * time.Minute}
}

type serverHealthBlocking struct {
deps ServerDataSourceDeps
remoteSource proxycfg.Health
state *state.Store
watchTimeout time.Duration
}

// Notify is mostly a copy of the function in `agent/consul/health_endpoint.go` with a few minor tweaks.
// Most notably, some query features unnecessary for mesh have been stripped out.
func (h *serverHealthBlocking) Notify(ctx context.Context, args *structs.ServiceSpecificRequest, correlationID string, ch chan<- proxycfg.UpdateEvent) error {
if args.Datacenter != h.deps.Datacenter {
return h.remoteSource.Notify(ctx, args, correlationID, ch)
}

// Verify the arguments
if args.ServiceName == "" {
return fmt.Errorf("Must provide service name")
}
if args.EnterpriseMeta.PartitionOrDefault() == acl.WildcardName {
return fmt.Errorf("Wildcards are not allowed in the partition field")
}

// Determine the function we'll call
var f func(memdb.WatchSet, *state.Store, *structs.ServiceSpecificRequest) (uint64, structs.CheckServiceNodes, error)
switch {
case args.Connect:
f = serviceNodesConnect
case args.Ingress:
f = serviceNodesIngress
default:
f = serviceNodesDefault
}

filter, err := bexpr.CreateFilter(args.Filter, nil, structs.CheckServiceNode{})
if err != nil {
return err
}

var hadResults bool = false
return watch.ServerLocalNotify(ctx, correlationID, h.deps.GetStore,
func(ws memdb.WatchSet, store Store) (uint64, *structs.IndexedCheckServiceNodes, error) {
// This is necessary so that service export changes are eventually picked up, since
// they won't trigger the watch themselves.
timeoutCh := make(chan struct{})
time.AfterFunc(h.watchTimeout, func() {
close(timeoutCh)
})
ws.Add(timeoutCh)

authzContext := acl.AuthorizerContext{
Peer: args.PeerName,
}
authz, err := h.deps.ACLResolver.ResolveTokenAndDefaultMeta(args.Token, &args.EnterpriseMeta, &authzContext)
if err != nil {
return 0, nil, err
}
// If we're doing a connect or ingress query, we need read access to the service
// we're trying to find proxies for, so check that.
if args.Connect || args.Ingress {
if authz.ServiceRead(args.ServiceName, &authzContext) != acl.Allow {
// If access was somehow revoked (via token deletion or unexporting), then we clear the
// last-known results before triggering an error. This way, the proxies will actually update
// their data, rather than holding onto the last-known list of healthy nodes indefinitely.
if hadResults {
hadResults = false
return 0, &structs.IndexedCheckServiceNodes{}, watch.ErrorACLResetData
}
return 0, nil, acl.ErrPermissionDenied
}
}

var thisReply structs.IndexedCheckServiceNodes
thisReply.Index, thisReply.Nodes, err = f(ws, h.state, args)
if err != nil {
return 0, nil, err
}

raw, err := filter.Execute(thisReply.Nodes)
if err != nil {
return 0, nil, err
}
thisReply.Nodes = raw.(structs.CheckServiceNodes)

// Note: we filter the results with ACLs *after* applying the user-supplied
// bexpr filter, to ensure QueryMeta.ResultsFilteredByACLs does not include
// results that would be filtered out even if the user did have permission.
if err := h.filterACL(&authzContext, args.Token, &thisReply); err != nil {
return 0, nil, err
}

hadResults = true
return thisReply.Index, &thisReply, nil
},
dispatchBlockingQueryUpdate[*structs.IndexedCheckServiceNodes](ch),
)
}

func (h *serverHealthBlocking) filterACL(authz *acl.AuthorizerContext, token string, subj *structs.IndexedCheckServiceNodes) error {
// Get the ACL from the token
var entMeta acl.EnterpriseMeta
authorizer, err := h.deps.ACLResolver.ResolveTokenAndDefaultMeta(token, &entMeta, authz)
if err != nil {
return err
}
aclfilter.New(authorizer, h.deps.Logger).Filter(subj)
return nil
}

func serviceNodesConnect(ws memdb.WatchSet, s *state.Store, args *structs.ServiceSpecificRequest) (uint64, structs.CheckServiceNodes, error) {
return s.CheckConnectServiceNodes(ws, args.ServiceName, &args.EnterpriseMeta, args.PeerName)
}

func serviceNodesIngress(ws memdb.WatchSet, s *state.Store, args *structs.ServiceSpecificRequest) (uint64, structs.CheckServiceNodes, error) {
return s.CheckIngressServiceNodes(ws, args.ServiceName, &args.EnterpriseMeta)
}

func serviceNodesDefault(ws memdb.WatchSet, s *state.Store, args *structs.ServiceSpecificRequest) (uint64, structs.CheckServiceNodes, error) {
return s.CheckServiceNodes(ws, args.ServiceName, &args.EnterpriseMeta, args.PeerName)
}
183 changes: 183 additions & 0 deletions agent/proxycfg-glue/health_blocking_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
package proxycfgglue

import (
"context"
"errors"
"testing"
"time"

"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/sdk/testutil"
"github.com/stretchr/testify/require"
)

func TestServerHealthBlocking(t *testing.T) {
t.Run("remote queries are delegated to the remote source", func(t *testing.T) {
var (
ctx = context.Background()
req = &structs.ServiceSpecificRequest{Datacenter: "dc2"}
correlationID = "correlation-id"
ch = make(chan<- proxycfg.UpdateEvent)
result = errors.New("KABOOM")
)

remoteSource := newMockHealth(t)
remoteSource.On("Notify", ctx, req, correlationID, ch).Return(result)

store := state.NewStateStore(nil)
dataSource := ServerHealthBlocking(ServerDataSourceDeps{Datacenter: "dc1"}, remoteSource, store)
err := dataSource.Notify(ctx, req, correlationID, ch)
require.Equal(t, result, err)
})

t.Run("services notify correctly", func(t *testing.T) {
const (
datacenter = "dc1"
serviceName = "web"
)

ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)

store := state.NewStateStore(nil)
aclResolver := newStaticResolver(acl.ManageAll())
dataSource := ServerHealthBlocking(ServerDataSourceDeps{
GetStore: func() Store { return store },
Datacenter: datacenter,
ACLResolver: aclResolver,
Logger: testutil.Logger(t),
}, nil, store)
dataSource.watchTimeout = 1 * time.Second

// Watch for all events
eventCh := make(chan proxycfg.UpdateEvent)
require.NoError(t, dataSource.Notify(ctx, &structs.ServiceSpecificRequest{
Datacenter: datacenter,
ServiceName: serviceName,
}, "", eventCh))

// Watch for a subset of events
filteredCh := make(chan proxycfg.UpdateEvent)
require.NoError(t, dataSource.Notify(ctx, &structs.ServiceSpecificRequest{
Datacenter: datacenter,
ServiceName: serviceName,
QueryOptions: structs.QueryOptions{
Filter: "Service.ID == \"web1\"",
},
}, "", filteredCh))

testutil.RunStep(t, "initial state", func(t *testing.T) {
result := getEventResult[*structs.IndexedCheckServiceNodes](t, eventCh)
require.Empty(t, result.Nodes)
result = getEventResult[*structs.IndexedCheckServiceNodes](t, filteredCh)
require.Empty(t, result.Nodes)
})

testutil.RunStep(t, "register services", func(t *testing.T) {
require.NoError(t, store.EnsureRegistration(10, &structs.RegisterRequest{
Datacenter: "dc1",
Node: "foo",
Address: "127.0.0.1",
Service: &structs.NodeService{
ID: serviceName + "1",
Service: serviceName,
Port: 80,
},
}))
result := getEventResult[*structs.IndexedCheckServiceNodes](t, eventCh)
require.Len(t, result.Nodes, 1)
result = getEventResult[*structs.IndexedCheckServiceNodes](t, filteredCh)
require.Len(t, result.Nodes, 1)

require.NoError(t, store.EnsureRegistration(11, &structs.RegisterRequest{
Datacenter: "dc1",
Node: "foo",
Address: "127.0.0.1",
Service: &structs.NodeService{
ID: serviceName + "2",
Service: serviceName,
Port: 81,
},
}))
result = getEventResult[*structs.IndexedCheckServiceNodes](t, eventCh)
require.Len(t, result.Nodes, 2)
result = getEventResult[*structs.IndexedCheckServiceNodes](t, filteredCh)
require.Len(t, result.Nodes, 1)
require.Equal(t, "web1", result.Nodes[0].Service.ID)
})

testutil.RunStep(t, "deregister service", func(t *testing.T) {
require.NoError(t, store.DeleteService(12, "foo", serviceName+"1", nil, ""))
result := getEventResult[*structs.IndexedCheckServiceNodes](t, eventCh)
require.Len(t, result.Nodes, 1)
result = getEventResult[*structs.IndexedCheckServiceNodes](t, filteredCh)
require.Len(t, result.Nodes, 0)
})

testutil.RunStep(t, "acl enforcement", func(t *testing.T) {
require.NoError(t, store.EnsureRegistration(11, &structs.RegisterRequest{
Datacenter: "dc1",
Node: "foo",
Address: "127.0.0.1",
Service: &structs.NodeService{
Service: serviceName + "-sidecar-proxy",
Kind: structs.ServiceKindConnectProxy,
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: serviceName,
},
},
}))

authzDeny := policyAuthorizer(t, ``)
authzAllow := policyAuthorizer(t, `
node_prefix "" { policy = "read" }
service_prefix "web" { policy = "read" }
`)

// Start a stream where insufficient permissions are denied
aclDenyCh := make(chan proxycfg.UpdateEvent)
aclResolver.SwapAuthorizer(authzDeny)
require.NoError(t, dataSource.Notify(ctx, &structs.ServiceSpecificRequest{
Connect: true,
Datacenter: datacenter,
ServiceName: serviceName,
}, "", aclDenyCh))
require.ErrorContains(t, getEventError(t, aclDenyCh), "Permission denied")

// Adding ACL permissions will send valid data
aclResolver.SwapAuthorizer(authzAllow)
time.Sleep(dataSource.watchTimeout)
result := getEventResult[*structs.IndexedCheckServiceNodes](t, aclDenyCh)
require.Len(t, result.Nodes, 1)
require.Equal(t, "web-sidecar-proxy", result.Nodes[0].Service.Service)

// Start a stream where sufficient permissions are allowed
aclAllowCh := make(chan proxycfg.UpdateEvent)
aclResolver.SwapAuthorizer(authzAllow)
require.NoError(t, dataSource.Notify(ctx, &structs.ServiceSpecificRequest{
Connect: true,
Datacenter: datacenter,
ServiceName: serviceName,
}, "", aclAllowCh))
result = getEventResult[*structs.IndexedCheckServiceNodes](t, aclAllowCh)
require.Len(t, result.Nodes, 1)
require.Equal(t, "web-sidecar-proxy", result.Nodes[0].Service.Service)

// Removing ACL permissions will send empty data
aclResolver.SwapAuthorizer(authzDeny)
time.Sleep(dataSource.watchTimeout)
result = getEventResult[*structs.IndexedCheckServiceNodes](t, aclAllowCh)
require.Len(t, result.Nodes, 0)

// Adding ACL permissions will send valid data
aclResolver.SwapAuthorizer(authzAllow)
time.Sleep(dataSource.watchTimeout)
result = getEventResult[*structs.IndexedCheckServiceNodes](t, aclAllowCh)
require.Len(t, result.Nodes, 1)
require.Equal(t, "web-sidecar-proxy", result.Nodes[0].Service.Service)
})
})
}