Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support horizontal edc scaling in cp adapter extension #678

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 32 additions & 18 deletions edc-extensions/control-plane-adapter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,36 @@ The goal of this extension is to simplify the process of retrieving data out of

Additional requirements, that affects the architecture of the extension:
- can return data both in SYNC and ASYNC mode (currently only SYNC endpoint available)
- can be persistent, so that process can be restored from the point where it was before application was stopped (not implemented yet)
- prepared to scale horizontally (not yet implemented)
- can be persistent, so that process can be restored from the point where it was before application was stopped
- scaling horizontally (when persistence is added to configuration)
- can retry failed part of the process (no need to start the process from the beginning)

<b>Configuration:</b>
## Configuration:

| Key | Description | Mandatory | Default |
|:-------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|------|---------|
| edc.cp.adapter.default.message.retry.number | Number of retries of a message, in case of an error, within the internal process of retrieving DataReference | no | 3 |
| edc.cp.adapter.default.sync.request.timeout | Timeout for synchronous request (in seconds), after witch 'timeout' error will be returned to the requesting client | no | 20 |
| edc.cp.adapter.messagebus.inmemory.thread.number | Number of threads running within the in-memory implementation of MessageBus _ _ | no | 10 |
| edc.cp.adapter.reuse.contract.agreement | Turn on/off reusing of existing contract agreements for the specific asset. Once the contract is agreed, the second request for the same asset will reuse the agreement. Value 1 = on, 0 = off. | no | 1 |
| edc.cp.adapter.cache.catalog.expire.after | Number of seconds, after witch prevoiusly requested catalog will not be reused, and will be removed from catalog cache | no | 300 |
| Key | Description | Mandatory | Default |
|:-------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|------|---------|
| edc.cp.adapter.default.message.retry.number | Number of retries of a message, in case of an error, within the internal process of retrieving DataReference | no | 3 |
| edc.cp.adapter.default.sync.request.timeout | Timeout for synchronous request (in seconds), after witch 'timeout' error will be returned to the requesting client | no | 20 |
| edc.cp.adapter.messagebus.inmemory.thread.number | Number of threads running within the in-memory implementation of MessageBus _ _ | no | 10 |
| edc.cp.adapter.reuse.contract.agreement | Turn on/off reusing of existing contract agreements for the specific asset. Once the contract is agreed, the second request for the same asset will reuse the agreement (if exists) pulled from the EDC. | no | true |
| edc.cp.adapter.cache.catalog.expire.after | Number of seconds, after witch prevoiusly requested catalog will not be reused, and will be removed from catalog cache | no | 300 |
| edc.cp.adapter.catalog.request.limit | Maximum number of items taken from Catalog within single request. Requests are repeated until all offers of the query are retrieved | no | 100 |

By default, the extension works in "IN MEMORY" mode. This setup has some limitations:
+ It can work only within single EDC instance. If CP-adapter requests are handled by more than one EDC, data flow may be broken.
+ If the EDC instance is restarted, all running processes are lost.

<b>How to use it:</b>
To run CP-Adapter in "PERSISTENT" mode, You need to create a proper tables with [this](docs/schema.sql) script, and add the following configuration values to Your control-plane EDC properties file:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@DominikPinsel can we handle this initial db initialisation somehow with the migration extension?

I think we should avoid, that the operator has to do something manually in the DB (expect creating DBs)!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


| Key | Description |
|-----------------------------------|-------------|
| edc.datasource.cpadapter.name | data source name |
| edc.datasource.cpadapter.url | data source url |
| edc.datasource.cpadapter.user | data source user |
| edc.datasource.cpadapter.password | data source password |


## How to use it:
1. Client sends a GET request with two parameters: assetId and the url of the provider control-plane:

```
Expand All @@ -33,13 +46,14 @@ Additional requirements, that affects the architecture of the extension:
http://localhost:9193/api/v1/data/adapter/asset/sync/123?providerUrl=http://localhost:8182/api/v1/ids/data
```

Optional request parameters:

| Name | Description |
|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---|
| contractAgreementId | Defines the ID of existing contract agreement, that should be reused for retrieving the asset. If parameter is specified, but contract is not found, 404 error will be returned. |
| contractAgreementReuse | Similar to <i>edc.cp.adapter.reuse.contract.agreement</i> option allows to turn off reusing of existing contracts, but on a request level. Set the parameter value to '0' and new contract agrement will be negotiated. |
Optional request parameters, that overwrite the settings for a single request:

| Name | Description |
|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|--- |
| contractAgreementId | Defines the ID of existing contract agreement, that should be reused for retrieving the asset. If parameter is specified, but contract is not found, 404 error will be returned. |
| contractAgreementReuse | Similar to <i>edc.cp.adapter.reuse.contract.agreement</i> option allows to turn off reusing of existing contracts, but on a request level. Set the parameter value to 'false' and new contract agrement will be negotiated. |
| timeout | Similar to <i>edc.cp.adapter.default.sync.request.timeout</i>, defines the maximum time of the request. If data is not ready, time out error will be returned. |

The controller is registered under the context alias of DataManagement API. The authentication depends on the DataManagement configuration.
To find out more please visit:

