From 75fc3e18bfd37762918670b95089df5a62b7e0c9 Mon Sep 17 00:00:00 2001 From: Wan Kai Date: Tue, 2 Jul 2024 13:15:46 +0800 Subject: [PATCH] Fix BanyanDB metrics query: used the wrong `Downsampling` type to find the schema (#12399) --- docs/en/changes/changes.md | 1 + .../banyandb/BanyanDBAggregationQueryDAO.java | 6 +-- .../banyandb/BanyanDBZipkinQueryDAO.java | 10 +++-- ...BanyanDBEBPFProfilingScheduleQueryDAO.java | 9 ++-- .../measure/BanyanDBEventQueryDAO.java | 8 +++- .../measure/BanyanDBHierarchyQueryDAO.java | 10 ++--- .../measure/BanyanDBMetadataQueryDAO.java | 44 ++++++++++--------- .../banyandb/measure/BanyanDBMetricsDAO.java | 2 +- .../BanyanDBNetworkAddressAliasDAO.java | 2 +- .../measure/BanyanDBServiceLabelDAO.java | 5 ++- .../BanyanDBTagAutocompleteQueryDAO.java | 8 +++- .../measure/BanyanDBTopologyQueryDAO.java | 17 +++---- .../banyandb/stream/AbstractBanyanDBDAO.java | 40 ++++++----------- 13 files changed, 84 insertions(+), 78 deletions(-) diff --git a/docs/en/changes/changes.md b/docs/en/changes/changes.md index 16b9c6c89068..97dc783e79ef 100644 --- a/docs/en/changes/changes.md +++ b/docs/en/changes/changes.md @@ -21,6 +21,7 @@ `persistence_timer_bulk_execute_latency` and `persistence_timer_bulk_all_latency` metrics in PersistenceTimer. * [Break Change] Update Nacos version to 2.3.2. Nacos 1.x server can't serve as cluster coordinator and configuration server. * Support tracing trace query(SkyWalking and Zipkin) for debugging. +* Fix BanyanDB metrics query: used the wrong `Downsampling` type to find the schema. #### UI diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBAggregationQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBAggregationQueryDAO.java index 6aa21c533296..6c21401d5d2e 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBAggregationQueryDAO.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBAggregationQueryDAO.java @@ -75,7 +75,7 @@ public List sortMetrics(TopNCondition condition, String valueCol } } - return directMetricsTopN(condition, valueColumnName, spec, timestampRange, additionalConditions); + return directMetricsTopN(condition, schema, valueColumnName, spec, timestampRange, additionalConditions); } List serverSideTopN(TopNCondition condition, MetadataRegistry.Schema schema, MetadataRegistry.ColumnSpec valueColumnSpec, @@ -103,9 +103,9 @@ List serverSideTopN(TopNCondition condition, MetadataRegistry.Sc return topNList; } - List directMetricsTopN(TopNCondition condition, String valueColumnName, MetadataRegistry.ColumnSpec valueColumnSpec, + List directMetricsTopN(TopNCondition condition, MetadataRegistry.Schema schema, String valueColumnName, MetadataRegistry.ColumnSpec valueColumnSpec, TimestampRange timestampRange, List additionalConditions) throws IOException { - MeasureQueryResponse resp = queryDebuggable(condition.getName(), TAGS, Collections.singleton(valueColumnName), + MeasureQueryResponse resp = queryDebuggable(schema, TAGS, Collections.singleton(valueColumnName), timestampRange, new QueryBuilder() { @Override protected void apply(MeasureQuery query) { diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBZipkinQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBZipkinQueryDAO.java index 2f2b35e4b1ac..f11ccb92fde8 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBZipkinQueryDAO.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBZipkinQueryDAO.java @@ -36,6 +36,7 @@ import org.apache.skywalking.banyandb.v1.client.StreamQuery; import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse; import org.apache.skywalking.banyandb.v1.client.TimestampRange; +import org.apache.skywalking.oap.server.core.analysis.DownSampling; import org.apache.skywalking.oap.server.core.query.input.Duration; import org.apache.skywalking.oap.server.core.storage.query.IZipkinQueryDAO; import org.apache.skywalking.oap.server.core.zipkin.ZipkinServiceRelationTraffic; @@ -84,8 +85,9 @@ public BanyanDBZipkinQueryDAO(BanyanDBStorageClient client) { @Override public List getServiceNames() throws IOException { + MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(ZipkinServiceTraffic.INDEX_NAME, DownSampling.Minute); MeasureQueryResponse resp = - query(ZipkinServiceTraffic.INDEX_NAME, + query(schema, SERVICE_TRAFFIC_TAGS, Collections.emptySet(), new QueryBuilder() { @@ -104,8 +106,9 @@ protected void apply(MeasureQuery query) { @Override public List getRemoteServiceNames(final String serviceName) throws IOException { + MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(ZipkinServiceRelationTraffic.INDEX_NAME, DownSampling.Minute); MeasureQueryResponse resp = - query(ZipkinServiceRelationTraffic.INDEX_NAME, + query(schema, REMOTE_SERVICE_TRAFFIC_TAGS, Collections.emptySet(), new QueryBuilder() { @@ -127,8 +130,9 @@ protected void apply(MeasureQuery query) { @Override public List getSpanNames(final String serviceName) throws IOException { + MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(ZipkinServiceSpanTraffic.INDEX_NAME, DownSampling.Minute); MeasureQueryResponse resp = - query(ZipkinServiceSpanTraffic.INDEX_NAME, + query(schema, SPAN_TRAFFIC_TAGS, Collections.emptySet(), new QueryBuilder() { diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBEBPFProfilingScheduleQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBEBPFProfilingScheduleQueryDAO.java index 7105d60009d5..9b68a1d1fef0 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBEBPFProfilingScheduleQueryDAO.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBEBPFProfilingScheduleQueryDAO.java @@ -24,11 +24,13 @@ import org.apache.skywalking.banyandb.v1.client.DataPoint; import org.apache.skywalking.banyandb.v1.client.MeasureQuery; import org.apache.skywalking.banyandb.v1.client.MeasureQueryResponse; -import org.apache.skywalking.oap.server.core.profiling.ebpf.storage.EBPFProfilingScheduleRecord; + import org.apache.skywalking.oap.server.core.analysis.DownSampling; + import org.apache.skywalking.oap.server.core.profiling.ebpf.storage.EBPFProfilingScheduleRecord; import org.apache.skywalking.oap.server.core.query.type.EBPFProfilingSchedule; import org.apache.skywalking.oap.server.core.storage.profiling.ebpf.IEBPFProfilingScheduleDAO; import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient; -import org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.AbstractBanyanDBDAO; + import org.apache.skywalking.oap.server.storage.plugin.banyandb.MetadataRegistry; + import org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.AbstractBanyanDBDAO; import java.io.IOException; import java.util.Collections; @@ -50,7 +52,8 @@ public BanyanDBEBPFProfilingScheduleQueryDAO(BanyanDBStorageClient client) { @Override public List querySchedules(String taskId) throws IOException { - MeasureQueryResponse resp = query(EBPFProfilingScheduleRecord.INDEX_NAME, + MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(EBPFProfilingScheduleRecord.INDEX_NAME, DownSampling.Minute); + MeasureQueryResponse resp = query(schema, TAGS, Collections.emptySet(), new QueryBuilder() { @Override diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBEventQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBEventQueryDAO.java index 567126239bbe..5bb76b90f6e4 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBEventQueryDAO.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBEventQueryDAO.java @@ -29,6 +29,7 @@ import org.apache.skywalking.banyandb.v1.client.MeasureQuery; import org.apache.skywalking.banyandb.v1.client.MeasureQueryResponse; import org.apache.skywalking.banyandb.v1.client.PairQueryCondition; +import org.apache.skywalking.oap.server.core.analysis.DownSampling; import org.apache.skywalking.oap.server.core.analysis.Layer; import org.apache.skywalking.oap.server.core.query.PaginationUtils; import org.apache.skywalking.oap.server.core.query.enumeration.Order; @@ -40,6 +41,7 @@ import org.apache.skywalking.oap.server.core.analysis.metrics.Event; import org.apache.skywalking.oap.server.core.storage.query.IEventQueryDAO; import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient; +import org.apache.skywalking.oap.server.storage.plugin.banyandb.MetadataRegistry; import org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.AbstractBanyanDBDAO; import static com.google.common.base.Strings.isNullOrEmpty; @@ -56,7 +58,8 @@ public BanyanDBEventQueryDAO(final BanyanDBStorageClient client) { @Override public Events queryEvents(EventQueryCondition condition) throws Exception { - MeasureQueryResponse resp = query(Event.INDEX_NAME, TAGS, + MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(Event.INDEX_NAME, DownSampling.Minute); + MeasureQueryResponse resp = query(schema, TAGS, Collections.emptySet(), buildQuery(Collections.singletonList(condition))); Events events = new Events(); if (resp.size() == 0) { @@ -70,7 +73,8 @@ public Events queryEvents(EventQueryCondition condition) throws Exception { @Override public Events queryEvents(List conditionList) throws Exception { - MeasureQueryResponse resp = query(Event.INDEX_NAME, TAGS, + MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(Event.INDEX_NAME, DownSampling.Minute); + MeasureQueryResponse resp = query(schema, TAGS, Collections.emptySet(), buildQuery(conditionList)); Events events = new Events(); if (resp.size() == 0) { diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBHierarchyQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBHierarchyQueryDAO.java index c8fc1891ea35..c2be98910c79 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBHierarchyQueryDAO.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBHierarchyQueryDAO.java @@ -59,7 +59,8 @@ public BanyanDBHierarchyQueryDAO(final BanyanDBStorageClient client) { @Override public List readAllServiceHierarchyRelations() throws Exception { - MeasureQueryResponse resp = query(ServiceHierarchyRelationTraffic.INDEX_NAME, + MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(ServiceHierarchyRelationTraffic.INDEX_NAME, DownSampling.Minute); + MeasureQueryResponse resp = query(schema, SERVICE_HIERARCHY_RELATION_TAGS, Collections.emptySet(), new QueryBuilder<>() { @Override @@ -69,8 +70,6 @@ protected void apply(MeasureQuery query) { ); final List relations = new ArrayList<>(); - MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata( - ServiceHierarchyRelationTraffic.INDEX_NAME, DownSampling.Minute); for (final DataPoint dataPoint : resp.getDataPoints()) { relations.add(new ServiceHierarchyRelationTraffic.Builder().storage2Entity( @@ -83,14 +82,13 @@ protected void apply(MeasureQuery query) { @Override public List readInstanceHierarchyRelations(final String instanceId, final String layer) throws Exception { - MeasureQueryResponse resp = query(InstanceHierarchyRelationTraffic.INDEX_NAME, + MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(ServiceHierarchyRelationTraffic.INDEX_NAME, DownSampling.Minute); + MeasureQueryResponse resp = query(schema, INSTANCE_HIERARCHY_RELATION_TAGS, Collections.emptySet(), buildInstanceRelationsQuery(instanceId, layer) ); List relations = new ArrayList<>(); - MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata( - InstanceHierarchyRelationTraffic.INDEX_NAME, DownSampling.Minute); for (final DataPoint dataPoint : resp.getDataPoints()) { relations.add(new InstanceHierarchyRelationTraffic.Builder().storage2Entity( new BanyanDBConverter.StorageToMeasure(schema, dataPoint))); diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetadataQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetadataQueryDAO.java index 07333538d12f..015933256d29 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetadataQueryDAO.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetadataQueryDAO.java @@ -86,7 +86,9 @@ public BanyanDBMetadataQueryDAO(BanyanDBStorageClient client) { @Override public List listServices() throws IOException { - MeasureQueryResponse resp = query(ServiceTraffic.INDEX_NAME, + MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(ServiceTraffic.INDEX_NAME, DownSampling.Minute); + + MeasureQueryResponse resp = query(schema, SERVICE_TRAFFIC_TAGS, Collections.emptySet(), new QueryBuilder() { @Override @@ -95,8 +97,6 @@ protected void apply(MeasureQuery query) { }); final List services = new ArrayList<>(); - MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(ServiceTraffic.INDEX_NAME, DownSampling.Minute); - for (final DataPoint dataPoint : resp.getDataPoints()) { services.add(buildService(dataPoint, schema)); } @@ -110,7 +110,8 @@ public List listInstances(Duration duration, String serviceId) if (duration != null) { timestampRange = new TimestampRange(0, duration.getEndTimestamp()); } - MeasureQueryResponse resp = query(InstanceTraffic.INDEX_NAME, + MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(InstanceTraffic.INDEX_NAME, DownSampling.Minute); + MeasureQueryResponse resp = query(schema, INSTANCE_TRAFFIC_TAGS, Collections.emptySet(), timestampRange, @@ -126,7 +127,6 @@ protected void apply(MeasureQuery query) { }); final List instances = new ArrayList<>(); - MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(InstanceTraffic.INDEX_NAME, DownSampling.Minute); for (final DataPoint dataPoint : resp.getDataPoints()) { instances.add(buildInstance(dataPoint, schema)); } @@ -137,7 +137,8 @@ protected void apply(MeasureQuery query) { @Override public ServiceInstance getInstance(String instanceId) throws IOException { IDManager.ServiceInstanceID.InstanceIDDefinition id = IDManager.ServiceInstanceID.analysisId(instanceId); - MeasureQueryResponse resp = query(InstanceTraffic.INDEX_NAME, + MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(InstanceTraffic.INDEX_NAME, DownSampling.Minute); + MeasureQueryResponse resp = query(schema, INSTANCE_TRAFFIC_TAGS, Collections.emptySet(), new QueryBuilder() { @@ -147,13 +148,13 @@ protected void apply(MeasureQuery query) { .and(eq(InstanceTraffic.NAME, id.getName())); } }); - MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(InstanceTraffic.INDEX_NAME, DownSampling.Minute); return resp.size() > 0 ? buildInstance(resp.getDataPoints().get(0), schema) : null; } @Override public List getInstances(List instanceIds) throws IOException { - MeasureQueryResponse resp = query(InstanceTraffic.INDEX_NAME, + MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(InstanceTraffic.INDEX_NAME, DownSampling.Minute); + MeasureQueryResponse resp = query(schema, INSTANCE_TRAFFIC_TAGS, Collections.emptySet(), new QueryBuilder() { @@ -169,13 +170,13 @@ protected void apply(MeasureQuery query) { query.criteria(or(instanceRelationsQueryConditions)); } }); - MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(InstanceTraffic.INDEX_NAME, DownSampling.Minute); return resp.getDataPoints().stream().map(e -> buildInstance(e, schema)).collect(Collectors.toList()); } @Override public List findEndpoint(String keyword, String serviceId, int limit) throws IOException { - MeasureQueryResponse resp = query(EndpointTraffic.INDEX_NAME, + MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(EndpointTraffic.INDEX_NAME, DownSampling.Minute); + MeasureQueryResponse resp = query(schema, ENDPOINT_TRAFFIC_TAGS, Collections.emptySet(), new QueryBuilder() { @@ -188,7 +189,6 @@ protected void apply(MeasureQuery query) { }); final List endpoints = new ArrayList<>(); - MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(EndpointTraffic.INDEX_NAME, DownSampling.Minute); for (final DataPoint dataPoint : resp.getDataPoints()) { endpoints.add(buildEndpoint(dataPoint, schema)); } @@ -201,7 +201,8 @@ protected void apply(MeasureQuery query) { @Override public List listProcesses(String serviceId, ProfilingSupportStatus supportStatus, long lastPingStartTimeBucket, long lastPingEndTimeBucket) throws IOException { - MeasureQueryResponse resp = query(ProcessTraffic.INDEX_NAME, + MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(ProcessTraffic.INDEX_NAME, DownSampling.Minute); + MeasureQueryResponse resp = query(schema, PROCESS_TRAFFIC_TAGS, Collections.emptySet(), new QueryBuilder() { @@ -222,7 +223,6 @@ protected void apply(MeasureQuery query) { }); final List processes = new ArrayList<>(); - MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(ProcessTraffic.INDEX_NAME, DownSampling.Minute); for (final DataPoint dataPoint : resp.getDataPoints()) { processes.add(buildProcess(dataPoint, schema)); } @@ -232,8 +232,9 @@ protected void apply(MeasureQuery query) { @Override public List listProcesses(String serviceInstanceId, Duration duration, boolean includeVirtual) throws IOException { + MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(ProcessTraffic.INDEX_NAME, DownSampling.Minute); long lastPingStartTimeBucket = duration.getStartTimeBucket(); - MeasureQueryResponse resp = query(ProcessTraffic.INDEX_NAME, + MeasureQueryResponse resp = query(schema, PROCESS_TRAFFIC_TAGS, Collections.emptySet(), new QueryBuilder() { @@ -248,7 +249,6 @@ protected void apply(MeasureQuery query) { }); final List processes = new ArrayList<>(); - MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(ProcessTraffic.INDEX_NAME, DownSampling.Minute); for (final DataPoint dataPoint : resp.getDataPoints()) { processes.add(buildProcess(dataPoint, schema)); } @@ -258,7 +258,8 @@ protected void apply(MeasureQuery query) { @Override public List listProcesses(String agentId) throws IOException { - MeasureQueryResponse resp = query(ProcessTraffic.INDEX_NAME, + MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(ProcessTraffic.INDEX_NAME, DownSampling.Minute); + MeasureQueryResponse resp = query(schema, PROCESS_TRAFFIC_TAGS, Collections.emptySet(), new QueryBuilder() { @@ -270,7 +271,6 @@ protected void apply(MeasureQuery query) { }); final List processes = new ArrayList<>(); - MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(ProcessTraffic.INDEX_NAME, DownSampling.Minute); for (final DataPoint dataPoint : resp.getDataPoints()) { processes.add(buildProcess(dataPoint, schema)); } @@ -280,7 +280,8 @@ protected void apply(MeasureQuery query) { @Override public long getProcessCount(String serviceId, ProfilingSupportStatus profilingSupportStatus, long lastPingStartTimeBucket, long lastPingEndTimeBucket) throws IOException { - MeasureQueryResponse resp = query(ProcessTraffic.INDEX_NAME, + MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(ProcessTraffic.INDEX_NAME, DownSampling.Minute); + MeasureQueryResponse resp = query(schema, PROCESS_TRAFFIC_TAGS, Collections.emptySet(), new QueryBuilder() { @@ -301,7 +302,8 @@ protected void apply(MeasureQuery query) { @Override public long getProcessCount(String instanceId) throws IOException { - MeasureQueryResponse resp = query(ProcessTraffic.INDEX_NAME, + MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(ProcessTraffic.INDEX_NAME, DownSampling.Minute); + MeasureQueryResponse resp = query(schema, PROCESS_TRAFFIC_TAGS, Collections.emptySet(), new QueryBuilder() { @@ -320,7 +322,8 @@ protected void apply(MeasureQuery query) { @Override public Process getProcess(String processId) throws IOException { - MeasureQueryResponse resp = query(ProcessTraffic.INDEX_NAME, + MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(ProcessTraffic.INDEX_NAME, DownSampling.Minute); + MeasureQueryResponse resp = query(schema, PROCESS_TRAFFIC_TAGS, Collections.emptySet(), new QueryBuilder() { @@ -331,7 +334,6 @@ protected void apply(MeasureQuery query) { } } }); - MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(ProcessTraffic.INDEX_NAME, DownSampling.Minute); return resp.size() > 0 ? buildProcess(resp.getDataPoints().get(0), schema) : null; } diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetricsDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetricsDAO.java index fd519c8a1b90..f736fc5ece3f 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetricsDAO.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetricsDAO.java @@ -125,7 +125,7 @@ public List multiGet(Model model, List metrics) throws IOExcep } List metricsInStorage = new ArrayList<>(metrics.size()); - MeasureQueryResponse resp = query(model.getName(), schema.getTags(), schema.getFields(), timestampRange, new QueryBuilder() { + MeasureQueryResponse resp = query(schema, schema.getTags(), schema.getFields(), timestampRange, new QueryBuilder() { @Override protected void apply(MeasureQuery query) { seriesIDColumns.entrySet().forEach(entry -> { diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBNetworkAddressAliasDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBNetworkAddressAliasDAO.java index b5598ef12bae..44d44822279e 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBNetworkAddressAliasDAO.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBNetworkAddressAliasDAO.java @@ -62,7 +62,7 @@ private MetadataRegistry.Schema getSchema() { public List loadLastUpdate(long timeBucket) { try { MeasureQueryResponse resp = query( - NetworkAddressAlias.INDEX_NAME, + getSchema(), TAGS, Collections.emptySet(), new QueryBuilder() { diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBServiceLabelDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBServiceLabelDAO.java index 6d6905f89800..734edb7ef79c 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBServiceLabelDAO.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBServiceLabelDAO.java @@ -27,9 +27,11 @@ import java.util.stream.Collectors; import org.apache.skywalking.banyandb.v1.client.MeasureQuery; +import org.apache.skywalking.oap.server.core.analysis.DownSampling; import org.apache.skywalking.oap.server.core.analysis.manual.process.ServiceLabelRecord; import org.apache.skywalking.oap.server.core.storage.profiling.ebpf.IServiceLabelDAO; import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient; +import org.apache.skywalking.oap.server.storage.plugin.banyandb.MetadataRegistry; import org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.AbstractBanyanDBDAO; public class BanyanDBServiceLabelDAO extends AbstractBanyanDBDAO implements IServiceLabelDAO { @@ -41,7 +43,8 @@ public BanyanDBServiceLabelDAO(final BanyanDBStorageClient client) { @Override public List queryAllLabels(String serviceId) throws IOException { - return query(ServiceLabelRecord.INDEX_NAME, TAGS, + MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(ServiceLabelRecord.INDEX_NAME, DownSampling.Minute); + return query(schema, TAGS, Collections.emptySet(), new QueryBuilder() { @Override protected void apply(final MeasureQuery query) { diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBTagAutocompleteQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBTagAutocompleteQueryDAO.java index fda01008302c..9b8e2fa0e7d5 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBTagAutocompleteQueryDAO.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBTagAutocompleteQueryDAO.java @@ -23,12 +23,14 @@ import org.apache.skywalking.banyandb.v1.client.MeasureQuery; import org.apache.skywalking.banyandb.v1.client.MeasureQueryResponse; import org.apache.skywalking.banyandb.v1.client.TimestampRange; +import org.apache.skywalking.oap.server.core.analysis.DownSampling; import org.apache.skywalking.oap.server.core.analysis.TimeBucket; import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.TagAutocompleteData; import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.TagType; import org.apache.skywalking.oap.server.core.query.input.Duration; import org.apache.skywalking.oap.server.core.storage.query.ITagAutoCompleteQueryDAO; import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient; +import org.apache.skywalking.oap.server.storage.plugin.banyandb.MetadataRegistry; import org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.AbstractBanyanDBDAO; import java.io.IOException; @@ -51,6 +53,7 @@ public BanyanDBTagAutocompleteQueryDAO(BanyanDBStorageClient client) { @Override public Set queryTagAutocompleteKeys(TagType tagType, int limit, Duration duration) throws IOException { + MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(TagAutocompleteData.INDEX_NAME, DownSampling.Minute); long startTB = 0; long endTB = 0; if (nonNull(duration)) { @@ -61,7 +64,7 @@ public Set queryTagAutocompleteKeys(TagType tagType, int limit, Duration if (startTB > 0 && endTB > 0) { range = new TimestampRange(TimeBucket.getTimestamp(startTB), TimeBucket.getTimestamp(endTB)); } - MeasureQueryResponse resp = query(TagAutocompleteData.INDEX_NAME, + MeasureQueryResponse resp = query(schema, TAGS_KEY, Collections.emptySet(), range, new QueryBuilder() { @@ -87,6 +90,7 @@ protected void apply(MeasureQuery query) { @Override public Set queryTagAutocompleteValues(TagType tagType, String tagKey, int limit, Duration duration) throws IOException { + MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(TagAutocompleteData.INDEX_NAME, DownSampling.Minute); long startSecondTB = 0; long endSecondTB = 0; if (nonNull(duration)) { @@ -101,7 +105,7 @@ public Set queryTagAutocompleteValues(TagType tagType, String tagKey, in if (startTB > 0 && endTB > 0) { range = new TimestampRange(TimeBucket.getTimestamp(startTB), TimeBucket.getTimestamp(endTB)); } - MeasureQueryResponse resp = query(TagAutocompleteData.INDEX_NAME, + MeasureQueryResponse resp = query(schema, TAGS_KV, Collections.emptySet(), range, new QueryBuilder() { diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBTopologyQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBTopologyQueryDAO.java index c754b5883bef..e1cd297ee378 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBTopologyQueryDAO.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBTopologyQueryDAO.java @@ -50,6 +50,7 @@ import org.apache.skywalking.oap.server.core.storage.query.ITopologyQueryDAO; import org.apache.skywalking.oap.server.library.util.CollectionUtils; import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient; +import org.apache.skywalking.oap.server.storage.plugin.banyandb.MetadataRegistry; import org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.AbstractBanyanDBDAO; import static java.util.Objects.nonNull; @@ -120,8 +121,8 @@ List queryServiceRelation(Duration duration, } final String modelName = detectPoint == DetectPoint.SERVER ? ServiceRelationServerSideMetrics.INDEX_NAME : ServiceRelationClientSideMetrics.INDEX_NAME; - - MeasureQueryResponse resp = query(modelName, + MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(modelName, duration.getStep()); + MeasureQueryResponse resp = queryDebuggable(schema, ImmutableSet.of( ServiceRelationClientSideMetrics.COMPONENT_IDS, Metrics.ENTITY_ID @@ -203,8 +204,8 @@ List queryInstanceRelation(Duration duration, } final String modelName = detectPoint == DetectPoint.SERVER ? ServiceInstanceRelationServerSideMetrics.INDEX_NAME : ServiceInstanceRelationClientSideMetrics.INDEX_NAME; - - MeasureQueryResponse resp = query(modelName, + MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(modelName, duration.getStep()); + MeasureQueryResponse resp = query(schema, ImmutableSet.of( Metrics.ENTITY_ID ), @@ -267,8 +268,8 @@ List queryEndpointRelation(Duration duration, if (startTB > 0 && endTB > 0) { timestampRange = new TimestampRange(TimeBucket.getTimestamp(startTB), TimeBucket.getTimestamp(endTB)); } - - MeasureQueryResponse resp = query(EndpointRelationServerSideMetrics.INDEX_NAME, + MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(EndpointRelationServerSideMetrics.INDEX_NAME, duration.getStep()); + MeasureQueryResponse resp = query(schema, ImmutableSet.of( Metrics.ENTITY_ID ), @@ -302,8 +303,8 @@ List queryProcessRelation(Duration duration, } final String modelName = detectPoint == DetectPoint.SERVER ? ProcessRelationServerSideMetrics.INDEX_NAME : ProcessRelationClientSideMetrics.INDEX_NAME; - - MeasureQueryResponse resp = query(modelName, + MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(modelName, duration.getStep()); + MeasureQueryResponse resp = query(schema, ImmutableSet.of(Metrics.ENTITY_ID, ProcessRelationClientSideMetrics.COMPONENT_ID), Collections.emptySet(), timestampRange, new QueryBuilder() { @Override diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/AbstractBanyanDBDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/AbstractBanyanDBDAO.java index 31e5ca45b2c9..466077dd1d33 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/AbstractBanyanDBDAO.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/AbstractBanyanDBDAO.java @@ -31,7 +31,6 @@ import org.apache.skywalking.banyandb.v1.client.TimestampRange; import org.apache.skywalking.banyandb.v1.client.TopNQuery; import org.apache.skywalking.banyandb.v1.client.TopNQueryResponse; -import org.apache.skywalking.oap.server.core.analysis.DownSampling; import org.apache.skywalking.oap.server.core.query.type.KeyValue; import org.apache.skywalking.oap.server.core.query.type.debugging.DebuggingSpan; import org.apache.skywalking.oap.server.core.query.type.debugging.DebuggingTraceContext; @@ -99,7 +98,7 @@ protected StreamQueryResponse queryDebuggable(String modelName, Set tags } StreamQueryResponse response = query(modelName, tags, timestampRange, queryBuilder); if (traceContext != null && traceContext.isDumpStorageRsp()) { - builder.append("\n").append(" Response: ").append(new Gson().toJson(response)); + builder.append("\n").append(" Response: ").append(new Gson().toJson(response.getElements())); span.setMsg(builder.toString()); } return response; @@ -110,11 +109,6 @@ protected StreamQueryResponse queryDebuggable(String modelName, Set tags } } - protected MeasureQueryResponse query(String measureModelName, Set tags, Set fields, - QueryBuilder builder) throws IOException { - return this.query(measureModelName, tags, fields, null, builder); - } - protected TopNQueryResponse topN(MetadataRegistry.Schema schema, TimestampRange timestampRange, int number, List additionalConditions) throws IOException { return topNQuery(schema, timestampRange, number, AbstractQuery.Sort.DESC, additionalConditions); @@ -148,7 +142,7 @@ protected TopNQueryResponse topNQueryDebuggable(MetadataRegistry.Schema schema, } TopNQueryResponse response = topNQuery(schema, timestampRange, number, sort, additionalConditions); if (traceContext != null && traceContext.isDumpStorageRsp()) { - builder.append("\n").append(" Response: ").append(new Gson().toJson(response)); + builder.append("\n").append(" Response: ").append(new Gson().toJson(response.getTopNLists())); span.setMsg(builder.toString()); } return response; @@ -175,24 +169,6 @@ private TopNQueryResponse topNQuery(MetadataRegistry.Schema schema, TimestampRan return getClient().query(q); } - protected MeasureQueryResponse query(String measureModelName, Set tags, Set fields, - TimestampRange timestampRange, QueryBuilder builder) throws IOException { - MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(measureModelName, DownSampling.Minute); - if (schema == null) { - throw new IllegalArgumentException("measure is not registered"); - } - return this.query(schema, tags, fields, timestampRange, builder); - } - - protected MeasureQueryResponse queryDebuggable(String measureModelName, Set tags, Set fields, - TimestampRange timestampRange, QueryBuilder builder) throws IOException { - MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(measureModelName, DownSampling.Minute); - if (schema == null) { - throw new IllegalArgumentException("measure is not registered"); - } - return queryDebuggable(schema, tags, fields, timestampRange, builder); - } - protected MeasureQueryResponse queryDebuggable(MetadataRegistry.Schema schema, Set tags, Set fields, @@ -217,7 +193,7 @@ protected MeasureQueryResponse queryDebuggable(MetadataRegistry.Schema schema, } MeasureQueryResponse response = query(schema, tags, fields, timestampRange, queryBuilder); if (traceContext != null && traceContext.isDumpStorageRsp()) { - builder.append("\n").append(" Response: ").append(new Gson().toJson(response)); + builder.append("\n").append(" Response: ").append(new Gson().toJson(response.getDataPoints())); span.setMsg(builder.toString()); } return response; @@ -228,8 +204,18 @@ protected MeasureQueryResponse queryDebuggable(MetadataRegistry.Schema schema, } } + protected MeasureQueryResponse query(MetadataRegistry.Schema schema, + Set tags, + Set fields, + QueryBuilder builder) throws IOException { + return query(schema, tags, fields, null, builder); + } + protected MeasureQueryResponse query(MetadataRegistry.Schema schema, Set tags, Set fields, TimestampRange timestampRange, QueryBuilder builder) throws IOException { + if (schema == null) { + throw new IllegalArgumentException("measure is not registered"); + } final MeasureQuery query; if (timestampRange == null) { query = new MeasureQuery(List.of(schema.getMetadata().getGroup()), schema.getMetadata().name(), LARGEST_TIME_RANGE, tags, fields);