Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(impl):[TRI-1224] Implement batch order timeout and job timeout #259

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.eclipse.tractusx.irs.services.events.BatchOrderProcessingFinishedEvent;
import org.eclipse.tractusx.irs.services.events.BatchOrderRegisteredEvent;
import org.eclipse.tractusx.irs.services.events.BatchProcessingFinishedEvent;
import org.eclipse.tractusx.irs.services.timeouts.TimeoutSchedulerBatchProcessingService;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
Expand All @@ -57,6 +58,7 @@ public class BatchOrderEventListener {
private final BatchStore batchStore;
private final IrsItemGraphQueryService irsItemGraphQueryService;
private final ApplicationEventPublisher applicationEventPublisher;
private final TimeoutSchedulerBatchProcessingService timeoutScheduler;

@Async
@EventListener
Expand Down Expand Up @@ -113,6 +115,8 @@ private void startBatch(final BatchOrder batchOrder, final Batch batch) {
batch.setJobProgressList(createdJobIds);
batch.setStartedOn(ZonedDateTime.now(ZoneOffset.UTC));
batchStore.save(batch.getBatchId(), batch);
timeoutScheduler.registerBatchTimeout(batch.getBatchId(), batchOrder.getTimeout());
timeoutScheduler.registerJobsTimeout(createdJobIds.stream().map(JobProgress::getJobId).toList(), batchOrder.getJobTimeout());
}

private JobProgress createJobProgress(final JobHandle jobHandle, final String globalAssetId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.eclipse.tractusx.irs.aaswrapper.job.AASTransferProcess;
import org.eclipse.tractusx.irs.aaswrapper.job.ItemContainer;
import org.eclipse.tractusx.irs.aaswrapper.job.ItemDataRequest;
import org.eclipse.tractusx.irs.aaswrapper.job.JobProcessingFinishedEvent;
import org.eclipse.tractusx.irs.aaswrapper.job.RequestMetric;
import org.eclipse.tractusx.irs.component.AsyncFetchedItems;
import org.eclipse.tractusx.irs.component.Bpn;
Expand Down Expand Up @@ -73,6 +74,7 @@
import org.eclipse.tractusx.irs.util.JsonUtil;
import org.springframework.beans.support.MutableSortDefinition;
import org.springframework.beans.support.PagedListHolder;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.Sort;
import org.springframework.scheduling.annotation.Scheduled;
Expand All @@ -99,6 +101,8 @@ public class IrsItemGraphQueryService implements IIrsItemGraphQueryService {

private final SemanticsHubFacade semanticsHubFacade;

private final ApplicationEventPublisher applicationEventPublisher;

@Override
public PageResult getJobsByState(final @NonNull List<JobState> states, final Pageable pageable) {
final List<MultiTransferJob> jobs = states.isEmpty() ? jobStore.findAll() : jobStore.findByStates(states);
Expand Down Expand Up @@ -212,6 +216,9 @@ public Job cancelJobById(final @NonNull UUID jobId) {
final String idAsString = String.valueOf(jobId);

final Optional<MultiTransferJob> canceled = this.jobStore.cancelJob(idAsString);
canceled.ifPresent(cancelledJob -> applicationEventPublisher.publishEvent(
new JobProcessingFinishedEvent(cancelledJob.getJobIdString(), cancelledJob.getJob().getState(),
cancelledJob.getJobParameter().getCallbackUrl(), cancelledJob.getBatchId())));
return canceled.orElseThrow(() -> new EntityNotFoundException("No job exists with id " + jobId)).getJob();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/********************************************************************************
* Copyright (c) 2021,2022,2023
* 2022: ZF Friedrichshafen AG
* 2022: ISTOS GmbH
* 2022,2023: Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
* 2022,2023: BOSCH AG
* Copyright (c) 2021,2022,2023 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0. *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*
* SPDX-License-Identifier: Apache-2.0
********************************************************************************/
package org.eclipse.tractusx.irs.services.timeouts;

import java.util.List;
import java.util.UUID;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.tractusx.irs.component.enums.JobState;
import org.eclipse.tractusx.irs.component.enums.ProcessingState;
import org.eclipse.tractusx.irs.connector.batch.BatchStore;
import org.eclipse.tractusx.irs.connector.batch.JobProgress;
import org.eclipse.tractusx.irs.services.IrsItemGraphQueryService;
import org.springframework.stereotype.Service;

/**
* Execute proper cancel jobs that are taking to long in Batch Process.
*/
@Service
@Slf4j
@RequiredArgsConstructor
public class CancelBatchProcessingService {

private final IrsItemGraphQueryService irsItemGraphQueryService;
private final BatchStore batchStore;

public void cancelNotFinishedJobs(final List<UUID> jobIds) {
log.info("Start scheduled timeout process for jobIds: {}", jobIds.toString());
jobIds.forEach(jobId -> {
final JobState jobState = irsItemGraphQueryService.getJobForJobId(jobId, false).getJob().getState();
if (isNotCompleted(jobState)) {
log.info("Not completed job detected. Canceling job with jobId: {}", jobId);
irsItemGraphQueryService.cancelJobById(jobId);
}
});
}

public void cancelNotFinishedJobsInBatch(final UUID batchId) {
log.info("Start scheduled timeout process for batchId: {}", batchId.toString());
batchStore.find(batchId).ifPresent(batch -> {
if (isBatchNotCompleted(batch.getBatchState())) {
cancelNotFinishedJobs(batch.getJobProgressList().stream().map(JobProgress::getJobId).toList());
}
});
}

private boolean isNotCompleted(final JobState jobState) {
return JobState.RUNNING.equals(jobState) || JobState.INITIAL.equals(jobState) || JobState.UNSAVED.equals(jobState);
}

private boolean isBatchNotCompleted(final ProcessingState processingState) {
return ProcessingState.PROCESSING.equals(processingState) || ProcessingState.INITIALIZED.equals(processingState);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/********************************************************************************
* Copyright (c) 2021,2022,2023
* 2022: ZF Friedrichshafen AG
* 2022: ISTOS GmbH
* 2022,2023: Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
* 2022,2023: BOSCH AG
* Copyright (c) 2021,2022,2023 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0. *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*
* SPDX-License-Identifier: Apache-2.0
********************************************************************************/
package org.eclipse.tractusx.irs.services.timeouts;

import java.util.List;
import java.util.UUID;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

/**
* Schedule timeouts in Batch Process.
*/
@Service
@Slf4j
@RequiredArgsConstructor
public class TimeoutSchedulerBatchProcessingService {

private final ScheduledExecutorService scheduler;
private final CancelBatchProcessingService cancelBatchProcessingService;

public void registerJobsTimeout(final List<UUID> jobIds, final Integer timeoutInSeconds) {
log.info("Register job timeout {} seconds for jobIds: {}", timeoutInSeconds, jobIds);
scheduler.schedule(() -> cancelBatchProcessingService.cancelNotFinishedJobs(jobIds), timeoutInSeconds, TimeUnit.SECONDS);
}

public void registerBatchTimeout(final UUID batchId, final Integer timeoutInSeconds) {
log.info("Register batch timeout {} seconds for batchId: {}", timeoutInSeconds, batchId);
scheduler.schedule(() -> cancelBatchProcessingService.cancelNotFinishedJobsInBatch(batchId), timeoutInSeconds, TimeUnit.SECONDS);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.eclipse.tractusx.irs.connector.batch.InMemoryBatchStore;
import org.eclipse.tractusx.irs.connector.batch.JobProgress;
import org.eclipse.tractusx.irs.services.events.BatchOrderRegisteredEvent;
import org.eclipse.tractusx.irs.services.timeouts.TimeoutSchedulerBatchProcessingService;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.context.ApplicationEventPublisher;
Expand All @@ -57,14 +58,16 @@ class BatchOrderEventListenerTest {
private BatchStore batchStore;
private final IrsItemGraphQueryService irsItemGraphQueryService = mock(IrsItemGraphQueryService.class);
private final ApplicationEventPublisher applicationEventPublisher = mock(ApplicationEventPublisher.class);
private final TimeoutSchedulerBatchProcessingService timeoutScheduler = mock(TimeoutSchedulerBatchProcessingService.class);

private BatchOrderEventListener eventListener;

@BeforeEach
void beforeEach() {
batchOrderStore = new InMemoryBatchOrderStore();
batchStore = new InMemoryBatchStore();
eventListener = new BatchOrderEventListener(batchOrderStore, batchStore, irsItemGraphQueryService, applicationEventPublisher);
eventListener = new BatchOrderEventListener(batchOrderStore, batchStore, irsItemGraphQueryService,
applicationEventPublisher, timeoutScheduler);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ class CreationBatchServiceTest {
void beforeEach() {
batchOrderStore = new InMemoryBatchOrderStore();
batchStore = new InMemoryBatchStore();
service = new CreationBatchService(batchOrderStore, batchStore, applicationEventPublisher, jobEventLinkedQueueListener, irsConfiguration);
service = new CreationBatchService(batchOrderStore, batchStore, applicationEventPublisher,
jobEventLinkedQueueListener, irsConfiguration);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.eclipse.tractusx.irs.component.Job;
import org.eclipse.tractusx.irs.component.JobErrorDetails;
import org.eclipse.tractusx.irs.component.JobHandle;
import org.eclipse.tractusx.irs.component.JobParameter;
import org.eclipse.tractusx.irs.component.RegisterJob;
import org.eclipse.tractusx.irs.component.enums.AspectType;
import org.eclipse.tractusx.irs.component.enums.BomLifecycle;
Expand Down Expand Up @@ -201,6 +202,10 @@ void cancelJobById() {
.job(Job.builder()
.id(UUID.fromString(idAsString))
.state(JobState.UNSAVED)
.parameter(JobParameter.builder()
.callbackUrl(
"example.com")
.build())
.exception(JobErrorDetails.builder()
.errorDetail(
"Job should be canceled")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.data.domain.Pageable;

@ExtendWith(MockitoExtension.class)
Expand All @@ -74,6 +75,9 @@ class IrsItemGraphQueryServiceTest {
@Mock
private BlobPersistence blobStore;

@Mock
private ApplicationEventPublisher applicationEventPublisher;

@InjectMocks
private IrsItemGraphQueryService testee;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/********************************************************************************
* Copyright (c) 2021,2022,2023
* 2022: ZF Friedrichshafen AG
* 2022: ISTOS GmbH
* 2022,2023: Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
* 2022,2023: BOSCH AG
* Copyright (c) 2021,2022,2023 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0. *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*
* SPDX-License-Identifier: Apache-2.0
********************************************************************************/
package org.eclipse.tractusx.irs.services.timeouts;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;

import java.util.List;
import java.util.UUID;

import org.eclipse.tractusx.irs.component.Job;
import org.eclipse.tractusx.irs.component.Jobs;
import org.eclipse.tractusx.irs.component.enums.JobState;
import org.eclipse.tractusx.irs.component.enums.ProcessingState;
import org.eclipse.tractusx.irs.connector.batch.Batch;
import org.eclipse.tractusx.irs.connector.batch.BatchStore;
import org.eclipse.tractusx.irs.connector.batch.InMemoryBatchStore;
import org.eclipse.tractusx.irs.connector.batch.JobProgress;
import org.eclipse.tractusx.irs.services.IrsItemGraphQueryService;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;

class CancelBatchProcessingServiceTest {

private final IrsItemGraphQueryService irsItemGraphQueryService = mock(IrsItemGraphQueryService.class);
private final BatchStore batchStore = new InMemoryBatchStore();
private final CancelBatchProcessingService cancelBatchProcessingService = new CancelBatchProcessingService(irsItemGraphQueryService, batchStore);

@Test
void shouldCancelOnlyNotCompletedJob() {
// given
UUID firstJobId = UUID.randomUUID();
UUID secondJobId = UUID.randomUUID();
UUID runningJobId = UUID.randomUUID();

ArgumentCaptor<UUID> captor = ArgumentCaptor.forClass(UUID.class);

given(irsItemGraphQueryService.getJobForJobId(firstJobId, false)).willReturn(
jobInState(JobState.COMPLETED)
);
given(irsItemGraphQueryService.getJobForJobId(secondJobId, false)).willReturn(
jobInState(JobState.ERROR)
);
given(irsItemGraphQueryService.getJobForJobId(runningJobId, false)).willReturn(
jobInState(JobState.RUNNING)
);

// when
cancelBatchProcessingService.cancelNotFinishedJobs(List.of(firstJobId, secondJobId, runningJobId));

// then
verify(irsItemGraphQueryService).cancelJobById(captor.capture());
assertThat(captor.getValue()).isEqualTo(runningJobId);
}

@Test
void shouldCancelOnlyNotCompletedJobInBatch() {
// given
UUID firstJobId = UUID.randomUUID();
UUID secondJobId = UUID.randomUUID();
UUID runningJobId = UUID.randomUUID();
UUID batchId = UUID.randomUUID();

ArgumentCaptor<UUID> captor = ArgumentCaptor.forClass(UUID.class);

given(irsItemGraphQueryService.getJobForJobId(firstJobId, false)).willReturn(
jobInState(JobState.COMPLETED)
);
given(irsItemGraphQueryService.getJobForJobId(secondJobId, false)).willReturn(
jobInState(JobState.ERROR)
);
given(irsItemGraphQueryService.getJobForJobId(runningJobId, false)).willReturn(
jobInState(JobState.RUNNING)
);

batchStore.save(batchId, createBatch(batchId, ProcessingState.PROCESSING,
List.of(firstJobId, secondJobId, runningJobId)));

// when
cancelBatchProcessingService.cancelNotFinishedJobsInBatch(batchId);

// then
verify(irsItemGraphQueryService).cancelJobById(captor.capture());
assertThat(captor.getValue()).isEqualTo(runningJobId);
}

private static Jobs jobInState(final JobState completed) {
return Jobs.builder().job(Job.builder().state(completed).build()).build();
}

private Batch createBatch(final UUID batchId, final ProcessingState state, final List<UUID> jobIds) {
return Batch.builder()
.batchId(batchId)
.batchState(state)
.jobProgressList(
jobIds.stream().map(uuid -> JobProgress.builder().jobId(uuid).build()).toList()
).build();
}

}
Loading