Skip to content

Commit

Permalink
Merge pull request #3677 from telepresenceio/thallgren/serviceless-in…
Browse files Browse the repository at this point in the history
…tercepts

Enable intercepts of workloads that have no service.
  • Loading branch information
thallgren committed Aug 30, 2024
2 parents 7884cec + c09b284 commit 7cd6eb4
Show file tree
Hide file tree
Showing 21 changed files with 1,298 additions and 1,095 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,13 @@ items:
- version: 2.20.0
date: (TBD)
notes:
- type: feature
title: Enable intercepts of workloads that have no service.
body: >-
Telepresence is now capable of intercepting workloads that have no associated service. The intercept will then target container port
instead of a service port. The new behavior is enabled by adding a <code>telepresence.getambassador.io/inject-container-ports</code>
annotation where the value is a comma separated list of port identifiers consisting of either the name or the port number of a container
port, optionally suffixed with <code>/TCP</code> or <code>/UDP</code>.
- type: feature
title: Publish the OSS version of the telepresence Helm chart
body: >-
Expand Down
72 changes: 39 additions & 33 deletions cmd/traffic/cmd/agent/intercepttarget.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@ import (
"github.com/telepresenceio/telepresence/v2/pkg/ioutil"
)

// InterceptTarget describes the mapping between service ports and one container port. All entries
// must be guaranteed to all have the same Protocol, ContainerPort, and AgentPort. The slice must
// be considered immutable once created using NewInterceptTarget.
// InterceptTarget describes the mapping between service ports and one container port, or if no service
// is used, just the container port.
// All entries must be guaranteed to all have the same Protocol, ContainerPort, and AgentPort.
// The slice must be considered immutable once created using NewInterceptTarget.
type InterceptTarget []*agentconfig.Intercept

