Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/cp adapter task 355 356 357 #621

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions edc-controlplane/edc-controlplane-base/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@
<groupId>org.eclipse.tractusx.edc.extensions</groupId>
<artifactId>cx-oauth2</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.tractusx.edc.extensions</groupId>
<artifactId>control-plane-adapter</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.tractusx.edc.extensions</groupId>
<artifactId>custom-jsonld</artifactId>
Expand Down
24 changes: 16 additions & 8 deletions edc-extensions/control-plane-adapter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,14 @@ Additional requirements, that affects the architecture of the extension:

<b>Configuration:</b>

| Key | Description | Mandatory | Default |
|:-------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|------|----|
| edc.cp.adapter.default.message.retry.number | Number of retries of a message, in case of an error, within the internal process of retrieving DataReference | no | 3 |
| edc.cp.adapter.default.sync.request.timeout | Timeout for synchronous request (in seconds), after witch 'timeout' error will be returned to the requesting client | no | 20 |
| edc.cp.adapter.messagebus.inmemory.thread.number | Number of threads running within the in-memory implementation of MessageBus _ _ | no | 10 |
| edc.cp.adapter.cache.contract.agreement | Turn on/off contract agreement cache for the specific asset. Once the contract is agreed, the second request for the same asset will reuse the agreement. Value 1 = on, 0 = off. | no | 1 |
| edc.cp.adapter.cache.catalog.expire.after | Number of seconds, after witch prevoiusly requested catalog will not be reused, and will be removed from catalog cache | no | 3600 |
| Key | Description | Mandatory | Default |
|:-------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|------|---------|
| edc.cp.adapter.default.message.retry.number | Number of retries of a message, in case of an error, within the internal process of retrieving DataReference | no | 3 |
| edc.cp.adapter.default.sync.request.timeout | Timeout for synchronous request (in seconds), after witch 'timeout' error will be returned to the requesting client | no | 20 |
| edc.cp.adapter.messagebus.inmemory.thread.number | Number of threads running within the in-memory implementation of MessageBus _ _ | no | 10 |
| edc.cp.adapter.reuse.contract.agreement | Turn on/off reusing of existing contract agreements for the specific asset. Once the contract is agreed, the second request for the same asset will reuse the agreement. Value 1 = on, 0 = off. | no | 1 |
| edc.cp.adapter.cache.catalog.expire.after | Number of seconds, after witch prevoiusly requested catalog will not be reused, and will be removed from catalog cache | no | 300 |
| edc.cp.adapter.catalog.request.limit | Maximum number of items taken from Catalog within single request. Requests are repeated until all offers of the query are retrieved | no | 100 |


<b>How to use it:</b>
Expand All @@ -31,8 +32,15 @@ Additional requirements, that affects the architecture of the extension:
```
http://localhost:9193/api/v1/data/adapter/asset/sync/123?providerUrl=http://localhost:8182/api/v1/ids/data
```

Oprional request parameters:

| Name | Description |
|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---|
| contractAgreementId | Defines the ID of existing contract agreement, that should be reused for retrieving the asset. If parameter is specified, but contract is not found, 404 error will be returned. |
| contractAgreementReuse | Similar to <i>edc.cp.adapter.reuse.contract.agreement</i> option allows to turn off reusing of existing contracts, but on a request level. Set the parameter value to '0' and new contract agrement will be negotiated. |

The controller is registered under the context alias of DataManagment API. The authentication depends on the DataManagement configuration.
The controller is registered under the context alias of DataManagement API. The authentication depends on the DataManagement configuration.
To find out more please visit:

[api-configuration](../../edc/extensions/control-plane/api/data-management/api-configuration/README.md)
Expand Down

This file was deleted.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
*
*/

package net.catenax.edc.cp.adapter;
package org.eclipse.tractusx.edc.cp.adapter;

import org.eclipse.edc.spi.system.ServiceExtensionContext;

