Skip to content

Commit

Permalink
Feature/2692245: Add Corelated activity id using "x-ms-cosmos-correla…
Browse files Browse the repository at this point in the history
…ted-activityid" header (Azure#28022)

### Packages impacted by this PR
@azure/cosmos

### Issues associated with this PR
Azure#27970

### Describe the problem that is addressed by this PR
This PR will add a correlated activity Id in the request header for all
types of queries on Items. This will help in troubleshooting by
correlating the queries that involves multiple roundtrips to server and
cross partition queries.

After the query is executed, the CorrelatedActivityId column of
SqlQueryExecMetrics table in Kusto can be used to verify.

![kusto](https://github.com/Azure/azure-sdk-for-js/assets/141712869/51881b07-4373-44ab-9bfe-2e8c948613e1)

Refer to attached doc for more details:

https://microsoftapc-my.sharepoint.com/:w:/g/personal/adlnu_microsoft_com/EXPrCMA9rsVMgnH3FMVIBnsB3N3QkRrbxF5PcGQTZNkRtQ?e=HO7wmF

### What are the possible designs available to address the problem? If
there are more than one possible design, why was the one in this PR
chosen?



### Are there test cases added in this PR? _(If not, why?)_
yes

### Provide a list of related PRs _(if any)_


### Command used to generate this PR:**_(Applicable only to SDK release
request PRs)_

### Checklists
- [x] Added impacted package name to the issue description

