Skip to content

Commit

Permalink
Merge pull request #2538 from murgatroid99/grpc-js_lb_policy_config_r…
Browse files Browse the repository at this point in the history
…efactor

grpc-js: Return LB policy configs from resolvers in JSON form
  • Loading branch information
murgatroid99 committed Aug 8, 2023
2 parents 14b11f6 + 08bcbfc commit 4d288de
Show file tree
Hide file tree
Showing 21 changed files with 421 additions and 350 deletions.
33 changes: 33 additions & 0 deletions packages/grpc-js-xds/src/duration.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright 2023 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import { experimental } from '@grpc/grpc-js';
import { Duration__Output } from './generated/google/protobuf/Duration';
import Duration = experimental.Duration;

/**
* Convert a Duration protobuf message object to a Duration object as used in
* the ServiceConfig definition. The difference is that the protobuf message
* defines seconds as a long, which is represented as a string in JavaScript,
* and the one used in the service config defines it as a number.
* @param duration
*/
export function protoDurationToDuration(duration: Duration__Output): Duration {
return {
seconds: Number.parseInt(duration.seconds),
nanos: duration.nanos
};
}
83 changes: 34 additions & 49 deletions packages/grpc-js-xds/src/load-balancer-cds.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*
*/

import { connectivityState, status, Metadata, logVerbosity, experimental } from '@grpc/grpc-js';
import { connectivityState, status, Metadata, logVerbosity, experimental, LoadBalancingConfig } from '@grpc/grpc-js';
import { getSingletonXdsClient, Watcher, XdsClient } from './xds-client';
import { Cluster__Output } from './generated/envoy/config/cluster/v3/Cluster';
import SubchannelAddress = experimental.SubchannelAddress;
Expand All @@ -24,17 +24,18 @@ import ChildLoadBalancerHandler = experimental.ChildLoadBalancerHandler;
import LoadBalancer = experimental.LoadBalancer;
import ChannelControlHelper = experimental.ChannelControlHelper;
import registerLoadBalancerType = experimental.registerLoadBalancerType;
import LoadBalancingConfig = experimental.LoadBalancingConfig;
import OutlierDetectionLoadBalancingConfig = experimental.OutlierDetectionLoadBalancingConfig;
import TypedLoadBalancingConfig = experimental.TypedLoadBalancingConfig;
import SuccessRateEjectionConfig = experimental.SuccessRateEjectionConfig;
import FailurePercentageEjectionConfig = experimental.FailurePercentageEjectionConfig;
import QueuePicker = experimental.QueuePicker;
import OutlierDetectionRawConfig = experimental.OutlierDetectionRawConfig;
import parseLoadBalancingConfig = experimental.parseLoadBalancingConfig;
import { OutlierDetection__Output } from './generated/envoy/config/cluster/v3/OutlierDetection';
import { Duration__Output } from './generated/google/protobuf/Duration';
import { EXPERIMENTAL_OUTLIER_DETECTION } from './environment';
import { DiscoveryMechanism, XdsClusterResolverChildPolicyHandler, XdsClusterResolverLoadBalancingConfig } from './load-balancer-xds-cluster-resolver';
import { DiscoveryMechanism, XdsClusterResolverChildPolicyHandler } from './load-balancer-xds-cluster-resolver';
import { CLUSTER_CONFIG_TYPE_URL, decodeSingleResource } from './resources';
import { CdsUpdate, ClusterResourceType, OutlierDetectionUpdate } from './xds-resource-type/cluster-resource-type';
import { CdsUpdate, ClusterResourceType } from './xds-resource-type/cluster-resource-type';

const TRACER_NAME = 'cds_balancer';

