Skip to content

Commit

Permalink
Merge pull request #862 from eclipse-tractusx/fix/755-duplicate-callb…
Browse files Browse the repository at this point in the history
…acks

fix(irs-api):[#755] fix job callbacks called multiple times after com…
  • Loading branch information
ds-jhartmann authored Aug 2, 2024
2 parents 866d50f + 072a0f1 commit 513bc56
Show file tree
Hide file tree
Showing 6 changed files with 264 additions and 87 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ _**For better traceability add the corresponding GitHub issue number in each cha
- Change policy to include full namespace `https://w3id.org/catenax/policy/` instead of `cx-policy:`
in some remaining code places (in context of #794).
- Fixed flaky test `InMemoryJobStoreTest.checkLastModifiedOnAfterCreation()` (PR#857).
- Fixed occasion where completed Job callbacks are called multiple times. #755

### Changed

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
import static org.eclipse.tractusx.irs.configuration.RestTemplateConfig.NO_ERROR_REST_TEMPLATE;

import java.net.URI;
import java.time.Duration;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
Expand Down Expand Up @@ -60,87 +63,120 @@ class CallbackResponderEventListener {
public static final String INVALID_CALLBACK_URL = "Invalid callback url '{}'.";
private final UrlValidator urlValidator;
private final RestTemplate restTemplate;
private final Map<String, Long> completedCallbacks;

/* package */ CallbackResponderEventListener(
@Qualifier(NO_ERROR_REST_TEMPLATE) final RestTemplate noErrorRestTemplate) {
this.urlValidator = new UrlValidator(UrlValidator.ALLOW_LOCAL_URLS);
this.restTemplate = noErrorRestTemplate;
this.completedCallbacks = new HashMap<>();
}

@Async
@EventListener
public void handleJobProcessingFinishedEvent(final JobProcessingFinishedEvent jobProcessingFinishedEvent) {
if (thereIsCallbackUrlRegistered(jobProcessingFinishedEvent.callbackUrl())) {
log.info("Processing of job has finished - attempting to notify job requestor");

final URI callbackUri = buildCallbackUri(jobProcessingFinishedEvent.callbackUrl(),
jobProcessingFinishedEvent.jobId(), JobState.valueOf(jobProcessingFinishedEvent.jobState()));
if (urlValidator.isValid(callbackUri.toString())) {
log.info("Got callback url {} for jobId {} with state {}", callbackUri,
jobProcessingFinishedEvent.jobId(), jobProcessingFinishedEvent.jobState());

try {
final ResponseEntity<Void> callbackResponse = restTemplate.getForEntity(callbackUri, Void.class);
log.info("Callback url pinged, received http status: {}, jobId {}", callbackResponse.getStatusCode(), jobProcessingFinishedEvent.jobId());
} catch (final ResourceAccessException resourceAccessException) {
log.warn("Callback url is not reachable - connection timed out, jobId {}", jobProcessingFinishedEvent.jobId());
}
} else {
log.warn(INVALID_CALLBACK_URL, callbackUri);
}
if (StringUtils.isBlank(jobProcessingFinishedEvent.callbackUrl())) {
return;
}
log.info("Processing of job has finished - attempting to notify job requestor");

final URI callbackUri = buildCallbackUri(jobProcessingFinishedEvent.callbackUrl(),
jobProcessingFinishedEvent.jobId(), JobState.valueOf(jobProcessingFinishedEvent.jobState()));

if (!urlValidator.isValid(callbackUri.toString())) {
log.warn(INVALID_CALLBACK_URL, callbackUri);
return;
}

log.info("Got callback url '{}' for jobId '{}' with state '{}'", callbackUri, jobProcessingFinishedEvent.jobId(),
jobProcessingFinishedEvent.jobState());
sendCallback(callbackUri, jobProcessingFinishedEvent.jobId());
}

@Async
@EventListener
public void handleBatchProcessingFinishedEvent(final BatchProcessingFinishedEvent batchProcessingFinishedEvent) {
if (thereIsCallbackUrlRegistered(batchProcessingFinishedEvent.callbackUrl())) {
log.info("Processing of Batch has finished - attempting to notify requestor");

final URI callbackUri = buildCallbackUri(batchProcessingFinishedEvent.callbackUrl(), batchProcessingFinishedEvent.batchOrderId(),
batchProcessingFinishedEvent.batchId(), batchProcessingFinishedEvent.batchOrderState(), batchProcessingFinishedEvent.batchState());
if (urlValidator.isValid(callbackUri.toString())) {
log.info("Got callback url {} for orderId {} with orderState {} and batchId {} with batchState {}", callbackUri,
batchProcessingFinishedEvent.batchOrderId(), batchProcessingFinishedEvent.batchOrderState(), batchProcessingFinishedEvent.batchId(), batchProcessingFinishedEvent.batchState());

try {
final ResponseEntity<Void> callbackResponse = restTemplate.getForEntity(callbackUri, Void.class);
log.info("Callback url pinged, received http status: {}, orderId {} batchId {}", callbackResponse.getStatusCode(), batchProcessingFinishedEvent.batchOrderId(), batchProcessingFinishedEvent.batchId());
} catch (final ResourceAccessException resourceAccessException) {
log.warn("Callback url is not reachable - connection timed out, orderId {} batchId {}", batchProcessingFinishedEvent.batchOrderId(), batchProcessingFinishedEvent.batchId());
}
} else {
log.warn(INVALID_CALLBACK_URL, callbackUri);
}
if (StringUtils.isBlank(batchProcessingFinishedEvent.callbackUrl())) {
return;
}
log.info("Processing of Batch has finished - attempting to notify requestor");

final URI callbackUri = buildCallbackUri(batchProcessingFinishedEvent.callbackUrl(),
batchProcessingFinishedEvent.batchOrderId(), batchProcessingFinishedEvent.batchId(),
batchProcessingFinishedEvent.batchOrderState(), batchProcessingFinishedEvent.batchState());

if (!urlValidator.isValid(callbackUri.toString())) {
log.warn(INVALID_CALLBACK_URL, callbackUri);
return;
}

log.info("Got callback url '{}' for orderId '{}' with orderState '{}' and batchId '{}' with batchState '{}'", callbackUri,
batchProcessingFinishedEvent.batchOrderId(), batchProcessingFinishedEvent.batchOrderState(),
batchProcessingFinishedEvent.batchId(), batchProcessingFinishedEvent.batchState());
sendCallback(callbackUri, batchProcessingFinishedEvent.batchId().toString());
}

@Async
@EventListener
public void handleBatchOrderProcessingFinishedEvent(final BatchOrderProcessingFinishedEvent batchOrderProcessingFinishedEvent) {
if (thereIsCallbackUrlRegistered(batchOrderProcessingFinishedEvent.callbackUrl())) {
log.info("Processing of Batch Order has finished - attempting to notify requestor");

final URI callbackUri = buildCallbackUri(batchOrderProcessingFinishedEvent.callbackUrl(), batchOrderProcessingFinishedEvent.batchOrderId(),
null, batchOrderProcessingFinishedEvent.batchOrderState(), null);
if (urlValidator.isValid(callbackUri.toString())) {
log.info("Got callback url {} for orderId {} with orderState {}", callbackUri,
batchOrderProcessingFinishedEvent.batchOrderId(), batchOrderProcessingFinishedEvent.batchOrderState());

try {
final ResponseEntity<Void> callbackResponse = restTemplate.getForEntity(callbackUri, Void.class);
log.info("Callback url pinged, received http status: {}, orderId {}", callbackResponse.getStatusCode(), batchOrderProcessingFinishedEvent.batchOrderId());
} catch (final ResourceAccessException resourceAccessException) {
log.warn("Callback url is not reachable - connection timed out, jobId {}", batchOrderProcessingFinishedEvent.batchOrderId());
}
} else {
log.warn(INVALID_CALLBACK_URL, callbackUri);
public void handleBatchOrderProcessingFinishedEvent(
final BatchOrderProcessingFinishedEvent batchOrderProcessingFinishedEvent) {
if (StringUtils.isBlank(batchOrderProcessingFinishedEvent.callbackUrl())) {
return;
}
log.info("Processing of Batch Order has finished - attempting to notify requestor");

final URI callbackUri = buildCallbackUri(batchOrderProcessingFinishedEvent.callbackUrl(),
batchOrderProcessingFinishedEvent.batchOrderId(), null,
batchOrderProcessingFinishedEvent.batchOrderState(), null);
if (!urlValidator.isValid(callbackUri.toString())) {
log.warn(INVALID_CALLBACK_URL, callbackUri);
return;
}

log.info("Got callback url '{}' for orderId '{}' with orderState '{}'", callbackUri,
batchOrderProcessingFinishedEvent.batchOrderId(), batchOrderProcessingFinishedEvent.batchOrderState());
sendCallback(callbackUri, batchOrderProcessingFinishedEvent.batchOrderId().toString());

}

private void sendCallback(final URI callbackUri, final String key) {
if (callbackNotSentYet(key)) {
addJobToSentCallbacks(key);
cleanupValuesOlderThan(Duration.ofHours(1));
try {
final ResponseEntity<Void> callbackResponse = restTemplate.getForEntity(callbackUri, Void.class);
log.info("Callback url '{}' pinged, received http status: '{}'", callbackUri,
callbackResponse.getStatusCode());
} catch (final ResourceAccessException resourceAccessException) {
log.warn("Callback url '{}' is not reachable - connection timed out.", callbackUri);
}
} else {
log.info("Callback for url '{}' is already sent.", callbackUri);
}
}

private void addJobToSentCallbacks(final String key) {
final LocalDateTime currentTime = LocalDateTime.now();
synchronized (completedCallbacks) {
completedCallbacks.put(key, currentTime.toEpochSecond(ZoneOffset.UTC));
}

}

private void cleanupValuesOlderThan(final Duration otherDuration) {
final LocalDateTime currentTime = LocalDateTime.now();
synchronized (completedCallbacks) {
completedCallbacks.entrySet()
.removeIf(entry ->
Duration.between(LocalDateTime.ofEpochSecond(entry.getValue(), 0, ZoneOffset.UTC),
currentTime).compareTo(otherDuration) > 0);
}
}

private boolean thereIsCallbackUrlRegistered(final String callbackUrl) {
return StringUtils.isNotBlank(callbackUrl);
private boolean callbackNotSentYet(final String key) {
synchronized (completedCallbacks) {
return !completedCallbacks.containsKey(key);
}
}

@SuppressWarnings("PMD.UseConcurrentHashMap")
Expand All @@ -155,7 +191,8 @@ private URI buildCallbackUri(final String callbackUrl, final String jobId, final
}

@SuppressWarnings("PMD.UseConcurrentHashMap")
private URI buildCallbackUri(final String callbackUrl, final UUID orderId, final UUID batchId, final ProcessingState orderState, final ProcessingState batchState) {
private URI buildCallbackUri(final String callbackUrl, final UUID orderId, final UUID batchId,
final ProcessingState orderState, final ProcessingState batchState) {
final Map<String, Object> uriVariables = new HashMap<>();
uriVariables.put("orderId", orderId);
uriVariables.put("batchId", batchId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -250,19 +249,22 @@ private void markJobInError(final MultiTransferJob job, final Throwable exceptio
}

private void publishJobProcessingFinishedEventIfFinished(final String jobId) {
jobStore.find(jobId).ifPresent(job -> {
if (job.getJob().getState().equals(JobState.COMPLETED) || job.getJob()
.getState()
.equals(JobState.ERROR)) {
jobStore.find(jobId).ifPresentOrElse(job -> {
if (JobState.COMPLETED.equals(job.getJob().getState()) || JobState.ERROR.equals(job.getJob().getState())) {
log.info("Publishing JobProcessingFinishedEvent for job '{}' with status '{}'.", job.getJobIdString(),
job.getJob().getState());
applicationEventPublisher.publishEvent(
new JobProcessingFinishedEvent(job.getJobIdString(), job.getJob().getState().name(),
job.getJobParameter().getCallbackUrl(), job.getBatchId()));
} else {
log.warn("Could not publish JobProcessingFinishedEvent. Job '{}' not in state COMPLETED or ERROR.",
jobId);
}
});
}, () -> log.warn("Could not publish JobProcessingFinishedEvent. Job '{}' not present.", jobId));
}

private long startTransfers(final MultiTransferJob job, final Stream<T> dataRequests) /* throws JobErrorDetails */ {
return dataRequests.map(r -> startTransfer(job, r)).collect(Collectors.counting());
return dataRequests.map(r -> startTransfer(job, r)).toList().size();
}

private TransferInitiateResponse startTransfer(final MultiTransferJob job,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import static org.eclipse.tractusx.irs.testing.wiremock.SubmodelFacadeWiremockSupport.PATH_TRANSFER;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;

Expand Down Expand Up @@ -189,6 +190,45 @@ void shouldStopJobAfterDepthIsReached() {
assertThat(jobForJobId.getTombstones()).isEmpty();
}

@Test
void shouldSendOneCallbackAfterJobCompletion() {
// Arrange
final String globalAssetIdLevel1 = "globalAssetId";
final String globalAssetIdLevel2 = "urn:uuid:7e4541ea-bb0f-464c-8cb3-021abccbfaf5";

WiremockSupport.successfulSemanticModelRequest();
WiremockSupport.successfulSemanticHubRequests();
WiremockSupport.successfulDiscovery();
WiremockSupport.successfulCallbackRequest();

successfulRegistryAndDataRequest(globalAssetIdLevel1, "Cathode", TEST_BPN, "integrationtesting/batch-1.json",
"integrationtesting/singleLevelBomAsBuilt-1.json");
successfulRegistryAndDataRequest(globalAssetIdLevel2, "Polyamid", TEST_BPN, "integrationtesting/batch-2.json",
"integrationtesting/singleLevelBomAsBuilt-2.json");

final RegisterJob request = WiremockSupport.jobRequest(globalAssetIdLevel1, TEST_BPN, 1, WiremockSupport.CALLBACK_URL);

// Act
final List<JobHandle> startedJobs = new ArrayList<>();

// Start 50 jobs in parallel. The bug #755 occurred when multiple (>10 Jobs) were started at the same time.
// To definitely provoke the cases where callbacks were triggered multiple times, we start 50 jobs.
final int numberOfParallelJobs = 50;
for (int i = 0; i < numberOfParallelJobs; i++) {
startedJobs.add(irsService.registerItemJob(request));
}

for (JobHandle jobHandle : startedJobs) {
assertThat(jobHandle.getId()).isNotNull();
waitForCompletion(jobHandle);
}

// Assert
for (JobHandle jobHandle : startedJobs) {
WiremockSupport.verifyCallbackCall(jobHandle.getId().toString(), JobState.COMPLETED, 1);
}
}

@Test
void shouldCreateTombstoneWhenDiscoveryServiceNotAvailable() {
// Arrange
Expand Down Expand Up @@ -423,7 +463,7 @@ private void failedNegotiation() {
private void waitForCompletion(final JobHandle jobHandle) {
Awaitility.await()
.timeout(Duration.ofSeconds(35))
.pollInterval(Duration.ofSeconds(1))
.pollInterval(Duration.ofMillis(500))
.until(() -> irsService.getJobForJobId(jobHandle.getId(), false)
.getJob()
.getState()
Expand Down
Loading

0 comments on commit 513bc56

Please sign in to comment.