Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(irs-apiirs-edc-client):[#256] added cache mechanism for edr tokens #679

Merged
merged 17 commits into from
Jan 8, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
bf1f6b7
feat(irs-apiirs-edc-client):[#256] added cache mechanism for edr tokens
ds-psosnowski Dec 1, 2023
645ee4f
feat(irs-apiirs-edc-client):[#256] fix for potential null pointer exc…
ds-psosnowski Dec 4, 2023
cb2c223
feat(irs-apiirs-edc-client):[#256] removed unnecessary stubbing
ds-psosnowski Dec 4, 2023
1e2fb19
feat(irs-apiirs-edc-client):[#256] sonar finding fix
ds-psosnowski Dec 4, 2023
e8158a6
feat(irs-apiirs-edc-client):[#256] fix findbug findings
ds-psosnowski Dec 4, 2023
54f62bc
feat(irs-apiirs-edc-client):[#256] fix findbug findings
ds-psosnowski Dec 4, 2023
2495ac1
feat(irs-apiirs-edc-client):[#256] address code smells
ds-psosnowski Dec 4, 2023
9654457
feat(irs-apiirs-edc-client):[#256] removed dependency
ds-psosnowski Dec 4, 2023
6b8e0db
feat(irs-api):[#256] spotbug fix
ds-psosnowski Dec 6, 2023
d9d43af
feat(irs-api):[#256] spotbug fix
ds-psosnowski Dec 6, 2023
2f3994f
feat(irs-registry-client):[#256] added cache for send notification
ds-psosnowski Dec 8, 2023
1e321b5
feat(irs-registry-client):[#256] removed unused method
ds-psosnowski Dec 8, 2023
bf3a437
Merge branch 'main' into feat/256-edr-token-caching
ds-psosnowski Dec 8, 2023
667f575
feat(irs-registry-client):[#256] added changelog record
ds-psosnowski Dec 11, 2023
a249df9
feat(irs-registry-client):[#202] merge main
ds-psosnowski Dec 14, 2023
20eff99
feat(irs-registry-client):[#256] corrections after review
ds-psosnowski Dec 20, 2023
e5de031
feat(irs-registry-client):[#256] added test
ds-psosnowski Dec 20, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,15 @@
import org.eclipse.tractusx.irs.edc.client.exceptions.UsagePolicyException;
import org.eclipse.tractusx.irs.edc.client.model.CatalogItem;
import org.eclipse.tractusx.irs.edc.client.model.ContractOfferDescription;
import org.eclipse.tractusx.irs.edc.client.model.EDRAuthCode;
import org.eclipse.tractusx.irs.edc.client.model.NegotiationRequest;
import org.eclipse.tractusx.irs.edc.client.model.NegotiationResponse;
import org.eclipse.tractusx.irs.edc.client.model.Response;
import org.eclipse.tractusx.irs.edc.client.model.TransferProcessDataDestination;
import org.eclipse.tractusx.irs.edc.client.model.TransferProcessRequest;
import org.eclipse.tractusx.irs.edc.client.model.TransferProcessResponse;
import org.eclipse.tractusx.irs.edc.client.policy.PolicyCheckerService;
import org.eclipse.tractusx.irs.edc.client.util.EndpointDataReferenceStatus;
import org.springframework.stereotype.Service;

/**
Expand All @@ -56,45 +58,83 @@ public class ContractNegotiationService {

public static final String EDC_PROTOCOL = "dataspace-protocol-http";
private final EdcControlPlaneClient edcControlPlaneClient;

private final PolicyCheckerService policyCheckerService;

private final EdcConfiguration config;

public NegotiationResponse negotiate(final String providerConnectorUrl, final CatalogItem catalogItem)
throws ContractNegotiationException, UsagePolicyException, TransferProcessException {
if (!policyCheckerService.isValid(catalogItem.getPolicy())) {
log.info("Policy was not allowed, canceling negotiation.");
throw new UsagePolicyException(catalogItem.getItemId());
}

final NegotiationRequest negotiationRequest = createNegotiationRequestFromCatalogItem(providerConnectorUrl,
catalogItem);
return this.negotiate(providerConnectorUrl, catalogItem,
new EndpointDataReferenceStatus(null, EndpointDataReferenceStatus.TokenStatus.REQUIRED_NEW));
}

final Response negotiationId = edcControlPlaneClient.startNegotiations(negotiationRequest);
@SuppressWarnings("PMD.AvoidReassigningParameters")
ds-psosnowski marked this conversation as resolved.
Show resolved Hide resolved
public NegotiationResponse negotiate(final String providerConnectorUrl, final CatalogItem catalogItem,
EndpointDataReferenceStatus endpointDataReferenceStatus)
throws ContractNegotiationException, UsagePolicyException, TransferProcessException {

log.info("Fetch negotiation id: {}", negotiationId.getResponseId());
if (endpointDataReferenceStatus == null) {
log.info(
"Missing information about endpoint data reference from storage, setting token status to REQUIRED_NEW.");
endpointDataReferenceStatus = new EndpointDataReferenceStatus(null,
EndpointDataReferenceStatus.TokenStatus.REQUIRED_NEW);
}

final CompletableFuture<NegotiationResponse> responseFuture = edcControlPlaneClient.getNegotiationResult(
negotiationId);
final NegotiationResponse negotiationResponse = Objects.requireNonNull(getNegotiationResponse(responseFuture));
NegotiationResponse negotiationResponse = null;
ds-psosnowski marked this conversation as resolved.
Show resolved Hide resolved
String contractAgreementId = null;
ds-psosnowski marked this conversation as resolved.
Show resolved Hide resolved

switch (endpointDataReferenceStatus.tokenStatus()) {
case REQUIRED_NEW -> {
final CompletableFuture<NegotiationResponse> responseFuture = startNewNegotiation(providerConnectorUrl,
catalogItem);
negotiationResponse = Objects.requireNonNull(getNegotiationResponse(responseFuture));
contractAgreementId = negotiationResponse.getContractAgreementId();
}
case EXPIRED -> {
contractAgreementId = EDRAuthCode.fromAuthCodeToken(
Objects.requireNonNull(endpointDataReferenceStatus.endpointDataReference().getAuthKey())).getCid();
log.info(
"Cached endpoint data reference has expired token. Refreshing token without new contract negotiation for contractAgreementId: {}",
contractAgreementId);
github-advanced-security[bot] marked this conversation as resolved.
Fixed
Show resolved Hide resolved
}
case VALID -> throw new IllegalStateException(
"Token is present and valid. Contract negotiation should not be started.");
default -> throw new IllegalStateException(
"Unknown token status.");
}

final TransferProcessRequest transferProcessRequest = createTransferProcessRequest(providerConnectorUrl,
catalogItem, negotiationResponse);
catalogItem, contractAgreementId);

final Response transferProcessId = edcControlPlaneClient.startTransferProcess(transferProcessRequest);

// can be added to cache after completed
final CompletableFuture<TransferProcessResponse> transferProcessFuture = edcControlPlaneClient.getTransferProcess(
transferProcessId);
final TransferProcessResponse transferProcessResponse = Objects.requireNonNull(
getTransferProcessResponse(transferProcessFuture));
log.info("Transfer process completed for transferProcessId: {}", transferProcessResponse.getResponseId());

return negotiationResponse;
}

private CompletableFuture<NegotiationResponse> startNewNegotiation(final String providerConnectorUrl,
final CatalogItem catalogItem) throws UsagePolicyException {
log.info("Staring new contract negotiation.");

if (!policyCheckerService.isValid(catalogItem.getPolicy())) {
log.info("Policy was not allowed, canceling negotiation.");
throw new UsagePolicyException(catalogItem.getItemId());
}

final NegotiationRequest negotiationRequest = createNegotiationRequestFromCatalogItem(providerConnectorUrl,
catalogItem);
final Response negotiationId = edcControlPlaneClient.startNegotiations(negotiationRequest);
log.info("Fetch negotiation id: {}", negotiationId.getResponseId());

return edcControlPlaneClient.getNegotiationResult(negotiationId);
}

private TransferProcessRequest createTransferProcessRequest(final String providerConnectorUrl,
final CatalogItem catalogItem, final NegotiationResponse response) {
final CatalogItem catalogItem, final String agreementId) {
final var destination = DataAddress.Builder.newInstance()
.type(TransferProcessDataDestination.DEFAULT_TYPE)
.build();
Expand All @@ -105,7 +145,7 @@ private TransferProcessRequest createTransferProcessRequest(final String provide
TransferProcessRequest.DEFAULT_MANAGED_RESOURCES)
.connectorId(catalogItem.getConnectorId())
.connectorAddress(providerConnectorUrl)
.contractId(response.getContractAgreementId())
.contractId(agreementId)
.assetId(catalogItem.getAssetPropId())
.dataDestination(destination);
if (StringUtils.isNotBlank(config.getCallbackUrl())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@
package org.eclipse.tractusx.irs.edc.client;

import static org.eclipse.tractusx.irs.edc.client.configuration.JsonLdConfiguration.NAMESPACE_EDC_ID;
import static org.eclipse.tractusx.irs.edc.client.util.EndpointDataReferenceStatus.TokenStatus;

import java.net.URI;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

import io.github.resilience4j.retry.Retry;
import io.github.resilience4j.retry.RetryRegistry;
Expand All @@ -45,6 +47,8 @@
import org.eclipse.tractusx.irs.edc.client.model.notification.EdcNotification;
import org.eclipse.tractusx.irs.edc.client.model.notification.EdcNotificationResponse;
import org.eclipse.tractusx.irs.edc.client.model.notification.NotificationContent;
import org.eclipse.tractusx.irs.edc.client.util.EndpointDataReferenceCacheService;
import org.eclipse.tractusx.irs.edc.client.util.EndpointDataReferenceStatus;
import org.eclipse.tractusx.irs.edc.client.util.Masker;
import org.springframework.context.annotation.Profile;
import org.springframework.stereotype.Service;
Expand All @@ -64,6 +68,10 @@ CompletableFuture<EdcNotificationResponse> sendNotification(String submodelEndpo

CompletableFuture<EndpointDataReference> getEndpointReferenceForAsset(String endpointAddress, String filterKey,
String filterValue) throws EdcClientException;

CompletableFuture<EndpointDataReference> getEndpointReferenceForAsset(String endpointAddress, String filterKey,
String filterValue, EndpointDataReferenceStatus cachedEndpointDataReference)
throws EdcClientException;
}

/**
Expand Down Expand Up @@ -98,6 +106,13 @@ public CompletableFuture<EdcNotificationResponse> sendNotification(final String
return CompletableFuture.completedFuture(() -> true);
}

@Override
public CompletableFuture<EndpointDataReference> getEndpointReferenceForAsset(final String endpointAddress,
final String filterKey, final String filterValue,
final EndpointDataReferenceStatus cachedEndpointDataReference) throws EdcClientException {
throw new EdcClientException("Not implemented");
}

@Override
public CompletableFuture<EndpointDataReference> getEndpointReferenceForAsset(final String endpointAddress,
final String filterKey, final String filterValue) throws EdcClientException {
Expand All @@ -122,6 +137,7 @@ class EdcSubmodelClientImpl implements EdcSubmodelClient {
private final AsyncPollingService pollingService;
private final RetryRegistry retryRegistry;
private final EDCCatalogFacade catalogFacade;
private final EndpointDataReferenceCacheService endpointDataReferenceCacheService;
private final UrlValidator urlValidator = new UrlValidator(UrlValidator.ALLOW_LOCAL_URLS);

private static void stopWatchOnEdcTask(final StopWatch stopWatch) {
Expand All @@ -131,6 +147,7 @@ private static void stopWatchOnEdcTask(final StopWatch stopWatch) {

private NegotiationResponse fetchNegotiationResponseWithFilter(final String connectorEndpoint, final String assetId)
throws EdcClientException {

final StopWatch stopWatch = new StopWatch();
stopWatch.start("Get EDC Submodel task for shell descriptor, endpoint " + connectorEndpoint);

Expand All @@ -141,6 +158,7 @@ private NegotiationResponse fetchNegotiationResponseWithFilter(final String conn
.findFirst()
.orElseThrow(() -> new ItemNotFoundInCatalogException(connectorEndpoint,
assetId));

return contractNegotiationService.negotiate(connectorEndpoint, catalogItem);
}

Expand All @@ -156,24 +174,19 @@ private CompletableFuture<EdcNotificationResponse> sendNotificationAsync(final S

}

private Optional<String> retrieveSubmodelData(final String submodelDataplaneUrl, final String contractAgreementId,
final StopWatch stopWatch) {
final Optional<EndpointDataReference> dataReference = retrieveEndpointDataReference(contractAgreementId);
private Optional<String> retrieveSubmodelData(final String submodelDataplaneUrl, final StopWatch stopWatch,
final EndpointDataReference endpointDataReference) {
log.info("Retrieving data from EDC data plane for dataReference with id {}", endpointDataReference.getId());
final String data = edcDataPlaneClient.getData(endpointDataReference, submodelDataplaneUrl);
stopWatchOnEdcTask(stopWatch);

if (dataReference.isPresent()) {
final EndpointDataReference ref = dataReference.get();
log.info("Retrieving data from EDC data plane for dataReference with id {}", ref.getId());
final String data = edcDataPlaneClient.getData(ref, submodelDataplaneUrl);
stopWatchOnEdcTask(stopWatch);

return Optional.of(data);
}
return Optional.empty();
return Optional.of(data);
}

private Optional<EndpointDataReference> retrieveEndpointReference(final String contractAgreementId,
final StopWatch stopWatch) {
final Optional<EndpointDataReference> dataReference = retrieveEndpointDataReference(contractAgreementId);
final Optional<EndpointDataReference> dataReference = retrieveEndpointDataReferenceByContractAgreementId(
ds-psosnowski marked this conversation as resolved.
Show resolved Hide resolved
contractAgreementId);

if (dataReference.isPresent()) {
final EndpointDataReference ref = dataReference.get();
Expand All @@ -187,7 +200,8 @@ private Optional<EndpointDataReference> retrieveEndpointReference(final String c

private Optional<EdcNotificationResponse> sendSubmodelNotification(final String contractAgreementId,
final EdcNotification<NotificationContent> notification, final StopWatch stopWatch) {
final Optional<EndpointDataReference> dataReference = retrieveEndpointDataReference(contractAgreementId);
final Optional<EndpointDataReference> dataReference = retrieveEndpointDataReferenceByContractAgreementId(
contractAgreementId);

if (dataReference.isPresent()) {
final EndpointDataReference ref = dataReference.get();
Expand All @@ -208,21 +222,49 @@ public CompletableFuture<String> getSubmodelRawPayload(final String connectorEnd
final StopWatch stopWatch = new StopWatch();
stopWatch.start("Get EDC Submodel task for raw payload, endpoint " + connectorEndpoint);

final var negotiationEndpoint = appendSuffix(connectorEndpoint,
config.getControlplane().getProviderSuffix());
log.debug("Starting negotiation with EDC endpoint: '{}'", negotiationEndpoint);
final NegotiationResponse negotiationResponse = fetchNegotiationResponseWithFilter(negotiationEndpoint,
assetId);
final EndpointDataReference endpointDataReference = getEndpointDataReference(connectorEndpoint, assetId);

return pollingService.<String>createJob()
.action(() -> retrieveSubmodelData(submodelDataplaneUrl,
negotiationResponse.getContractAgreementId(), stopWatch))
.action(() -> retrieveSubmodelData(submodelDataplaneUrl, stopWatch,
endpointDataReference))
.timeToLive(config.getSubmodel().getRequestTtl())
.description("waiting for submodel retrieval")
.build()
.schedule();
});
}

@SuppressWarnings("PMD.ConfusingTernary")
ds-psosnowski marked this conversation as resolved.
Show resolved Hide resolved
private EndpointDataReference getEndpointDataReference(final String connectorEndpoint, final String assetId)
throws EdcClientException {
final EndpointDataReferenceStatus cachedEndpointDataReference = endpointDataReferenceCacheService.getEndpointDataReference(
assetId);
EndpointDataReference endpointDataReference;

if (cachedEndpointDataReference.tokenStatus() != TokenStatus.VALID) {
endpointDataReference = getEndpointDataReferenceAndAddToStorage(connectorEndpoint, assetId,
cachedEndpointDataReference);
} else {
endpointDataReference = cachedEndpointDataReference.endpointDataReference();
}

return endpointDataReference;
}

private EndpointDataReference getEndpointDataReferenceAndAddToStorage(final String connectorEndpoint,
final String assetId, final EndpointDataReferenceStatus cachedEndpointDataReference)
throws EdcClientException {
try {
final EndpointDataReference endpointDataReference = getEndpointReferenceForAsset(connectorEndpoint,
NAMESPACE_EDC_ID, assetId, cachedEndpointDataReference).get();
endpointDataReferenceStorage.put(assetId, endpointDataReference);

return endpointDataReference;
} catch (InterruptedException | ExecutionException e) {
throw new EdcClientException(e);
}
}

@Override
public CompletableFuture<EdcNotificationResponse> sendNotification(final String connectorEndpoint,
final String assetId, final EdcNotification<NotificationContent> notification) throws EdcClientException {
Expand All @@ -241,8 +283,17 @@ public CompletableFuture<EdcNotificationResponse> sendNotification(final String
@Override
public CompletableFuture<EndpointDataReference> getEndpointReferenceForAsset(final String endpointAddress,
final String filterKey, final String filterValue) throws EdcClientException {
return getEndpointReferenceForAsset(endpointAddress, filterKey, filterValue,
new EndpointDataReferenceStatus(null, TokenStatus.REQUIRED_NEW));
}

@Override
public CompletableFuture<EndpointDataReference> getEndpointReferenceForAsset(final String endpointAddress,
final String filterKey, final String filterValue,
final EndpointDataReferenceStatus endpointDataReferenceStatus) throws EdcClientException {
return execute(endpointAddress, () -> {
final StopWatch stopWatch = new StopWatch();

stopWatch.start("Get EDC Submodel task for shell descriptor, endpoint " + endpointAddress);
final String providerWithSuffix = appendSuffix(endpointAddress,
config.getControlplane().getProviderSuffix());
Expand All @@ -251,7 +302,7 @@ public CompletableFuture<EndpointDataReference> getEndpointReferenceForAsset(fin
filterValue);

final NegotiationResponse response = contractNegotiationService.negotiate(providerWithSuffix,
items.stream().findFirst().orElseThrow());
items.stream().findFirst().orElseThrow(), endpointDataReferenceStatus);

return pollingService.<EndpointDataReference>createJob()
.action(() -> retrieveEndpointReference(response.getContractAgreementId(), stopWatch))
Expand All @@ -275,7 +326,8 @@ private String appendSuffix(final String endpointAddress, final String providerS
return addressWithSuffix;
}

private Optional<EndpointDataReference> retrieveEndpointDataReference(final String contractAgreementId) {
private Optional<EndpointDataReference> retrieveEndpointDataReferenceByContractAgreementId(
final String contractAgreementId) {
log.info("Retrieving dataReference from storage for contractAgreementId {}", Masker.mask(contractAgreementId));
return endpointDataReferenceStorage.remove(contractAgreementId);
ds-psosnowski marked this conversation as resolved.
Show resolved Hide resolved
}
Expand Down
Loading