Expand All @@ -44,7 +45,7 @@ function trace(text: string): void {

const TYPE_NAME = 'cds';

export class CdsLoadBalancingConfig implements LoadBalancingConfig {
class CdsLoadBalancingConfig implements TypedLoadBalancingConfig {
getLoadBalancerName(): string {
return TYPE_NAME;
}
Expand Down Expand Up @@ -72,29 +73,6 @@ export class CdsLoadBalancingConfig implements LoadBalancingConfig {
}
}

function durationToMs(duration: Duration__Output): number {
return (Number(duration.seconds) * 1_000 + duration.nanos / 1_000_000) | 0;
}

function translateOutlierDetectionConfig(outlierDetection: OutlierDetectionUpdate | undefined): OutlierDetectionLoadBalancingConfig | undefined {
if (!EXPERIMENTAL_OUTLIER_DETECTION) {
return undefined;
}
if (!outlierDetection) {
/* No-op outlier detection config, with all fields unset. */
return new OutlierDetectionLoadBalancingConfig(null, null, null, null, null, null, []);
}
return new OutlierDetectionLoadBalancingConfig(
outlierDetection.intervalMs,
outlierDetection.baseEjectionTimeMs,
outlierDetection.maxEjectionTimeMs,
outlierDetection.maxEjectionPercent,
outlierDetection.successRateConfig,
outlierDetection.failurePercentageConfig,
[]
);
}

interface ClusterEntry {
watcher: Watcher<CdsUpdate>;
latestUpdate?: CdsUpdate;
Expand Down Expand Up @@ -133,16 +111,16 @@ function generateDiscoverymechanismForCdsUpdate(config: CdsUpdate): DiscoveryMec
type: config.type,
eds_service_name: config.edsServiceName,
dns_hostname: config.dnsHostname,
outlier_detection: translateOutlierDetectionConfig(config.outlierDetectionUpdate)
outlier_detection: config.outlierDetectionUpdate
};
}

const RECURSION_DEPTH_LIMIT = 15;

/**
* Prerequisite: isClusterTreeFullyUpdated(tree, root)
* @param tree
* @param root
* @param tree
* @param root
*/
function getDiscoveryMechanismList(tree: ClusterTree, root: string): DiscoveryMechanism[] {
const visited = new Set<string>();
Expand Down Expand Up @@ -189,6 +167,11 @@ export class CdsLoadBalancer implements LoadBalancer {
this.childBalancer = new XdsClusterResolverChildPolicyHandler(channelControlHelper);
}

private reportError(errorMessage: string) {
trace('CDS cluster reporting error ' + errorMessage);
this.channelControlHelper.updateState(connectivityState.TRANSIENT_FAILURE, new UnavailablePicker({code: status.UNAVAILABLE, details: errorMessage, metadata: new Metadata()}));
}

private addCluster(cluster: string) {
if (cluster in this.clusterTree) {
return;
Expand All @@ -208,19 +191,28 @@ export class CdsLoadBalancer implements LoadBalancer {
try {
discoveryMechanismList = getDiscoveryMechanismList(this.clusterTree, this.latestConfig!.getCluster());
} catch (e) {
this.channelControlHelper.updateState(connectivityState.TRANSIENT_FAILURE, new UnavailablePicker({code: status.UNAVAILABLE, details: e.message, metadata: new Metadata()}));
this.reportError((e as Error).message);
return;
}
const clusterResolverConfig: LoadBalancingConfig = {
xds_cluster_resolver: {
discovery_mechanisms: discoveryMechanismList,
locality_picking_policy: [],
endpoint_picking_policy: []
}
};
let parsedClusterResolverConfig: TypedLoadBalancingConfig;
try {
parsedClusterResolverConfig = parseLoadBalancingConfig(clusterResolverConfig);
} catch (e) {
this.reportError(`CDS cluster ${this.latestConfig?.getCluster()} child config parsing failed with error ${(e as Error).message}`);
return;
}
const clusterResolverConfig = new XdsClusterResolverLoadBalancingConfig(
discoveryMechanismList,
[],
[]
);
trace('Child update config: ' + JSON.stringify(clusterResolverConfig));
this.updatedChild = true;
this.childBalancer.updateAddressList(
[],
clusterResolverConfig,
parsedClusterResolverConfig,
this.latestAttributes
);
}
Expand All @@ -231,20 +223,13 @@ export class CdsLoadBalancer implements LoadBalancer {
this.clusterTree[cluster].latestUpdate = undefined;
this.clusterTree[cluster].children = [];
}
this.channelControlHelper.updateState(connectivityState.TRANSIENT_FAILURE, new UnavailablePicker({code: status.UNAVAILABLE, details: `CDS resource ${cluster} does not exist`, metadata: new Metadata()}));
this.reportError(`CDS resource ${cluster} does not exist`);
this.childBalancer.destroy();
},
onError: (statusObj) => {
if (!this.updatedChild) {
trace('Transitioning to transient failure due to onError update for cluster' + cluster);
this.channelControlHelper.updateState(
connectivityState.TRANSIENT_FAILURE,
new UnavailablePicker({
code: status.UNAVAILABLE,
details: `xDS request failed with error ${statusObj.details}`,
metadata: new Metadata(),
})
);
this.reportError(`xDS request failed with error ${statusObj.details}`);
}
}
});
Expand Down Expand Up @@ -275,7 +260,7 @@ export class CdsLoadBalancer implements LoadBalancer {

updateAddressList(
addressList: SubchannelAddress[],
lbConfig: LoadBalancingConfig,
lbConfig: TypedLoadBalancingConfig,
attributes: { [key: string]: unknown }
): void {
if (!(lbConfig instanceof CdsLoadBalancingConfig)) {
Expand Down
25 changes: 12 additions & 13 deletions packages/grpc-js-xds/src/load-balancer-lrs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@ import { XdsClusterLocalityStats, XdsClient, getSingletonXdsClient } from './xds
import LoadBalancer = experimental.LoadBalancer;
import ChannelControlHelper = experimental.ChannelControlHelper;
import registerLoadBalancerType = experimental.registerLoadBalancerType;
import getFirstUsableConfig = experimental.getFirstUsableConfig;
import SubchannelAddress = experimental.SubchannelAddress;
import LoadBalancingConfig = experimental.LoadBalancingConfig;
import ChildLoadBalancerHandler = experimental.ChildLoadBalancerHandler;
import Picker = experimental.Picker;
import PickArgs = experimental.PickArgs;
Expand All @@ -34,11 +32,12 @@ import Filter = experimental.Filter;
import BaseFilter = experimental.BaseFilter;
import FilterFactory = experimental.FilterFactory;
import Call = experimental.CallStream;
import validateLoadBalancingConfig = experimental.validateLoadBalancingConfig
import TypedLoadBalancingConfig = experimental.TypedLoadBalancingConfig;
import selectLbConfigFromList = experimental.selectLbConfigFromList;

const TYPE_NAME = 'lrs';

export class LrsLoadBalancingConfig implements LoadBalancingConfig {
class LrsLoadBalancingConfig implements TypedLoadBalancingConfig {
getLoadBalancerName(): string {
return TYPE_NAME;
}
Expand All @@ -49,12 +48,12 @@ export class LrsLoadBalancingConfig implements LoadBalancingConfig {
eds_service_name: this.edsServiceName,
lrs_load_reporting_server_name: this.lrsLoadReportingServer,
locality: this.locality,
child_policy: this.childPolicy.map(policy => policy.toJsonObject())
child_policy: [this.childPolicy.toJsonObject()]
}
}
}

constructor(private clusterName: string, private edsServiceName: string, private lrsLoadReportingServer: XdsServerConfig, private locality: Locality__Output, private childPolicy: LoadBalancingConfig[]) {}
constructor(private clusterName: string, private edsServiceName: string, private lrsLoadReportingServer: XdsServerConfig, private locality: Locality__Output, private childPolicy: TypedLoadBalancingConfig) {}

getClusterName() {
return this.clusterName;
Expand Down Expand Up @@ -98,11 +97,15 @@ export class LrsLoadBalancingConfig implements LoadBalancingConfig {
if (!('child_policy' in obj && Array.isArray(obj.child_policy))) {
throw new Error('lrs config must have a child_policy array');
}
const childConfig = selectLbConfigFromList(obj.config);
if (!childConfig) {
throw new Error('lrs config child_policy parsing failed');
}
return new LrsLoadBalancingConfig(obj.cluster_name, obj.eds_service_name, validateXdsServerConfig(obj.lrs_load_reporting_server), {
region: obj.locality.region ?? '',
zone: obj.locality.zone ?? '',
sub_zone: obj.locality.sub_zone ?? ''
}, obj.child_policy.map(validateLoadBalancingConfig));
}, childConfig);
}
}

Expand Down Expand Up @@ -161,7 +164,7 @@ export class LrsLoadBalancer implements LoadBalancer {

updateAddressList(
addressList: SubchannelAddress[],
lbConfig: LoadBalancingConfig,
lbConfig: TypedLoadBalancingConfig,
attributes: { [key: string]: unknown }
): void {
if (!(lbConfig instanceof LrsLoadBalancingConfig)) {
Expand All @@ -173,11 +176,7 @@ export class LrsLoadBalancer implements LoadBalancer {
lbConfig.getEdsServiceName(),
lbConfig.getLocality()
);
const childPolicy: LoadBalancingConfig = getFirstUsableConfig(
lbConfig.getChildPolicy(),
true
);
this.childBalancer.updateAddressList(addressList, childPolicy, attributes);
this.childBalancer.updateAddressList(addressList, lbConfig.getChildPolicy(), attributes);
}
exitIdle(): void {
this.childBalancer.exitIdle();
Expand Down
Loading

0 comments on commit 4d288de

Please sign in to comment.