Skip to content

Commit

Permalink
Fix issue with streaming service health watches. (#17775) (#17781)
Browse files Browse the repository at this point in the history
Fix issue with streaming service health watches.

This commit fixes an issue where the health streams were unaware of service
export changes. Whenever an exported-services config entry is modified, it is
effectively an ACL change.

The bug would be triggered by the following situation:

- no services are exported
- an upstream watch to service X is spawned
- the streaming backend filters out data for service X (due to lack of exports)
- service X is finally exported

In the situation above, the streaming backend does not trigger a refresh of its
data.  This means that any events that were supposed to have been received prior
to the export are NOT backfilled, and the watches never see service X spawning.

We currently have decided to not trigger a stream refresh in this situation due
to the potential for a thundering herd effect (touching exports would cause a
re-fetch of all watches for that partition, potentially).  Therefore, a local
blocking-query approach was added by this commit for agentless.

It's also worth noting that the streaming subscription is currently bypassed
most of the time with agentful, because proxycfg has a `req.Source.Node != ""`
which prevents the `streamingEnabled` check from passing.  This means that while
agents should technically have this same issue, they don't experience it with
mesh health watches.

Note that this is a temporary fix that solves the issue for proxycfg, but not
service-discovery use cases.
  • Loading branch information
hashi-derek authored Jun 15, 2023
1 parent c7d9075 commit 5bf4381
Show file tree
Hide file tree
Showing 5 changed files with 366 additions and 4 deletions.
3 changes: 3 additions & 0 deletions .changelog/17775.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
connect: Fix issue where changes to service exports were not reflected in proxies.
```
6 changes: 5 additions & 1 deletion agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -4555,7 +4555,11 @@ func (a *Agent) proxyDataSources() proxycfg.DataSources {
sources.ExportedPeeredServices = proxycfgglue.ServerExportedPeeredServices(deps)
sources.FederationStateListMeshGateways = proxycfgglue.ServerFederationStateListMeshGateways(deps)
sources.GatewayServices = proxycfgglue.ServerGatewayServices(deps)
sources.Health = proxycfgglue.ServerHealth(deps, proxycfgglue.ClientHealth(a.rpcClientHealth))
// We do not use this health check currently due to a bug with the way that service exports
// interact with ACLs and the streaming backend. See comments in `proxycfgglue.ServerHealthBlocking`
// for more details.
// sources.Health = proxycfgglue.ServerHealth(deps, proxycfgglue.ClientHealth(a.rpcClientHealth))
sources.Health = proxycfgglue.ServerHealthBlocking(deps, proxycfgglue.ClientHealth(a.rpcClientHealth), server.FSM().State())
sources.HTTPChecks = proxycfgglue.ServerHTTPChecks(deps, a.config.NodeName, proxycfgglue.CacheHTTPChecks(a.cache), a.State)
sources.Intentions = proxycfgglue.ServerIntentions(deps)
sources.IntentionUpstreams = proxycfgglue.ServerIntentionUpstreams(deps)
Expand Down
14 changes: 11 additions & 3 deletions agent/consul/watch/server_local.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ import (
)

var (
ErrorNotFound = errors.New("no data found for query")
ErrorNotChanged = errors.New("data did not change for query")
ErrorNotFound = errors.New("no data found for query")
ErrorNotChanged = errors.New("data did not change for query")
ErrorACLResetData = errors.New("an acl update forced a state reset")

errNilContext = errors.New("cannot call ServerLocalNotify with a nil context")
errNilGetStore = errors.New("cannot call ServerLocalNotify without a callback to get a StateStore")
Expand Down Expand Up @@ -320,8 +321,15 @@ func serverLocalNotifyRoutine[ResultType any, StoreType StateStore](
return
}

// An ACL reset error can be raised so that the index greater-than check is
// bypassed. We should not propagate it to the caller.
forceReset := errors.Is(err, ErrorACLResetData)
if forceReset {
err = nil
}

// Check the index to see if we should call notify
if minIndex == 0 || minIndex < index {
if minIndex == 0 || minIndex < index || forceReset {
notify(ctx, correlationID, result, err)
minIndex = index
}
Expand Down
164 changes: 164 additions & 0 deletions agent/proxycfg-glue/health_blocking.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0

package proxycfgglue

import (
"context"
"fmt"
"time"

"github.com/hashicorp/go-bexpr"
"github.com/hashicorp/go-memdb"

"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/consul/watch"
"github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/structs/aclfilter"
)

// ServerHealthBlocking exists due to a bug with the streaming backend and its interaction with ACLs.
// Whenever an exported-services config entry is modified, this is effectively an ACL change.
// Assume the following situation:
// - no services are exported
// - an upstream watch to service X is spawned
// - the streaming backend filters out data for service X (because it's not exported yet)
// - service X is finally exported
//
// In this situation, the streaming backend does not trigger a refresh of its data.
// This means that any events that were supposed to have been received prior to the export are NOT backfilled,
// and the watches never see service X spawning.
//
// We currently have decided to not trigger a stream refresh in this situation due to the potential for a
// thundering herd effect (touching exports would cause a re-fetch of all watches for that partition, potentially).
// Therefore, this local blocking-query approach exists for agentless.
//
// It's also worth noting that the streaming subscription is currently bypassed most of the time with agentful,
// because proxycfg has a `req.Source.Node != ""` which prevents the `streamingEnabled` check from passing.
// This means that while agents should technically have this same issue, they don't experience it with mesh health
// watches.
func ServerHealthBlocking(deps ServerDataSourceDeps, remoteSource proxycfg.Health, state *state.Store) *serverHealthBlocking {
return &serverHealthBlocking{deps, remoteSource, state, 5 * time.Minute}
}

type serverHealthBlocking struct {
deps ServerDataSourceDeps
remoteSource proxycfg.Health
state *state.Store
watchTimeout time.Duration
}

// Notify is mostly a copy of the function in `agent/consul/health_endpoint.go` with a few minor tweaks.
// Most notably, some query features unnecessary for mesh have been stripped out.
func (h *serverHealthBlocking) Notify(ctx context.Context, args *structs.ServiceSpecificRequest, correlationID string, ch chan<- proxycfg.UpdateEvent) error {
if args.Datacenter != h.deps.Datacenter {
return h.remoteSource.Notify(ctx, args, correlationID, ch)
}

// Verify the arguments
if args.ServiceName == "" {
return fmt.Errorf("Must provide service name")
}
if args.EnterpriseMeta.PartitionOrDefault() == acl.WildcardName {
return fmt.Errorf("Wildcards are not allowed in the partition field")
}

// Determine the function we'll call
var f func(memdb.WatchSet, *state.Store, *structs.ServiceSpecificRequest) (uint64, structs.CheckServiceNodes, error)
switch {
case args.Connect:
f = serviceNodesConnect
case args.Ingress:
f = serviceNodesIngress
default:
f = serviceNodesDefault
}

filter, err := bexpr.CreateFilter(args.Filter, nil, structs.CheckServiceNode{})
if err != nil {
return err
}

var hadResults bool = false
return watch.ServerLocalNotify(ctx, correlationID, h.deps.GetStore,
func(ws memdb.WatchSet, store Store) (uint64, *structs.IndexedCheckServiceNodes, error) {
// This is necessary so that service export changes are eventually picked up, since
// they won't trigger the watch themselves.
timeoutCh := make(chan struct{})
time.AfterFunc(h.watchTimeout, func() {
close(timeoutCh)
})
ws.Add(timeoutCh)

authzContext := acl.AuthorizerContext{
Peer: args.PeerName,
}
authz, err := h.deps.ACLResolver.ResolveTokenAndDefaultMeta(args.Token, &args.EnterpriseMeta, &authzContext)
if err != nil {
return 0, nil, err
}
// If we're doing a connect or ingress query, we need read access to the service
// we're trying to find proxies for, so check that.
if args.Connect || args.Ingress {
if authz.ServiceRead(args.ServiceName, &authzContext) != acl.Allow {
// If access was somehow revoked (via token deletion or unexporting), then we clear the
// last-known results before triggering an error. This way, the proxies will actually update
// their data, rather than holding onto the last-known list of healthy nodes indefinitely.
if hadResults {
hadResults = false
return 0, &structs.IndexedCheckServiceNodes{}, watch.ErrorACLResetData
}
return 0, nil, acl.ErrPermissionDenied
}
}

var thisReply structs.IndexedCheckServiceNodes
thisReply.Index, thisReply.Nodes, err = f(ws, h.state, args)
if err != nil {
return 0, nil, err
}

raw, err := filter.Execute(thisReply.Nodes)
if err != nil {
return 0, nil, err
}
thisReply.Nodes = raw.(structs.CheckServiceNodes)

// Note: we filter the results with ACLs *after* applying the user-supplied
// bexpr filter, to ensure QueryMeta.ResultsFilteredByACLs does not include
// results that would be filtered out even if the user did have permission.
if err := h.filterACL(&authzContext, args.Token, &thisReply); err != nil {
return 0, nil, err
}

hadResults = true
return thisReply.Index, &thisReply, nil
},
dispatchBlockingQueryUpdate[*structs.IndexedCheckServiceNodes](ch),
)
}

func (h *serverHealthBlocking) filterACL(authz *acl.AuthorizerContext, token string, subj *structs.IndexedCheckServiceNodes) error {
// Get the ACL from the token
var entMeta acl.EnterpriseMeta
authorizer, err := h.deps.ACLResolver.ResolveTokenAndDefaultMeta(token, &entMeta, authz)
if err != nil {
return err
}
aclfilter.New(authorizer, h.deps.Logger).Filter(subj)
return nil
}

func serviceNodesConnect(ws memdb.WatchSet, s *state.Store, args *structs.ServiceSpecificRequest) (uint64, structs.CheckServiceNodes, error) {
return s.CheckConnectServiceNodes(ws, args.ServiceName, &args.EnterpriseMeta, args.PeerName)
}

func serviceNodesIngress(ws memdb.WatchSet, s *state.Store, args *structs.ServiceSpecificRequest) (uint64, structs.CheckServiceNodes, error) {
return s.CheckIngressServiceNodes(ws, args.ServiceName, &args.EnterpriseMeta)
}

func serviceNodesDefault(ws memdb.WatchSet, s *state.Store, args *structs.ServiceSpecificRequest) (uint64, structs.CheckServiceNodes, error) {
return s.CheckServiceNodes(ws, args.ServiceName, &args.EnterpriseMeta, args.PeerName)
}
183 changes: 183 additions & 0 deletions agent/proxycfg-glue/health_blocking_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
package proxycfgglue

import (
"context"
"errors"
"testing"
"time"

"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/sdk/testutil"
"github.com/stretchr/testify/require"
)

func TestServerHealthBlocking(t *testing.T) {
t.Run("remote queries are delegated to the remote source", func(t *testing.T) {
var (
ctx = context.Background()
req = &structs.ServiceSpecificRequest{Datacenter: "dc2"}
correlationID = "correlation-id"
ch = make(chan<- proxycfg.UpdateEvent)
result = errors.New("KABOOM")
)

remoteSource := newMockHealth(t)
remoteSource.On("Notify", ctx, req, correlationID, ch).Return(result)

store := state.NewStateStore(nil)
dataSource := ServerHealthBlocking(ServerDataSourceDeps{Datacenter: "dc1"}, remoteSource, store)
err := dataSource.Notify(ctx, req, correlationID, ch)
require.Equal(t, result, err)
})

t.Run("services notify correctly", func(t *testing.T) {
const (
datacenter = "dc1"
serviceName = "web"
)

ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)

store := state.NewStateStore(nil)
aclResolver := newStaticResolver(acl.ManageAll())
dataSource := ServerHealthBlocking(ServerDataSourceDeps{
GetStore: func() Store { return store },
Datacenter: datacenter,
ACLResolver: aclResolver,
Logger: testutil.Logger(t),
}, nil, store)
dataSource.watchTimeout = 1 * time.Second

// Watch for all events
eventCh := make(chan proxycfg.UpdateEvent)
require.NoError(t, dataSource.Notify(ctx, &structs.ServiceSpecificRequest{
Datacenter: datacenter,
ServiceName: serviceName,
}, "", eventCh))

// Watch for a subset of events
filteredCh := make(chan proxycfg.UpdateEvent)
require.NoError(t, dataSource.Notify(ctx, &structs.ServiceSpecificRequest{
Datacenter: datacenter,
ServiceName: serviceName,
QueryOptions: structs.QueryOptions{
Filter: "Service.ID == \"web1\"",
},
}, "", filteredCh))

testutil.RunStep(t, "initial state", func(t *testing.T) {
result := getEventResult[*structs.IndexedCheckServiceNodes](t, eventCh)
require.Empty(t, result.Nodes)
result = getEventResult[*structs.IndexedCheckServiceNodes](t, filteredCh)
require.Empty(t, result.Nodes)
})

testutil.RunStep(t, "register services", func(t *testing.T) {
require.NoError(t, store.EnsureRegistration(10, &structs.RegisterRequest{
Datacenter: "dc1",
Node: "foo",
Address: "127.0.0.1",
Service: &structs.NodeService{
ID: serviceName + "1",
Service: serviceName,
Port: 80,
},
}))
result := getEventResult[*structs.IndexedCheckServiceNodes](t, eventCh)
require.Len(t, result.Nodes, 1)
result = getEventResult[*structs.IndexedCheckServiceNodes](t, filteredCh)
require.Len(t, result.Nodes, 1)

require.NoError(t, store.EnsureRegistration(11, &structs.RegisterRequest{
Datacenter: "dc1",
Node: "foo",
Address: "127.0.0.1",
Service: &structs.NodeService{
ID: serviceName + "2",
Service: serviceName,
Port: 81,
},
}))
result = getEventResult[*structs.IndexedCheckServiceNodes](t, eventCh)
require.Len(t, result.Nodes, 2)
result = getEventResult[*structs.IndexedCheckServiceNodes](t, filteredCh)
require.Len(t, result.Nodes, 1)
require.Equal(t, "web1", result.Nodes[0].Service.ID)
})

testutil.RunStep(t, "deregister service", func(t *testing.T) {
require.NoError(t, store.DeleteService(12, "foo", serviceName+"1", nil, ""))
result := getEventResult[*structs.IndexedCheckServiceNodes](t, eventCh)
require.Len(t, result.Nodes, 1)
result = getEventResult[*structs.IndexedCheckServiceNodes](t, filteredCh)
require.Len(t, result.Nodes, 0)
})

testutil.RunStep(t, "acl enforcement", func(t *testing.T) {
require.NoError(t, store.EnsureRegistration(11, &structs.RegisterRequest{
Datacenter: "dc1",
Node: "foo",
Address: "127.0.0.1",
Service: &structs.NodeService{
Service: serviceName + "-sidecar-proxy",
Kind: structs.ServiceKindConnectProxy,
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: serviceName,
},
},
}))

authzDeny := policyAuthorizer(t, ``)
authzAllow := policyAuthorizer(t, `
node_prefix "" { policy = "read" }
service_prefix "web" { policy = "read" }
`)

// Start a stream where insufficient permissions are denied
aclDenyCh := make(chan proxycfg.UpdateEvent)
aclResolver.SwapAuthorizer(authzDeny)
require.NoError(t, dataSource.Notify(ctx, &structs.ServiceSpecificRequest{
Connect: true,
Datacenter: datacenter,
ServiceName: serviceName,
}, "", aclDenyCh))
require.ErrorContains(t, getEventError(t, aclDenyCh), "Permission denied")

// Adding ACL permissions will send valid data
aclResolver.SwapAuthorizer(authzAllow)
time.Sleep(dataSource.watchTimeout)
result := getEventResult[*structs.IndexedCheckServiceNodes](t, aclDenyCh)
require.Len(t, result.Nodes, 1)
require.Equal(t, "web-sidecar-proxy", result.Nodes[0].Service.Service)

// Start a stream where sufficient permissions are allowed
aclAllowCh := make(chan proxycfg.UpdateEvent)
aclResolver.SwapAuthorizer(authzAllow)
require.NoError(t, dataSource.Notify(ctx, &structs.ServiceSpecificRequest{
Connect: true,
Datacenter: datacenter,
ServiceName: serviceName,
}, "", aclAllowCh))
result = getEventResult[*structs.IndexedCheckServiceNodes](t, aclAllowCh)
require.Len(t, result.Nodes, 1)
require.Equal(t, "web-sidecar-proxy", result.Nodes[0].Service.Service)

// Removing ACL permissions will send empty data
aclResolver.SwapAuthorizer(authzDeny)
time.Sleep(dataSource.watchTimeout)
result = getEventResult[*structs.IndexedCheckServiceNodes](t, aclAllowCh)
require.Len(t, result.Nodes, 0)

// Adding ACL permissions will send valid data
aclResolver.SwapAuthorizer(authzAllow)
time.Sleep(dataSource.watchTimeout)
result = getEventResult[*structs.IndexedCheckServiceNodes](t, aclAllowCh)
require.Len(t, result.Nodes, 1)
require.Equal(t, "web-sidecar-proxy", result.Nodes[0].Service.Service)
})
})
}

0 comments on commit 5bf4381

Please sign in to comment.