-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Observation and tracing per emitted element on flux publisher #3424
Comments
To my understanding, with only metrics it was a more simple task and it was possible to achieve. Observation is a more abstract concept and has no functionalities that would allow instrumenting individual items of a Flux. As you noted:
This is the proper way to handle it. I think we'd need more concrete ideas and expected API examples. Keeping in mind that an Observation is used to extract MDC information in micrometer-tracing, that it's used for tracing and also metrics, can you provide some examples that directly access the relevant Observation in the individual items' processing steps to depict how it should work? |
If we are to assume that metrics and observations in the end will produce different set of metrics by design then it should be added in metrics.adoc. Right now the assumption is that you can replace Micrometer.metrics with Micrometer.observation which isn't really the case. Metrics will generate different set of metrics and just metrics while observation will potentially generate a single metric for whole publisher and a span associated with it (both created by observation handlers registered to observation registry). This means that if we want full metrics + tracing on a flux we can't use either of solutions on just flux, we will need to write something like this with micrometer.observation: someFluxSource.flatMap(element -> Mono.just(element)
.name('processing_element')
.tag('foo', 'bar')
.flatMap(imDoingSomeProcessing)
// ... some other operators ...
.tap(Micrometer.observation(registry))
) Or manually add tracing but i'm not sure whether metrics exemplars will then work properly. someFluxSource.tap(Micrometer.metrics(registry))
.flatMap(element -> {
var createSpan = tracer.spanBuilder()
//Some setup of a span
.name()
.tag();
return Mono.just(element)
.doFirst(startSpan)
// ... some other operators ...
.doOnCancel(cancelSpan)
.doOnError(errorSpan)
.doOnSuccess(endSpan)
.contextWrite(addTheSpanToContextSoItsAvailableDownstreamForSomeOtherOperators);
}) |
Upvoting this. We have a similar use case, where we have an "infinite flux" coming from Kafka (using Reactor Kafka). For each message received, we apply a transformation logic, which is an in-memory description of the payload that is in Kafka. Therefore, we would like to count the number of transformations realized, as well as the time of the transformations. The first transformation takes 2ms, the second takes 5ms, the third takes 4ms, and the Nth takes 6ms.
Our initial logic was :
Which we refactored to
In order to get the expected result. Having to convert to flatMap, then creating a Mono.just in order to get the correct observation is a bit strange, and the code a bit less elegant. It would be great if the reactor team can help with an improvement on this business use case for observability. Thank you in advance. |
Let me just paste my thoughts expressed in the Micrometer Slack: When it comes to the question about avoiding a I wonder if there is any middle ground here. Perhaps the Observation API would be able to more granularly record metrics between onNext signals, but contributing these to the single Observation for the entire stream. Let's ask the Micrometer team, perhaps they have some recommendations! @marcingrzejszczak @shakuzen @jonatan-ivanov 👋 |
As far as I understand the problem so far after more digging, it's currently impossible to create tracing on a flux's per element basis like it happened with spring boot 2 (special MonoSpan and supporting subscribers etc. classes). The closest we could get is to create Flux<Mono> that would require subscribers to subscribe to each separate publisher separately then we could have separate context per each element. Otherwise it's impossible to trace context of a T element in Flux publishers. |
@Gromcio I wrote a three-part blog post series about this subject if you're keen to understand more. In a nutshell, the approach Spring Cloud Sleuth took is not ideal and doesn't work in all cases + can result in thread-local state leaks. The new approach is based around Reactor Context, which is bound to a |
I totally understand the problems with LocalThread :) The biggest problem I've got with current implementation is that it works only if all elements of a publisher should be sharing a context. So to say if we have Mono it's pretty much ok, but if we want to deal with a collection or unbounded stream of records, it sometimes might and sometimes might not make sense. Bear with me as it might be a bit lengthy :) Example 1We've got a db query in spring framework for example returning user's list of book's somehow enriched: requestMono.flatMap(request -> booksRepo.getUsersBooks(request.getUserId()))
.tap(Micrometer.observation(registry))//... maybe some additional params magic to customize spans
.flatMap(book -> enrichment.apply(book))
) This would create a trace with basically 2 spans:
We could always put tap as part of enrichment.apply publisher to generate separate spans but that's not the point here. We've got a trace of fully functional trace of what we needed to know about our process and that's ok. All requirements for our tracing are fulfilled. AsideFor request stream It works because we're not working with an endless stream of records but attaching our operator (handler) to an existing somewhere in space pipeline making the endless stream of records (requests) fluxOfRequests.flatMap(request -> Mono.just(request)
.flatMap(request -> yourHandler(request)
.contextWrite(createTracingContext)
)
)
//And the your handler would in that case be some kind of implementation of Function<element, Publisher<T>>
function Mono<SomeHttpResponse> yourHandler(Request request) {
return Mono.just(SomeHttpResponse.ok()).flatMap(methodThatHasContextBecauseRequestIsPartOfUpstreamMono)
} Basically we're giving them this but hidden behind the facade of handler/operator that they define for the request processing. return fluxOfRequests.map(request -> Mono.just(request).contextWrite(createTracingContext));
//And they use it like tihs
requestsFlux.flatMap(requestMono -> requestMono.tap(thisWillHaveParentTraceTheSameWay)
.flatMap(doSomethingWithContextIveGot)
) Example 2//This unbounded stream can be anything
//It could be kafka messages stream returned as unbounded flux
//It could be unbounded stream publishing scheduled tasks Flux.create(generateEventsFromCron) or Flux created with sink emitter
//It could be a receiving events from filesystem changes watcher
Flux<SomeRecord> unboundedStream = getUnboundedStream();
//Let's say that the source of the events should somehow attach tracing context to elements to connect later all possible operations,
//For kafka it would likely want to read message headers to acquire tracing context if there's any (company wants to know how some process got handled)
//For task system maybe those tasks are generated with tracing context and put in the stream with some headers in mind
//All of that means that the SomeRecord already has propagated context included it's just not part of observation handling and that's why we want to autoinstrument it so that clients of this api / flux don't have to worry about deser, what to do when SomeRecord is mapped to int and then to UserDto etc.
//Assumption there's no context extraction and injection into reactor context
unboundedStream
//getSomethingByMagic would need to extract trace context from record and duplicate in user while handling of all other stuff would still need to take it into account as well
.flatMap(record -> getSomethingByMagic(record))
.flatMap(user -> something...)
//Assuming it happens and it extracts records trace context we basically can't do it seamlessly
Flux<SomeRecord> tracedFluxNotWorking = unboundedStream
.transform(extractToMonoWithTraceContextFlattened);
//and users ends up with
tracedFluxNotWorking.flatMap(hereWeDontHaveContextAsItsNotPropagatedDownstream)
//The only way to do it is either creating facade as earlier with requests
unboundedStream
.flatMap(record -> Mono.just(record)
.flatMapOrTransform(usersDelegateHandlerMethod)
.contextWrite(getTracingContext(record)
)
//Or to change the contract which means it's no longer seamless to use. We can't simply create spans etc.
Flux<Mono<SomeRecord>> tracedFluxOfMonos = unboundedStream
.transform(extractToMonoWithTraceContext)
.flatMap(recordMono -> recordMono.flatMap(contextIsHereAndIcanUseIt)) All of above practically is not that much of a problem when we're working on a project and can do whatever we want. But if we want to create an instrumentation or simply enable observations/tracing/metrics for existing implementation we would need to change the contracts/implementations in the code. As a good example we can point at kafka instrumentation where we've got KafkaReceiver interface with defined and logical contract but if we wanted to enable tracing we have to change the contract in the current state of things. It feels like maybe the stream should wrap records in some kind of PublisherItem that is not visible to default operators like flatMap or map, but with a method similar to deferContextual it could access the PublisherItem interface to acquire context of a given record, and that then could be leveraged by instrumentation. Otherwise I find it pretty much impossible to do the above stuff seamlessly for flux publishers. |
Hi all 👋 I've been lurking on this thread for a while, and I think you may find a project I have been working on interesting. The project is called Atleon, and its goal is to make it more seamless to build scalable infinite processing pipelines. It's built on top of Reactor, and while the main goal of making it easier to wire together disparate message broker infrastructures (Kafka, SQS, RabbitMQ, etc.) isn't the focus of this thread, the way it abstracts away message acknowledgement is. In order to propagate "acknowledgement" with each item in the pipeline, Atleon wraps each element (like a Since each item you're streaming is wrapped in its own transparent "context", you can add more to that context, like tracing and metrics. This has been really beneficial in my day job since we can get end-to-end traces out of our stream processing exported to our APM. As an added bonus, Atleon also takes care of fan-in (i.e. Here's an example showing tracing functionality in action. The project is still very much a work in progress, but I'd be interested in your thoughts if you check it out! |
I really love where this discussion is heading! ❤️ Thanks for the examples, consideration and ideas! I’ll bring it to the team’s attention and we will consider what we can do! 🚀 |
Any progress on this? I see this as a big issue when using Reactor for endless streams. This problem is not strictly limited to only metrics - although that is important - but also in general to any kind of metadata for emitted elements. For example in event-driven architecture you have an endless stream of events flowing through the system and also single applications within the system. Reactor fits this problem well in all other ways but it doesn't provide a reasonable way to collect per-event metrics at an application level. One cannot do this at subscriber level since those subscribers live as long as the application lives. But in addition to this one would want to have other kind of metadata too to empower logging (event ID, user ID and whatnot) and other functionalities. Sure you can create an envelope-class that wraps your actual element object but that gets very tedious very quickly and it pollutes the business logic in all the places when you are not interested of this metadata in most of the cases. The suggested solution of wrapping everything into separate We are early in our project and we would like to keep on using Reactor for it but unless this is coming soon(ish) we may have to change to alternative approaches. But I don't want to do this if this per-element metadata handling would be coming within the next six months or so. |
Upvoting this. Please help on this feature |
Thanks for the input. I do agree this would help in certain scenarios. However, I don't have an idea for a solution at this time. If you have ideas, let's explore them. @Sage-Pierce created a project which involves an abstraction that looks like something difficult to incorporate. Also would be quite heavy to maintain - it's a combination of a wrapper over each message with a wrapper on top of each I'm open to ideas, please comment if you have something in mind. |
Throwing a quick idea here, not necessarily smart but what has popped into my mind: Provide Any downsides on this approach? |
I absolutely love this conversation, and am super interested in where it might lead 😄 I want to voice my agreement with @chemicL's sentiment that the mechanisms I implemented in Atleon should not be incorporated into Reactor (or Reactor Core, at least), especially in the "wrapping" implementation currently in use. My reasoning behind this is that, IMO, these mechanisms are highly targeted at the very specific subset of use cases concerning infinite processing pipelines, and I think that makes this functionality mis-aligned with the generic applicability of Reactor. Side note, regarding the non-transparent handling of metadata: This is sort of by design. My goal has been to make it such that metadata handling/propagation is maximally abstracted away, such that, in the vast majority of use cases, users don't have to care about it at all, and can just focus on handling the data. This keeps the handling of metadata from polluting your business logic, and seems to be in line with @empperi's thinking. In the case of tracing, this maintains the "automatic" management/activation of spans that application developers are used to when implementing request-response paths (i.e. they don't usually have to mess with the span at all). Behind the scenes, there is automatic decoration available for metrics and tracing, which is implemented by just decorating the underlying @empperi I would be interested in seeing some pseudo-code illustrating your idea. It isn't clear to me how adding an optional In the spirit of ideation, I can only think of two ways to accomplish this propagation of per-item metadata/context. The first is the "wrapping" method already discussed. The second is some form of external per-item metadata association, likely based on object identity. I can't say I've thought this all the way through, but such an implementation would likely require a non-trivial dedicated infrastructure in Reactor Core. This infrastructure would have to handle creating and storing an item's context, enabling access to that context, and somehow tying the lifecycle of that context to the item being processed (in order to avoid memory leaks). This gets even more non-trivial when you consider "fan-in" operations (like Looking forward to continuing this conversation! |
I don't believe we can even get the metrics correctly with unbounded flux and tap operator in current state as they are depending on onFirst/onComplete "events". Event if we tried to listen to onFirst(aka just before subscribe)/onNext/onComplete it would still lead to problems like:
Additionally there's topic of publishing further to different schedulers which would be completely omitted by the metrics calculation, as only publishing operation would be recorded, since onNext call would be called just after it. The whole further processing would be executed on separate scheduler (Or I'm wrong on this part) which is not part of the "previous" scheduler. Current Flux in regards of available context looks like it's mostly suited to processing collection of elements that make up single resource (for example list of addresses customer has), than processing collection of resources (for example list of customers). This way you can have customer publisher that can call delayUntil/flatMap with list of his addresses that you somehow process inside that context, but you can't have for example a Flux of customers where each one of them have their context since context is bound with whole publisher. I doubt that external per item would work, as we would need to make sure then that 2 elements that generate the same unique key are actually separate so that the "meta" doesn't leak. For example But if we want to process all of the customer queries endlessly (let's say their search queries as stream of web requests) and have support of correct metrics then we would need to do something like registering metrics like: webRequestsStreamAsFlux.flatMap(requestAndResponse -> {
//Create and resolve observation here
return userHandleMethod(requestAndResponse.getRequest())
.flatMap(writeReturnFromHandleToResponse(requestAndResponse));
})
.subscribe(); All other implementations if they would want to actually handle these correctly would need to do something similar, whether it's kafka consumer records stream, web requests stream, mq events stream, db events listener stream, more and more. I believe spring for example is doing just that Due to the nature of this I believe that unbounded Flux shouldn't be used if we have tracing/metrics in mind, but something similar to delegate handling Mono publisher would need to be used, which in a way creates weird pattern where to register publisher for mono would be: Mono<Customer> getCustomer(){
return Mono.just(customer);
}
void something(){
getCustomer().flatMap(doSomething).subscribe();
} while for flux it would look like this if we want to keep our per record context working Flux<Customer> getCustomers(Function<Mono<Customer>, Mono<Customer>> handle) {
return Flux.just(customer)
.flatMap(record -> handle(
Mono.just(record).contextWrite(createRecordContext(record))
);
}
void something(){
getCustomers(doSomething).subscribe();
} |
Adding my thoughts to all of this. Disclaimer: I am not intimate with Reactor and thus all that I'm going to say may violate horribly the internals of Reactor. That being said, I am quite familiar with stream processing, async programming and so on. Let's assume we have a simple unbounded/infinite getInfiniteFluxOfStrings()
.map(s -> s.toUpperCase())
.delayElement(Duration.ofSeconds(1))
.map(MySomeEnum::valueOf); Naturally nothing would happen without subscription but let's ignore that for now. What if we could write this: getInfiniteFluxOfStrings()
.map((s, context) -> {
context.put("startTime", System.currentTimeMillis());
return s.toUpperCase();
})
.delayElement(Duration.ofSeconds(1))
.map((s, context) -> {
long elementProcessingStart = context.get("startTime");
// bear with me, this is just a simplistic example and yes this is horrible
System.out.println(System.currentTimeMillis() - elementProcessingStart);
return MySomeEnum.valueOf(s);
}); In this case Reactor would create the context under the hood for each element flowing through the system and you could get them by providing a Now, naturally there's few things to think about if it would work like this:
To me it sounds like this should be kept separate and simple. In my mind we should have something like Then let's talk about In my mind this is however over thinking all of this. The implementation should focus on providing element level context - metadata capability - not strictly metrics capability. But if we do think a bit more about the metrics and the parking problem then I think this would go to the sink level. We could - in theory - add the concept of element level context into a specific type of sink. Then one could add a tap-like functionality into sink itself which could add the necessary metrics bootstrapping. This way context would already be there even if the element is parked before there's subscribers for this data. In all of the cases handling the final metrics calculation is the easiest part: just grab it at the very end and do your thing. Oh and one more thing: @Sage-Pierce was contemplating using object identities for all of this and I say it won't work at all. Elements are transformed and changed in the flow and since most people use immutable data these days then object identities won't stay fixed in the chain. I think it's easiest just keep on passing the context to different methods via BiFunction approach. Whatever I'm saying might be totally off the mark but that's what I'm thinking of this. |
Great points, @maciej-gromul and @empperi. I have some thoughts on each of your comments...
I agree with you on the logic here. The current Reactor Core infrastructure doesn't have the functionality to support the per-item metric association that I think we're aiming for. In the case of infinite processing pipelines, I think what we care about (at minimum) is being able to track how long it takes from the point of receiving a message (from polling or channel reception) to the point in time that processing the element (and its downstream transformations) has completed. In that regard, tracking latency across scheduler boundaries isn't really an issue, because we probably want to include that time in the latency being measured.
This is a great point, and as @empperi mentions in his latest comment, after thinking through this more with you all, I agree this probably can't work, if only because you could have cases where the emitted items aren't unique based on object identity due to possible interning and/or memoization.
In my view, tracing does allow for this, and somewhat encourages it. You can have a
I will say in my own experience, it's been incredibly helpful to be able to support tracing in infinite reactive stream processes, and much native work has been done on infrastructures to support tracing on async messages (Kafka headers, RabbitMQ message properties, SQS message system properties, etc.). I think it would be a huge win for Reactor to be able to officially support it alongside the existing infrastructure integrations. I agree the pattern(s) you showed is inorganic, but I will say again that I do not think any "wrapping" mechanism makes sense in Reactor Core...
I'll do you one better, and mention that I implemented exactly this, just a tad bit simpler and removing the need for changing any mapping functions to private static final AloContext.Key<Instant> CUSTOM_KEY = AloContext.Key.min("custom");
receiver.receiveAloRecords(topic)
.map(record -> {
AloContext.active().set(CUSTOM_KEY, Instant.now());
return record.value().toUpperCase();
})
.delayElement(Duration.ofSeconds(1))
.map((s, context) -> {
AloContext.active().get(CUSTOM_KEY)
.map(it -> Duration.between(it, Instant.now())
.ifPresent(it -> System.out.println("Custom Duration: it");
return MySomeEnum.valueOf(s);
});
Agreed, and adds another reason to why this probably won't make sense in Reactor Core.
Spot on. When I was considering this for Atleon, I had to bake the considerations of fan-in into the context "key" definitions (note how the key uses
I get the feeling that adding a new
Right, I think we're all in agreement that supporting per-item metadata capability simply isn't possible with the existing Reactor Core infrastructure, especially with
Agreed. One nuance that we have only slightly touched on, but I think is critically important is that when considering supporting tracing and metrics as metadata, there needs to be some convention for signaling "processing of this data has completed". Otherwise, spans or default metrics won't ever be recorded. Any solution for this would need to somehow implement this signal, assuming simple metadata isn't the only thing we want to solve for. |
Totally agree, if it's to collect the latency of other scheduler publisher.
Your example is based on opentracing (depracated now in favour of opentelemetry). In open telemetry there's no such thing as many span references of either child or follow types. There's one parent and something called links, but according to the docs they are not the same as with opentracing. I remember trying to set it up in Jaeger to show open telemetry span links but couldn't get it done in any way.
Totally agree, i would love to see some way to instrument observability without modifying the code in large quantities like it mostly works with "standard" code, where I can either add annotations, call static helper methods or do some simple dependency injections to call tracer api and make sure i'm actually doing metrics/tracing of what I actually needed 👍
The problem with this AloContext is that it will work properly only if given "pipeline" is simple and everything executes on single thread practically synchronously. If let's say i've got a flatMap method which does db query, or call some kind of api and the task is offloaded since IO shouldn't block, the next element would be picked up and processed, potentially breaking the "loop" by changing the value of CUSTOM_KEY under that thread and in case of parallel or "publishOn" magic , something similar would happen. This is why I believe current reactor created this publisher context that is being kept with the publisher instead of just ThreadLocal, and doest that context-propagation, since it is being used mostly for backward compatibility with log4j/logback/mdc and other similar tooling ? I still find problems with that since by default retry won't clone the context (at least I've had problems with spring boot 2 + reactor) and i had to write custom onRetry handler that attached the original context to retry publisher to keep my tracing after retries (for example on REST API to our legacy systems). While reading through the reactor source code (i'm trying to understand what's happening behind scenes there :P) I'm struggling to find, a way on how to do that properly since for example: We've got doFirst which is handled by creating FluxDoFirst or FluxDoFirstFuseable, which generally calls onFirst delegate just before returning the passed in core subscriber (aka before subscribe action). Then we've got doOnNext which goes through doOnEach for FluxPeek with onNextCall handler (similar stuff happening for flatmap but more complicated). But in general we will know that element was published to the method, which then will be published downstream X times, until the pipeline ends, but we don't know when all of 10 following operators finished processing it. We only know when next onNext is being called, which depending on the scenario might create false metrics data, since onNext will publish something only if parent actually returned something from request. I was thinking whether we could manage to add wrapping around onRequest since requests should happen when subscriber actually can process stuff because he's kinda finished with previous one but since there are operators like buffer I'm not even sure how it should behave then :( additionally there's parallelism and non blocking IO which if I understand reactor concurrency correctly would "offload" the element processing, load another one, process it to some degree and go further (maybe even back to previous one), thus creating false belief in something taking 2s since onNext had been called while it could be other values.
If we have this then there would be ways to generate observability stuff for it since we can safely store related to the record meta information together with it, without worry that it's being moved between threads etc. Kinda would work like ThreadLocal for reactive streams ? We could then monitor when element is first returned on first publisher and listeners could create whatever is needed for it and upon any transformations that data could be propagated further. The only questions that would be left then what about collect/buffer methods which don't simply change element type from N to M or maybe produce multiple M from single N (like flatMap could), but these "merge" them into single context as standard List upon which the context would be lost.
Additionally there's no way of knowing that an element reached the end of it's processing since we would need to be able to somehow register an event in "last" subscriber consumer method ? So that we know it went through everything expected ? Problematic with fan-out since we would need to somehow track how many of these spawned to handle them later in that theoretical handler. I'm not fully sure how these hooks/optimizations/transform methods work properly but these most likely could screw up that "theoretical handler" stuff. This is one thing I like with micrometer observations since the scope is closed to current publisher onFirst -> onFinished, it's way easier to define "processing scope of a given publisher", but it's rather tough to use seamlessly with most of my use cases :( A lot of stuff to rewrite for that. |
Good point! My usage of tracing is mostly based on using the DataDog Java Agent, which didn't fully support OpenTelemetry last I checked, so OpenTracing was my only option when I implemented this a year or so ago. To your point, an implementation with OpenTelemetry would probably be creating a new span with no parent, and providing links to the fanned-in span. It would probably take some support tickets in order to get this working the way it currently works (mostly correctly) with the DataDog Agent.
I can see how you could come to that conclusion by taking a look at the AloContext source code, but this is fortunately not true. Only access to the context is implemented with a ThreadLocal, which is an intentional design decision to mirror the idioms you might be familiar with in MDC or gRPC (Context), as well as somewhat with tracing (it also avoids needing to add overloaded methods that take in
Ha, man, these are challenges that I'm happy I'm no longer having to reason about on my own 😅 It took me quite a while to figure out how to solve for this in a performant/non-blocking way. In case you're interested in seeing code that does so, here it is. Basically, you just need to keep count of how many elements a fan-out operation produces, and also keep count of how many of those elements have completed (idempotently). Once both the fan-out operation has finished and the last emitted element has completed, you know the original element is completed. Now, this again all hinges on some form of "wrapping" elements in the pipeline, as well as having some mechanism in place for signaling "complete". I think I am becoming more convinced that implementing either of these functionalities would be a no-go for Reactor Core. |
Ok I've missed the ConcurrentHashMap later down the file. Still it means that "CUSTOM_KEY" needs to be generated for each element where we would have our meta stored. This creates issue like with streams that might have non unique entries or entries with same identifier. Hash+Equals like with hashmaps wouldn't work because we want to keep meta about each queue element separate, no matter if it's a duplicate or something similar. For that I believe only wrapping around the elements at their source would only work since in that case we don't care whether it's A,B,C or A,A,A stream. Of course as long the context is created anew for each one of them. I can see a lot of performance issues with wrapping elements and most likely with implementing it in current code base. Did you test these scenarios ? Having stream of A,A,A with heavy load happening or parallel that could lead to these being executed in the same time range which could then result in above scenario ? I like the idea with ack (Alo) interface if it would be somehow handled internally, but if it has to be manual this again changes the contract to support just fan out scenario. Someone needs to ack these manually, or we need another set of wrappers that would be executed, so that "standard" operators still see only element T. Maybe a hook to FluxFlatMap operator ? I'm not sure how to handle that properly internally to make it "seamless" for api clients. Maybe some magic around assembly and Hooks.onLastOperator ? But i'm not to familiar with these internals :( In general I agree that it feels more like a no-go for this library as it's too big of a change, but on the other hand it limits observability instrumentation to properly handling few Mono/Flux(whereas flux elements belong to single context) scenarios, while other with fan-out/in or unbounded publishers are to be either handled manually (which then is weird that for some code u just do publisher.tap(MagicHandler) while others need a lot of code refactoring). |
Right, these are all valid concerns. You don't need a "new" CUSTOM_KEY generated for each element, it just serves as a key into the metadata store, and each element has its own metadata store ( Handling the non-unique entries is why it's required to specify a reduction function. The reduction to use is highly dependent on what you are doing with the metadata.
Yep, I was concerned about this as well, and did some empirical testing that I've never gotten around to formalizing into a codified stress test. My conclusions were a bit murky, because of the nature of use cases you have when you care about adding finer-grained metadata, tracing, and/or metrics. These use cases typically imply you are doing some non-trivial enrichment or transformation based on I/O. Otherwise, it's hard to contrive why you might care about finer granularity. In these use cases, the I/O overhead far outweighs the added overhead of wrapping elements in the stream, so the computational and GC overhead from wrapping is barely noticeable, if at all.
Spot on. There has to be some mechanism somewhere for indicating "item processing completion" (either successfully or exceptionally). It's even more tricky than just considering
Agreed. The only thing I would again qualify this with is that we're specifically addressing infinite processing pipelines, which I think is a significantly specific subset usage of Reactor, which on its own, might be justification for not implementing it in Core. Though I would say it would make (or would have made) my life easier if it was natively supported 😄 |
I think I misunderstand something here. CUSTOM_KEY is a type of Key that generally has just a string with name and reducer function. If we get Flux of 50 elements and we want each of the elements to have it's own AloContext, then the key for each one of them would need to be separate and unique. Otherwise with key = single("test"), each of the rows trying to access the context of that key would get the same information, since all the code there does is ask ThreadLocal for access to "active" context, which is generally just shared occurrence, and get method is actually called on the ConcurrentHashMap that is shared with all requests. Additionally the key needs to be hardcoded in code since the object might change and key might be impossible to calculate in later steps.
In our case reactor is mostly used for defining reactive kafka stream processors. Some cases are 1 to 1 input/output topics, while in several cases it's many to many (kinda working like a router with enrichment). The most common services that come to my mind right now are either API Services (REST/grpc/graphql/whathever else) and message processors, whereas both are unbounded stream of events, one coming from listening to "requests" endlessly on certain port, while other simply listens to messages on a topic/queue endlessly. If these 2 common scenarios should be handled by Flux, since that's the only publisher that can send more than 1 element, then capability we speak of should be available and reachable with just core libraries. In our case, we want to add observability and metrics because logs with any concurrent/parallel code get easily disconnected especially between multiple services. Additionally we want metrics around some of our pipeline stages to narrow down problems or do some tweaking, like verifying DB io times for specific queries, narrow down time lost awaiting API response, how much time processing of kafka record took etc. and with these we're building alerting. Of course we have some REST endpoints with reactive code, but there are few of these and in general they don't pose the problem that much because Spring Webflux does wrapping of requests as Mono publishers with context of trace/metrics already mostly handled, and if needed we can get these traces/metrics mostly properly with Mono. To make this long story short :D I believe unbounded streams are root for a good chunk of situations and since observability is important these days it would be nice to have capability to easily achieve it with either core library or by adding simple library. But it shouldn't require significant code base changes to achieve between normal pipeline and one with observability enabled. |
I think the confusion is here. Each item has its own context (backed by a
Nice, I do this same thing a lot
This is interesting... For all of the server side REST, gRPC, GQL usages I have had experience with, the existing Reactor Core infrastructure works just fine, because the request itself has a "context" available that it makes intuitive sense to be active during the entirety of processing the request (and in the case of the DataDog Agent, works just fine). The only exception I could think of for this that would be applicable to what we're discussing would be client-streaming or bi-directional gRPC RPCs. I myself have never implemented such RPCs, but I can see how you might run into issues here with wanting separate contexts per client-sent message.
Absolutely agree, these abilities are invaluable. I leverage this ability with Atleon quite a bit to tweak/optimize stream processes.
Agreed, that end result would be ideal, and I think the challenge to Reactor Core maintainers is to question whether this "chunk" is significant enough to warrant the added complexity of being able to support per-item contexts in infinite |
As noted in the above discussions (quite a lengthy read, yet worthwhile!) it doesn't seem like a reasonable goal for reactor-core to implement transparent per-item I just created a PR to provide insight into the difference between I do believe that hiding the complexity of the observability aspect from the user should be the responsibility of particular use-case-dedicated library. E.g. like WebFlux and WebClient in Spring Framework do. For libraries like reactor-kafka, it would require adopting an action-based API which would allow the user to declare the processing, while the library would inject the observability on top of it, just like suggested by @maciej-gromul. The With the above, I'm closing the issue. Thanks everyone for the discussion. |
Motivation
Right now tap operator with Micrometer::observation will only register an observation and span for whole publisher. So in case of flux which can have multiple elements it means that it would register only single span and single metric on that flux. In case of unbounded flux publishers (for example input from a kafka topic of reactor-kafka) we would never get a metric/span since there's no completion/cancellation/error triggered in default usage scenario. To solve that we would need to manually turn each element of flux into separate mono publisher with a flatmap. And the same would need to happen for each flux that we would want to have separate telemetry per element.
Desired solution
A way to register like previously with
.metrics()
but with.tap(Micrometer.observation(registry))
that would handle not only publisher as a whole but each element as well. That way we can have spans and metrics out of the observations, and using these we could get informations like per element processing time, how many elements are processed per second, each element could have it's own span for processing making it's own trace.Considered alternatives
We could have special function for transform operator that does a flatmap and registers observation, the downside with that is that it won't propagate the span context properly, and will need a callable to be passed that would be injected in that transformation as a part of the pipeline to continue the code processing.
The text was updated successfully, but these errors were encountered: