Skip to content

Commit

Permalink
Fix BanyanDB metrics query: used the wrong Downsampling type to fin…
Browse files Browse the repository at this point in the history
…d the schema (#12399)
  • Loading branch information
wankai123 committed Jul 2, 2024
1 parent 05373d6 commit 75fc3e1
Show file tree
Hide file tree
Showing 13 changed files with 84 additions and 78 deletions.
1 change: 1 addition & 0 deletions docs/en/changes/changes.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
`persistence_timer_bulk_execute_latency` and `persistence_timer_bulk_all_latency` metrics in PersistenceTimer.
* [Break Change] Update Nacos version to 2.3.2. Nacos 1.x server can't serve as cluster coordinator and configuration server.
* Support tracing trace query(SkyWalking and Zipkin) for debugging.
* Fix BanyanDB metrics query: used the wrong `Downsampling` type to find the schema.

#### UI

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public List<SelectedRecord> sortMetrics(TopNCondition condition, String valueCol
}
}

return directMetricsTopN(condition, valueColumnName, spec, timestampRange, additionalConditions);
return directMetricsTopN(condition, schema, valueColumnName, spec, timestampRange, additionalConditions);
}

List<SelectedRecord> serverSideTopN(TopNCondition condition, MetadataRegistry.Schema schema, MetadataRegistry.ColumnSpec valueColumnSpec,
Expand Down Expand Up @@ -103,9 +103,9 @@ List<SelectedRecord> serverSideTopN(TopNCondition condition, MetadataRegistry.Sc
return topNList;
}

List<SelectedRecord> directMetricsTopN(TopNCondition condition, String valueColumnName, MetadataRegistry.ColumnSpec valueColumnSpec,
List<SelectedRecord> directMetricsTopN(TopNCondition condition, MetadataRegistry.Schema schema, String valueColumnName, MetadataRegistry.ColumnSpec valueColumnSpec,
TimestampRange timestampRange, List<KeyValue> additionalConditions) throws IOException {
MeasureQueryResponse resp = queryDebuggable(condition.getName(), TAGS, Collections.singleton(valueColumnName),
MeasureQueryResponse resp = queryDebuggable(schema, TAGS, Collections.singleton(valueColumnName),
timestampRange, new QueryBuilder<MeasureQuery>() {
@Override
protected void apply(MeasureQuery query) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.skywalking.banyandb.v1.client.StreamQuery;
import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
import org.apache.skywalking.banyandb.v1.client.TimestampRange;
import org.apache.skywalking.oap.server.core.analysis.DownSampling;
import org.apache.skywalking.oap.server.core.query.input.Duration;
import org.apache.skywalking.oap.server.core.storage.query.IZipkinQueryDAO;
import org.apache.skywalking.oap.server.core.zipkin.ZipkinServiceRelationTraffic;
Expand Down Expand Up @@ -84,8 +85,9 @@ public BanyanDBZipkinQueryDAO(BanyanDBStorageClient client) {

@Override
public List<String> getServiceNames() throws IOException {
MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(ZipkinServiceTraffic.INDEX_NAME, DownSampling.Minute);
MeasureQueryResponse resp =
query(ZipkinServiceTraffic.INDEX_NAME,
query(schema,
SERVICE_TRAFFIC_TAGS,
Collections.emptySet(), new QueryBuilder<MeasureQuery>() {

Expand All @@ -104,8 +106,9 @@ protected void apply(MeasureQuery query) {

@Override
public List<String> getRemoteServiceNames(final String serviceName) throws IOException {
MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(ZipkinServiceRelationTraffic.INDEX_NAME, DownSampling.Minute);
MeasureQueryResponse resp =
query(ZipkinServiceRelationTraffic.INDEX_NAME,
query(schema,
REMOTE_SERVICE_TRAFFIC_TAGS,
Collections.emptySet(), new QueryBuilder<MeasureQuery>() {

Expand All @@ -127,8 +130,9 @@ protected void apply(MeasureQuery query) {

@Override
public List<String> getSpanNames(final String serviceName) throws IOException {
MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(ZipkinServiceSpanTraffic.INDEX_NAME, DownSampling.Minute);
MeasureQueryResponse resp =
query(ZipkinServiceSpanTraffic.INDEX_NAME,
query(schema,
SPAN_TRAFFIC_TAGS,
Collections.emptySet(), new QueryBuilder<MeasureQuery>() {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@
import org.apache.skywalking.banyandb.v1.client.DataPoint;
import org.apache.skywalking.banyandb.v1.client.MeasureQuery;
import org.apache.skywalking.banyandb.v1.client.MeasureQueryResponse;
import org.apache.skywalking.oap.server.core.profiling.ebpf.storage.EBPFProfilingScheduleRecord;
import org.apache.skywalking.oap.server.core.analysis.DownSampling;
import org.apache.skywalking.oap.server.core.profiling.ebpf.storage.EBPFProfilingScheduleRecord;
import org.apache.skywalking.oap.server.core.query.type.EBPFProfilingSchedule;
import org.apache.skywalking.oap.server.core.storage.profiling.ebpf.IEBPFProfilingScheduleDAO;
import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
import org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.AbstractBanyanDBDAO;
import org.apache.skywalking.oap.server.storage.plugin.banyandb.MetadataRegistry;
import org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.AbstractBanyanDBDAO;

import java.io.IOException;
import java.util.Collections;
Expand All @@ -50,7 +52,8 @@ public BanyanDBEBPFProfilingScheduleQueryDAO(BanyanDBStorageClient client) {

@Override
public List<EBPFProfilingSchedule> querySchedules(String taskId) throws IOException {
MeasureQueryResponse resp = query(EBPFProfilingScheduleRecord.INDEX_NAME,
MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(EBPFProfilingScheduleRecord.INDEX_NAME, DownSampling.Minute);
MeasureQueryResponse resp = query(schema,
TAGS,
Collections.emptySet(), new QueryBuilder<MeasureQuery>() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.skywalking.banyandb.v1.client.MeasureQuery;
import org.apache.skywalking.banyandb.v1.client.MeasureQueryResponse;
import org.apache.skywalking.banyandb.v1.client.PairQueryCondition;
import org.apache.skywalking.oap.server.core.analysis.DownSampling;
import org.apache.skywalking.oap.server.core.analysis.Layer;
import org.apache.skywalking.oap.server.core.query.PaginationUtils;
import org.apache.skywalking.oap.server.core.query.enumeration.Order;
Expand All @@ -40,6 +41,7 @@
import org.apache.skywalking.oap.server.core.analysis.metrics.Event;
import org.apache.skywalking.oap.server.core.storage.query.IEventQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
import org.apache.skywalking.oap.server.storage.plugin.banyandb.MetadataRegistry;
import org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.AbstractBanyanDBDAO;

import static com.google.common.base.Strings.isNullOrEmpty;
Expand All @@ -56,7 +58,8 @@ public BanyanDBEventQueryDAO(final BanyanDBStorageClient client) {

@Override
public Events queryEvents(EventQueryCondition condition) throws Exception {
MeasureQueryResponse resp = query(Event.INDEX_NAME, TAGS,
MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(Event.INDEX_NAME, DownSampling.Minute);
MeasureQueryResponse resp = query(schema, TAGS,
Collections.emptySet(), buildQuery(Collections.singletonList(condition)));
Events events = new Events();
if (resp.size() == 0) {
Expand All @@ -70,7 +73,8 @@ public Events queryEvents(EventQueryCondition condition) throws Exception {

@Override
public Events queryEvents(List<EventQueryCondition> conditionList) throws Exception {
MeasureQueryResponse resp = query(Event.INDEX_NAME, TAGS,
MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(Event.INDEX_NAME, DownSampling.Minute);
MeasureQueryResponse resp = query(schema, TAGS,
Collections.emptySet(), buildQuery(conditionList));
Events events = new Events();
if (resp.size() == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ public BanyanDBHierarchyQueryDAO(final BanyanDBStorageClient client) {

@Override
public List<ServiceHierarchyRelationTraffic> readAllServiceHierarchyRelations() throws Exception {
MeasureQueryResponse resp = query(ServiceHierarchyRelationTraffic.INDEX_NAME,
MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(ServiceHierarchyRelationTraffic.INDEX_NAME, DownSampling.Minute);
MeasureQueryResponse resp = query(schema,
SERVICE_HIERARCHY_RELATION_TAGS,
Collections.emptySet(), new QueryBuilder<>() {
@Override
Expand All @@ -69,8 +70,6 @@ protected void apply(MeasureQuery query) {
);

final List<ServiceHierarchyRelationTraffic> relations = new ArrayList<>();
MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(
ServiceHierarchyRelationTraffic.INDEX_NAME, DownSampling.Minute);

for (final DataPoint dataPoint : resp.getDataPoints()) {
relations.add(new ServiceHierarchyRelationTraffic.Builder().storage2Entity(
Expand All @@ -83,14 +82,13 @@ protected void apply(MeasureQuery query) {
@Override
public List<InstanceHierarchyRelationTraffic> readInstanceHierarchyRelations(final String instanceId,
final String layer) throws Exception {
MeasureQueryResponse resp = query(InstanceHierarchyRelationTraffic.INDEX_NAME,
MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(ServiceHierarchyRelationTraffic.INDEX_NAME, DownSampling.Minute);
MeasureQueryResponse resp = query(schema,
INSTANCE_HIERARCHY_RELATION_TAGS,
Collections.emptySet(), buildInstanceRelationsQuery(instanceId, layer)
);

List<InstanceHierarchyRelationTraffic> relations = new ArrayList<>();
MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(
InstanceHierarchyRelationTraffic.INDEX_NAME, DownSampling.Minute);
for (final DataPoint dataPoint : resp.getDataPoints()) {
relations.add(new InstanceHierarchyRelationTraffic.Builder().storage2Entity(
new BanyanDBConverter.StorageToMeasure(schema, dataPoint)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,9 @@ public BanyanDBMetadataQueryDAO(BanyanDBStorageClient client) {

@Override
public List<Service> listServices() throws IOException {
MeasureQueryResponse resp = query(ServiceTraffic.INDEX_NAME,
MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(ServiceTraffic.INDEX_NAME, DownSampling.Minute);

MeasureQueryResponse resp = query(schema,
SERVICE_TRAFFIC_TAGS,
Collections.emptySet(), new QueryBuilder<MeasureQuery>() {
@Override
Expand All @@ -95,8 +97,6 @@ protected void apply(MeasureQuery query) {
});

final List<Service> services = new ArrayList<>();
MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(ServiceTraffic.INDEX_NAME, DownSampling.Minute);

for (final DataPoint dataPoint : resp.getDataPoints()) {
services.add(buildService(dataPoint, schema));
}
Expand All @@ -110,7 +110,8 @@ public List<ServiceInstance> listInstances(Duration duration, String serviceId)
if (duration != null) {
timestampRange = new TimestampRange(0, duration.getEndTimestamp());
}
MeasureQueryResponse resp = query(InstanceTraffic.INDEX_NAME,
MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(InstanceTraffic.INDEX_NAME, DownSampling.Minute);
MeasureQueryResponse resp = query(schema,
INSTANCE_TRAFFIC_TAGS,
Collections.emptySet(),
timestampRange,
Expand All @@ -126,7 +127,6 @@ protected void apply(MeasureQuery query) {
});

final List<ServiceInstance> instances = new ArrayList<>();
MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(InstanceTraffic.INDEX_NAME, DownSampling.Minute);
for (final DataPoint dataPoint : resp.getDataPoints()) {
instances.add(buildInstance(dataPoint, schema));
}
Expand All @@ -137,7 +137,8 @@ protected void apply(MeasureQuery query) {
@Override
public ServiceInstance getInstance(String instanceId) throws IOException {
IDManager.ServiceInstanceID.InstanceIDDefinition id = IDManager.ServiceInstanceID.analysisId(instanceId);
MeasureQueryResponse resp = query(InstanceTraffic.INDEX_NAME,
MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(InstanceTraffic.INDEX_NAME, DownSampling.Minute);
MeasureQueryResponse resp = query(schema,
INSTANCE_TRAFFIC_TAGS,
Collections.emptySet(),
new QueryBuilder<MeasureQuery>() {
Expand All @@ -147,13 +148,13 @@ protected void apply(MeasureQuery query) {
.and(eq(InstanceTraffic.NAME, id.getName()));
}
});
MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(InstanceTraffic.INDEX_NAME, DownSampling.Minute);
return resp.size() > 0 ? buildInstance(resp.getDataPoints().get(0), schema) : null;
}

@Override
public List<ServiceInstance> getInstances(List<String> instanceIds) throws IOException {
MeasureQueryResponse resp = query(InstanceTraffic.INDEX_NAME,
MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(InstanceTraffic.INDEX_NAME, DownSampling.Minute);
MeasureQueryResponse resp = query(schema,
INSTANCE_TRAFFIC_TAGS,
Collections.emptySet(),
new QueryBuilder<MeasureQuery>() {
Expand All @@ -169,13 +170,13 @@ protected void apply(MeasureQuery query) {
query.criteria(or(instanceRelationsQueryConditions));
}
});
MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(InstanceTraffic.INDEX_NAME, DownSampling.Minute);
return resp.getDataPoints().stream().map(e -> buildInstance(e, schema)).collect(Collectors.toList());
}

@Override
public List<Endpoint> findEndpoint(String keyword, String serviceId, int limit) throws IOException {
MeasureQueryResponse resp = query(EndpointTraffic.INDEX_NAME,
MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(EndpointTraffic.INDEX_NAME, DownSampling.Minute);
MeasureQueryResponse resp = query(schema,
ENDPOINT_TRAFFIC_TAGS,
Collections.emptySet(),
new QueryBuilder<MeasureQuery>() {
Expand All @@ -188,7 +189,6 @@ protected void apply(MeasureQuery query) {
});

final List<Endpoint> endpoints = new ArrayList<>();
MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(EndpointTraffic.INDEX_NAME, DownSampling.Minute);
for (final DataPoint dataPoint : resp.getDataPoints()) {
endpoints.add(buildEndpoint(dataPoint, schema));
}
Expand All @@ -201,7 +201,8 @@ protected void apply(MeasureQuery query) {

@Override
public List<Process> listProcesses(String serviceId, ProfilingSupportStatus supportStatus, long lastPingStartTimeBucket, long lastPingEndTimeBucket) throws IOException {
MeasureQueryResponse resp = query(ProcessTraffic.INDEX_NAME,
MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(ProcessTraffic.INDEX_NAME, DownSampling.Minute);
MeasureQueryResponse resp = query(schema,
PROCESS_TRAFFIC_TAGS,
Collections.emptySet(),
new QueryBuilder<MeasureQuery>() {
Expand All @@ -222,7 +223,6 @@ protected void apply(MeasureQuery query) {
});

final List<Process> processes = new ArrayList<>();
MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(ProcessTraffic.INDEX_NAME, DownSampling.Minute);
for (final DataPoint dataPoint : resp.getDataPoints()) {
processes.add(buildProcess(dataPoint, schema));
}
Expand All @@ -232,8 +232,9 @@ protected void apply(MeasureQuery query) {

@Override
public List<Process> listProcesses(String serviceInstanceId, Duration duration, boolean includeVirtual) throws IOException {
MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(ProcessTraffic.INDEX_NAME, DownSampling.Minute);
long lastPingStartTimeBucket = duration.getStartTimeBucket();
MeasureQueryResponse resp = query(ProcessTraffic.INDEX_NAME,
MeasureQueryResponse resp = query(schema,
PROCESS_TRAFFIC_TAGS,
Collections.emptySet(),
new QueryBuilder<MeasureQuery>() {
Expand All @@ -248,7 +249,6 @@ protected void apply(MeasureQuery query) {
});

final List<Process> processes = new ArrayList<>();
MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(ProcessTraffic.INDEX_NAME, DownSampling.Minute);
for (final DataPoint dataPoint : resp.getDataPoints()) {
processes.add(buildProcess(dataPoint, schema));
}
Expand All @@ -258,7 +258,8 @@ protected void apply(MeasureQuery query) {

@Override
public List<Process> listProcesses(String agentId) throws IOException {
MeasureQueryResponse resp = query(ProcessTraffic.INDEX_NAME,
MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(ProcessTraffic.INDEX_NAME, DownSampling.Minute);
MeasureQueryResponse resp = query(schema,
PROCESS_TRAFFIC_TAGS,
Collections.emptySet(),
new QueryBuilder<MeasureQuery>() {
Expand All @@ -270,7 +271,6 @@ protected void apply(MeasureQuery query) {
});

final List<Process> processes = new ArrayList<>();
MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(ProcessTraffic.INDEX_NAME, DownSampling.Minute);
for (final DataPoint dataPoint : resp.getDataPoints()) {
processes.add(buildProcess(dataPoint, schema));
}
Expand All @@ -280,7 +280,8 @@ protected void apply(MeasureQuery query) {

@Override
public long getProcessCount(String serviceId, ProfilingSupportStatus profilingSupportStatus, long lastPingStartTimeBucket, long lastPingEndTimeBucket) throws IOException {
MeasureQueryResponse resp = query(ProcessTraffic.INDEX_NAME,
MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(ProcessTraffic.INDEX_NAME, DownSampling.Minute);
MeasureQueryResponse resp = query(schema,
PROCESS_TRAFFIC_TAGS,
Collections.emptySet(),
new QueryBuilder<MeasureQuery>() {
Expand All @@ -301,7 +302,8 @@ protected void apply(MeasureQuery query) {

@Override
public long getProcessCount(String instanceId) throws IOException {
MeasureQueryResponse resp = query(ProcessTraffic.INDEX_NAME,
MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(ProcessTraffic.INDEX_NAME, DownSampling.Minute);
MeasureQueryResponse resp = query(schema,
PROCESS_TRAFFIC_TAGS,
Collections.emptySet(),
new QueryBuilder<MeasureQuery>() {
Expand All @@ -320,7 +322,8 @@ protected void apply(MeasureQuery query) {

@Override
public Process getProcess(String processId) throws IOException {
MeasureQueryResponse resp = query(ProcessTraffic.INDEX_NAME,
MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(ProcessTraffic.INDEX_NAME, DownSampling.Minute);
MeasureQueryResponse resp = query(schema,
PROCESS_TRAFFIC_TAGS,
Collections.emptySet(),
new QueryBuilder<MeasureQuery>() {
Expand All @@ -331,7 +334,6 @@ protected void apply(MeasureQuery query) {
}
}
});
MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(ProcessTraffic.INDEX_NAME, DownSampling.Minute);

return resp.size() > 0 ? buildProcess(resp.getDataPoints().get(0), schema) : null;
}
Expand Down
Loading

0 comments on commit 75fc3e1

Please sign in to comment.