Expand All @@ -25,7 +25,8 @@ public class ApiAdapterConfig {
"edc.cp.adapter.messagebus.inmemory.thread.number";
private static final String CATALOG_EXPIRE_AFTER_TIME =
"edc.cp.adapter.cache.catalog.expire.after";
private static final String CONTRACT_AGREEMENT_CACHE = "edc.cp.adapter.cache.contract.agreement";
private static final String CATALOG_REQUEST_LIMIT = "edc.cp.adapter.catalog.request.limit";
private static final String CACHE_CONTRACT_AGREEMENT = "edc.cp.adapter.cache.contract.agreement";

private final ServiceExtensionContext context;

Expand All @@ -46,10 +47,14 @@ public int getInMemoryMessageBusThreadNumber() {
}

public boolean isContractAgreementCacheOn() {
return context.getSetting(CONTRACT_AGREEMENT_CACHE, 1) != 0;
return context.getSetting(CACHE_CONTRACT_AGREEMENT, 1) != 0;
}

public int getCatalogExpireAfterTime() {
return context.getSetting(CATALOG_EXPIRE_AFTER_TIME, 3600);
}

public int getCatalogRequestLimit() {
return context.getSetting(CATALOG_REQUEST_LIMIT, 100);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,52 +12,57 @@
*
*/

package net.catenax.edc.cp.adapter;
package org.eclipse.tractusx.edc.cp.adapter;

import static java.util.Objects.nonNull;

import net.catenax.edc.cp.adapter.messaging.Channel;
import net.catenax.edc.cp.adapter.messaging.InMemoryMessageBus;
import net.catenax.edc.cp.adapter.messaging.ListenerService;
import net.catenax.edc.cp.adapter.process.contractdatastore.ContractDataStore;
import net.catenax.edc.cp.adapter.process.contractdatastore.InMemoryContractDataStore;
import net.catenax.edc.cp.adapter.process.contractnegotiation.ContractNegotiationHandler;
import net.catenax.edc.cp.adapter.process.contractnotification.ContractInMemorySyncService;
import net.catenax.edc.cp.adapter.process.contractnotification.ContractNegotiationListenerImpl;
import net.catenax.edc.cp.adapter.process.contractnotification.ContractNotificationHandler;
import net.catenax.edc.cp.adapter.process.contractnotification.ContractNotificationSyncService;
import net.catenax.edc.cp.adapter.process.contractnotification.DataTransferInitializer;
import net.catenax.edc.cp.adapter.process.datareference.DataRefInMemorySyncService;
import net.catenax.edc.cp.adapter.process.datareference.DataRefNotificationSyncService;
import net.catenax.edc.cp.adapter.process.datareference.DataReferenceHandler;
import net.catenax.edc.cp.adapter.process.datareference.EndpointDataReferenceReceiverImpl;
import net.catenax.edc.cp.adapter.service.ErrorResultService;
import net.catenax.edc.cp.adapter.service.ResultService;
import net.catenax.edc.cp.adapter.util.ExpiringMap;
import net.catenax.edc.cp.adapter.util.LockMap;
import org.eclipse.edc.connector.api.management.configuration.ManagementApiConfiguration;
import org.eclipse.edc.connector.contract.spi.negotiation.observe.ContractNegotiationListener;
import org.eclipse.edc.connector.contract.spi.negotiation.observe.ContractNegotiationObservable;
import org.eclipse.edc.connector.contract.spi.negotiation.store.ContractNegotiationStore;
import org.eclipse.edc.connector.spi.catalog.CatalogService;
import org.eclipse.edc.connector.spi.contractagreement.ContractAgreementService;
import org.eclipse.edc.connector.spi.contractnegotiation.ContractNegotiationService;
import org.eclipse.edc.connector.spi.transferprocess.TransferProcessService;
import org.eclipse.edc.connector.transfer.spi.edr.EndpointDataReferenceReceiver;
import org.eclipse.edc.connector.transfer.spi.edr.EndpointDataReferenceReceiverRegistry;
import org.eclipse.edc.runtime.metamodel.annotation.Inject;
import org.eclipse.edc.spi.message.RemoteMessageDispatcherRegistry;
import org.eclipse.edc.spi.monitor.Monitor;
import org.eclipse.edc.spi.system.ServiceExtension;
import org.eclipse.edc.spi.system.ServiceExtensionContext;
import org.eclipse.edc.transaction.spi.TransactionContext;
import org.eclipse.edc.web.spi.WebService;
import org.eclipse.tractusx.edc.cp.adapter.messaging.Channel;
import org.eclipse.tractusx.edc.cp.adapter.messaging.InMemoryMessageBus;
import org.eclipse.tractusx.edc.cp.adapter.messaging.ListenerService;
import org.eclipse.tractusx.edc.cp.adapter.process.contractnegotiation.CatalogCachedRetriever;
import org.eclipse.tractusx.edc.cp.adapter.process.contractnegotiation.CatalogRetriever;
import org.eclipse.tractusx.edc.cp.adapter.process.contractnegotiation.ContractAgreementRetriever;
import org.eclipse.tractusx.edc.cp.adapter.process.contractnegotiation.ContractNegotiationHandler;
import org.eclipse.tractusx.edc.cp.adapter.process.contractnotification.*;
import org.eclipse.tractusx.edc.cp.adapter.process.datareference.DataRefInMemorySyncService;
import org.eclipse.tractusx.edc.cp.adapter.process.datareference.DataRefNotificationSyncService;
import org.eclipse.tractusx.edc.cp.adapter.process.datareference.DataReferenceHandler;
import org.eclipse.tractusx.edc.cp.adapter.process.datareference.EndpointDataReferenceReceiverImpl;
import org.eclipse.tractusx.edc.cp.adapter.service.ErrorResultService;
import org.eclipse.tractusx.edc.cp.adapter.service.ResultService;
import org.eclipse.tractusx.edc.cp.adapter.util.ExpiringMap;
import org.eclipse.tractusx.edc.cp.adapter.util.LockMap;

public class ApiAdapterExtension implements ServiceExtension {
@Inject private Monitor monitor;
@Inject private ContractNegotiationObservable negotiationObservable;
@Inject private WebService webService;
@Inject private ContractNegotiationService contractNegotiationService;
@Inject private CatalogService catalogService;
@Inject private RemoteMessageDispatcherRegistry dispatcher;
@Inject private EndpointDataReferenceReceiverRegistry receiverRegistry;
@Inject private ManagementApiConfiguration apiConfig;
@Inject private TransferProcessService transferProcessService;
@Inject private ContractNegotiationStore contractNegotiationStore;
@Inject private TransactionContext transactionContext;
@Inject private CatalogService catalogService;
@Inject private ContractAgreementService agreementService;

@Override
public String name() {
Expand All @@ -76,7 +81,6 @@ public void initialize(ServiceExtensionContext context) {
ErrorResultService errorResultService = new ErrorResultService(monitor, messageBus);
ContractNotificationSyncService contractSyncService =
new ContractInMemorySyncService(new LockMap());
ContractDataStore contractDataStore = new InMemoryContractDataStore();
DataTransferInitializer dataTransferInitializer =
new DataTransferInitializer(monitor, transferProcessService);
ContractNotificationHandler contractNotificationHandler =
Expand All @@ -87,8 +91,7 @@ public void initialize(ServiceExtensionContext context) {
contractNegotiationService,
dataTransferInitializer);
ContractNegotiationHandler contractNegotiationHandler =
getContractNegotiationHandler(
monitor, contractNegotiationService, messageBus, contractDataStore);
getContractNegotiationHandler(monitor, contractNegotiationService, messageBus, config);
DataRefNotificationSyncService dataRefSyncService =
new DataRefInMemorySyncService(new LockMap());
DataReferenceHandler dataReferenceHandler =
Expand All @@ -102,13 +105,8 @@ public void initialize(ServiceExtensionContext context) {

initHttpController(monitor, messageBus, resultService, config);
initContractNegotiationListener(
monitor,
negotiationObservable,
messageBus,
contractSyncService,
contractDataStore,
dataTransferInitializer);
initDataReferenceReciever(monitor, messageBus, dataRefSyncService);
monitor, negotiationObservable, messageBus, contractSyncService, dataTransferInitializer);
initDataReferenceReceiver(monitor, messageBus, dataRefSyncService);
}

private void initHttpController(
Expand All @@ -125,17 +123,18 @@ private ContractNegotiationHandler getContractNegotiationHandler(
Monitor monitor,
ContractNegotiationService contractNegotiationService,
InMemoryMessageBus messageBus,
ContractDataStore contractDataStore) {
ApiAdapterConfig config) {
return new ContractNegotiationHandler(
monitor,
messageBus,
contractNegotiationService,
catalogService,
contractDataStore,
new ExpiringMap<>());
new CatalogCachedRetriever(
new CatalogRetriever(config.getCatalogRequestLimit(), catalogService),
new ExpiringMap<>()),
new ContractAgreementRetriever(monitor, agreementService));
}

private void initDataReferenceReciever(
private void initDataReferenceReceiver(
Monitor monitor,
InMemoryMessageBus messageBus,
DataRefNotificationSyncService dataRefSyncService) {
Expand All @@ -149,11 +148,10 @@ private void initContractNegotiationListener(
ContractNegotiationObservable negotiationObservable,
InMemoryMessageBus messageBus,
ContractNotificationSyncService contractSyncService,
ContractDataStore contractDataStore,
DataTransferInitializer dataTransferInitializer) {
ContractNegotiationListener contractNegotiationListener =
new ContractNegotiationListenerImpl(
monitor, messageBus, contractSyncService, contractDataStore, dataTransferInitializer);
monitor, messageBus, contractSyncService, dataTransferInitializer);
if (nonNull(negotiationObservable)) {
negotiationObservable.registerListener(contractNegotiationListener);
}
Expand Down
Loading