func NewInterceptTarget(ics []*agentconfig.Intercept) InterceptTarget {
Expand All @@ -26,10 +27,10 @@ func NewInterceptTarget(ics []*agentconfig.Intercept) InterceptTarget {
panic("attempt to add intercept create an InterceptTarget with no Intercepts")
}
if ni > 1 {
sv := ics[0]
icZero := ics[0]
for i := 1; i < ni; i++ {
ic := ics[i]
if sv.AgentPort != ic.AgentPort || sv.ContainerPort != ic.ContainerPort || sv.Protocol != ic.Protocol {
if icZero.AgentPort != ic.AgentPort || icZero.ContainerPort != ic.ContainerPort || icZero.Protocol != ic.Protocol {
panic("attempt to add intercept to an InterceptTarget with different AgentPort or ContainerPort")
}
}
Expand All @@ -38,8 +39,8 @@ func NewInterceptTarget(ics []*agentconfig.Intercept) InterceptTarget {
}

func (cp InterceptTarget) MatchForSpec(spec *manager.InterceptSpec) bool {
for _, sv := range cp {
if agentconfig.SpecMatchesIntercept(spec, sv) {
for _, ic := range cp {
if agentconfig.SpecMatchesIntercept(spec, ic) {
return true
}
}
Expand All @@ -62,26 +63,35 @@ func (cp InterceptTarget) Protocol() v1.Protocol {
return cp[0].Protocol
}

func portString(ic *agentconfig.Intercept) (s string) {
if ic.ServiceUID != "" {
p := ic.ServicePortName
if p == "" {
p = strconv.Itoa(int(ic.ServicePort))
}
return fmt.Sprintf("service port %s:%s", ic.ServiceName, p)
}
p := ic.ContainerPortName
if p == "" {
p = strconv.Itoa(int(ic.ContainerPort))
}
return fmt.Sprintf("container port %s", p)
}

func (cp InterceptTarget) AppProtocol(ctx context.Context) (proto string) {
var foundSv *agentconfig.Intercept
for _, sv := range cp {
if sv.AppProtocol == "" {
var foundIc *agentconfig.Intercept
for _, ic := range cp {
if ic.AppProtocol == "" {
continue
}
if foundSv == nil {
foundSv = sv
proto = foundSv.AppProtocol
} else if foundSv.AppProtocol != sv.AppProtocol {
svcPort := func(s *agentconfig.Intercept) string {
if s.ServicePortName != "" {
return fmt.Sprintf("%s:%s", s.ServiceName, s.ServicePortName)
}
return fmt.Sprintf("%s:%d", s.ServiceName, s.ServicePort)
}
dlog.Warningf(ctx, "port %s appProtocol %s differs from port %s appProtocol %s. %s will be used for container port %d",
svcPort(foundSv), proto,
svcPort(sv), sv.AppProtocol,
proto, sv.ContainerPort)
if foundIc == nil {
foundIc = ic
proto = foundIc.AppProtocol
} else if foundIc.AppProtocol != ic.AppProtocol {
dlog.Warningf(ctx, "%s appProtocol %s differs from %s appProtocol %s. %s will be used for %s",
portString(foundIc), proto,
portString(ic), ic.AppProtocol,
proto, portString(ic))
}
}
return proto
Expand Down Expand Up @@ -111,7 +121,7 @@ func (cp InterceptTarget) String() string {
if l > 1 {
sb.WriteByte('[')
}
for i, sv := range cp {
for i, ic := range cp {
if i > 0 {
switch l {
case 2:
Expand All @@ -122,17 +132,13 @@ func (cp InterceptTarget) String() string {
sb.WriteString(", ")
}
}
sb.WriteString(sv.ServiceName)
sb.WriteByte(':')
if sv.ServicePortName != "" {
sb.WriteString(sv.ServicePortName)
} else {
sb.WriteString(strconv.Itoa(int(sv.ServicePort)))
}
sb.WriteString(portString(ic))
}
if l > 1 {
sb.WriteByte(']')
}
ioutil.Printf(&sb, " => container port %d/%s", cp.ContainerPort(), cp.Protocol())
if l > 1 || cp[0].ServiceName != "" {
ioutil.Printf(&sb, " => container port %d/%s", cp.ContainerPort(), cp.Protocol())
}
return sb.String()
}
4 changes: 2 additions & 2 deletions cmd/traffic/cmd/agent/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,8 @@ func (s *state) HandleIntercepts(ctx context.Context, iis []*manager.InterceptIn
for _, ii := range iis {
ic := ist.Target()
if ic.MatchForSpec(ii.Spec) {
dlog.Debugf(ctx, "intercept id %s svc=%q, svcPortId=%q matches target protocol=%s, agentPort=%d, containerPort=%d",
ii.Id, ii.Spec.ServiceName, ii.Spec.ServicePortIdentifier, ic.Protocol(), ic.AgentPort(), ic.ContainerPort())
dlog.Debugf(ctx, "intercept id %s svc=%q, portId=%q matches target protocol=%s, agentPort=%d, containerPort=%d",
ii.Id, ii.Spec.ServiceName, ii.Spec.PortIdentifier, ic.Protocol(), ic.AgentPort(), ic.ContainerPort())
ms = append(ms, ii)
}
}
Expand Down
32 changes: 16 additions & 16 deletions cmd/traffic/cmd/agent/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,27 +76,27 @@ func TestState_HandleIntercepts(t *testing.T) {
cepts = []*rpc.InterceptInfo{
{
Spec: &rpc.InterceptSpec{
Name: "cept1Name",
Client: "user@host1",
Agent: "agentName",
Mechanism: "tcp",
Namespace: namespace,
ServiceName: serviceName,
ServicePortIdentifier: "http",
TargetPort: 8080,
Name: "cept1Name",
Client: "user@host1",
Agent: "agentName",
Mechanism: "tcp",
Namespace: namespace,
ServiceName: serviceName,
PortIdentifier: "http",
TargetPort: 8080,
},
Id: "intercept-01",
},
{
Spec: &rpc.InterceptSpec{
Name: "cept2Name",
Client: "user@host2",
Agent: "agentName",
Mechanism: "tcp",
Namespace: namespace,
ServiceName: serviceName,
ServicePortIdentifier: "http",
TargetPort: 8080,
Name: "cept2Name",
Client: "user@host2",
Agent: "agentName",
Mechanism: "tcp",
Namespace: namespace,
ServiceName: serviceName,
PortIdentifier: "http",
TargetPort: 8080,
},
Id: "intercept-02",
},
Expand Down
49 changes: 28 additions & 21 deletions cmd/traffic/cmd/manager/state/intercept.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,15 @@ import (

// PrepareIntercept ensures that the given request can be matched against the intercept configuration of
// the workload that it references. It returns a PreparedIntercept where all intercepted ports have been
// qualified with a service name and a service port name.
// qualified with a container port and if applicable, with service name and a service port name.
//
// The first step is to find the requested Workload and the agent config for that workload. This step will
// create the initial ConfigMap for the namespace if it doesn't exist yet, and also generate the actual
// intercept config if it doesn't exist.
//
// The second step matches all ServicePortIdentifiers in the request to the intercepts of the agent config
// and creates a resulting PreparedIntercept with a services array that has the same size and positions as
// the ServicePortIdentifiers in the request.
// The second step matches all PortIdentifiers in the request to the intercepts of the agent config.
//
// It's expected that the client that makes the call will update any unqualified service port identifiers
// It's expected that the client that makes the call will update any unqualified port identifiers
// with the ones in the returned PreparedIntercept.
func (s *state) PrepareIntercept(
ctx context.Context,
Expand Down Expand Up @@ -83,7 +81,7 @@ func (s *state) PrepareIntercept(
if err != nil {
return interceptError(err)
}
_, ic, err := findIntercept(ac, spec)
cn, ic, err := findIntercept(ac, spec)
if err != nil {
return interceptError(err)
}
Expand All @@ -92,6 +90,9 @@ func (s *state) PrepareIntercept(
ServiceUid: string(ic.ServiceUID),
ServiceName: ic.ServiceName,
ServicePortName: ic.ServicePortName,
ContainerName: cn.Name,
Protocol: string(ic.Protocol),
ContainerPort: int32(ic.ContainerPort),
ServicePort: int32(ic.ServicePort),
AgentImage: ac.AgentImage,
WorkloadKind: ac.WorkloadKind,
Expand Down Expand Up @@ -517,16 +518,22 @@ func unmarshalConfigMapEntry(y string, name, namespace string) (agentconfig.Side
return scx, nil
}

// findIntercept finds the intercept configuration that matches the given InterceptSpec's service/service port.
// findIntercept finds the intercept configuration that matches the given InterceptSpec's service/service port or container port.
func findIntercept(ac *agentconfig.Sidecar, spec *managerrpc.InterceptSpec) (foundCN *agentconfig.Container, foundIC *agentconfig.Intercept, err error) {
spi := agentconfig.PortIdentifier(spec.ServicePortIdentifier)
pi := agentconfig.PortIdentifier(spec.PortIdentifier)
for _, cn := range ac.Containers {
for _, ic := range cn.Intercepts {
if !(spec.ServiceName == "" || spec.ServiceName == ic.ServiceName) {
continue
}
if !(spi == "" || agentconfig.IsInterceptFor(spi, ic)) {
continue
if pi != "" {
if ic.ServiceUID != "" {
if !agentconfig.IsInterceptForService(pi, ic) {
continue
}
} else if !agentconfig.IsInterceptForContainer(pi, ic) {
continue
}
}
if foundIC == nil {
foundCN = cn
Expand All @@ -535,22 +542,22 @@ func findIntercept(ac *agentconfig.Sidecar, spec *managerrpc.InterceptSpec) (fou
}
var msg string
switch {
case spec.ServiceName == "" && spi == "":
msg = fmt.Sprintf("%s %s.%s has multiple interceptable service ports.\n"+
"Please specify the service and/or service port you want to intercept "+
"by passing the --service=<svc> and/or --port=<local:svcPortName> flag.",
case spec.ServiceName == "" && pi == "":
msg = fmt.Sprintf("%s %s.%s has multiple interceptable ports.\n"+
"Please specify the service and/or port you want to intercept "+
"by passing the --service=<svc> and/or --port=<local:portName/portNumber> flag.",
ac.WorkloadKind, ac.WorkloadName, ac.Namespace)
case spec.ServiceName == "":
msg = fmt.Sprintf("%s %s.%s has multiple interceptable services with port %s.\n"+
"Please specify the service you want to intercept by passing the --service=<svc> flag.",
ac.WorkloadKind, ac.WorkloadName, ac.Namespace, spi)
case spi == "":
ac.WorkloadKind, ac.WorkloadName, ac.Namespace, pi)
case pi == "":
msg = fmt.Sprintf("%s %s.%s has multiple interceptable ports in service %s.\n"+
"Please specify the port you want to intercept by passing the --port=<local:svcPortName> flag.",
ac.WorkloadKind, ac.WorkloadName, ac.Namespace, spec.ServiceName)
default:
msg = fmt.Sprintf("%s %s.%s intercept config is broken. Service %s, port %s is declared more than once\n",
ac.WorkloadKind, ac.WorkloadName, ac.Namespace, spec.ServiceName, spi)
ac.WorkloadKind, ac.WorkloadName, ac.Namespace, spec.ServiceName, pi)
}
return nil, nil, errcat.User.New(msg)
}
Expand All @@ -561,13 +568,13 @@ func findIntercept(ac *agentconfig.Sidecar, spec *managerrpc.InterceptSpec) (fou

ss := ""
if spec.ServiceName != "" {
if spi != "" {
ss = fmt.Sprintf(" matching service %s, port %s", spec.ServiceName, spi)
if pi != "" {
ss = fmt.Sprintf(" matching service %s, port %s", spec.ServiceName, pi)
} else {
ss = fmt.Sprintf(" matching service %s", spec.ServiceName)
}
} else if spi != "" {
ss = fmt.Sprintf(" matching port %s", spi)
} else if pi != "" {
ss = fmt.Sprintf(" matching port %s", pi)
}
return nil, nil, errcat.User.Newf("%s %s.%s has no interceptable port%s", ac.WorkloadKind, ac.WorkloadName, ac.Namespace, ss)
}
Expand Down
32 changes: 32 additions & 0 deletions integration_test/testdata/k8s/echo-no-svc.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: echo-no-svc
labels:
app: echo-no-svc
spec:
replicas: 1
selector:
matchLabels:
app: echo-no-svc
template:
metadata:
labels:
app: echo-no-svc
annotations:
telepresence.getambassador.io/inject-container-ports: http
spec:
automountServiceAccountToken: false
containers:
- name: echo-server
image: ghcr.io/telepresenceio/echo-server:latest
ports:
- name: http
containerPort: 8080
env:
- name: PORT
value: "8080"
resources:
limits:
cpu: 50m
memory: 8Mi
Loading

0 comments on commit 7cd6eb4

Please sign in to comment.