Expand Down Expand Up @@ -70,7 +84,7 @@ Additional requirements, that affects the architecture of the extension:
header: Authorization:eyJhbGciOiJSUzI1NiJ9.eyJkYWQiOi... {authKey:authCode}
```

<b>Internal design of the extension:</b>
### Internal design of the extension:

![diagram](src/main/resources/control-plane-adapter.jpg)

Expand Down
54 changes: 54 additions & 0 deletions edc-extensions/control-plane-adapter/docs/schema.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
--
-- Copyright (c) 2022 ZF Friedrichshafen AG
--
-- This program and the accompanying materials are made available under the
-- terms of the Apache License, Version 2.0 which is available at
-- https://www.apache.org/licenses/LICENSE-2.0
--
-- SPDX-License-Identifier: Apache-2.0
--
-- Contributors:
-- ZF Friedrichshafen AG - Initial SQL Query
--

-- Statements are designed for and tested with Postgres only!


CREATE TABLE IF NOT EXISTS edc_lease
(
leased_by VARCHAR NOT NULL,
leased_at BIGINT,
lease_duration INTEGER NOT NULL,
lease_id VARCHAR NOT NULL
CONSTRAINT lease_pk
PRIMARY KEY
);

CREATE TABLE IF NOT EXISTS edc_cpadapter_queue
(
id VARCHAR NOT NULL,
created_at BIGINT NOT NULL,
channel VARCHAR,
message JSON,
invoke_after BIGINT NOT NULL,
lease_id VARCHAR
CONSTRAINT cpadapter_queue_lease_lease_id_fk
REFERENCES edc_lease
ON DELETE SET NULL,
PRIMARY KEY (id)
);

CREATE UNIQUE INDEX IF NOT EXISTS edc_cpadapter_queue_id_uindex
ON edc_cpadapter_queue (id);

CREATE TABLE IF NOT EXISTS edc_cpadapter_object_store
(
id VARCHAR NOT NULL,
created_at BIGINT NOT NULL,
type VARCHAR,
object JSON,
PRIMARY KEY (id)
);



22 changes: 22 additions & 0 deletions edc-extensions/control-plane-adapter/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,28 @@
<artifactId>aggregate-service-spi</artifactId>
</dependency>

<dependency>
<groupId>org.eclipse.edc</groupId>
<artifactId>sql-core</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.edc</groupId>
<artifactId>sql-lease</artifactId>
<version>${org.eclipse.edc.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.edc</groupId>
<artifactId>sql-pool-apache-commons</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.edc</groupId>
<artifactId>transaction-datasource-spi</artifactId>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
</dependency>

<!-- External Dependencies -->
<dependency>
<groupId>org.projectlombok</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,24 @@ public class ApiAdapterConfig {
"edc.cp.adapter.default.message.retry.number";
private static final String DEFAULT_SYNC_REQUEST_TIMEOUT =
"edc.cp.adapter.default.sync.request.timeout";
private static final String IN_MEMORY_MESSAGE_BUS_THREAD_NUMBER =
"edc.cp.adapter.messagebus.inmemory.thread.number";
private static final String CATALOG_EXPIRE_AFTER_TIME =
"edc.cp.adapter.cache.catalog.expire.after";
private static final String CATALOG_REQUEST_LIMIT = "edc.cp.adapter.catalog.request.limit";
private static final String REUSE_CONTRACT_AGREEMENT = "edc.cp.adapter.reuse.contract.agreement";
private static final String CATALOG_REQUEST_LIMIT = "edc.cp.adapter.catalog.request.limit";

private static final String DATASOURCE_NAME = "edc.datasource.cpadapter.name";
private static final String DATASOURCE_URL = "edc.datasource.cpadapter.url";
private static final String DATASOURCE_USER = "edc.datasource.cpadapter.user";
private static final String DATASOURCE_PASS = "edc.datasource.cpadapter.password";

private static final String IN_MEMORY_MESSAGE_BUS_THREAD_NUMBER =
"edc.cp.adapter.messagebus.inmemory.thread.number";
private static final String SQL_MESSAGE_BUS_THREAD_NUMBER =
"edc.cp.adapter.messagebus.sql.thread.number";
private static final String SQL_MESSAGE_BUS_MAX_DELIVERY =
"edc.cp.adapter.messagebus.sql.max.delivery";
private static final String SQL_MESSAGE_BUS_DELIVERY_INTERVAL =
"edc.cp.adapter.messagebus.sql.max.delivery";

private final ServiceExtensionContext context;

Expand All @@ -47,14 +59,42 @@ public int getInMemoryMessageBusThreadNumber() {
}

public boolean isContractAgreementReuseOn() {
return context.getSetting(REUSE_CONTRACT_AGREEMENT, 1) != 0;
return context.getSetting(REUSE_CONTRACT_AGREEMENT, true);
}

public int getCatalogExpireAfterTime() {
return context.getSetting(CATALOG_EXPIRE_AFTER_TIME, 3600);
return context.getSetting(CATALOG_EXPIRE_AFTER_TIME, 180);
}

public int getCatalogRequestLimit() {
return context.getSetting(CATALOG_REQUEST_LIMIT, 100);
}

public String getDataSourceName() {
return context.getSetting(DATASOURCE_NAME, "cpadapter");
}

public String getDataSourceUrl() {
return context.getSetting(DATASOURCE_URL, null);
}

public String getDataSourceUser() {
return context.getSetting(DATASOURCE_USER, null);
}

public String getDataSourcePass() {
return context.getSetting(DATASOURCE_PASS, null);
}

public int getSqlMessageBusThreadNumber() {
return context.getSetting(SQL_MESSAGE_BUS_THREAD_NUMBER, 10);
}

public int getSqlMessageBusMaxDelivery() {
return context.getSetting(SQL_MESSAGE_BUS_MAX_DELIVERY, 10);
}

public int getSqlMessageBusDeliveryInterval() {
return context.getSetting(SQL_MESSAGE_BUS_DELIVERY_INTERVAL, 1);
}
}
Loading