- [ ] Does this PR needs any fixes in the SDK Generator?** _(If so,
create an Issue in the
[Autorest/typescript](https://github.com/Azure/autorest.typescript)
repository and link it here)_
- [ ] Added a changelog (if necessary)

---------

Co-authored-by: Aditishree <adlnu@microsoft.com>
  • Loading branch information
aditishree1 and Aditishree authored Jan 30, 2024
1 parent 56e164c commit c7301ab
Show file tree
Hide file tree
Showing 15 changed files with 262 additions and 15 deletions.
1 change: 1 addition & 0 deletions sdk/cosmosdb/cosmos/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## 4.0.1 (Unreleased)

### Features Added
- Added Correlated Activity Id feature in JS SDK for better troubleshooting of queries. The correlated activity id is added in the request header for all types of queries on Items.

### Breaking Changes

Expand Down
1 change: 1 addition & 0 deletions sdk/cosmosdb/cosmos/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@
"@azure/core-auth": "^1.3.0",
"@azure/core-rest-pipeline": "^1.2.0",
"@azure/core-tracing": "^1.0.0",
"@azure/core-util": "^1.6.1",
"debug": "^4.1.1",
"fast-json-stable-stringify": "^2.1.0",
"jsbi": "^3.1.3",
Expand Down
6 changes: 4 additions & 2 deletions sdk/cosmosdb/cosmos/review/cosmos.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ export class ClientContext {
getClientConfig(): ClientConfigDiagnostic;
getDatabaseAccount(diagnosticNode: DiagnosticNodeInternal, options?: RequestOptions): Promise<Response_2<DatabaseAccount>>;
// (undocumented)
getQueryPlan(path: string, resourceType: ResourceType, resourceId: string, query: SqlQuerySpec | string, options: FeedOptions, diagnosticNode: DiagnosticNodeInternal): Promise<Response_2<PartitionedQueryExecutionInfo>>;
getQueryPlan(path: string, resourceType: ResourceType, resourceId: string, query: SqlQuerySpec | string, options: FeedOptions, diagnosticNode: DiagnosticNodeInternal, correlatedActivityId?: string): Promise<Response_2<PartitionedQueryExecutionInfo>>;
// (undocumented)
getReadEndpoint(diagnosticNode: DiagnosticNodeInternal): Promise<string>;
// (undocumented)
Expand All @@ -232,7 +232,7 @@ export class ClientContext {
diagnosticNode: DiagnosticNodeInternal;
}): Promise<Response_2<T & Resource>>;
// (undocumented)
queryFeed<T>({ path, resourceType, resourceId, resultFn, query, options, diagnosticNode, partitionKeyRangeId, partitionKey, startEpk, endEpk, }: {
queryFeed<T>({ path, resourceType, resourceId, resultFn, query, options, diagnosticNode, partitionKeyRangeId, partitionKey, startEpk, endEpk, correlatedActivityId, }: {
path: string;
resourceType: ResourceType;
resourceId: string;
Expand All @@ -246,6 +246,7 @@ export class ClientContext {
partitionKey?: PartitionKey;
startEpk?: string | undefined;
endEpk?: string | undefined;
correlatedActivityId?: string;
}): Promise<Response_2<T & Resource>>;
// (undocumented)
queryPartitionKeyRanges(collectionLink: string, query?: string | SqlQuerySpec, options?: FeedOptions): QueryIterator<PartitionKeyRange>;
Expand Down Expand Up @@ -447,6 +448,7 @@ export const Constants: {
PageSize: string;
ItemCount: string;
ActivityId: string;
CorrelatedActivityId: string;
PreTriggerInclude: string;
PreTriggerExclude: string;
PostTriggerInclude: string;
Expand Down
9 changes: 9 additions & 0 deletions sdk/cosmosdb/cosmos/src/ClientContext.ts
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ export class ClientContext {
partitionKey,
startEpk,
endEpk,
correlatedActivityId,
}: {
path: string;
resourceType: ResourceType;
Expand All @@ -171,6 +172,7 @@ export class ClientContext {
partitionKey?: PartitionKey;
startEpk?: string | undefined;
endEpk?: string | undefined;
correlatedActivityId?: string;
}): Promise<Response<T & Resource>> {
// Query operations will use ReadEndpoint even though it uses
// GET(for queryFeed) and POST(for regular query operations)
Expand Down Expand Up @@ -209,6 +211,9 @@ export class ClientContext {
}

if (query !== undefined) {
if (correlatedActivityId !== undefined) {
request.headers[HttpHeaders.CorrelatedActivityId] = correlatedActivityId;
}
request.headers[HttpHeaders.IsQuery] = "true";
request.headers[HttpHeaders.ContentType] = QueryJsonContentType;
if (typeof query === "string") {
Expand Down Expand Up @@ -237,6 +242,7 @@ export class ClientContext {
query: SqlQuerySpec | string,
options: FeedOptions = {},
diagnosticNode: DiagnosticNodeInternal,
correlatedActivityId?: string,
): Promise<Response<PartitionedQueryExecutionInfo>> {
const request: RequestContext = {
...this.getContextDerivedPropsForRequestCreation(),
Expand All @@ -258,6 +264,9 @@ export class ClientContext {
request.operationType,
);
request.headers = await this.buildHeaders(request);
if (correlatedActivityId !== undefined) {
request.headers[HttpHeaders.CorrelatedActivityId] = correlatedActivityId;
}
request.headers[HttpHeaders.IsQueryPlan] = "True";
request.headers[HttpHeaders.QueryVersion] = "1.4";
request.headers[HttpHeaders.SupportedQueryFeatures] =
Expand Down
2 changes: 2 additions & 0 deletions sdk/cosmosdb/cosmos/src/client/Item/Items.ts
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ export class Items {
const fetchFunction: FetchFunctionCallback = async (
diagnosticNode: DiagnosticNodeInternal,
innerOptions: FeedOptions,
correlatedActivityId: string,
) => {
const response = await this.clientContext.queryFeed({
path,
Expand All @@ -121,6 +122,7 @@ export class Items {
options: innerOptions,
partitionKey: options.partitionKey,
diagnosticNode,
correlatedActivityId,
});
return response;
};
Expand Down
1 change: 1 addition & 0 deletions sdk/cosmosdb/cosmos/src/common/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ export const Constants = {

// Request sender generated. Simply echoed by backend.
ActivityId: "x-ms-activity-id",
CorrelatedActivityId: "x-ms-cosmos-correlated-activityid",
PreTriggerInclude: "x-ms-documentdb-pre-trigger-include",
PreTriggerExclude: "x-ms-documentdb-pre-trigger-exclude",
PostTriggerInclude: "x-ms-documentdb-post-trigger-include",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ const logger: AzureLogger = createClientLogger("ClientContext");
export type FetchFunctionCallback = (
diagnosticNode: DiagnosticNodeInternal,
options: FeedOptions,
correlatedActivityId: string,
) => Promise<Response<any>>;

/** @hidden */
Expand All @@ -38,6 +39,7 @@ export class DefaultQueryExecutionContext implements ExecutionContext {
}
private state: STATES;
private nextFetchFunction: Promise<Response<any>>;
private correlatedActivityId: string;
/**
* Provides the basic Query Execution Context.
* This wraps the internal logic query execution using provided fetch functions
Expand All @@ -52,6 +54,7 @@ export class DefaultQueryExecutionContext implements ExecutionContext {
constructor(
options: FeedOptions,
fetchFunctions: FetchFunctionCallback | FetchFunctionCallback[],
correlatedActivityId: string,
) {
this.resources = [];
this.currentIndex = 0;
Expand All @@ -60,6 +63,7 @@ export class DefaultQueryExecutionContext implements ExecutionContext {
this.options = options || {};
this.continuationToken = this.options.continuationToken || this.options.continuation || null;
this.state = DefaultQueryExecutionContext.STATES.start;
this.correlatedActivityId = correlatedActivityId;
}

/**
Expand Down Expand Up @@ -153,7 +157,11 @@ export class DefaultQueryExecutionContext implements ExecutionContext {
this.nextFetchFunction = undefined;
} else {
logger.verbose("using fresh fetch");
p = this.fetchFunctions[this.currentPartitionIndex](childDiagnosticNode, this.options);
p = this.fetchFunctions[this.currentPartitionIndex](
childDiagnosticNode,
this.options,
this.correlatedActivityId,
);
}
const response = await p;
resources = response.result;
Expand All @@ -167,10 +175,14 @@ export class DefaultQueryExecutionContext implements ExecutionContext {
if (this.options && this.options.bufferItems === true) {
const fetchFunction = this.fetchFunctions[this.currentPartitionIndex];
this.nextFetchFunction = fetchFunction
? fetchFunction(childDiagnosticNode, {
...this.options,
continuationToken: this.continuationToken,
})
? fetchFunction(
childDiagnosticNode,
{
...this.options,
continuationToken: this.continuationToken,
},
this.correlatedActivityId,
)
: undefined;
}
} catch (err: any) {
Expand Down
16 changes: 13 additions & 3 deletions sdk/cosmosdb/cosmos/src/queryExecutionContext/documentProducer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@ import {
import { DiagnosticNodeInternal } from "../diagnostics/DiagnosticNodeInternal";
import { FeedOptions } from "../request";
import { Response } from "../request";
import { DefaultQueryExecutionContext } from "./defaultQueryExecutionContext";
import {
DefaultQueryExecutionContext,
FetchFunctionCallback,
} from "./defaultQueryExecutionContext";
import { FetchResult, FetchResultType } from "./FetchResult";
import { CosmosHeaders, getInitialHeader, mergeHeaders } from "./headerUtils";
import { SqlQuerySpec } from "./index";
Expand Down Expand Up @@ -46,6 +49,7 @@ export class DocumentProducer {
query: SqlQuerySpec,
targetPartitionKeyRange: PartitionKeyRange,
options: FeedOptions,
correlatedActivityId: string,
) {
// TODO: any options
this.collectionLink = collectionLink;
Expand All @@ -60,7 +64,11 @@ export class DocumentProducer {
this.continuationToken = undefined;
this.respHeaders = getInitialHeader();

this.internalExecutionContext = new DefaultQueryExecutionContext(options, this.fetchFunction);
this.internalExecutionContext = new DefaultQueryExecutionContext(
options,
this.fetchFunction,
correlatedActivityId,
);
}
/**
* Synchronously gives the contiguous buffered results (stops at the first non result) if any
Expand All @@ -86,9 +94,10 @@ export class DocumentProducer {
return bufferedResults;
}

public fetchFunction = async (
public fetchFunction: FetchFunctionCallback = async (
diagnosticNode: DiagnosticNodeInternal,
options: FeedOptions,
correlatedActivityId: string,
): Promise<Response<Resource>> => {
const path = getPathFromLink(this.collectionLink, ResourceType.item);
diagnosticNode.addData({ partitionKeyRangeId: this.targetPartitionKeyRange.id });
Expand All @@ -103,6 +112,7 @@ export class DocumentProducer {
options,
diagnosticNode,
partitionKeyRangeId: this.targetPartitionKeyRange["id"],
correlatedActivityId,
});
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,17 @@ export class OrderByQueryExecutionContext
query: string | SqlQuerySpec,
options: FeedOptions,
partitionedQueryExecutionInfo: PartitionedQueryExecutionInfo,
correlatedActivityId: string,
) {
// Calling on base class constructor
super(clientContext, collectionLink, query, options, partitionedQueryExecutionInfo);
super(
clientContext,
collectionLink,
query,
options,
partitionedQueryExecutionInfo,
correlatedActivityId,
);
this.orderByComparator = new OrderByDocumentProducerComparator(this.sortOrders);
}
// Instance members are inherited
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,14 @@ export abstract class ParallelQueryExecutionContextBase implements ExecutionCont
private query: string | SqlQuerySpec,
private options: FeedOptions,
private partitionedQueryExecutionInfo: PartitionedQueryExecutionInfo,
private correlatedActivityId: string,
) {
this.clientContext = clientContext;
this.collectionLink = collectionLink;
this.query = query;
this.options = options;
this.partitionedQueryExecutionInfo = partitionedQueryExecutionInfo;
this.correlatedActivityId = correlatedActivityId;
this.diagnosticNodeWrapper = {
consumed: false,
diagnosticNode: new DiagnosticNodeInternal(
Expand Down Expand Up @@ -535,6 +537,7 @@ export abstract class ParallelQueryExecutionContextBase implements ExecutionCont
sqlQuerySpec,
partitionKeyTargetRange,
options,
this.correlatedActivityId,
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ export class PipelinedQueryExecutionContext implements ExecutionContext {
private query: string | SqlQuerySpec,
private options: FeedOptions,
private partitionedQueryExecutionInfo: PartitionedQueryExecutionInfo,
correlatedActivityId: string,
) {
this.endpoint = null;
this.pageSize = options["maxItemCount"];
Expand All @@ -49,6 +50,7 @@ export class PipelinedQueryExecutionContext implements ExecutionContext {
this.query,
this.options,
this.partitionedQueryExecutionInfo,
correlatedActivityId,
),
);
} else {
Expand All @@ -58,6 +60,7 @@ export class PipelinedQueryExecutionContext implements ExecutionContext {
this.query,
this.options,
this.partitionedQueryExecutionInfo,
correlatedActivityId,
);
}
if (
Expand Down
7 changes: 6 additions & 1 deletion sdk/cosmosdb/cosmos/src/queryIterator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
// Licensed under the MIT license.

/// <reference lib="esnext.asynciterable" />

import { ClientContext } from "./ClientContext";
import { DiagnosticNodeInternal, DiagnosticNodeType } from "./diagnostics/DiagnosticNodeInternal";
import { getPathFromLink, ResourceType, StatusCodes } from "./common";
Expand All @@ -26,6 +25,7 @@ import {
withMetadataDiagnostics,
} from "./utils/diagnostics";
import { MetadataLookUpType } from "./CosmosDiagnostics";
import { randomUUID } from "@azure/core-util";

/**
* Represents a QueryIterator Object, an implementation of feed or query response that enables
Expand All @@ -38,6 +38,7 @@ export class QueryIterator<T> {
private queryExecutionContext: ExecutionContext;
private queryPlanPromise: Promise<Response<PartitionedQueryExecutionInfo>>;
private isInitialized: boolean;
private correlatedActivityId: string;
/**
* @hidden
*/
Expand Down Expand Up @@ -203,12 +204,14 @@ export class QueryIterator<T> {
* Reset the QueryIterator to the beginning and clear all the resources inside it
*/
public reset(): void {
this.correlatedActivityId = randomUUID();
this.queryPlanPromise = undefined;
this.fetchAllLastResHeaders = getInitialHeader();
this.fetchAllTempResources = [];
this.queryExecutionContext = new DefaultQueryExecutionContext(
this.options,
this.fetchFunctions,
this.correlatedActivityId,
);
}

Expand Down Expand Up @@ -274,6 +277,7 @@ export class QueryIterator<T> {
this.query,
this.options,
queryPlan,
this.correlatedActivityId,
);
}

Expand All @@ -287,6 +291,7 @@ export class QueryIterator<T> {
this.query,
this.options,
diagnosticNode,
this.correlatedActivityId,
)
.catch((error: any) => error); // Without this catch, node reports an unhandled rejection. So we stash the promise as resolved even if it errored.
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ describe("defaultQueryExecutionContext", function () {
bufferItems: false,
};

const context = new DefaultQueryExecutionContext(options, fetchFunction);
const correlatedId = "random-id";
const context = new DefaultQueryExecutionContext(options, fetchFunction, correlatedId);

assert.strictEqual(calledCount, 0, "Nothing should be fetched at this point");

Expand Down Expand Up @@ -73,8 +74,8 @@ describe("defaultQueryExecutionContext", function () {
const options: FeedOptions = {
bufferItems: true,
};

const context = new DefaultQueryExecutionContext(options, fetchFunction);
const correlatedId = "random-id";
const context = new DefaultQueryExecutionContext(options, fetchFunction, correlatedId);

assert.strictEqual(calledCount, 0, "Nothing should be fetched at this point");

Expand Down
Loading

0 comments on commit c7301ab

Please sign in to comment.