diff --git a/.licenserc.yaml b/.licenserc.yaml
index 445d5362c1d4..e9f54c08041d 100644
--- a/.licenserc.yaml
+++ b/.licenserc.yaml
@@ -60,6 +60,7 @@ header:
- '**/src/main/proto/jaeger/**'
- '**/src/main/proto/mixer/**'
- '**/src/main/proto/policy/**'
+ - '**/src/main/proto/cilium/**'
- '**/src/main/proto/prometheus/client_model/metrics.proto'
- '**/src/main/proto/protoc-gen-swagger/**'
- '**/src/main/proto/validate/validate.proto'
diff --git a/dist-material/release-docs/LICENSE b/dist-material/release-docs/LICENSE
index efce54ed310b..7cbf202b2713 100644
--- a/dist-material/release-docs/LICENSE
+++ b/dist-material/release-docs/LICENSE
@@ -241,6 +241,8 @@ The text of each license is the standard Apache 2.0 license.
https://mvnrepository.com/artifact/com.linecorp.armeria/armeria/1.27.3 Apache-2.0
https://mvnrepository.com/artifact/com.linecorp.armeria/armeria-graphql/1.27.3 Apache-2.0
https://mvnrepository.com/artifact/com.linecorp.armeria/armeria-graphql-protocol/1.27.3 Apache-2.0
+ https://mvnrepository.com/artifact/com.linecorp.armeria/armeria-grpc/1.27.3 Apache-2.0
+ https://mvnrepository.com/artifact/com.linecorp.armeria/armeria-grpc-protocol/1.27.3 Apache-2.0
https://mvnrepository.com/artifact/com.linecorp.armeria/armeria-protobuf/1.27.3 Apache-2.0
https://mvnrepository.com/artifact/com.orbitz.consul/consul-client/1.5.3 Apache-2.0
https://mvnrepository.com/artifact/com.squareup.okhttp3/okhttp/3.14.9 Apache-2.0
@@ -295,6 +297,7 @@ The text of each license is the standard Apache 2.0 license.
https://mvnrepository.com/artifact/io.grpc/grpc-netty/1.63.0 Apache-2.0
https://mvnrepository.com/artifact/io.grpc/grpc-protobuf/1.63.0 Apache-2.0
https://mvnrepository.com/artifact/io.grpc/grpc-protobuf-lite/1.63.0 Apache-2.0
+ https://mvnrepository.com/artifact/io.grpc/grpc-services/1.61.0 Apache-2.0
https://mvnrepository.com/artifact/io.grpc/grpc-stub/1.63.0 Apache-2.0
https://mvnrepository.com/artifact/io.grpc/grpc-util/1.63.0 Apache-2.0
https://mvnrepository.com/artifact/io.micrometer/micrometer-commons/1.12.2 Apache-2.0
@@ -371,6 +374,7 @@ The text of each license is the standard Apache 2.0 license.
https://mvnrepository.com/artifact/org.xerial.snappy/snappy-java/1.1.8.4 Apache-2.0
https://mvnrepository.com/artifact/org.yaml/snakeyaml/2.0 Apache-2.0
https://npmjs.com/package/typescript/v/4.7.4 4.7.4 Apache-2.0
+ https://github.com/cilium/cilium/tree/v1.15.6/api/v1 Apache-2.0
========================================================================
BSD-2-Clause licenses
@@ -544,6 +548,7 @@ The text of each license is also included in licenses/LICENSE-[project].txt.
https://npmjs.com/package/nanoid/v/3.3.7 3.3.7 MIT
https://mvnrepository.com/artifact/org.checkerframework/checker-qual/3.33.0 MIT
https://mvnrepository.com/artifact/org.codehaus.mojo/animal-sniffer-annotations/1.23 MIT
+ https://mvnrepository.com/artifact/org.curioswitch.curiostack/protobuf-jackson/2.2.0 MIT
https://npmjs.com/package/pinia/v/2.0.28 2.0.28 MIT
https://npmjs.com/package/pinia/node_modules/vue-demi/v/0.13.11 0.13.11 MIT
https://npmjs.com/package/postcss/v/8.4.33 8.4.33 MIT
diff --git a/dist-material/release-docs/LICENSE.tpl b/dist-material/release-docs/LICENSE.tpl
index 5479740ec7ba..738f07ec2370 100644
--- a/dist-material/release-docs/LICENSE.tpl
+++ b/dist-material/release-docs/LICENSE.tpl
@@ -30,6 +30,9 @@ The text of each license is also included in licenses/LICENSE-[project].txt.
https://npmjs.com/package/{{ .Name }}/v/{{ .Version }} {{ .Version }} {{ .LicenseID }}
{{- end }}
{{- end }}
+ {{- if eq .LicenseID "Apache-2.0" }}
+ https://github.com/cilium/cilium/tree/v1.15.6/api/v1 Apache-2.0
+ {{- end }}
{{ end }}
=======================================================================
The zipkin-lens.jar dependency has more front-end dependencies in it and the front-end dependencies' licenses
diff --git a/docs/en/changes/changes.md b/docs/en/changes/changes.md
index 97dc783e79ef..000dd37544c1 100644
--- a/docs/en/changes/changes.md
+++ b/docs/en/changes/changes.md
@@ -22,9 +22,14 @@
* [Break Change] Update Nacos version to 2.3.2. Nacos 1.x server can't serve as cluster coordinator and configuration server.
* Support tracing trace query(SkyWalking and Zipkin) for debugging.
* Fix BanyanDB metrics query: used the wrong `Downsampling` type to find the schema.
+* Support fetch cilium flow to monitoring network traffic between cilium services.
+* Support `labelCount` function in the OAL engine.
#### UI
-
+* Highlight search log keywords.
+* Add Error URL in the browser log.
+* Add a SolonMVC icon.
+* Adding cilium icon and i18n for menu.
#### Documentation
diff --git a/docs/en/concepts-and-designs/oal.md b/docs/en/concepts-and-designs/oal.md
index 1f88901b90e8..34e55074ef6f 100644
--- a/docs/en/concepts-and-designs/oal.md
+++ b/docs/en/concepts-and-designs/oal.md
@@ -102,6 +102,11 @@ In this case, see `p99`, `p95`, `p90`, `p75`, and `p50` of all incoming requests
In this case, the p99 value of all incoming requests. The parameter is precise to a latency at p99, such as in the above case, and 120ms and 124ms are considered to produce the same response time.
+- `labelCount`. The count of the label value.
+> drop_reason_count = from(CiliumService.*).filter(verdict == "dropped").labelCount(dropReason);
+
+In this case, the count of the drop reason of each Cilium service.
+
## Metrics name
The metrics name for storage implementor, alarm and query modules. The type inference is supported by core.
diff --git a/docs/en/concepts-and-designs/scope-definitions.md b/docs/en/concepts-and-designs/scope-definitions.md
index 1047c6383960..1d2e5ab53cfb 100644
--- a/docs/en/concepts-and-designs/scope-definitions.md
+++ b/docs/en/concepts-and-designs/scope-definitions.md
@@ -566,4 +566,106 @@ For `K8SEndpoint` and `K8SEndpointRelation`, they only have the following **prot
| componentId | The ID of component used in this call. | | string |
| destServiceName | The dest service name in kubernetes. | | string |
| destServiceName | The layer in kubernetes dest service. | | string |
-| destEndpointName | The endpoint name detect in kubernetes dest service. | | string |
\ No newline at end of file
+| destEndpointName | The endpoint name detect in kubernetes dest service. | | string |
+
+### SCOPES with `Cilium` Prefix
+
+All metrics starting with `Cilium` are derived from Cilium monitoring by Cilium Hubble.
+
+#### Service, Service Instance and relations
+
+For all `CiliumService`, `CiliumServiceInstance`, `CiliumServiceRelation` and `CiliumServiceInstanceRelation`, they all have the
+following **L4**/**L7** metric contents.
+
+| Name | Remarks | Group Key | Type |
+|-----------------------|-----------------------------------------------------------------------------|-----------|--------|
+| verdict | The metrics verdict from Flow. The value may be `forwarded` and `dropped`. | | string |
+| type | The metrics type from Flow. The value may be `tcp`, `http`, `dns`, `kakfa`. | | string |
+| direction | The metrics direction from Flow. The value may be `ingress` and `egress`. | | string |
+| dropReason | When the verdict is `dropped`, the drop reason would be recorded. | | string |
+| http.url | The URL of the HTTP request. | | string |
+| http.code | The Response code of the HTTP response. | | int |
+| http.protocol | The protocol of the HTTP request. | | string |
+| http.method | The method of the HTTP request. | | string |
+| kafka.errorCode | The error code of the Kafka request. | | int |
+| kafka.errorCodeString | The error code explaination of the Kafka request. | | string |
+| kafka.apiVersion | The API version of the Kafka request. | | string |
+| kafka.apiKey | The API key of the Kafka request. | | string |
+| kafka.correlationId | The correlation ID of the Kafka request. | | string |
+| kafka.topic | The topic of the Kafka request. | | string |
+| dns.domain | The domain of the DNS request. | | string |
+| dns.queryType | The query type of the DNS request. | | string |
+| dns.rcode | The response code of the DNS request. | | int |
+| dns.recodeString | The response code explaination of the DNS request. | | string |
+| dns.ttl | The TTL of the DNS request. | | int |
+| dns.ipCount | The count of the IP addresses of the DNS responsed. | | int |
+| duration | The duration(millisecond) of the L7 response. | | long |
+| success | Is the response success of the L7 response. | | bool |
+
+##### SCOPE `CiliumService`
+
+| Name | Remarks | Group Key | Type |
+|-------------|--------------------------------------------------------------------|-----------|--------|
+| name | The service name in Cilium. | | string |
+| layer | The layer in Cilium service. | | string |
+| detectPoint | Where the relation is detected. The value may be client or server. | | enum |
+
+##### SCOPE `CiliumServiceInstance`
+
+| Name | Remarks | Group Key | Type |
+|---------------------|--------------------------------------------------------------------|-----------|--------|
+| serviceName | The service name in Cilium. | | string |
+| serviceInstanceName | The pod name in Cilium. | | string |
+| layer | The layer of Cilium service. | | string |
+| detectPoint | Where the relation is detected. The value may be client or server. | | enum |
+
+##### SCOPE `CiliumServiceRelation`
+
+| Name | Remarks | Group Key | Type |
+|-------------------|--------------------------------------------------------------------|-----------|--------|
+| sourceServiceName | The source service name in Cilium. | | string |
+| sourceLayer | The source layer service in Cilium. | | string |
+| detectPoint | Where the relation is detected. The value may be client or server. | | enum |
+| componentId | The ID of component used in this call. | | int |
+| destServiceName | The dest service name in Cilium. | | string |
+| destLayer | The dest layer service in Cilium. | | string |
+
+##### SCOPE `CiliumServiceInstanceRelation`
+
+| Name | Remarks | Group Key | Type |
+|---------------------------|--------------------------------------------------------------------|-----------|--------|
+| sourceServiceName | The source service name in Cilium. | | string |
+| sourceServiceInstanceName | The source pod name in Cilium. | | string |
+| sourceLayer | The source layer service in Cilium. | | string |
+| detectPoint | Where the relation is detected. The value may be client or server. | | enum |
+| componentId | The ID of component used in this call. | | int |
+| destServiceName | The dest service name in Cilium. | | string |
+| destServiceInstanceName | The dest pod name in Cilium. | | string |
+| destLayer | The dest layer service in Cilium. | | string |
+
+#### Endpoint and Endpoint Relation
+
+For `CiliumEndpoint` and `CiliumEndpointRelation`, they have all the fields of **L4**/**L7** metric contents, but the `type` only would be `http`, `dns` or `kafka`.
+
+##### SCOPE `CiliumEndpoint`
+
+| Name | Remarks | Group Key | Type |
+|--------------|---------------------------------------------------------|-----------|--------|
+| serviceName | The service name in Cilium. | | string |
+| layer | The layer in Cilium service. | | string |
+| endpointName | The endpoint name detect in Cilium service. | | string |
+
+##### SCOPE `CiliumEndpointRelation`
+
+| Name | Remarks | Group Key | Type |
+|--------------------|--------------------------------------------------------------------|-----------|--------|
+| sourceServiceName | The source service name in Cilium. | | string |
+| sourceLayer | The layer in Cilium source service. | | enum |
+| sourceEndpointName | The endpoint name detect in Cilium source service. | | string |
+| detectPoint | Where the relation is detected. The value may be client or server. | | enum |
+| componentId | The ID of component used in this call. | | int |
+| destServiceName | The dest service name in Cilium. | | string |
+| destLayer | The layer in Cilium dest service. | | enum |
+| destEndpointName | The endpoint name detect in Cilium dest service. | | string |
+
+
diff --git a/docs/en/guides/How-to-build.md b/docs/en/guides/How-to-build.md
index bb3862f90761..ed2200193f54 100644
--- a/docs/en/guides/How-to-build.md
+++ b/docs/en/guides/How-to-build.md
@@ -84,4 +84,5 @@ Refer to [Build docker image](../../../docker) for more details.
* `grpc-java` and `java` folders in **oap-server/exporter/target/generated-sources/protobuf**
* `grpc-java` and `java` folders in **oap-server/server-configuration/grpc-configuration-sync/target/generated-sources/protobuf**
* `grpc-java` and `java` folders in **oap-server/server-alarm-plugin/target/generated-sources/protobuf**
+ * `grpc-java` and `java` folders in **oap-server/server-fetcher-plugin/fetcher-proto/target/generated-sources/protobuf**
* `antlr4` folder in **oap-server/oal-grammar/target/generated-sources**
diff --git a/docs/en/setup/backend/backend-k8s-monitoring-cilium.md b/docs/en/setup/backend/backend-k8s-monitoring-cilium.md
new file mode 100644
index 000000000000..4dcfd7634189
--- /dev/null
+++ b/docs/en/setup/backend/backend-k8s-monitoring-cilium.md
@@ -0,0 +1,108 @@
+# Kubernetes (K8s) monitoring from Rover
+
+SkyWalking uses the Cilium Fetcher to gather traffic data between services from Cilium Hubble via the Observe API. It then leverages the [OAL System](./../../concepts-and-designs/oal.md) for metrics and entity analysis.
+
+## Data flow
+
+SkyWalking fetches Cilium Node and Observability Data from gRPC API, analysis to generate entity and using [OAL](./../../concepts-and-designs/oal.md) to generating metrics.
+
+## API Requirements
+
+1. [Peers API](https://github.com/cilium/cilium/blob/main/api/v1/peer/peer_grpc.pb.go#L33-L39): Listen the hubble node in the cluster, OAP would communicate with Hubble node to obtain Observe data.
+2. [Observe API](https://github.com/cilium/cilium/blob/main/api/v1/observer/observer_grpc.pb.go#L41): Fetch the Flow data from Hubble node.
+
+## Setup
+1. Please following the [Setup Hubble Observability documentation](https://docs.cilium.io/en/stable/gettingstarted/hubble_setup/) to setting the Hubble for provided API.
+2. To activate Cilium receiver module, set `selector=default` in the YAML or `set SW_CILIUM_FETCHER=default` through the system environment variable.
+```yaml
+cilium-fetcher:
+ selector: ${SW_CILIUM_FETCHER:default}
+ default:
+ # Host name and port of Hubble peer component
+ peerHost: ${SW_CILIUM_FETCHER_PEER_HOST:hubble-peer.kube-system.svc.cluster.local}
+ peerPort: ${SW_CILIUM_FETCHER_PEER_PORT:80}
+ fetchFailureRetrySecond: ${SW_CILIUM_FETCHER_FETCH_FAILURE_RETRY_SECOND:10}
+ sslConnection: ${SW_CILIUM_FETCHER_SSL_CONNECTION:false}
+ sslPrivateKeyFile: ${SW_CILIUM_FETCHER_PRIVATE_KEY_FILE_PATH:}
+ sslCertChainFile: ${SW_CILIUM_FETCHER_CERT_CHAIN_FILE_PATH:}
+ sslCaFile: ${SW_CILIUM_FETCHER_CA_FILE_PATH:}
+ convertClientAsServerTraffic: ${SW_CILIUM_FETCHER_CONVERT_CLIENT_AS_SERVER_TRAFFIC:true}
+```
+3. If enabled the [TLS certificate within the Hubble](https://docs.cilium.io/en/stable/gettingstarted/hubble-configuration/#tls-certificates), please update these few configurations.
+ 1. `peerPort`: usually should be updated to the `443`.
+ 2. `sslConnection`: should be set to `true`.
+ 3. `sslPrivateKeyFile`: the path of the private key file.
+ 4. `sslCertChainFile`: the path of the certificate chain file.
+ 5. `sslCaFile`: the path of the CA file.
+
+## Generated Entities
+
+SkyWalking fetch the flow from Cilium, analyzes the source and destination endpoint to parse out the following corresponding entities:
+1. Service
+2. Service Instance
+3. Service Endpoint
+4. Service Relation
+5. Service Instance Relation
+6. Service Endpoint Relation
+
+## Generate Metrics
+
+For each of the above-mentioned entities, metrics such as L4 and L7 protocols can be analyzed.
+
+### L4 Metrics
+
+Record the relevant metrics for every service read/write packages with other services.
+
+| Name | Unit | Description |
+|---------------------------|---------------|---------------------------------------------------------------------------|
+| Read Package CPM | Count | Total Read Package from other Service counts per minutes. |
+| Write Package CPM | Count | Total Write Package from other Service counts per minutes. |
+| Drop Package CPM | Count | Total Drop Package from other Service counts per minutes. |
+| Drop Package Reason Count | Labeled Count | Total Read Package reason(labeled) from other Service counts per minutes. |
+
+### Protocol
+
+Based on each transfer data analysis, extract the information of the 7-layer network protocol.
+
+NOTE: By default, Cilium only reports L4 metrics. If you need L7 metrics,
+they must be explicitly specified in each service's CiliumNetworkPolicy. For details please [refer to this document](https://docs.cilium.io/en/latest/security/).
+
+#### HTTP
+
+| Name | Unit | Description |
+|--------------------|-------------|---------------------------------------------------------|
+| CPM | Count | HTTP Request calls per minutes. |
+| Duration | Nanoseconds | Total HTTP Response use duration. |
+| Success CPM | Count | Total HTTP Response success(status < 500) count. |
+| Status 1/2/3/4/5xx | Count | HTTP Response status code group by 1xx/2xx/3xx/4xx/5xx. |
+
+#### DNS
+
+| Name | Unit | Description |
+|-------------|-------------|--------------------------------------------------------|
+| CPM | Count | DNS Request calls per minutes. |
+| Duration | Nanoseconds | Total DNS Response use duration. |
+| Success CPM | Count | Total DNS Response success(code == 0) count. |
+| Error Count | Label Count | DNS Response error count with error description label. |
+
+#### Kafka
+
+| Name | Unit | Description |
+|-------------|-------------|----------------------------------------------------------|
+| CPM | Count | Kafka Request calls per minutes. |
+| Duration | Nanoseconds | Total Kafka Response use duration. |
+| Success CPM | Count | Total Kafka Response success(errorCode == 0) count. |
+| Error Count | Label Count | Kafka Response error count with error description label. |
+
+## Load Balance for Cilium Fetcher with OAP cluster
+
+The Cilium Fetcher module relies on the Cluster module, when the Cilium Fetcher module starts up,
+it obtains information about all Cilium nodes and node information in the OAP cluster through Peers API on each OAP node.
+
+Additionally, it averagely distributes collected Cilium nodes to every OAP node.
+Moreover, it ensures that a single Cilium node is not monitored by multiple OAP nodes.
+
+## Customizations
+You can customize your own metrics/dashboard panel.
+The metrics definition and expression rules are found in `/config/oal/cilium.oal`, please refer the [Scope Declaration Documentation](../../concepts-and-designs/scope-definitions.md#scopes-with-cilium-prefix).
+The Cilium dashboard panel configurations are found in `/config/ui-initialized-templates/cilium_service`.
diff --git a/docs/en/setup/backend/backend-k8s-monitoring.md b/docs/en/setup/backend/backend-k8s-monitoring.md
index b483ba006fbd..3c94a445a4f1 100644
--- a/docs/en/setup/backend/backend-k8s-monitoring.md
+++ b/docs/en/setup/backend/backend-k8s-monitoring.md
@@ -13,6 +13,9 @@ ways to monitor deployments on Kubernetes.
2. Rover is a SkyWalking native eBPF agent to collect network Access Logs to support topology-aware and metrics
analysis. Meanwhile, due to the power of eBPF, it could profile running services written by C++, Rust, Golang, etc.
Read [Rover setup guide](./backend-k8s-monitoring-rover.md) for more details.
+3. If Cilium is installed in Kubernetes, use Cilium Fetcher to collect network traffic data of services through Cilium Hubble APIs.
+ This data can be used to create topology maps and to provide L4 and L7 layer metrics.
+ Read [Cilium Fetcher setup guide](./backend-k8s-monitoring-cilium.md) for more details.
SkyWalking deeply integrates with Kubernetes to help users understand the status of their applications on Kubernetes.
Cillium with Hubble is in our v10 plan.
\ No newline at end of file
diff --git a/docs/en/setup/backend/configuration-vocabulary.md b/docs/en/setup/backend/configuration-vocabulary.md
index e721922c5260..8f89debe52a2 100644
--- a/docs/en/setup/backend/configuration-vocabulary.md
+++ b/docs/en/setup/backend/configuration-vocabulary.md
@@ -241,6 +241,15 @@ The Configuration Vocabulary lists all available configurations provided by `app
| - | - | topicNameOfManagements | Kafka topic name for service instance reporting and registration. | - | skywalking-managements |
| - | - | topicNameOfLogs | Kafka topic name for native proto log data. | - | skywalking-logs |
| - | - | topicNameOfJsonLogs | Kafka topic name for native json log data. | - | skywalking-logs-json |
+| cilium-fetcher | default | Read Cilium Observe protocol data to collect Cilium Service status. | - | - | |
+| - | - | peerHost | The Cilium Peer Service Host. | SW_CILIUM_FETCHER_PEER_HOST | hubble-peer.kube-system.svc.cluster.local |
+| - | - | peerPort | The Cilium Peer Service Port. | SW_CILIUM_FETCHER_PEER_PORT | 80 |
+| - | - | fetchFailureRetrySecond | The Cilium fetch observe data failure retry interval(second). | SW_CILIUM_FETCHER_FETCH_FAILURE_RETRY_SECOND | 10 |
+| - | - | sslConnection | The Cilium fetch protocol is TLS enabled or not. | eSW_CILIUM_FETCHER_SSL_CONNECTION | false |
+| - | - | sslPrivateKeyFile | The Cilium TLS fetch private key file path. | SW_CILIUM_FETCHER_PRIVATE_KEY_FILE_PATH | "" |
+| - | - | sslCertChainFile | The Cilium TLS fetch cert chain file path. | SW_CILIUM_FETCHER_CERT_CHAIN_FILE_PATH | "" |
+| - | - | sslCaFile | The Cilium TLS fetch rot CA Certification file path. | SW_CILIUM_FETCHER_CA_FILE_PATH | "" |
+| - | - | convertClientAsServerTraffic | The Cilium flow data should convert client to the server side not. If convert, then the server side flow would be ignored. | SW_CILIUM_FETCHER_CONVERT_CLIENT_AS_SERVER_TRAFFIC | true |
| receiver-browser | default | gRPC services that accept browser performance data and error log. | - | - | - |
| - | - | sampleRate | Sampling rate for receiving trace. Precise to 1/10000. 10000 means sampling rate of 100% by default. | SW_RECEIVER_BROWSER_SAMPLE_RATE | 10000 |
| query | graphql | - | GraphQL query implementation. | - | |
diff --git a/docs/menu.yml b/docs/menu.yml
index 005d5fdd25cf..7e9a4ccbcada 100644
--- a/docs/menu.yml
+++ b/docs/menu.yml
@@ -82,6 +82,8 @@ catalog:
path: "/en/setup/backend/backend-k8s-monitoring-rover"
- name: "Profile Pod's Network"
path: "/en/setup/backend/backend-k8s-network-monitoring"
+ - name: "Cilium"
+ path: "/en/setup/backend/backend-k8s-monitoring-cilium"
- name: "Infrastructure Monitoring"
catalog:
- name: "Linux Monitoring"
diff --git a/oap-server-bom/pom.xml b/oap-server-bom/pom.xml
index 4076469305dd..7a8d5571e2f4 100644
--- a/oap-server-bom/pom.xml
+++ b/oap-server-bom/pom.xml
@@ -552,6 +552,11 @@
+
+ com.linecorp.armeria
+ armeria-grpc
+ ${armeria.version}
+ org.apache.httpcomponentshttpcore
diff --git a/oap-server/oal-grammar/src/main/antlr4/org/apache/skywalking/oal/rt/grammar/OALLexer.g4 b/oap-server/oal-grammar/src/main/antlr4/org/apache/skywalking/oal/rt/grammar/OALLexer.g4
index 6139cd181d3d..ea6eb5612c7a 100644
--- a/oap-server/oal-grammar/src/main/antlr4/org/apache/skywalking/oal/rt/grammar/OALLexer.g4
+++ b/oap-server/oal-grammar/src/main/antlr4/org/apache/skywalking/oal/rt/grammar/OALLexer.g4
@@ -58,6 +58,12 @@ SRC_K8S_ENDPOINT: 'K8SEndpoint';
SRC_K8S_SERVICE_RELATION: 'K8SServiceRelation';
SRC_K8S_SERVICE_INSTANCE_RELATION: 'K8SServiceInstanceRelation';
SRC_K8S_ENDPOINT_RELATION: 'K8SEndpointRelation';
+SRC_CILIUM_SERVICE: 'CiliumService';
+SRC_CILIUM_SERVICE_INSTANCE: 'CiliumServiceInstance';
+SRC_CILIUM_ENDPOINT: 'CiliumEndpoint';
+SRC_CILIUM_SERVICE_RELATION: 'CiliumServiceRelation';
+SRC_CILIUM_SERVICE_INSTANCE_RELATION: 'CiliumServiceInstanceRelation';
+SRC_CILIUM_ENDPOINT_RELATION: 'CiliumEndpointRelation';
// Browser keywords
diff --git a/oap-server/oal-grammar/src/main/antlr4/org/apache/skywalking/oal/rt/grammar/OALParser.g4 b/oap-server/oal-grammar/src/main/antlr4/org/apache/skywalking/oal/rt/grammar/OALParser.g4
index fdab8d4ff0cd..153d0c1f8462 100644
--- a/oap-server/oal-grammar/src/main/antlr4/org/apache/skywalking/oal/rt/grammar/OALParser.g4
+++ b/oap-server/oal-grammar/src/main/antlr4/org/apache/skywalking/oal/rt/grammar/OALParser.g4
@@ -58,7 +58,8 @@ source
SRC_BROWSER_APP_PERF | SRC_BROWSER_APP_PAGE_PERF | SRC_BROWSER_APP_SINGLE_VERSION_PERF |
SRC_BROWSER_APP_TRAFFIC | SRC_BROWSER_APP_PAGE_TRAFFIC | SRC_BROWSER_APP_SINGLE_VERSION_TRAFFIC |
SRC_EVENT | SRC_MQ_ACCESS | SRC_MQ_ENDPOINT_ACCESS |
- SRC_K8S_SERVICE | SRC_K8S_SERVICE_INSTANCE | SRC_K8S_ENDPOINT | SRC_K8S_SERVICE_RELATION | SRC_K8S_SERVICE_INSTANCE_RELATION | SRC_K8S_ENDPOINT_RELATION
+ SRC_K8S_SERVICE | SRC_K8S_SERVICE_INSTANCE | SRC_K8S_ENDPOINT | SRC_K8S_SERVICE_RELATION | SRC_K8S_SERVICE_INSTANCE_RELATION | SRC_K8S_ENDPOINT_RELATION |
+ SRC_CILIUM_SERVICE | SRC_CILIUM_SERVICE_INSTANCE | SRC_CILIUM_ENDPOINT | SRC_CILIUM_SERVICE_RELATION | SRC_CILIUM_SERVICE_INSTANCE_RELATION | SRC_CILIUM_ENDPOINT_RELATION
;
disableSource
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/Layer.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/Layer.java
index fd77ac782f74..0f9b6b8f80ec 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/Layer.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/Layer.java
@@ -228,7 +228,13 @@ public enum Layer {
/**
* ActiveMQ is a popular open source, multi-protocol, Java-based message broker.
*/
- ACTIVEMQ(37, true);
+ ACTIVEMQ(37, true),
+
+ /**
+ * Cilium is open source software for providing and transparently securing network connectivity and load balancing
+ * between application workloads such as application containers or processes.
+ */
+ CILIUM_SERVICE(38, true);
private final int value;
/**
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/endpoint/HubbleEndpointDispatcher.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/endpoint/HubbleEndpointDispatcher.java
new file mode 100644
index 000000000000..d7abf0898b3a
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/endpoint/HubbleEndpointDispatcher.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.manual.endpoint;
+
+import org.apache.skywalking.oap.server.core.analysis.SourceDispatcher;
+import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor;
+import org.apache.skywalking.oap.server.core.source.CiliumEndpoint;
+
+public class HubbleEndpointDispatcher implements SourceDispatcher {
+ @Override
+ public void dispatch(CiliumEndpoint source) {
+ final EndpointTraffic traffic = new EndpointTraffic();
+ traffic.setTimeBucket(source.getTimeBucket());
+ traffic.setName(source.getEndpointName());
+ traffic.setServiceId(source.getServiceId());
+ MetricsStreamProcessor.getInstance().in(traffic);
+ }
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/instance/HubbleInstanceTrafficDispatcher.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/instance/HubbleInstanceTrafficDispatcher.java
new file mode 100644
index 000000000000..5273f68b33fa
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/instance/HubbleInstanceTrafficDispatcher.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.manual.instance;
+
+import org.apache.skywalking.oap.server.core.analysis.SourceDispatcher;
+import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor;
+import org.apache.skywalking.oap.server.core.source.CiliumServiceInstance;
+
+public class HubbleInstanceTrafficDispatcher implements SourceDispatcher {
+ @Override
+ public void dispatch(CiliumServiceInstance source) {
+ final InstanceTraffic traffic = new InstanceTraffic();
+ traffic.setTimeBucket(source.getTimeBucket());
+ traffic.setName(source.getServiceInstanceName());
+ traffic.setServiceId(source.getServiceId());
+ traffic.setLastPingTimestamp(source.getTimeBucket());
+ MetricsStreamProcessor.getInstance().in(traffic);
+ }
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/relation/endpoint/HubbleEndpointCallRelationDispatcher.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/relation/endpoint/HubbleEndpointCallRelationDispatcher.java
new file mode 100644
index 000000000000..99a693184362
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/relation/endpoint/HubbleEndpointCallRelationDispatcher.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.manual.relation.endpoint;
+
+import org.apache.skywalking.oap.server.core.analysis.SourceDispatcher;
+import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor;
+import org.apache.skywalking.oap.server.core.source.CiliumEndpointRelation;
+
+public class HubbleEndpointCallRelationDispatcher implements SourceDispatcher {
+ @Override
+ public void dispatch(CiliumEndpointRelation source) {
+ switch (source.getDetectPoint()) {
+ case SERVER:
+ serverSide(source);
+ break;
+ default:
+ }
+ }
+
+ private void serverSide(CiliumEndpointRelation source) {
+ EndpointRelationServerSideMetrics metrics = new EndpointRelationServerSideMetrics();
+ metrics.setTimeBucket(source.getTimeBucket());
+ metrics.setSourceEndpoint(source.getSourceEndpointId());
+ metrics.setDestEndpoint(source.getDestEndpointId());
+ metrics.setComponentId(source.getComponentId());
+ metrics.setEntityId(source.getEntityId());
+ MetricsStreamProcessor.getInstance().in(metrics);
+ }
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/relation/instance/HubbleServiceInstanceCallRelationDispatcher.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/relation/instance/HubbleServiceInstanceCallRelationDispatcher.java
new file mode 100644
index 000000000000..853065966185
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/relation/instance/HubbleServiceInstanceCallRelationDispatcher.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.manual.relation.instance;
+
+import org.apache.skywalking.oap.server.core.analysis.SourceDispatcher;
+import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor;
+import org.apache.skywalking.oap.server.core.source.CiliumServiceInstanceRelation;
+
+public class HubbleServiceInstanceCallRelationDispatcher implements SourceDispatcher {
+ @Override
+ public void dispatch(CiliumServiceInstanceRelation source) {
+ switch (source.getDetectPoint()) {
+ case SERVER:
+ serverSide(source);
+ break;
+ case CLIENT:
+ clientSide(source);
+ break;
+ }
+ }
+
+ private void serverSide(CiliumServiceInstanceRelation source) {
+ ServiceInstanceRelationServerSideMetrics metrics = new ServiceInstanceRelationServerSideMetrics();
+ metrics.setTimeBucket(source.getTimeBucket());
+ metrics.setSourceServiceId(source.getSourceServiceId());
+ metrics.setSourceServiceInstanceId(source.getSourceServiceInstanceId());
+ metrics.setDestServiceId(source.getDestServiceId());
+ metrics.setDestServiceInstanceId(source.getDestServiceInstanceId());
+ metrics.setEntityId(source.getEntityId());
+ MetricsStreamProcessor.getInstance().in(metrics);
+ }
+
+ private void clientSide(CiliumServiceInstanceRelation source) {
+ ServiceInstanceRelationClientSideMetrics metrics = new ServiceInstanceRelationClientSideMetrics();
+ metrics.setTimeBucket(source.getTimeBucket());
+ metrics.setSourceServiceId(source.getSourceServiceId());
+ metrics.setSourceServiceInstanceId(source.getSourceServiceInstanceId());
+ metrics.setDestServiceId(source.getDestServiceId());
+ metrics.setDestServiceInstanceId(source.getDestServiceInstanceId());
+ metrics.setEntityId(source.getEntityId());
+ MetricsStreamProcessor.getInstance().in(metrics);
+ }
+
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/relation/service/HubbleServiceCallRelationDispatcher.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/relation/service/HubbleServiceCallRelationDispatcher.java
new file mode 100644
index 000000000000..4be0b2a379c6
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/relation/service/HubbleServiceCallRelationDispatcher.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.manual.relation.service;
+
+import org.apache.skywalking.oap.server.core.analysis.SourceDispatcher;
+import org.apache.skywalking.oap.server.core.analysis.metrics.IntList;
+import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor;
+import org.apache.skywalking.oap.server.core.source.CiliumServiceRelation;
+
+public class HubbleServiceCallRelationDispatcher implements SourceDispatcher {
+ @Override
+ public void dispatch(CiliumServiceRelation source) {
+ switch (source.getDetectPoint()) {
+ case SERVER:
+ serverSide(source);
+ break;
+ case CLIENT:
+ clientSide(source);
+ break;
+ }
+ }
+
+ private void serverSide(CiliumServiceRelation source) {
+ final ServiceRelationServerSideMetrics metrics = new ServiceRelationServerSideMetrics();
+ metrics.setTimeBucket(source.getTimeBucket());
+ metrics.setSourceServiceId(source.getSourceServiceId());
+ metrics.setDestServiceId(source.getDestServiceId());
+ if (source.getComponentId() != 0) {
+ final IntList componentIds = metrics.getComponentIds();
+ componentIds.add(source.getComponentId());
+ }
+ metrics.setEntityId(source.getEntityId());
+ MetricsStreamProcessor.getInstance().in(metrics);
+ }
+
+ private void clientSide(CiliumServiceRelation source) {
+ ServiceRelationClientSideMetrics metrics = new ServiceRelationClientSideMetrics();
+ metrics.setTimeBucket(source.getTimeBucket());
+ metrics.setSourceServiceId(source.getSourceServiceId());
+ metrics.setDestServiceId(source.getDestServiceId());
+ if (source.getComponentId() != 0) {
+ final IntList componentIds = metrics.getComponentIds();
+ componentIds.add(source.getComponentId());
+ }
+ metrics.setEntityId(source.getEntityId());
+ MetricsStreamProcessor.getInstance().in(metrics);
+ }
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/service/HubbleServiceTrafficDispatcher.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/service/HubbleServiceTrafficDispatcher.java
new file mode 100644
index 000000000000..57f17780c2ed
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/service/HubbleServiceTrafficDispatcher.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.manual.service;
+
+import org.apache.skywalking.oap.server.core.analysis.SourceDispatcher;
+import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor;
+import org.apache.skywalking.oap.server.core.source.CiliumService;
+
+public class HubbleServiceTrafficDispatcher implements SourceDispatcher {
+ @Override
+ public void dispatch(CiliumService source) {
+ final ServiceTraffic traffic = new ServiceTraffic();
+ traffic.setTimeBucket(source.getTimeBucket());
+ traffic.setName(source.getServiceName());
+ traffic.setLayer(source.getLayer());
+ MetricsStreamProcessor.getInstance().in(traffic);
+ }
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/metrics/LabelCountMetrics.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/metrics/LabelCountMetrics.java
new file mode 100644
index 000000000000..d433d9a14ea3
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/metrics/LabelCountMetrics.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.metrics;
+
+import lombok.Getter;
+import lombok.Setter;
+import org.apache.skywalking.oap.server.core.analysis.metrics.annotation.Arg;
+import org.apache.skywalking.oap.server.core.analysis.metrics.annotation.ConstOne;
+import org.apache.skywalking.oap.server.core.analysis.metrics.annotation.Entrance;
+import org.apache.skywalking.oap.server.core.analysis.metrics.annotation.MetricsFunction;
+import org.apache.skywalking.oap.server.core.storage.annotation.BanyanDB;
+import org.apache.skywalking.oap.server.core.storage.annotation.Column;
+import org.apache.skywalking.oap.server.core.storage.annotation.ElasticSearch;
+
+@MetricsFunction(functionName = "labelCount")
+public abstract class LabelCountMetrics extends Metrics implements LabeledValueHolder {
+ protected static final String DATASET = "dataset";
+ protected static final String VALUE = "datatable_value";
+
+ protected static final String LABEL_NAME = "n";
+
+ @Getter
+ @Setter
+ @Column(name = DATASET, storageOnly = true)
+ @BanyanDB.MeasureField
+ private DataTable dataset;
+
+ @Getter
+ @Setter
+ @Column(name = VALUE, dataType = Column.ValueDataType.LABELED_VALUE, storageOnly = true)
+ @ElasticSearch.Column(legacyName = "value")
+ @BanyanDB.MeasureField
+ private DataTable value;
+
+ private boolean isCalculated;
+
+ public LabelCountMetrics() {
+ this.dataset = new DataTable(30);
+ this.value = new DataTable(30);
+ }
+
+ @Entrance
+ public final void combine(@Arg String label, @ConstOne long count) {
+ this.isCalculated = false;
+ this.dataset.valueAccumulation(label, count);
+ }
+
+ @Override
+ public boolean combine(Metrics metrics) {
+ this.isCalculated = false;
+ final LabelCountMetrics labelCountMetrics = (LabelCountMetrics) metrics;
+ this.dataset.append(labelCountMetrics.dataset);
+ return true;
+ }
+
+ @Override
+ public void calculate() {
+ if (isCalculated) {
+ return;
+ }
+
+ // convert dataset to labeled value
+ for (String key : this.dataset.keys()) {
+ final DataLabel label = new DataLabel();
+ label.put(LABEL_NAME, key);
+ this.value.put(label, this.dataset.get(key));
+ }
+ }
+
+ @Override
+ public DataTable getValue() {
+ return this.value;
+ }
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/management/ui/template/UITemplateInitializer.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/management/ui/template/UITemplateInitializer.java
index b37c5828e9df..7c11cee66a6b 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/management/ui/template/UITemplateInitializer.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/management/ui/template/UITemplateInitializer.java
@@ -75,6 +75,7 @@ public class UITemplateInitializer {
Layer.ROCKETMQ.name(),
Layer.CLICKHOUSE.name(),
Layer.ACTIVEMQ.name(),
+ Layer.CILIUM_SERVICE.name(),
"custom"
};
private final UITemplateManagementService uiTemplateManagementService;
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/CiliumEndpoint.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/CiliumEndpoint.java
new file mode 100644
index 000000000000..2b2b9762caf1
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/CiliumEndpoint.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.source;
+
+import lombok.Data;
+import org.apache.skywalking.oap.server.core.analysis.IDManager;
+import org.apache.skywalking.oap.server.core.analysis.Layer;
+
+import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.ENDPOINT_CATALOG_NAME;
+import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.CILIUM_ENDPOINT;
+
+@Data
+@ScopeDeclaration(id = CILIUM_ENDPOINT, name = "CiliumEndpoint", catalog = ENDPOINT_CATALOG_NAME)
+@ScopeDefaultColumn.VirtualColumnDefinition(fieldName = "entityId", columnName = "entity_id", isID = true, type = String.class)
+public class CiliumEndpoint extends CiliumMetrics {
+ private volatile String entityId;
+
+ private String serviceId;
+ private String serviceName;
+ private String endpointName;
+ public Layer layer;
+
+ @Override
+ public int scope() {
+ return CILIUM_ENDPOINT;
+ }
+
+ @Override
+ public void prepare() {
+ serviceId = IDManager.ServiceID.buildId(serviceName, layer.isNormal());
+ entityId = IDManager.EndpointID.buildId(serviceId, endpointName);
+ }
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/CiliumEndpointRelation.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/CiliumEndpointRelation.java
new file mode 100644
index 000000000000..62c5814671a1
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/CiliumEndpointRelation.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.source;
+
+import lombok.Data;
+import org.apache.skywalking.oap.server.core.analysis.IDManager;
+import org.apache.skywalking.oap.server.core.analysis.Layer;
+
+import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.ENDPOINT_RELATION_CATALOG_NAME;
+import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.CILIUM_ENDPOINT_REALATION;
+
+@Data
+@ScopeDeclaration(id = CILIUM_ENDPOINT_REALATION, name = "CiliumEndpointRelation", catalog = ENDPOINT_RELATION_CATALOG_NAME)
+@ScopeDefaultColumn.VirtualColumnDefinition(fieldName = "entityId", columnName = "entity_id", isID = true, type = String.class)
+public class CiliumEndpointRelation extends CiliumMetrics {
+ private volatile String entityId;
+
+ private String sourceServiceId;
+ private String sourceServiceName;
+ private String sourceEndpointId;
+ private String sourceEndpointName;
+ private Layer sourceLayer;
+
+ private DetectPoint detectPoint;
+ private int componentId;
+
+ private String destServiceId;
+ private String destServiceName;
+ private String destEndpointId;
+ private String destEndpointName;
+ private Layer destLayer;
+
+ private boolean success;
+ private long duration;
+
+ @Override
+ public int scope() {
+ return CILIUM_ENDPOINT_REALATION;
+ }
+
+ @Override
+ public void prepare() {
+ sourceServiceId = IDManager.ServiceID.buildId(sourceServiceName, sourceLayer.isNormal());
+ sourceEndpointId = IDManager.EndpointID.buildId(sourceServiceId, sourceEndpointName);
+ destServiceId = IDManager.ServiceID.buildId(destServiceName, destLayer.isNormal());
+ destEndpointId = IDManager.EndpointID.buildId(destServiceId, destEndpointName);
+
+ entityId = IDManager.EndpointID.buildRelationId(new IDManager.EndpointID.EndpointRelationDefine(
+ sourceServiceId, sourceEndpointName, destServiceId, destEndpointName
+ ));
+ }
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/CiliumMetrics.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/CiliumMetrics.java
new file mode 100644
index 000000000000..8f2368dbefb2
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/CiliumMetrics.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.source;
+
+import lombok.Data;
+
+@Data
+public abstract class CiliumMetrics extends Source {
+ public static final String VERDICT_FORWARDED = "forwarded";
+ public static final String VERDICT_DROPPED = "dropped";
+
+ public static final String TYPE_TCP = "tcp";
+ public static final String TYPE_HTTP = "http";
+ public static final String TYPE_DNS = "dns";
+ public static final String TYPE_KAFKA = "kafka";
+
+ public static final String DIRECTION_INGRESS = "ingress";
+ public static final String DIRECTION_EGRESS = "egress";
+
+ // Basic information
+ private String verdict;
+ private String type;
+ private String direction;
+
+ // For Dropped Package Reason
+ private String dropReason;
+
+ // For L7 metrics
+ private HTTPMetrics http;
+ private KafkaMetrics kafka;
+ private DNSMetrics dns;
+ private long duration;
+ private boolean success;
+
+ @Data
+ public static class HTTPMetrics {
+ private String url;
+ private int code;
+ private String protocol;
+ private String method;
+ }
+
+ @Data
+ public static class KafkaMetrics {
+ private int errorCode;
+ private String errorCodeString;
+ private int apiVersion;
+ private String apiKey;
+ private int correlationId;
+ private String topic;
+ }
+
+ @Data
+ public static class DNSMetrics {
+ private String domain;
+ private String queryType;
+ private int rcode;
+ private String rcodeString;
+ private int ttl;
+ private int ipCount;
+ }
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/CiliumService.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/CiliumService.java
new file mode 100644
index 000000000000..7518694142f4
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/CiliumService.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.source;
+
+import lombok.Data;
+import org.apache.skywalking.oap.server.core.analysis.IDManager;
+import org.apache.skywalking.oap.server.core.analysis.Layer;
+
+import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.CILIUM_SERVICE;
+import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.SERVICE_CATALOG_NAME;
+
+@Data
+@ScopeDeclaration(id = CILIUM_SERVICE, name = "CiliumService", catalog = SERVICE_CATALOG_NAME)
+@ScopeDefaultColumn.VirtualColumnDefinition(fieldName = "entityId", columnName = "entity_id", isID = true, type = String.class)
+public class CiliumService extends CiliumMetrics {
+ private volatile String entityId;
+
+ private String serviceName;
+ public Layer layer;
+
+ private DetectPoint detectPoint;
+
+ @Override
+ public int scope() {
+ return CILIUM_SERVICE;
+ }
+
+ @Override
+ public void prepare() {
+ entityId = IDManager.ServiceID.buildId(serviceName, layer.isNormal());
+ }
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/CiliumServiceInstance.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/CiliumServiceInstance.java
new file mode 100644
index 000000000000..eb59938cbf98
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/CiliumServiceInstance.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.source;
+
+import lombok.Data;
+import org.apache.skywalking.oap.server.core.analysis.IDManager;
+import org.apache.skywalking.oap.server.core.analysis.Layer;
+
+import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.CILIUM_SERVICE_INSTANCE;
+import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.SERVICE_INSTANCE_CATALOG_NAME;
+
+@Data
+@ScopeDeclaration(id = CILIUM_SERVICE_INSTANCE, name = "CiliumServiceInstance", catalog = SERVICE_INSTANCE_CATALOG_NAME)
+@ScopeDefaultColumn.VirtualColumnDefinition(fieldName = "entityId", columnName = "entity_id", isID = true, type = String.class)
+public class CiliumServiceInstance extends CiliumMetrics {
+ private volatile String entityId;
+
+ private String serviceId;
+ private String serviceName;
+ private String serviceInstanceName;
+ public Layer layer;
+
+ private DetectPoint detectPoint;
+
+ @Override
+ public int scope() {
+ return CILIUM_SERVICE_INSTANCE;
+ }
+
+ @Override
+ public void prepare() {
+ serviceId = IDManager.ServiceID.buildId(serviceName, layer.isNormal());
+ entityId = IDManager.ServiceInstanceID.buildId(serviceId, serviceInstanceName);
+ }
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/CiliumServiceInstanceRelation.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/CiliumServiceInstanceRelation.java
new file mode 100644
index 000000000000..ebfc66a3d58b
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/CiliumServiceInstanceRelation.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.source;
+
+import lombok.Data;
+import org.apache.skywalking.oap.server.core.analysis.IDManager;
+import org.apache.skywalking.oap.server.core.analysis.Layer;
+
+import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.CILIUM_SERVICE_INSTANCE_RELATION;
+import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.SERVICE_INSTANCE_RELATION_CATALOG_NAME;
+
+@Data
+@ScopeDeclaration(id = CILIUM_SERVICE_INSTANCE_RELATION, name = "CiliumServiceInstanceRelation", catalog = SERVICE_INSTANCE_RELATION_CATALOG_NAME)
+@ScopeDefaultColumn.VirtualColumnDefinition(fieldName = "entityId", columnName = "entity_id", isID = true, type = String.class)
+public class CiliumServiceInstanceRelation extends CiliumMetrics {
+ private volatile String entityId;
+
+ private String sourceServiceId;
+ private String sourceServiceName;
+ private String sourceServiceInstanceId;
+ private String sourceServiceInstanceName;
+ private Layer sourceLayer;
+
+ private DetectPoint detectPoint;
+ private int componentId;
+
+ private String destServiceId;
+ private String destServiceName;
+ private String destServiceInstanceId;
+ private String destServiceInstanceName;
+ private Layer destLayer;
+
+ @Override
+ public int scope() {
+ return CILIUM_SERVICE_INSTANCE_RELATION;
+ }
+
+ @Override
+ public void prepare() {
+ sourceServiceId = IDManager.ServiceID.buildId(sourceServiceName, sourceLayer.isNormal());
+ sourceServiceInstanceId = IDManager.ServiceInstanceID.buildId(sourceServiceId, sourceServiceInstanceName);
+ destServiceId = IDManager.ServiceID.buildId(destServiceName, destLayer.isNormal());
+ destServiceInstanceId = IDManager.ServiceInstanceID.buildId(destServiceId, destServiceInstanceName);
+
+ entityId = IDManager.ServiceInstanceID.buildRelationId(
+ new IDManager.ServiceInstanceID.ServiceInstanceRelationDefine(
+ sourceServiceInstanceId,
+ destServiceInstanceId
+ )
+ );
+ }
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/CiliumServiceRelation.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/CiliumServiceRelation.java
new file mode 100644
index 000000000000..a6d02b306955
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/CiliumServiceRelation.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.source;
+
+import lombok.Data;
+import org.apache.skywalking.oap.server.core.analysis.IDManager;
+import org.apache.skywalking.oap.server.core.analysis.Layer;
+
+import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.CILIUM_SERVICE_RELATION;
+import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.SERVICE_RELATION_CATALOG_NAME;
+
+@Data
+@ScopeDeclaration(id = CILIUM_SERVICE_RELATION, name = "CiliumServiceRelation", catalog = SERVICE_RELATION_CATALOG_NAME)
+@ScopeDefaultColumn.VirtualColumnDefinition(fieldName = "entityId", columnName = "entity_id", isID = true, type = String.class)
+public class CiliumServiceRelation extends CiliumMetrics {
+ private volatile String entityId;
+
+ private String sourceServiceId;
+ private String sourceServiceName;
+ private Layer sourceLayer;
+
+ private DetectPoint detectPoint;
+ private int componentId;
+
+ private String destServiceId;
+ private String destServiceName;
+ private Layer destLayer;
+
+ @Override
+ public int scope() {
+ return CILIUM_SERVICE_RELATION;
+ }
+
+ @Override
+ public void prepare() {
+ sourceServiceId = IDManager.ServiceID.buildId(sourceServiceName, sourceLayer.isNormal());
+ destServiceId = IDManager.ServiceID.buildId(destServiceName, destLayer.isNormal());
+
+ entityId = IDManager.ServiceID.buildRelationId(
+ new IDManager.ServiceID.ServiceRelationDefine(
+ sourceServiceId,
+ destServiceId
+ )
+ );
+ }
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/DefaultScopeDefine.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/DefaultScopeDefine.java
index 9529354ca615..b7ea73c6ee8e 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/DefaultScopeDefine.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/DefaultScopeDefine.java
@@ -137,6 +137,13 @@ public class DefaultScopeDefine {
public static final int K8S_ENDPOINT = 76;
public static final int K8S_ENDPOINT_REALATION = 77;
+ public static final int CILIUM_SERVICE = 78;
+ public static final int CILIUM_SERVICE_INSTANCE = 79;
+ public static final int CILIUM_SERVICE_RELATION = 80;
+ public static final int CILIUM_SERVICE_INSTANCE_RELATION = 81;
+ public static final int CILIUM_ENDPOINT = 82;
+ public static final int CILIUM_ENDPOINT_REALATION = 83;
+
/**
* Catalog of scope, the metrics processor could use this to group all generated metrics by oal rt.
*/
diff --git a/oap-server/server-fetcher-plugin/cilium-fetcher-plugin/pom.xml b/oap-server/server-fetcher-plugin/cilium-fetcher-plugin/pom.xml
new file mode 100644
index 000000000000..ed3d60cb9a21
--- /dev/null
+++ b/oap-server/server-fetcher-plugin/cilium-fetcher-plugin/pom.xml
@@ -0,0 +1,53 @@
+
+
+
+
+
+ org.apache.skywalking
+ server-fetcher-plugin
+ 10.1.0-SNAPSHOT
+
+ 4.0.0
+
+ cilium-fetcher-plugin
+ jar
+
+
+
+ org.apache.skywalking
+ fetcher-proto
+ ${project.version}
+
+
+ org.apache.skywalking
+ skywalking-sharing-server-plugin
+ ${project.version}
+
+
+ org.apache.skywalking
+ library-module
+ ${project.version}
+
+
+
+ com.linecorp.armeria
+ armeria-grpc
+
+
+
\ No newline at end of file
diff --git a/oap-server/server-fetcher-plugin/cilium-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/cilium/CiliumFetcherConfig.java b/oap-server/server-fetcher-plugin/cilium-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/cilium/CiliumFetcherConfig.java
new file mode 100644
index 000000000000..bce193600fbb
--- /dev/null
+++ b/oap-server/server-fetcher-plugin/cilium-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/cilium/CiliumFetcherConfig.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.fetcher.cilium;
+
+import lombok.Getter;
+import org.apache.skywalking.oap.server.library.module.ModuleConfig;
+
+@Getter
+public class CiliumFetcherConfig extends ModuleConfig {
+ private String peerHost;
+ private int peerPort;
+ private int fetchFailureRetrySecond;
+ private boolean sslConnection;
+ private String sslPrivateKeyFile;
+ private String sslCertChainFile;
+ private String sslCaFile;
+ private boolean convertClientAsServerTraffic;
+}
diff --git a/oap-server/server-fetcher-plugin/cilium-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/cilium/CiliumFetcherModule.java b/oap-server/server-fetcher-plugin/cilium-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/cilium/CiliumFetcherModule.java
new file mode 100644
index 000000000000..fc20782468d1
--- /dev/null
+++ b/oap-server/server-fetcher-plugin/cilium-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/cilium/CiliumFetcherModule.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.fetcher.cilium;
+
+import org.apache.skywalking.oap.server.library.module.ModuleDefine;
+
+public class CiliumFetcherModule extends ModuleDefine {
+ public static final String NAME = "cilium-fetcher";
+
+ public CiliumFetcherModule() {
+ super(NAME);
+ }
+
+ @Override
+ public Class[] services() {
+ return new Class[0];
+ }
+}
diff --git a/oap-server/server-fetcher-plugin/cilium-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/cilium/CiliumFetcherProvider.java b/oap-server/server-fetcher-plugin/cilium-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/cilium/CiliumFetcherProvider.java
new file mode 100644
index 000000000000..98ef2f9e82a7
--- /dev/null
+++ b/oap-server/server-fetcher-plugin/cilium-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/cilium/CiliumFetcherProvider.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.fetcher.cilium;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.oap.server.core.CoreModule;
+import org.apache.skywalking.oap.server.core.cluster.ClusterModule;
+import org.apache.skywalking.oap.server.core.oal.rt.OALEngineLoaderService;
+import org.apache.skywalking.oap.server.fetcher.cilium.handler.CiliumFlowListener;
+import org.apache.skywalking.oap.server.fetcher.cilium.handler.ServiceMetadata;
+import org.apache.skywalking.oap.server.fetcher.cilium.nodes.CiliumNodeManager;
+import org.apache.skywalking.oap.server.fetcher.cilium.nodes.GrpcStubBuilder;
+import org.apache.skywalking.oap.server.library.module.ModuleDefine;
+import org.apache.skywalking.oap.server.library.module.ModuleProvider;
+import org.apache.skywalking.oap.server.library.module.ModuleStartException;
+import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
+import org.apache.skywalking.oap.server.library.util.FieldsHelper;
+
+@Slf4j
+public class CiliumFetcherProvider extends ModuleProvider {
+ private CiliumFetcherConfig config;
+
+ protected String fieldMappingFile = "metadata-service-mapping.yaml";
+
+ @Override
+ public String name() {
+ return "default";
+ }
+
+ @Override
+ public Class extends ModuleDefine> module() {
+ return CiliumFetcherModule.class;
+ }
+
+ @Override
+ public ConfigCreator newConfigCreator() {
+ return new ConfigCreator() {
+ @Override
+ public Class type() {
+ return CiliumFetcherConfig.class;
+ }
+
+ @Override
+ public void onInitialized(CiliumFetcherConfig initialized) {
+ config = initialized;
+ }
+ };
+ }
+
+ @Override
+ public void prepare() throws ServiceNotProvidedException, ModuleStartException {
+ }
+
+ @Override
+ public void start() throws ServiceNotProvidedException, ModuleStartException {
+ // load official analysis
+ getManager().find(CoreModule.NAME)
+ .provider()
+ .getService(OALEngineLoaderService.class)
+ .load(CiliumOALDefine.INSTANCE);
+ try {
+ FieldsHelper.forClass(ServiceMetadata.class).init(fieldMappingFile);
+ } catch (Exception e) {
+ throw new ModuleStartException(e.getMessage(), e);
+ }
+
+ final CiliumNodeManager ciliumNodeManager = new CiliumNodeManager(getManager(), new GrpcStubBuilder(config), config);
+ ciliumNodeManager.addListener(new CiliumFlowListener(getManager(), config));
+ ciliumNodeManager.start();
+ }
+
+ @Override
+ public void notifyAfterCompleted() throws ServiceNotProvidedException, ModuleStartException {
+
+ }
+
+ @Override
+ public String[] requiredModules() {
+ return new String[]{
+ CoreModule.NAME, ClusterModule.NAME
+ };
+ }
+}
diff --git a/oap-server/server-fetcher-plugin/cilium-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/cilium/CiliumOALDefine.java b/oap-server/server-fetcher-plugin/cilium-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/cilium/CiliumOALDefine.java
new file mode 100644
index 000000000000..1ccdec63f2c6
--- /dev/null
+++ b/oap-server/server-fetcher-plugin/cilium-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/cilium/CiliumOALDefine.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.fetcher.cilium;
+
+import org.apache.skywalking.oap.server.core.oal.rt.OALDefine;
+
+public class CiliumOALDefine extends OALDefine {
+
+ public static final CiliumOALDefine INSTANCE = new CiliumOALDefine();
+
+ private CiliumOALDefine() {
+ super(
+ "oal/cilium.oal",
+ "org.apache.skywalking.oap.server.core.source"
+ );
+ }
+}
diff --git a/oap-server/server-fetcher-plugin/cilium-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/cilium/handler/CiliumFlowListener.java b/oap-server/server-fetcher-plugin/cilium-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/cilium/handler/CiliumFlowListener.java
new file mode 100644
index 000000000000..c5d9b2f2b3cd
--- /dev/null
+++ b/oap-server/server-fetcher-plugin/cilium-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/cilium/handler/CiliumFlowListener.java
@@ -0,0 +1,474 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.fetcher.cilium.handler;
+
+import com.google.protobuf.util.Timestamps;
+import io.cilium.api.flow.DNS;
+import io.cilium.api.flow.Endpoint;
+import io.cilium.api.flow.Flow;
+import io.cilium.api.flow.HTTP;
+import io.cilium.api.flow.Kafka;
+import io.cilium.api.flow.L7FlowType;
+import io.cilium.api.flow.TrafficDirection;
+import io.cilium.api.flow.Verdict;
+import io.cilium.api.observer.GetFlowsRequest;
+import io.cilium.api.observer.GetFlowsResponse;
+import io.cilium.api.observer.ObserverGrpc;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.oap.server.core.CoreModule;
+import org.apache.skywalking.oap.server.core.analysis.Layer;
+import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
+import org.apache.skywalking.oap.server.core.source.DetectPoint;
+import org.apache.skywalking.oap.server.core.source.CiliumEndpointRelation;
+import org.apache.skywalking.oap.server.core.source.CiliumEndpoint;
+import org.apache.skywalking.oap.server.core.source.CiliumMetrics;
+import org.apache.skywalking.oap.server.core.source.CiliumServiceInstanceRelation;
+import org.apache.skywalking.oap.server.core.source.CiliumServiceInstance;
+import org.apache.skywalking.oap.server.core.source.CiliumService;
+import org.apache.skywalking.oap.server.core.source.CiliumServiceRelation;
+import org.apache.skywalking.oap.server.core.source.SourceReceiver;
+import org.apache.skywalking.oap.server.fetcher.cilium.CiliumFetcherConfig;
+import org.apache.skywalking.oap.server.fetcher.cilium.nodes.CiliumNode;
+import org.apache.skywalking.oap.server.fetcher.cilium.nodes.CiliumNodeUpdateListener;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.apache.skywalking.oap.server.library.util.RunnableWithExceptionProtection;
+import org.apache.skywalking.oap.server.library.util.StringUtil;
+
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+@Slf4j
+public class CiliumFlowListener implements CiliumNodeUpdateListener {
+ private static final Executor EXECUTOR = Executors.newCachedThreadPool();
+ private final SourceReceiver sourceReceiver;
+ private final Integer retrySecond;
+ private final boolean convertClientAsServerTraffic;
+
+ public static final Layer SERVICE_LAYER = Layer.CILIUM_SERVICE;
+
+ public CiliumFlowListener(ModuleManager moduleManager, CiliumFetcherConfig config) {
+ this.sourceReceiver = moduleManager.find(CoreModule.NAME).provider().getService(SourceReceiver.class);
+ this.retrySecond = config.getFetchFailureRetrySecond();
+ this.convertClientAsServerTraffic = config.isConvertClientAsServerTraffic();
+ }
+
+ @Override
+ public void onNodeAdded(CiliumNode node) {
+ final String address = node.getAddress();
+ EXECUTOR.execute(new RunnableWithExceptionProtection(() -> {
+ final ObserverGrpc.ObserverBlockingStub stub = node.getObserverStub();
+ if (stub == null) {
+ return;
+ }
+ final Iterator flows = stub.getFlows(
+ GetFlowsRequest.newBuilder().setSince(Timestamps.now()).setFollow(true).build());
+ final Thread thread = Thread.currentThread();
+ node.addingCloseable(thread::interrupt);
+ flows.forEachRemaining(flow -> {
+ switch (flow.getResponseTypesCase()) {
+ case FLOW:
+ log.debug("Detect flow data: address: {}, flow: {}", address, flow.getFlow());
+ handleFlow(node, flow.getFlow());
+ break;
+ case LOST_EVENTS:
+ log.warn("Detected lost events, address: {}, events: {}", address, flow.getLostEvents());
+ break;
+ case NODE_STATUS:
+ log.debug("Detected node status, address: {}, status: {}", address, flow.getNodeStatus());
+ break;
+ }
+ });
+ }, t -> {
+ if (t instanceof InterruptedException || (t.getCause() != null && t.getCause() instanceof InterruptedException)) {
+ log.debug("detected the node have been closed: {}, stopping to get flows", node.getAddress());
+ return;
+ }
+ log.error("Failed to fetch flows from Cilium node: {}, will retry after {} seconds.", node.getAddress(), this.retrySecond, t);
+ try {
+ TimeUnit.SECONDS.sleep(this.retrySecond);
+ } catch (InterruptedException e) {
+ log.error("Failed to sleep for {} seconds.", this.retrySecond, e);
+ return;
+ }
+
+ onNodeAdded(node);
+ }));
+ }
+
+ @Override
+ public void onNodeDelete(CiliumNode node) {
+ }
+
+ protected void handleFlow(CiliumNode node, Flow flow) {
+ // if no source or no destination, then ignore this flow
+ if (shouldIgnoreFlow(node, flow)) {
+ return;
+ }
+
+ flow = convertTraffic(node, flow);
+
+ final ServiceMetadata sourceMetadata = new ServiceMetadata(flow.getSource());
+ final ServiceMetadata destMetadata = new ServiceMetadata(flow.getDestination());
+ DetectPoint detectPoint = parseDetectPoint(flow);
+ if (convertClientAsServerTraffic) {
+ // if the client traffic is converted as server traffic, then the detect point should be only be server side
+ detectPoint = DetectPoint.SERVER;
+ }
+ log.debug("ready to building cilium traffic from {}{} -> {}{}, flow: {}, type: {}",
+ detectPoint.equals(DetectPoint.CLIENT) ? "*" : "",
+ sourceMetadata.getServiceName(),
+ detectPoint.equals(DetectPoint.SERVER) ? "*" : "",
+ destMetadata.getServiceName(), parseDirectionString(flow),
+ flow.getType());
+
+ switch (flow.getType()) {
+ case L3_L4:
+ buildL34Metrics(node, flow, sourceMetadata, destMetadata, detectPoint);
+ break;
+ case L7:
+ buildL7Metrics(node, flow, sourceMetadata, destMetadata, detectPoint);
+ break;
+ }
+ }
+
+ protected Flow convertTraffic(CiliumNode node, Flow flow) {
+ final Flow.Builder builder = flow.toBuilder();
+ // if the flow is reply traffic
+ if (flow.getIsReply().getValue()) {
+ // need to convert the traffic direction
+ // the reply flow traffic is opposite to the original flow
+ builder.setTrafficDirection(convertDirection(flow.getTrafficDirection()));
+
+ // correct the source and destination
+ // when the flow is reply, the source and destination should be exchanged to the client -> server
+ final Endpoint source = flow.getSource();
+ final Endpoint dest = flow.getDestination();
+ builder.setSource(dest);
+ builder.setDestination(source);
+ }
+
+ if (convertClientAsServerTraffic) {
+ // convert the traffic direction
+ builder.setTrafficDirection(convertDirection(builder.getTrafficDirection()));
+ }
+
+ return builder.build();
+ }
+
+ protected TrafficDirection convertDirection(TrafficDirection direction) {
+ switch (direction) {
+ case INGRESS:
+ return TrafficDirection.EGRESS;
+ case EGRESS:
+ return TrafficDirection.INGRESS;
+ }
+ return direction;
+ }
+
+ private void buildL34Metrics(CiliumNode node, Flow flow, ServiceMetadata sourceMetadata, ServiceMetadata destMetadata, DetectPoint detectPoint) {
+ ServiceMetadata currentService = detectPoint.equals(DetectPoint.CLIENT) ? sourceMetadata : destMetadata;
+ List extends CiliumMetrics> metrics = Arrays.asList(
+ buildService(node, flow, currentService, detectPoint), buildServiceRelation(node, flow, sourceMetadata, destMetadata, detectPoint),
+ buildServiceInstance(node, flow, currentService, detectPoint), buildServiceInstanceRelation(node, flow, sourceMetadata, destMetadata, detectPoint));
+
+ metrics.forEach(metric -> {
+ setBasicInfo(metric, flow, CiliumMetrics.TYPE_TCP);
+
+ sourceReceiver.receive(metric);
+ });
+ }
+
+ private void buildL7Metrics(CiliumNode node, Flow flow, ServiceMetadata sourceMetadata, ServiceMetadata destMetadata, DetectPoint detectPoint) {
+ switch (flow.getL7().getRecordCase()) {
+ case HTTP:
+ buildHttpMetrics(node, flow, sourceMetadata, destMetadata, detectPoint, flow.getL7().getHttp());
+ break;
+ case DNS:
+ buildDnsMetrics(node, flow, sourceMetadata, destMetadata, detectPoint, flow.getL7().getDns());
+ break;
+ case KAFKA:
+ buildKafkaMetrics(node, flow, sourceMetadata, destMetadata, detectPoint, flow.getL7().getKafka());
+ break;
+ }
+ }
+
+ private void buildKafkaMetrics(CiliumNode node, Flow flow, ServiceMetadata sourceMetadata, ServiceMetadata destMetadata, DetectPoint detectPoint, Kafka kafka) {
+ // only acknowledge the response flow
+ if (flow.getL7().getType() != L7FlowType.RESPONSE) {
+ return;
+ }
+ boolean success = kafka.getErrorCode() == 0;
+ String endpoint = "Kafka/" + kafka.getTopic() + "/" + kafka.getApiKey();
+ List metrics = buildingL7Metrics(node, flow, sourceMetadata, destMetadata, detectPoint, endpoint);
+
+ metrics.stream().filter(Objects::nonNull).forEach(metric -> {
+ setBasicInfo(metric, flow, CiliumMetrics.TYPE_KAFKA);
+ metric.setSuccess(success);
+ metric.setDuration(flow.getL7().getLatencyNs());
+
+ metric.setKafka(new CiliumMetrics.KafkaMetrics());
+ metric.getKafka().setErrorCode(kafka.getErrorCode());
+ metric.getKafka().setErrorCodeString(KafkaCodes.ERROR_CODES.getOrDefault(kafka.getErrorCode(), "UNKNOWN"));
+ metric.getKafka().setApiVersion(kafka.getApiVersion());
+ metric.getKafka().setApiKey(kafka.getApiKey());
+ metric.getKafka().setCorrelationId(kafka.getCorrelationId());
+ metric.getKafka().setTopic(kafka.getTopic());
+
+ sourceReceiver.receive(metric);
+ });
+ }
+
+ private void buildDnsMetrics(CiliumNode node, Flow flow, ServiceMetadata sourceMetadata, ServiceMetadata destMetadata, DetectPoint detectPoint, DNS dns) {
+ // only acknowledge the response flow
+ if (flow.getL7().getType() != L7FlowType.RESPONSE) {
+ return;
+ }
+ boolean success = dns.getRcode() == 0;
+ String endpoint = "DNS/" + (dns.getQtypesCount() > 0 ? dns.getQtypesList().get(0) : "UNKNOWN");
+ List metrics = buildingL7Metrics(node, flow, sourceMetadata, destMetadata, detectPoint, endpoint);
+
+ metrics.stream().filter(Objects::nonNull).forEach(metric -> {
+ setBasicInfo(metric, flow, CiliumMetrics.TYPE_DNS);
+ metric.setSuccess(success);
+ metric.setDuration(flow.getL7().getLatencyNs());
+
+ metric.setDns(new CiliumMetrics.DNSMetrics());
+ metric.getDns().setDomain(dns.getQuery());
+ metric.getDns().setQueryType(dns.getQtypesCount() > 0 ? dns.getQtypesList().get(0) : "UNKNOWN");
+ metric.getDns().setRcode(dns.getRcode());
+ metric.getDns().setRcodeString(DNSCodes.RETURN_CODES.getOrDefault(dns.getRcode(), "UNKNOWN"));
+ metric.getDns().setTtl(dns.getTtl());
+ metric.getDns().setIpCount(dns.getIpsCount());
+
+ sourceReceiver.receive(metric);
+ });
+ }
+
+ private void buildHttpMetrics(CiliumNode node, Flow flow, ServiceMetadata sourceMetadata, ServiceMetadata destMetadata, DetectPoint detectPoint, HTTP http) {
+ // if the http code is 0, then ignore this flow, it should be request
+ if (http.getCode() == 0) {
+ return;
+ }
+ final URL url;
+ try {
+ url = new URL(http.getUrl());
+ } catch (MalformedURLException e) {
+ log.warn("Failed to parse the URL: {} from {} -> {}", http.getUrl(),
+ sourceMetadata.getServiceInstanceName(), destMetadata.getServiceInstanceName(), e);
+ return;
+ }
+ String endpointName = http.getMethod() + ":" + url.getPath();
+ final boolean httpSuccess = parseHTTPSuccess(flow, http);
+ List metrics = buildingL7Metrics(node, flow, sourceMetadata, destMetadata, detectPoint, endpointName);
+
+ metrics.stream().filter(Objects::nonNull).forEach(metric -> {
+ setBasicInfo(metric, flow, CiliumMetrics.TYPE_HTTP);
+ metric.setSuccess(httpSuccess);
+ metric.setDuration(flow.getL7().getLatencyNs());
+
+ metric.setHttp(new CiliumMetrics.HTTPMetrics());
+ metric.getHttp().setUrl(http.getUrl());
+ metric.getHttp().setCode(http.getCode());
+ metric.getHttp().setProtocol(http.getProtocol());
+ metric.getHttp().setMethod(http.getMethod());
+
+ sourceReceiver.receive(metric);
+ });
+ }
+
+ private List buildingL7Metrics(CiliumNode node, Flow flow, ServiceMetadata sourceMetadata, ServiceMetadata destMetadata, DetectPoint detectPoint, String endpointName) {
+ ServiceMetadata currentService = detectPoint.equals(DetectPoint.CLIENT) ? sourceMetadata : destMetadata;
+ return Arrays.asList(
+ buildService(node, flow, currentService, detectPoint), buildServiceRelation(node, flow, sourceMetadata, destMetadata, detectPoint),
+ buildServiceInstance(node, flow, currentService, detectPoint), buildServiceInstanceRelation(node, flow, sourceMetadata, destMetadata, detectPoint),
+ buildEndpoint(node, flow, currentService, endpointName, detectPoint), buildEndpointRelation(node, flow, sourceMetadata, destMetadata, detectPoint, endpointName)
+ );
+ }
+
+ private void setBasicInfo(CiliumMetrics metric, Flow flow, String type) {
+ metric.setVerdict(parseVerdictString(flow));
+ metric.setType(type);
+ metric.setDirection(parseDirectionString(flow));
+ metric.setTimeBucket(TimeBucket.getMinuteTimeBucket(flow.getTime().getSeconds() * 1000));
+ if (Verdict.DROPPED.equals(flow.getVerdict())) {
+ metric.setDropReason(flow.getDropReasonDesc().toString());
+ }
+ }
+
+ protected boolean shouldIgnoreEndpoint(Endpoint endpoint) {
+ if (endpoint.getID() != 0) {
+ return false;
+ }
+
+ return StringUtil.isEmpty(endpoint.getPodName()) || StringUtil.isEmpty(endpoint.getNamespace());
+ }
+
+ protected boolean shouldIgnoreFlow(CiliumNode node, Flow flow) {
+ // must have source and destination
+ if (!flow.hasSource() || !flow.hasDestination()) {
+ return true;
+ }
+ // if the source and destination or not set, then ignore this flow
+ if (shouldIgnoreEndpoint(flow.getSource()) || shouldIgnoreEndpoint(flow.getDestination())) {
+ return true;
+ }
+ // only acknowledge the flows is forwarded or dropped
+ switch (flow.getVerdict()) {
+ case FORWARDED:
+ case DROPPED:
+ break;
+ default:
+ return true;
+ }
+ // traffic direction must be set
+ if (flow.getTrafficDirection() == TrafficDirection.TRAFFIC_DIRECTION_UNKNOWN) {
+ return true;
+ }
+ // flow type is only support for L3, L4 and L7
+ switch (flow.getType()) {
+ case L3_L4:
+ case L7: break;
+ default: return true;
+ }
+ // ignore the client traffic if we convert the client as server traffic
+ if (this.convertClientAsServerTraffic && DetectPoint.SERVER.equals(parseDetectPoint(flow))) {
+ return true;
+ }
+ return false;
+ }
+
+ private String parseVerdictString(Flow flow) {
+ switch (flow.getVerdict()) {
+ case FORWARDED:
+ return CiliumMetrics.VERDICT_FORWARDED;
+ case DROPPED:
+ return CiliumMetrics.VERDICT_DROPPED;
+ }
+ return "";
+ }
+
+ private String parseDirectionString(Flow flow) {
+ switch (flow.getTrafficDirection()) {
+ case INGRESS:
+ return CiliumMetrics.DIRECTION_INGRESS;
+ case EGRESS:
+ return CiliumMetrics.DIRECTION_EGRESS;
+ }
+ return "";
+ }
+
+ protected CiliumMetrics buildService(CiliumNode node, Flow flow, ServiceMetadata metadata, DetectPoint detectPoint) {
+ final CiliumService service = new CiliumService();
+ service.setServiceName(metadata.getServiceName());
+ service.setLayer(SERVICE_LAYER);
+ service.setDetectPoint(detectPoint);
+ return service;
+ }
+
+ protected CiliumMetrics buildServiceRelation(CiliumNode node, Flow flow, ServiceMetadata source, ServiceMetadata dest, DetectPoint detectPoint) {
+ final CiliumServiceRelation serviceRelation = new CiliumServiceRelation();
+ serviceRelation.setSourceServiceName(source.getServiceName());
+ serviceRelation.setSourceLayer(SERVICE_LAYER);
+ serviceRelation.setDestServiceName(dest.getServiceName());
+ serviceRelation.setDestLayer(SERVICE_LAYER);
+ serviceRelation.setDetectPoint(detectPoint);
+ serviceRelation.setComponentId(parseComponentId(flow));
+ return serviceRelation;
+ }
+
+ protected CiliumMetrics buildServiceInstance(CiliumNode node, Flow flow, ServiceMetadata metadata, DetectPoint detectPoint) {
+ final CiliumServiceInstance serviceInstance = new CiliumServiceInstance();
+ serviceInstance.setServiceName(metadata.getServiceName());
+ serviceInstance.setServiceInstanceName(metadata.getServiceInstanceName());
+ serviceInstance.setLayer(SERVICE_LAYER);
+ serviceInstance.setDetectPoint(detectPoint);
+ return serviceInstance;
+ }
+
+ protected CiliumMetrics buildServiceInstanceRelation(CiliumNode node, Flow flow, ServiceMetadata source, ServiceMetadata dest, DetectPoint detectPoint) {
+ final CiliumServiceInstanceRelation serviceInstanceRelation = new CiliumServiceInstanceRelation();
+ serviceInstanceRelation.setSourceServiceName(source.getServiceName());
+ serviceInstanceRelation.setSourceServiceInstanceName(source.getServiceInstanceName());
+ serviceInstanceRelation.setSourceLayer(SERVICE_LAYER);
+ serviceInstanceRelation.setDestServiceName(dest.getServiceName());
+ serviceInstanceRelation.setDestServiceInstanceName(dest.getServiceInstanceName());
+ serviceInstanceRelation.setDestLayer(SERVICE_LAYER);
+ serviceInstanceRelation.setDetectPoint(detectPoint);
+ serviceInstanceRelation.setComponentId(parseComponentId(flow));
+ return serviceInstanceRelation;
+ }
+
+ protected CiliumMetrics buildEndpoint(CiliumNode node, Flow flow, ServiceMetadata source, String endpointName, DetectPoint detectPoint) {
+ if (DetectPoint.CLIENT.equals(detectPoint)) {
+ return null;
+ }
+ final CiliumEndpoint endpoint = new CiliumEndpoint();
+ endpoint.setServiceName(source.getServiceName());
+ endpoint.setEndpointName(endpointName);
+ endpoint.setLayer(SERVICE_LAYER);
+ return endpoint;
+ }
+
+ protected CiliumMetrics buildEndpointRelation(CiliumNode node, Flow flow, ServiceMetadata source, ServiceMetadata dest,
+ DetectPoint detectPoint, String endpointName) {
+ final CiliumEndpointRelation endpointRelation = new CiliumEndpointRelation();
+ endpointRelation.setSourceServiceName(source.getServiceName());
+ endpointRelation.setSourceEndpointName(endpointName);
+ endpointRelation.setSourceLayer(SERVICE_LAYER);
+ endpointRelation.setDestServiceName(dest.getServiceName());
+ endpointRelation.setDestEndpointName(endpointName);
+ endpointRelation.setDestLayer(SERVICE_LAYER);
+ endpointRelation.setDetectPoint(detectPoint);
+ return endpointRelation;
+ }
+
+ protected boolean parseHTTPSuccess(Flow flow, HTTP http) {
+ return http.getCode() < 500;
+ }
+
+ private DetectPoint parseDetectPoint(Flow flow) {
+ boolean isReply = flow.getIsReply().getValue();
+ if (!isReply) {
+ return flow.getSource().getID() != 0 ? DetectPoint.CLIENT : DetectPoint.SERVER;
+ }
+ return flow.getDestination().getID() != 0 ? DetectPoint.CLIENT : DetectPoint.SERVER;
+ }
+
+ private int parseComponentId(Flow flow) {
+ switch (flow.getType()) {
+ case L3_L4:
+ return 110; // For the L3/L4 flow, just use the TCP component
+ case L7:
+ switch (flow.getL7().getRecordCase()) {
+ case HTTP: return 49; // HTTP
+ case DNS: return 159; // DNS
+ case KAFKA: return 27; // Kafka
+ }
+ }
+ return 0;
+ }
+
+}
diff --git a/oap-server/server-fetcher-plugin/cilium-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/cilium/handler/DNSCodes.java b/oap-server/server-fetcher-plugin/cilium-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/cilium/handler/DNSCodes.java
new file mode 100644
index 000000000000..bf8a9955823e
--- /dev/null
+++ b/oap-server/server-fetcher-plugin/cilium-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/cilium/handler/DNSCodes.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.fetcher.cilium.handler;
+
+import com.google.common.collect.ImmutableMap;
+
+import java.util.Map;
+
+public class DNSCodes {
+
+ // Follow https://www.iana.org/assignments/dns-parameters/dns-parameters.xhtml#dns-parameters-6
+ public static final Map RETURN_CODES = ImmutableMap.builder()
+ .put(0, "NoError")
+ .put(1, "FormErr")
+ .put(2, "ServFail")
+ .put(3, "NXDomain")
+ .put(4, "NotImp")
+ .put(5, "Refused")
+ .put(6, "YXDomain")
+ .put(7, "YXRRSet")
+ .put(8, "NXRRSet")
+ .put(9, "NotAuth")
+ .put(10, "NotZone")
+ .put(11, "DSOTYPENI")
+ .put(16, "BADVERS|BADSIG")
+ .put(17, "BADKEY")
+ .put(18, "BADTIME")
+ .put(19, "BADMODE")
+ .put(20, "BADNAME")
+ .put(21, "BADALG")
+ .put(22, "BADTRUNC")
+ .put(23, "BADCOOKIE")
+ .build();
+}
diff --git a/oap-server/server-fetcher-plugin/cilium-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/cilium/handler/KafkaCodes.java b/oap-server/server-fetcher-plugin/cilium-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/cilium/handler/KafkaCodes.java
new file mode 100644
index 000000000000..39e779b421bf
--- /dev/null
+++ b/oap-server/server-fetcher-plugin/cilium-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/cilium/handler/KafkaCodes.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.fetcher.cilium.handler;
+
+import com.google.common.collect.ImmutableMap;
+
+import java.util.Map;
+
+public class KafkaCodes {
+
+ // Follow https://kafka.apache.org/protocol#protocol_error_codes
+ public static final Map ERROR_CODES = ImmutableMap.builder()
+ .put(-1, "UNKNOWN_SERVER_ERROR")
+ .put(0, "NONE")
+ .put(1, "OFFSET_OUT_OF_RANGE")
+ .put(2, "CORRUPT_MESSAGE")
+ .put(3, "UNKNOWN_TOPIC_OR_PARTITION")
+ .put(4, "INVALID_FETCH_SIZE")
+ .put(5, "LEADER_NOT_AVAILABLE")
+ .put(6, "NOT_LEADER_OR_FOLLOWER")
+ .put(7, "REQUEST_TIMED_OUT")
+ .put(8, "BROKER_NOT_AVAILABLE")
+ .put(9, "REPLICA_NOT_AVAILABLE")
+ .put(10, "MESSAGE_TOO_LARGE")
+ .put(11, "STALE_CONTROLLER_EPOCH")
+ .put(12, "OFFSET_METADATA_TOO_LARGE")
+ .put(13, "NETWORK_EXCEPTION")
+ .put(14, "COORDINATOR_LOAD_IN_PROGRESS")
+ .put(15, "COORDINATOR_NOT_AVAILABLE")
+ .put(16, "NOT_COORDINATOR")
+ .put(17, "INVALID_TOPIC_EXCEPTION")
+ .put(18, "RECORD_LIST_TOO_LARGE")
+ .put(19, "NOT_ENOUGH_REPLICAS")
+ .put(20, "NOT_ENOUGH_REPLICAS_AFTER_APPEND")
+ .put(21, "INVALID_REQUIRED_ACKS")
+ .put(22, "ILLEGAL_GENERATION")
+ .put(23, "INCONSISTENT_GROUP_PROTOCOL")
+ .put(24, "INVALID_GROUP_ID")
+ .put(25, "UNKNOWN_MEMBER_ID")
+ .put(26, "INVALID_SESSION_TIMEOUT")
+ .put(27, "REBALANCE_IN_PROGRESS")
+ .put(28, "INVALID_COMMIT_OFFSET_SIZE")
+ .put(29, "TOPIC_AUTHORIZATION_FAILED")
+ .put(30, "GROUP_AUTHORIZATION_FAILED")
+ .put(31, "CLUSTER_AUTHORIZATION_FAILED")
+ .put(32, "INVALID_TIMESTAMP")
+ .put(33, "UNSUPPORTED_SASL_MECHANISM")
+ .put(34, "ILLEGAL_SASL_STATE")
+ .put(35, "UNSUPPORTED_VERSION")
+ .put(36, "TOPIC_ALREADY_EXISTS")
+ .put(37, "INVALID_PARTITIONS")
+ .put(38, "INVALID_REPLICATION_FACTOR")
+ .put(39, "INVALID_REPLICA_ASSIGNMENT")
+ .put(40, "INVALID_CONFIG")
+ .put(41, "NOT_CONTROLLER")
+ .put(42, "INVALID_REQUEST")
+ .put(43, "UNSUPPORTED_FOR_MESSAGE_FORMAT")
+ .put(44, "POLICY_VIOLATION")
+ .put(45, "OUT_OF_ORDER_SEQUENCE_NUMBER")
+ .put(46, "DUPLICATE_SEQUENCE_NUMBER")
+ .put(47, "INVALID_PRODUCER_EPOCH")
+ .put(48, "INVALID_TXN_STATE")
+ .put(49, "INVALID_PRODUCER_ID_MAPPING")
+ .put(50, "INVALID_TRANSACTION_TIMEOUT")
+ .put(51, "CONCURRENT_TRANSACTIONS")
+ .put(52, "TRANSACTION_COORDINATOR_FENCED")
+ .put(53, "TRANSACTIONAL_ID_AUTHORIZATION_FAILED")
+ .put(54, "SECURITY_DISABLED")
+ .put(55, "OPERATION_NOT_ATTEMPTED")
+ .put(56, "KAFKA_STORAGE_ERROR")
+ .put(57, "LOG_DIR_NOT_FOUND")
+ .put(58, "SASL_AUTHENTICATION_FAILED")
+ .put(59, "UNKNOWN_PRODUCER_ID")
+ .put(60, "REASSIGNMENT_IN_PROGRESS")
+ .put(61, "DELEGATION_TOKEN_AUTH_DISABLED")
+ .put(62, "DELEGATION_TOKEN_NOT_FOUND")
+ .put(63, "DELEGATION_TOKEN_OWNER_MISMATCH")
+ .put(64, "DELEGATION_TOKEN_REQUEST_NOT_ALLOWED")
+ .put(65, "DELEGATION_TOKEN_AUTHORIZATION_FAILED")
+ .put(66, "DELEGATION_TOKEN_EXPIRED")
+ .put(67, "INVALID_PRINCIPAL_TYPE")
+ .put(68, "NON_EMPTY_GROUP")
+ .put(69, "GROUP_ID_NOT_FOUND")
+ .put(70, "FETCH_SESSION_ID_NOT_FOUND")
+ .put(71, "INVALID_FETCH_SESSION_EPOCH")
+ .put(72, "LISTENER_NOT_FOUND")
+ .put(73, "TOPIC_DELETION_DISABLED")
+ .put(74, "FENCED_LEADER_EPOCH")
+ .put(75, "UNKNOWN_LEADER_EPOCH")
+ .put(76, "UNSUPPORTED_COMPRESSION_TYPE")
+ .put(77, "STALE_BROKER_EPOCH")
+ .put(78, "OFFSET_NOT_AVAILABLE")
+ .put(79, "MEMBER_ID_REQUIRED")
+ .put(80, "PREFERRED_LEADER_NOT_AVAILABLE")
+ .put(81, "GROUP_MAX_SIZE_REACHED")
+ .put(82, "FENCED_INSTANCE_ID")
+ .put(83, "ELIGIBLE_LEADERS_NOT_AVAILABLE")
+ .put(84, "ELECTION_NOT_NEEDED")
+ .put(85, "NO_REASSIGNMENT_IN_PROGRESS")
+ .put(86, "GROUP_SUBSCRIBED_TO_TOPIC")
+ .put(87, "INVALID_RECORD")
+ .put(88, "UNSTABLE_OFFSET_COMMIT")
+ .put(89, "THROTTLING_QUOTA_EXCEEDED")
+ .put(90, "PRODUCER_FENCED")
+ .put(91, "RESOURCE_NOT_FOUND")
+ .put(92, "DUPLICATE_RESOURCE")
+ .put(93, "UNACCEPTABLE_CREDENTIAL")
+ .put(94, "INCONSISTENT_VOTER_SET")
+ .put(95, "INVALID_UPDATE_VERSION")
+ .put(96, "FEATURE_UPDATE_FAILED")
+ .put(97, "PRINCIPAL_DESERIALIZATION_FAILURE")
+ .put(98, "SNAPSHOT_NOT_FOUND")
+ .put(99, "POSITION_OUT_OF_RANGE")
+ .put(100, "UNKNOWN_TOPIC_ID")
+ .put(101, "DUPLICATE_BROKER_REGISTRATION")
+ .put(102, "BROKER_ID_NOT_REGISTERED")
+ .put(103, "INCONSISTENT_TOPIC_ID")
+ .put(104, "INCONSISTENT_CLUSTER_ID")
+ .put(105, "TRANSACTIONAL_ID_NOT_FOUND")
+ .put(106, "FETCH_SESSION_TOPIC_ID_ERROR")
+ .put(107, "INELIGIBLE_REPLICA")
+ .put(108, "NEW_LEADER_ELECTED")
+ .put(109, "OFFSET_MOVED_TO_TIERED_STORAGE")
+ .put(110, "FENCED_MEMBER_EPOCH")
+ .put(111, "UNRELEASED_INSTANCE_ID")
+ .put(112, "UNSUPPORTED_ASSIGNOR")
+ .put(113, "STALE_MEMBER_EPOCH")
+ .put(114, "MISMATCHED_ENDPOINT_TYPE")
+ .put(115, "UNSUPPORTED_ENDPOINT_TYPE")
+ .put(116, "UNKNOWN_CONTROLLER_ID")
+ .put(117, "UNKNOWN_SUBSCRIPTION_ID")
+ .put(118, "TELEMETRY_TOO_LARGE")
+ .put(119, "INVALID_REGISTRATION")
+ .build();
+}
diff --git a/oap-server/server-fetcher-plugin/cilium-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/cilium/handler/ServiceMetadata.java b/oap-server/server-fetcher-plugin/cilium-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/cilium/handler/ServiceMetadata.java
new file mode 100644
index 000000000000..d7548578488f
--- /dev/null
+++ b/oap-server/server-fetcher-plugin/cilium-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/cilium/handler/ServiceMetadata.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.fetcher.cilium.handler;
+
+import com.google.protobuf.Struct;
+import com.google.protobuf.Value;
+import io.cilium.api.flow.Endpoint;
+import io.vavr.Tuple;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+import lombok.ToString;
+import org.apache.skywalking.oap.server.library.util.FieldsHelper;
+
+@Getter
+@Setter
+@ToString
+@NoArgsConstructor
+@EqualsAndHashCode(onlyExplicitlyIncluded = true)
+public class ServiceMetadata {
+ @EqualsAndHashCode.Include
+ private String serviceName;
+ @EqualsAndHashCode.Include
+ private String serviceInstanceName;
+
+ public ServiceMetadata(Endpoint endpoint) {
+ FieldsHelper.forClass(this.getClass()).inflate(parseEndpointToStruct(endpoint), this);
+ }
+
+ private Struct parseEndpointToStruct(Endpoint endpoint) {
+ final Struct.Builder builder = Struct.newBuilder();
+
+ // Convert Labels
+ final Struct.Builder labelsStruct = Struct.newBuilder();
+ endpoint.getLabelsList().stream()
+ .map(label -> label.split("=", 2))
+ .forEach(split -> {
+ if (split.length == 1) {
+ addingLabel(labelsStruct, split[0], "");
+ return;
+ }
+ addingLabel(labelsStruct, split[0], split[1]);
+ });
+ builder.putFields("LABELS", Value.newBuilder().setStructValue(labelsStruct.build()).build());
+
+ // Convert Workloads
+ final Struct.Builder workloadsStruct = Struct.newBuilder();
+ endpoint.getWorkloadsList().stream()
+ .map(workload -> Tuple.of(workload.getKind(), workload.getName()))
+ .forEach(split -> {
+ workloadsStruct.putFields(split._1, Value.newBuilder().setStringValue(split._2).build());
+ });
+ builder.putFields("WORKLOADS", Value.newBuilder().setStructValue(workloadsStruct.build()).build());
+
+ // Adding other metadata
+ builder.putFields("NAMESPACE", Value.newBuilder().setStringValue(endpoint.getNamespace()).build());
+ builder.putFields("NAME", Value.newBuilder().setStringValue(endpoint.getPodName()).build());
+
+ return builder.build();
+ }
+
+ private void addingLabel(Struct.Builder builder, String key, String value) {
+ final Value val = Value.newBuilder().setStringValue(value).build();
+ builder.putFields(key, val);
+ // remove the prefix "k8s:" from the key
+ if (key.indexOf("k8s:") == 0) {
+ builder.putFields(key.substring(4), val);
+ }
+ }
+}
diff --git a/oap-server/server-fetcher-plugin/cilium-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/cilium/nodes/CiliumNode.java b/oap-server/server-fetcher-plugin/cilium-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/cilium/nodes/CiliumNode.java
new file mode 100644
index 000000000000..7a79f2e775cf
--- /dev/null
+++ b/oap-server/server-fetcher-plugin/cilium-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/cilium/nodes/CiliumNode.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.fetcher.cilium.nodes;
+
+import io.cilium.api.observer.ObserverGrpc;
+import io.vavr.Tuple2;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import lombok.ToString;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.Closeable;
+import java.util.Objects;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+/**
+ * CiliumNode represents a node in the Cilium cluster.
+ * Usually it's aware by the Cilium Peer Service.
+ */
+@RequiredArgsConstructor
+@Getter
+@Slf4j
+@EqualsAndHashCode(of = {
+ "address"
+})
+@ToString(of = {
+ "address",
+ "connected"
+})
+public class CiliumNode {
+ private final String address;
+ private final ClientBuilder clientBuilder;
+
+ private volatile ObserverGrpc.ObserverBlockingStub observerStub;
+ private volatile boolean closed;
+ private final CopyOnWriteArrayList closeables = new CopyOnWriteArrayList<>();
+
+ public ObserverGrpc.ObserverBlockingStub getObserverStub() {
+ if (closed) {
+ return null;
+ }
+ if (Objects.nonNull(observerStub)) {
+ return observerStub;
+ }
+
+ synchronized (this) {
+ if (Objects.isNull(observerStub)) {
+ final Tuple2 addressTuple = parseAddress();
+ observerStub = clientBuilder.buildClient(addressTuple._1, addressTuple._2, ObserverGrpc.ObserverBlockingStub.class);
+ }
+ }
+ return observerStub;
+ }
+
+ public void addingCloseable(Closeable closeable) {
+ closeables.add(closeable);
+ }
+
+ public void close() {
+ this.closed = true;
+ closeables.forEach(c -> {
+ try {
+ c.close();
+ } catch (Exception e) {
+ log.warn("Failed to close the cilium node", e);
+ }
+ });
+ }
+
+ private Tuple2 parseAddress() {
+ String[] parts = address.split(":");
+ if (parts.length != 2) {
+ return new Tuple2<>(address, 4244);
+ }
+ return new Tuple2<>(parts[0], Integer.parseInt(parts[1]));
+ }
+}
diff --git a/oap-server/server-fetcher-plugin/cilium-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/cilium/nodes/CiliumNodeManager.java b/oap-server/server-fetcher-plugin/cilium-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/cilium/nodes/CiliumNodeManager.java
new file mode 100644
index 000000000000..e5068fd2bbb2
--- /dev/null
+++ b/oap-server/server-fetcher-plugin/cilium-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/cilium/nodes/CiliumNodeManager.java
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.fetcher.cilium.nodes;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Sets;
+import io.cilium.api.peer.NotifyRequest;
+import io.cilium.api.peer.PeerGrpc;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.log4j.helpers.LogLog;
+import org.apache.skywalking.oap.server.core.cluster.ClusterCoordinator;
+import org.apache.skywalking.oap.server.core.cluster.ClusterModule;
+import org.apache.skywalking.oap.server.core.cluster.ClusterNodesQuery;
+import org.apache.skywalking.oap.server.core.cluster.ClusterWatcher;
+import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
+import org.apache.skywalking.oap.server.fetcher.cilium.CiliumFetcherConfig;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.apache.skywalking.oap.server.library.util.CollectionUtils;
+import org.apache.skywalking.oap.server.library.util.RunnableWithExceptionProtection;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+@Slf4j
+public class CiliumNodeManager implements ClusterWatcher {
+ private static final Executor EXECUTOR = Executors.newCachedThreadPool();
+
+ private final PeerGrpc.PeerBlockingStub peerStub;
+ private final ClientBuilder clientBuilder;
+ private final ModuleManager moduleManager;
+ private final int retrySecond;
+ private volatile List remoteInstances;
+ private List listeners;
+ private ClusterNodesQuery clusterNodesQuery;
+
+ // This is a list of all cilium nodes
+ private volatile List allNodes;
+ // This is a list of cilium nodes that are should be used in current OAP node
+ private volatile List usingNodes;
+
+ public CiliumNodeManager(ModuleManager moduleManager, ClientBuilder clientBuilder, CiliumFetcherConfig config) {
+ this.moduleManager = moduleManager;
+ this.clientBuilder = clientBuilder;
+ this.peerStub = this.clientBuilder.buildClient(config.getPeerHost(), config.getPeerPort(), PeerGrpc.PeerBlockingStub.class);
+ this.allNodes = new ArrayList<>();
+ this.usingNodes = new ArrayList<>();
+ this.listeners = new ArrayList<>();
+ this.retrySecond = config.getFetchFailureRetrySecond();
+ }
+
+ public void start() {
+ ClusterCoordinator coordinator = this.moduleManager
+ .find(ClusterModule.NAME)
+ .provider()
+ .getService(ClusterCoordinator.class);
+ coordinator.registerWatcher(this);
+ // init the remote instances
+ this.remoteInstances = ImmutableList.copyOf(coordinator.queryRemoteNodes());
+ startWatchNodeUpdates();
+ startRefreshRemoteNodes();
+ }
+
+ public void addListener(CiliumNodeUpdateListener listener) {
+ listeners.add(listener);
+ }
+
+ private void listenNotified() {
+ peerStub.notify(NotifyRequest.newBuilder().build())
+ .forEachRemaining(changeNotification -> {
+ log.debug("Receive cilium node change notification, name: {}, address: {}, type: {}", changeNotification.getName(),
+ changeNotification.getAddress(), changeNotification.getType());
+ switch (changeNotification.getType()) {
+ case PEER_ADDED:
+ case PEER_UPDATED:
+ this.addOrUpdateNode(new CiliumNode(changeNotification.getAddress(), clientBuilder));
+ break;
+ case PEER_DELETED:
+ this.removeNode(new CiliumNode(changeNotification.getAddress(), clientBuilder));
+ break;
+ default:
+ log.error("Unknown cilium node change notification type: {}", changeNotification);
+ break;
+ }
+ });
+ }
+
+ private void startWatchNodeUpdates() {
+ EXECUTOR.execute(new RunnableWithExceptionProtection(this::listenNotified, t -> {
+ LogLog.error("Cilium node manager listen notified failure.", t);
+ try {
+ TimeUnit.SECONDS.sleep(this.retrySecond);
+ } catch (InterruptedException e) {
+ log.error("Failed to sleep for {} seconds.", this.retrySecond, e);
+ return;
+ }
+
+ startWatchNodeUpdates();
+ }));
+ }
+
+ private void startRefreshRemoteNodes() {
+ Executors.newSingleThreadScheduledExecutor()
+ .scheduleWithFixedDelay(new RunnableWithExceptionProtection(this::refreshRemoteNodes, t -> log.error(
+ "Scheduled refresh Remote Clients failure.", t)), 1, 10, TimeUnit.SECONDS);
+ }
+
+ private void refreshRemoteNodes() {
+ if (Objects.isNull(clusterNodesQuery)) {
+ this.clusterNodesQuery = moduleManager.find(ClusterModule.NAME)
+ .provider()
+ .getService(ClusterNodesQuery.class);
+ }
+
+ this.onClusterNodesChanged(clusterNodesQuery.queryRemoteNodes());
+ }
+
+ private void addOrUpdateNode(CiliumNode node) {
+ if (allNodes.contains(node)) {
+ allNodes.remove(node);
+ }
+ allNodes.add(node);
+ this.refreshUsingNodes();
+ }
+
+ private void removeNode(CiliumNode node) {
+ allNodes.remove(node);
+ this.refreshUsingNodes();
+ }
+
+ void refreshUsingNodes() {
+ final List shouldUsingNodes = buildShouldUsingNodes();
+ log.debug("Trying to rebuilding using cilium nodes, current using nodes count: {}, new using nodes count: {}",
+ usingNodes.size(), shouldUsingNodes.size());
+
+ if (log.isDebugEnabled()) {
+ shouldUsingNodes.forEach(node -> log.debug("Ready to using cilium node, wait notify: {}", node.getAddress()));
+ }
+
+ if (!compare(shouldUsingNodes)) {
+ log.info("Rebuilding using cilium nodes, old using nodes count: {}, new using nodes count: {}",
+ usingNodes.size(), shouldUsingNodes.size());
+ this.reBuildUsingNodes(shouldUsingNodes);
+ } else {
+ log.debug("No need to rebuild using cilium nodes, old using nodes count: {}, new using nodes count: {}",
+ usingNodes.size(), shouldUsingNodes.size());
+ }
+ }
+
+ private void reBuildUsingNodes(List shouldUsingNodes) {
+ final Map remoteClientCollection =
+ this.usingNodes.stream()
+ .collect(Collectors.toMap(
+ CiliumNode::getAddress,
+ node -> new NodeWithAction(
+ node, Action.Close)
+ ));
+
+ final Map latestRemoteClients =
+ shouldUsingNodes.stream()
+ .collect(Collectors.toMap(
+ CiliumNode::getAddress,
+ remote -> new NodeWithAction(
+ remote, Action.Create)
+ ));
+
+ final Set unChangeAddresses = Sets.intersection(
+ remoteClientCollection.keySet(), latestRemoteClients.keySet());
+
+ unChangeAddresses.stream()
+ .filter(remoteClientCollection::containsKey)
+ .forEach(unChangeAddress -> remoteClientCollection.get(unChangeAddress)
+ .setAction(Action.Unchanged));
+
+ // make the latestRemoteClients including the new clients only
+ unChangeAddresses.forEach(latestRemoteClients::remove);
+ remoteClientCollection.putAll(latestRemoteClients);
+
+ final List newNodes = new LinkedList<>();
+ remoteClientCollection.forEach((address, clientAction) -> {
+ switch (clientAction.getAction()) {
+ case Unchanged:
+ newNodes.add(clientAction.getNode());
+ break;
+ case Create:
+ newNodes.add(clientAction.getNode());
+ notifyListeners(clientAction.getNode(), Action.Create);
+ break;
+ case Close:
+ notifyListeners(clientAction.getNode(), Action.Close);
+ clientAction.getNode().close();
+ break;
+ }
+ });
+
+ this.usingNodes = ImmutableList.copyOf(newNodes);
+ }
+
+ private void notifyListeners(CiliumNode node, Action action) {
+ listeners.forEach(listener -> {
+ if (action == Action.Create) {
+ listener.onNodeAdded(node);
+ } else if (action == Action.Close) {
+ listener.onNodeDelete(node);
+ }
+ });
+ }
+
+ private void printUsingNodesList() {
+ if (!log.isDebugEnabled()) {
+ return;
+ }
+ final String addresses = Joiner.on(", ").join(usingNodes.stream().map(CiliumNode::getAddress).collect(Collectors.toList()));
+ log.debug("Current using cilium nodes: {}", addresses);
+ }
+
+ private List buildShouldUsingNodes() {
+ if (CollectionUtils.isEmpty(allNodes) || CollectionUtils.isEmpty(remoteInstances)) {
+ log.debug("Found no cilium or backend nodes, skip all nodes, cilium nodes: {}, backend clients: {}",
+ allNodes, remoteInstances);
+ return ImmutableList.of();
+ }
+ allNodes.sort(Comparator.comparing(CiliumNode::getAddress));
+ final List totalBackendClients = remoteInstances
+ .stream().sorted(Comparator.comparing(RemoteInstance::getAddress)).collect(Collectors.toList());
+ final int currentNodeIndex = totalBackendClients.indexOf(totalBackendClients.stream()
+ .filter(t -> t.getAddress().isSelf()).findFirst().get());
+ // if the backend count bigger than cilium node count, we need to
+ if (totalBackendClients.size() > allNodes.size()) {
+ if (currentNodeIndex >= allNodes.size()) {
+ log.debug("Found no cilium nodes for current OAP node, skip all nodes, total cilium nodes: {}, total backend clients: {}, " +
+ "current node index: {}", allNodes.size(), totalBackendClients.size(), currentNodeIndex);
+ return ImmutableList.of();
+ }
+ log.debug("Total cilium nodes: {}, total backend clients: {}, current node index: {}, using cilium node: {}",
+ allNodes.size(), totalBackendClients.size(), currentNodeIndex, allNodes.get(currentNodeIndex));
+ return ImmutableList.of(allNodes.get(currentNodeIndex));
+ }
+
+ final int partNodesCount = allNodes.size() / totalBackendClients.size();
+ if (partNodesCount == 0 && currentNodeIndex >= allNodes.size()) {
+ log.debug("Found no cilium nodes for current OAP node, skip all nodes, total cilium nodes: {}, total backend clients: {}, " +
+ "current node index: {}", allNodes.size(), totalBackendClients.size(), currentNodeIndex);
+ return ImmutableList.of();
+ }
+ final int startIndex = currentNodeIndex * partNodesCount;
+ final int endIndex = currentNodeIndex == totalBackendClients.size() - 1 ? allNodes.size() : (currentNodeIndex + 1) * partNodesCount;
+ log.debug("Total cilium nodes: {}, part nodes count: {}, current node index: {}, using nodes part: {} - {}",
+ allNodes.size(), partNodesCount, currentNodeIndex, startIndex, endIndex);
+ return ImmutableList.copyOf(allNodes.subList(startIndex, endIndex));
+ }
+
+ private boolean compare(List remoteInstances) {
+ if (usingNodes.size() == remoteInstances.size()) {
+ for (int i = 0; i < usingNodes.size(); i++) {
+ if (!usingNodes.get(i).getAddress().equals(remoteInstances.get(i).getAddress())) {
+ return false;
+ }
+ }
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public void onClusterNodesChanged(List remoteInstances) {
+ this.remoteInstances = ImmutableList.copyOf(remoteInstances);
+ refreshUsingNodes();
+ }
+
+ enum Action {
+ Close, Unchanged, Create
+ }
+
+ @AllArgsConstructor
+ @Getter
+ private static class NodeWithAction {
+ private final CiliumNode node;
+ @Setter
+ private Action action;
+ }
+}
diff --git a/oap-server/server-fetcher-plugin/cilium-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/cilium/nodes/CiliumNodeUpdateListener.java b/oap-server/server-fetcher-plugin/cilium-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/cilium/nodes/CiliumNodeUpdateListener.java
new file mode 100644
index 000000000000..64cd9f808d27
--- /dev/null
+++ b/oap-server/server-fetcher-plugin/cilium-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/cilium/nodes/CiliumNodeUpdateListener.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.fetcher.cilium.nodes;
+
+public interface CiliumNodeUpdateListener {
+
+ /**
+ * Callback when a new node is added.
+ *
+ * @param node the new node
+ */
+ void onNodeAdded(CiliumNode node);
+
+ /**
+ * Callback when a node is deleted.
+ *
+ * @param node the deleted node
+ */
+ void onNodeDelete(CiliumNode node);
+}
+
diff --git a/oap-server/server-fetcher-plugin/cilium-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/cilium/nodes/ClientBuilder.java b/oap-server/server-fetcher-plugin/cilium-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/cilium/nodes/ClientBuilder.java
new file mode 100644
index 000000000000..fe88902a5359
--- /dev/null
+++ b/oap-server/server-fetcher-plugin/cilium-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/cilium/nodes/ClientBuilder.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.fetcher.cilium.nodes;
+
+public interface ClientBuilder {
+ /**
+ * Building client by address
+ */
+ T buildClient(String host, int port, Class stubClass);
+}
diff --git a/oap-server/server-fetcher-plugin/cilium-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/cilium/nodes/GrpcStubBuilder.java b/oap-server/server-fetcher-plugin/cilium-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/cilium/nodes/GrpcStubBuilder.java
new file mode 100644
index 000000000000..dbdcbcbb53a3
--- /dev/null
+++ b/oap-server/server-fetcher-plugin/cilium-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/cilium/nodes/GrpcStubBuilder.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.fetcher.cilium.nodes;
+
+import com.linecorp.armeria.client.ClientBuilderParams;
+import com.linecorp.armeria.client.ClientFactory;
+import com.linecorp.armeria.client.ClientFactoryBuilder;
+import com.linecorp.armeria.client.ClientOptions;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.oap.server.fetcher.cilium.CiliumFetcherConfig;
+
+import java.io.File;
+import java.net.URI;
+
+/**
+ * For Building all the gRPC stubs for the cilium fetcher.
+ */
+@Slf4j
+public class GrpcStubBuilder implements ClientBuilder {
+ private final CiliumFetcherConfig config;
+ private final ClientFactory clientFactory;
+
+ public GrpcStubBuilder(CiliumFetcherConfig config) {
+ final ClientFactoryBuilder builder = ClientFactory.builder();
+ if (config.isSslConnection()) {
+ builder
+ .tlsNoVerify() // skip the verification of the server's certificate(for the host checker)
+ .useHttp2WithoutAlpn(true) // use HTTP/2 without ALPN(cilium not support for now)
+ .tlsCustomizer(ctx -> {
+ ctx.keyManager(new File(config.getSslCertChainFile()), new File(config.getSslPrivateKeyFile()));
+ ctx.trustManager(new File(config.getSslCaFile()));
+ });
+ }
+
+ this.config = config;
+ this.clientFactory = builder.build();
+ }
+
+ @Override
+ @SneakyThrows
+ public T buildClient(String host, int port, Class stubClass) {
+ String proto = "http";
+ if (config.isSslConnection()) {
+ proto = "https";
+ }
+
+ final URI url = new URI("gproto+" + proto, null, host, port, "/", null, null);
+ return (T) clientFactory.newClient(ClientBuilderParams.of(url, stubClass, ClientOptions.of(
+ ClientOptions.RESPONSE_TIMEOUT_MILLIS.newValue(Long.MAX_VALUE), // For the cilium fetcher, we need to wait for the response(all the data from streaming)
+ ClientOptions.MAX_RESPONSE_LENGTH.newValue(0L) // For the cilium streaming fetcher, we should ignore the response length limit
+ )));
+ }
+}
diff --git a/oap-server/server-fetcher-plugin/cilium-fetcher-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleDefine b/oap-server/server-fetcher-plugin/cilium-fetcher-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleDefine
new file mode 100644
index 000000000000..f67193309983
--- /dev/null
+++ b/oap-server/server-fetcher-plugin/cilium-fetcher-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleDefine
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+#
+
+org.apache.skywalking.oap.server.fetcher.cilium.CiliumFetcherModule
diff --git a/oap-server/server-fetcher-plugin/cilium-fetcher-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider b/oap-server/server-fetcher-plugin/cilium-fetcher-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider
new file mode 100644
index 000000000000..04855b33418f
--- /dev/null
+++ b/oap-server/server-fetcher-plugin/cilium-fetcher-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+#
+
+org.apache.skywalking.oap.server.fetcher.cilium.CiliumFetcherProvider
diff --git a/oap-server/server-fetcher-plugin/cilium-fetcher-plugin/src/test/java/org/apache/skywalking/oap/server/fetcher/cilium/nodes/CiliumNodeManagerTest.java b/oap-server/server-fetcher-plugin/cilium-fetcher-plugin/src/test/java/org/apache/skywalking/oap/server/fetcher/cilium/nodes/CiliumNodeManagerTest.java
new file mode 100644
index 000000000000..acec03ff1aa2
--- /dev/null
+++ b/oap-server/server-fetcher-plugin/cilium-fetcher-plugin/src/test/java/org/apache/skywalking/oap/server/fetcher/cilium/nodes/CiliumNodeManagerTest.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.fetcher.cilium.nodes;
+
+import lombok.Getter;
+import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
+import org.apache.skywalking.oap.server.core.remote.client.Address;
+import org.apache.skywalking.oap.server.fetcher.cilium.CiliumFetcherConfig;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.powermock.reflect.Whitebox;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+@ExtendWith(MockitoExtension.class)
+public class CiliumNodeManagerTest {
+
+ @Mock
+ private ModuleManager moduleManager;
+ private CiliumNodeManager ciliumNodeManager;
+ private NodeUpdateListener nodeUpdateListener;
+
+ public static Collection
+
+ org.apache.skywalking
+ server-core
+ ${project.version}
+
+
+ org.apache.skywalking
+ apm-network
+ ${project.version}
+
+
+ org.apache.skywalking
+ library-module
+ ${project.version}
+
+
+ org.apache.skywalking
+ library-util
+ ${project.version}
+
diff --git a/oap-server/server-library/library-server/src/main/java/org/apache/skywalking/oap/server/library/server/grpc/ssl/DynamicSslContext.java b/oap-server/server-library/library-server/src/main/java/org/apache/skywalking/oap/server/library/server/grpc/ssl/DynamicSslContext.java
index 4d9ff3c1023b..bdfd98a5ce70 100644
--- a/oap-server/server-library/library-server/src/main/java/org/apache/skywalking/oap/server/library/server/grpc/ssl/DynamicSslContext.java
+++ b/oap-server/server-library/library-server/src/main/java/org/apache/skywalking/oap/server/library/server/grpc/ssl/DynamicSslContext.java
@@ -74,7 +74,7 @@ protected void updateContext(final String privateKeyFile, final String certChain
if (StringUtil.isNotEmpty(trustedCAsFile)) {
builder.trustManager(Paths.get(trustedCAsFile).toFile())
- .clientAuth(ClientAuth.REQUIRE);
+ .clientAuth(ClientAuth.REQUIRE);
}
setCtx(builder.build());
diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/mx/FieldsHelper.java b/oap-server/server-library/library-util/src/main/java/org/apache/skywalking/oap/server/library/util/FieldsHelper.java
similarity index 66%
rename from oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/mx/FieldsHelper.java
rename to oap-server/server-library/library-util/src/main/java/org/apache/skywalking/oap/server/library/util/FieldsHelper.java
index 29b227ddacf3..9e3306ee7279 100644
--- a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/mx/FieldsHelper.java
+++ b/oap-server/server-library/library-util/src/main/java/org/apache/skywalking/oap/server/library/util/FieldsHelper.java
@@ -16,7 +16,17 @@
*
*/
-package org.apache.skywalking.oap.server.receiver.envoy.als.mx;
+package org.apache.skywalking.oap.server.library.util;
+
+import com.google.common.base.Splitter;
+import com.google.common.base.Strings;
+import com.google.protobuf.Struct;
+import com.google.protobuf.Value;
+import lombok.RequiredArgsConstructor;
+import lombok.experimental.Delegate;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.yaml.snakeyaml.Yaml;
import java.io.InputStream;
import java.lang.invoke.CallSite;
@@ -28,51 +38,54 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiConsumer;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.skywalking.oap.server.library.module.ModuleStartException;
-import org.apache.skywalking.oap.server.library.util.ResourceUtils;
-import org.apache.skywalking.oap.server.receiver.envoy.als.ServiceMetaInfo;
-import org.apache.skywalking.oap.server.receiver.envoy.als.ServiceMetaInfo.KeyValue;
-import org.yaml.snakeyaml.Yaml;
-
-import com.google.common.base.Splitter;
-import com.google.common.base.Strings;
-import com.google.protobuf.Struct;
-import com.google.protobuf.Value;
+@Slf4j
+public class FieldsHelper {
-import lombok.RequiredArgsConstructor;
-import lombok.experimental.Delegate;
-import lombok.extern.slf4j.Slf4j;
+ /**
+ * The difference Class have different {@link FieldsHelper} instance for their own mappings.
+ */
+ private static final Map, FieldsHelper> HELPER_MAP = new ConcurrentHashMap<>();
-@Slf4j
-public enum FieldsHelper {
- SINGLETON;
+ /**
+ * The target class to be inflated.
+ */
+ private final Class> targetClass;
+ /**
+ * Whether the {@link FieldsHelper} has been initialized.
+ */
private boolean initialized = false;
/**
- * The mappings from the field name of {@link ServiceMetaInfo} to the field name of {@code flatbuffers}.
+ * The mappings from the field name to the field name of {@code flatbuffers}.
*/
- private Map fieldNameMapping;
+ private Map fieldNameMapping;
/**
- * The mappings from the field name of {@link ServiceMetaInfo} to its {@code setter}.
+ * The mappings from the field name to its {@code setter}.
*/
- private Map> fieldSetterMapping;
+ private Map> fieldSetterMapping;
- public void init(final String file,
- final Class extends ServiceMetaInfo> serviceInfoClass) throws Exception {
- init(ResourceUtils.readToStream(file), serviceInfoClass);
+ public static FieldsHelper forClass(final Class> targetClass) {
+ return HELPER_MAP.computeIfAbsent(targetClass, FieldsHelper::new);
+ }
+
+ private FieldsHelper(Class> targetClass) {
+ this.targetClass = targetClass;
+ }
+
+ public void init(final String file) throws Exception {
+ init(ResourceUtils.readToStream(file));
}
@SuppressWarnings("unchecked")
- public void init(final InputStream inputStream,
- final Class extends ServiceMetaInfo> serviceInfoClass) throws ModuleStartException {
+ public void init(final InputStream inputStream) {
if (initialized) {
return;
}
@@ -103,13 +116,14 @@ public void init(final InputStream inputStream,
tokenBuffer.append(token);
} else if (tokenBuffer.length() > 0) {
tokenBuffer.append(".").append(token);
- if (token.endsWith("\"")) {
- candidateFields.add(tokenBuffer.toString().replaceAll("\"", ""));
- tokenBuffer.setLength(0);
- }
} else {
candidateFields.add(token);
}
+
+ if (tokenBuffer.length() > 0 && token.endsWith("\"")) {
+ candidateFields.add(tokenBuffer.toString().replaceAll("\"", ""));
+ tokenBuffer.setLength(0);
+ }
}
return new Field(candidateFields);
}).collect(Collectors.toList());
@@ -119,7 +133,7 @@ public void init(final InputStream inputStream,
fieldNameMapping.put(
serviceMetaInfoFieldName,
- new ServiceNameFormat(serviceNamePattern.toString(), flatBuffersFieldNames)
+ new FieldFormat(serviceNamePattern.toString(), flatBuffersFieldNames)
);
try {
@@ -130,35 +144,35 @@ public void init(final InputStream inputStream,
lookup, "accept",
MethodType.methodType(BiConsumer.class),
MethodType.methodType(void.class, Object.class, Object.class),
- lookup.findVirtual(serviceInfoClass, setter,
+ lookup.findVirtual(targetClass, setter,
MethodType.methodType(void.class, parameterType)),
- MethodType.methodType(void.class, serviceInfoClass, parameterType));
+ MethodType.methodType(void.class, targetClass, parameterType));
final MethodHandle factory = site.getTarget();
- final BiConsumer super ServiceMetaInfo, String> method =
- (BiConsumer super ServiceMetaInfo, String>) factory.invoke();
+ final BiConsumer