Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Servicemap POC #42120

Closed
wants to merge 1 commit into from
Closed

Conversation

roncohen
Copy link
Contributor

@roncohen roncohen commented Jul 29, 2019

This is a POC on a service map. It works by looking at a carefully sampled set of traces and generating connection documents from those traces. It's meant as a way to get us started in the land of the service map and not as the complete solution to everything we could every want from a service map.

service-map-800x610

I've added the service map as a global tab, next to the services and traces lists. I've also added a service map tab in the service specific pages, next to the errors tab. This tab will show services that are downstream from the selected service. I've used react-cytoscapejs to draw the services/connections. The visualization part obviously still needs a bunch of work.

You'll notice the opbeans-go isn't running, so it doesn't find the connection to that service. Instead it shows the a destination.address node (opbeans-go:3000) that it knows from the calling service. There's also currently a problem with the node agent in which an external span can have somewhat arbitrary child spans: elastic/apm-agent-nodejs#1239.

Interestingly, opbeans-python will create spans both to postgresql and postgreql (missing an s) (fixed: #42120 (comment))

opbeans-ruby shows up twice, one with the environment and one without. That's because outgoing spans don't have the environment set yet, while transaction do and opbeans-ruby is the only opbeans app using envs by default. Will be fixed by elastic/apm-server#2471

Assumptions

  1. We cannot use anything other than the trace id to link services together
    Agents don't support baggage across service boundaries and it's not on the roadmap. It also adds overhead to every request, although that might not be significant. In scenarios where our agents aren't being used (opencensus?), we cannot rely on additional context being propagated.
  2. Random sampling will capture a trace of every interesting connection in the system.
    People with a very low sampling rate in combination with very rare connections run the risk of missing connections. This proposal does not attempt to solve tail based sampling.
  3. Service A will only talk to one other service B per destination.address. If this is not the case it's not a dealbreaker, but there are complications. Along the same lines, we assume Service A sending messages on a specific queue will be consumed by only one service B. The pathological case here is an event bus situation where many services listen to the same queue/topic. Again, not a dealbreaker though. More in the discussion One service per destination address/queue below.
  4. This solution needs to work with cross cluster search. That ruled out things like using the routing key to ensure all spans of the same trace go to the same shard (see discussion below)

How it works

In its current state there's a Kibana job that runs every minute. It will execute a composite aggregation on spans to go through every combination of service and destination.address found since the job last ran. For each of these combinations, it will select X number of spans and get their trace id. It selects the spans using a diversified sampler sub-aggregation. At the moment it will try to get a diverse set of spans based on their duration, but we can imagine including other criteria as well (see discussion on what happens if a service is down). It takes the trace.id of all the spans returned and then executes a scripted metric aggregation, filtering for the trace.ids we found in the previous step and also grouping by trace.id. The scripted metric aggregation assembles the full trace from the individual spans and generates a document per trace that has a list of connections between services that was extracted from the trace. The job then goes through those results and creates the connection documents in Elasticsearch. I initially wrote it as a scripted metric agg because i was trying to do it all as a data frame, but it didn't work out. We should consider if it still makes sense to have Elasticsearch doing the assembly or we should instead pull the traces out and do it in Node.js.

We'll create a connection document even for outgoing spans that don't have a corresponding receiving side transaction. This means that if we set destination.address on DB/cache/etc spans, they will automatically show up in the service map as well. Those connection documents will not have a callee property.

A note on security: Some customers will use index level security to restrict who can see what. I've assumed users are allowed to see the outgoing connections from all the services they already have access to. That is, you can see all the services that the services you have access to talks to but you cannot necessarily see all the services that talk to the services you have access to. To achieve this, connection documents are written to the same index as where the outgoing call was found. Some I've added a configuration option that allows users to decide where to write the connection documents if they don't like this.

Try it out

There are a couple of things that you need to do to try it out.

  1. Install this pipeline:
extract_destination pipeline
PUT /_ingest/pipeline/extract_destination
{
  "description": "sets destination on ext spans based on their name",
  "processors": [
    {
        "set": {
          "if": "ctx.span != null && ctx.span.type == 'ext'",
          "field": "span.type",
          "value": "external"
        }
    },
    {
        "script": """
        if(ctx['span'] != null) {
          if (ctx['span']['type'] == 'external') {
            def spanName = ctx['span']['name'];
            if (spanName.indexOf('/') > -1) {
              spanName = spanName.substring(0, spanName.indexOf('/'));
            }
            
            if (spanName.indexOf(' ') > -1) {
              spanName = spanName.substring(spanName.indexOf(' ')+1, spanName.length());
            }
            ctx['destination.address']=spanName;
          }
          
          if (ctx['span']['type'] == 'resource') {
            def spanName = ctx['span']['name'];
            
            if (spanName.indexOf('://') > -1) {
              spanName = spanName.substring(spanName.indexOf('://')+3, spanName.length());
            }
            if (spanName.indexOf('/') > -1) {
              spanName = spanName.substring(0, spanName.indexOf('/'));
            }
            
            ctx['destination.address']=spanName;
          }
          
          if (ctx['span']['type'] == 'db') {
            def dest = ctx['span']['subtype'];
            ctx['destination.address']=dest;
          }
          
          if (ctx['span']['type'] == 'cache') {
            def dest = ctx['span']['subtype'];
            ctx['destination.address']=dest;
          }
        }
        """
      }
  ]
}

Install the pipeline in the API pipeline:

PUT _ingest/pipeline/apm
{
    "description" : "Default enrichment for APM events",
    "processors" : [
      {
        "pipeline" : {
          "name" : "apm_user_agent"
        }
      },
      {
        "pipeline" : {
          "name" : "apm_user_geo"
        }
      },
      {
        "pipeline": {
          "name": "extract_destination"
        }
      }
      
    ]
  }

This is only needed until #117 and #115 are implemented across agents.

  1. Install the painless scripts that assemble a trace from spans/transactions in a scripted metric aggregation:
Trace assembly scripts
POST _scripts/map-service-conns
{
  "script": {
    "lang": "painless",
    "source": """
              def s = new HashMap();

              if (!doc['span.id'].empty) {
                s.id = doc['span.id'].value
              } else {
                s.id = doc['transaction.id'].value;
                s.transaction = true;
              }
              if (!doc['parent.id'].empty) {
                s.parent = doc['parent.id'].value;
              }
              if (!doc['service.environment'].empty) {
                s.environment = doc['service.environment'].value;
              }
             
              if (!doc['destination.address'].empty) {
                s.destination = doc['destination.address'].value;
              }
              
              if (!doc['_index'].empty) {
                s._index = doc['_index'].value;
              }
              
              if (!doc['span.type'].empty) {
                s.span_type = doc['span.type'].value;
              }
              
              if (!doc['span.subtype'].empty) {
                s.span_subtype = doc['span.subtype'].value;
              }
              
              s.timestamp = doc['@timestamp'].value;
              s.service_name = doc['service.name'].value;
              if(!state.spans.containsKey(s.parent)) {
                state.spans.put(s.parent, new ArrayList())
              }
              
              // parent == id will lead to a stack overflow later on
              if (s.parent != s.id) {
                state.spans[s.parent].add(s)
              }
            """
  }
}

POST _scripts/reduce-service-conns
{
  "script": {
    "lang": "painless",
    "source": """
              void extractChildren(def caller, def spans, def upstream, def conns, def count) {
                // todo: simplify these conditionals
                if (spans.containsKey(caller.id)) {
                  for(s in spans[caller.id]) {
                      if (caller.span_type=='external') {
                        upstream.add(caller.service_name+"/"+caller.environment);
  
                        def conn = new HashMap();
                        conn.caller = caller;
                        conn.callee = s;
                        conn.upstream = new ArrayList(upstream);
                        conns.add(conn);
                        
                        extractChildren(s, spans, upstream, conns, count);
                        upstream.remove(upstream.size()-1);
                      } else {
                        extractChildren(s, spans, upstream, conns, count);
                      }
                  }
                } else {
                  // no connection found, do not set 'callee'
                  def conn = new HashMap();
                  conn.caller = caller;
                  conn.upstream = new ArrayList(upstream);
                  conn.upstream.add(caller.service_name+"/"+caller.environment);
                  conns.add(conn);
                }
              }
              def conns = new HashSet();
              def spans = new HashMap();
              
              // merge results from shards
              for(state in states) {
                for(s in state.entrySet()) {
                  def v = s.getValue();
                  def k = s.getKey();
                  if(!spans.containsKey(k)) {
                    spans[k] = v;
                  } else {
                    for (p in v) {
                      spans[k].add(p);
                    }
                  }
                }
              }
              
              if (spans.containsKey(null) && spans[null].size() > 0) {
                def node = spans[null][0];
                def upstream = new ArrayList();
              
                extractChildren(node, spans, upstream, conns, 0);

                return new ArrayList(conns)
              }
              return [];
            """
  }
}

POST _scripts/combine-service-conns
{
  "script": {
    "lang": "painless",
    "source": "return state.spans"
  }
}
  1. Pull this branch of kibana and run it against a master Elasticsearch and wait for a bit

Discussion

What happens if calls to a service fails for some time

If service B is down or otherwise inaccessible, there will be no discoverable traces connecting it to other services for that period. Like opbean-go above, it will show up alongside db/cache connections in grey and its label will be the destination.address that other services are trying to reach it at. If our sampling hits upon a trace that actually went through to the opbeans-go service, a connection document will be created that has the 'callee' property set. As alluded to above, we should include the result of the outgoing span in the criteria for choosing diverse spans to ensure we get calls that succeeded.

Connection documents have timestamps that are taken from their outgoing spans. The aggregation we perform in the UI will ensure that if there's even a single connection with callee set in the given timeframe, it will show up in the UI instead of the gray node labeled destination.address. It also means if the user selects a small enough timeframe, where we didn't generate a connection with callee set, the user will only see the gray node.

Time to glass

The job currently runs every 60s. That means new connections will take up to 60s to show up in addition to our regular time to glass, which is something like 10s, but usually much faster. The job only executes two queries so I'd say it's reasonable to decrease the period, for example to 10s.

The trade-off are that more queries will be executed and if the number of sample traces examined stays constant, more connection documents will get generated. We can reduce the number of selected sample traces per run to make up for it, but i don't expect these connections to be a big problem.

One service per destination address/queue

For each service, it will look at the full trace for a certain number of outgoing spans per destination. That means, as long as there's only one service on the receiving end of each service/destination combination, we will discover the connection (assuming it was even sampled in the agent, see (2) above). If there are more services on the receiving end, for example if you have an uninstrumented proxy that will sometimes route calls to a specific destination.address to service C instead of B, the job might need to go through many traces until it finds one in which service A connects to service C on that particular address. It is setup to look at 20 traces per service per destination per minute at the moment. It's configurable. In the case of a proxy like istio/envoy, the destination.address usually defines the destination service, so it's not a problem. You can have custom routing, for example you're creating a new version of a service, with a new name "B2", and you're using envoy to route some of the traffic there while being opaque to the calling service, we'll need to wait until the job happens on a trace that hit service B2. If the amount of calls going there is small enough relative to the total number of calls from that service to that address and the number of selected samples is small enough (20 default), it might not show up. If we integrate with envoy and collect spans from it, this should go away. This whole conversation applies more or less directly to messaging systems. If you instead of destination.address think about topic or queue. Imagine we introduce a new field that is a combination of the fields we think are relevant to determining the receiving service, for http it will be destination.address, for jobs, it will be job queue or combination of broker cluster and job queue etc.

As mentioned above, the case of the event bus, where arbitrary services might listen to a specific topic/queue, this could be a problem. I think it's totally fine to ship a service map that doesn't take messaging into account (as it's typically not in the latency path) for starters, if it turns out to be problematic.

Cross cluster search

This solution needs to work with cross cluster search. Some customers have each team running their own cluster for their own APM/logging/metrics needs but then they want to use CCS to see what the other teams services are doing. This solution could accommodate that by using CSS to query across clusters and use the configuration described above to make sure connections are written to their local cluster

Extensions

I don't want us to spend a bunch of time discussing these right now, just want to get us thinking:

requests/errors per minute overlay on connections

Assuming assumption (3) holds, agents should start to collect metrics that describe the number of requests each service makes to a specific destination.address. It's somewhat similar to the breakdown graphs, but instead of using span.type we use destination.address. In order to show errors per minute we could also include the result as a dimension in the metric. Even if (3) doesn't hold always, we can show an outgoing arrow per destination.address and put the rpm/epm metric on that and then it splits into the multiple services that we've discovered for that destination.address.

better connections through baggage

When agents support baggage, we could send something through in the baggage that uniquely identifies the service. For example, a hash of the service name. We'd make sure the hash is attached to the transaction documents that are generated on the receiving end. Then in the Kibana job, we flip the composite aggregation around so that it will look at all combinations of incoming service/hash. Flipping the composite agg will mean no longer getting external dependencies such as db/cache automatically, so some thought will have to go into this.

@beniwohli
Copy link

This is awesome!

Interestingly, opbeans-python will create spans both to postgresql and postgreql (missing an s).

A fix for this has been recently merged and will be released in tandem with the 7.3 release.

@roncohen roncohen added the Team:APM All issues that need APM UI Team support label Jul 29, 2019
@elasticmachine
Copy link
Contributor

Pinging @elastic/apm-ui

@elasticmachine
Copy link
Contributor

💔 Build Failed


*/

function interestingTransactions(since?: string, afterKey?: any) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is should rather be called "interestingTraces"

@felixbarny
Copy link
Member

First of all: I love it!

Some comments/questions:

  • When using both agent/head-based sampling plus a diversified sampler aggregation, how could we calculate approximate throughput and error rates between services?
  • Could we use the result documents used to identify interesting/unusual traces to facilitate tail-based sampling?
  • I guess the worst-case for this setup would be a non-instrumented proxy which routes to a wide variety of different services, based on the path like http://proxy.example.com/$service/path/within/service, right? But we might have problems with that setup anyway if the proxy doesn't forward the headers.

@axw
Copy link
Member

axw commented Aug 6, 2019

Could we use the result documents used to identify interesting/unusual traces to facilitate tail-based sampling?

@felixbarny I've been thinking the same thing. I think we would want to compute the aggregated trace graph though, not the service graph. The service graph is a transformation of the transaction graph: collapse transaction nodes for the same service name; so we could in theory store this as the primitive.

We might want to consider storing trace paths rather than connections, enabling identification of critical paths, e.g. by visualising as a flame graph, or highlighting critical paths on the graph. This would be more expensive to store vs. pairwise connections, but I'm not sure how much more; worth investigation I think.

@eyalkoren
Copy link
Contributor

This is really awesome!! Completely different from the approach I had in mind (doing the aggregation in the agents and send sub-graphs from each agent), and with some great advantages over it, as long as the overhead of the backend job is acceptable and the accuracy of the data we show is not too affected by aggressive sampling.

I also had similar questions to what @felixbarny raised:

When using both agent/head-based sampling plus a diversified sampler aggregation, how could we calculate approximate throughput and error rates between services?

I assume that as long as head-based sampling is applied randomly by the agents, we can extrapolate and get meaningful approximations, with the understanding that the less samples we collect, the less accurate results we'll get. So I think the greater concern is the sampling done by the aggregations, rather than the agents. The question is how well this will preform in very high throughputs.

Regarding the diversified sampler- it seems it does not support the ability to calculate hit rates and throughput at all. It seems efficient for discovery of connections but not for quantification.

I think the bottom line of this is: if we eventually want to show hit rates and some other aggregations (like error rates and average/max durations of connections)- can this approach lead us there?

@roncohen
Copy link
Contributor Author

roncohen commented Aug 12, 2019

thanks for your comments!:

When using both agent/head-based sampling plus a diversified sampler aggregation, how could we calculate approximate throughput and error rates between services?

when (3) holds it means we can rely on the fact that service A talks only to service B on address B'. When that is the case, we can create a metric in the agents that tracks throughput (number of calls per period, number of failed calls etc.) to B'. Still, we won't have accurate metrics on connections 2+ hops from a given service. For example, let's say we have this topology:

A ---> B ---> D
C ----/

And then we want to show the user only the down streams from A:

A ---> B ---> D

Here, we cannot tell you how many calls to D originate in A. Putting the total throughput from B to D on the graph here would be misleading because it would make people attribute more calls B->D to service A than what is true.

A way to work around this in the UI, could be to show the sum of the "other" incoming request to B alongside the connection A->B:

     A --10 rpm--> B --22 rpm--> D
others --12 rpm---/

E.g. include "others" talking to B here, with the aggregated metrics for them. Not ideal, but perhaps a good start. Another approach, which some of you hinted at, would be to extrapolate from the traces we did look at, e.g. how often do calls to D originate in A. That's certainly something we could try, but it could be problematic.

For showing latency specifically, there's another option. We could decide to only indicate which services are contributing significant latency instead of showing hard numbers. For each connection document, we include the percentage of the full trace time that was spent in "service self time", e.g. time that we could not attribute to calls to other services. Then when showing the diagram we highlight the service(s) that contributed significant latency according to some definition of "significant latency" we decide on:

A ---> B ---> [D]

It would still be a sampling approach, but i think it would be good enough seeing as the diversified sampler should ensure we get a diverse set of latency buckets, plus it wouldn't be a hard number but rather an indicator that D is the service that is slowing down A.

For situations where (3) doesn't hold, meaning A talks to both B and C using address B' and the sampling discovers this, we can have the UI show an outgoing arrow that splits into two:

    8 rpm    /------ B
A -----------
             \------ C

all the considerations around >1 hop connections still apply here.

Could we use the result documents used to identify interesting/unusual traces to facilitate tail-based sampling?

Not sure exactly what you have in mind?

I guess the worst-case for this setup would be a non-instrumented proxy which routes to a wide variety of different services, based on the path like http://proxy.example.com/$service/path/within/service, right? But we might have problems with that setup anyway if the proxy doesn't forward the headers.

Yes exactly. Adding the path to the diversified sampler here would help ensure we detect the connections, but might also backfire, for example if they have IDs in urls. In any case (3) would no longer hold, which means overlaying throughput etc. as mentioned above wouldn't be as nice.

We might want to consider storing trace paths rather than connections, enabling identification of critical paths, e.g. by visualising as a flame graph, or highlighting critical paths on the graph. This would be more expensive to store vs. pairwise connections, but I'm not sure how much more; worth investigation I think.

The POC is already storing the full path with each connection as a list. It also stores the upstream services as a list and as a joined keyword. You can then "group by" the upstream keyword to get each service in each position in the path or you can use the list to filter for downs stream connections.

Regarding the diversified sampler- it seems it does not support the ability to calculate hit rates and throughput at all. It seems efficient for discovery of connections but not for quantification.

I think the bottom line of this is: if we eventually want to show hit rates and some other aggregations (like error rates and average/max durations of connections)- can this approach lead us there?

You're exactly right. As long as (3) holds, we can "overlay" error rates, throughput, average/max durations of the immediate connections (e.g. one hop) etc. on top of the discovered connections using a metric in the agent that is tracking each address. If (3) doesn't hold, things will not be as nice.

@eyalkoren
Copy link
Contributor

Thanks for explaining.

we can "overlay" error rates, throughput, average/max durations of the immediate connections (e.g. one hop) etc. on top of the discovered connections using a metric in the agent that is tracking each address

If we can transfer baggage, can we rely on these metrics alone to provide all the information we want about all connections? Or will there still be data we won't be able to get without looking at specific traces?

@roncohen
Copy link
Contributor Author

Yes! As far as i can tell, with baggage that we "own", e.g. we set it in our agents, we should be able to produce the >1 hop metrics we'd want without needing to do approximations etc.

@hmdhk
Copy link
Contributor

hmdhk commented Aug 15, 2019

@roncohen , It seems that the usage of baggage here is very similar to what tracestate header provides, have you consider this?

tracestate is considered opaque to other vendors and has the added benefit that trace-context implementations are required to propagate this if the connection goes across vendor boundaries!

@roncohen
Copy link
Contributor Author

@jahtalab

yes good point. We'd probably use that instead of "baggage" as such.

@axw
Copy link
Member

axw commented Aug 19, 2019

We might want to consider storing trace paths rather than connections, enabling identification of critical paths, e.g. by visualising as a flame graph, or highlighting critical paths on the graph. This would be more expensive to store vs. pairwise connections, but I'm not sure how much more; worth investigation I think.

The POC is already storing the full path with each connection as a list. It also stores the upstream services as a list and as a joined keyword. You can then "group by" the upstream keyword to get each service in each position in the path or you can use the list to filter for downs stream connections.

Thanks for the pointer. Now I understand that it's basically what I had in mind, except that this is storing the service-level graph and not transaction-level. The latter would be a more broadly useful primitive, but we could also evolve towards that later.

I ended up pulling down and running the code locally. In case anyone else finds it useful, I wrote a little program to simulate a distributed trace with defined node names and paths: https://gist.github.com/axw/841bf72943a0bf348c0bc1ae29a9d2b3

},
{
path: '/servicemap',
title: i18n.translate('xpack.apm.home.tracesTabLabel', {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be a new key instead of tracesTabLabel?

}

// interface Props {
// connections: any;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI cytoscape has very good DefinitelyTyped definitions which we can use to our advantage later, since you can deal with some pretty complex objects.

Not sure where to ask this, so I'll ask it here: Are there reason's we're not using the components that make up the Graph capabilities already in x-pack?

Copy link
Member

@sorenlouv sorenlouv Aug 29, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there reasons we're not using the components that make up the Graph capabilities already in x-pack?

Without having looked into it much I don't think the Graph plugin is easily embeddable in APM UI. Afaict Graph is based on Angular while APM UI uses React. Mixing those frameworks is possible but something we should think twice about doing.

Secondly, I think our use case requires a lot of custom UI work and we would probably spend more time fighting the capabilities and visual appearance of Graph instead of building it from scratch. We already had a meeting with other teams who also need graph visualizations and we found common ground in using the same lib (cytoscape) but for now nothing more than that.

@ogupte ogupte closed this May 5, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
discuss Team:APM All issues that need APM UI Team support
Projects
None yet
Development

Successfully merging this pull request may close these issues.