diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/ScheduledExecution.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/ScheduledExecution.java index 21f2d51a..cb231f45 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/ScheduledExecution.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/ScheduledExecution.java @@ -14,8 +14,10 @@ package com.github.kagkarlsson.scheduler; import com.github.kagkarlsson.scheduler.exceptions.DataClassMismatchException; +import com.github.kagkarlsson.scheduler.exceptions.MissingRawDataException; import com.github.kagkarlsson.scheduler.task.Execution; import com.github.kagkarlsson.scheduler.task.TaskInstanceId; + import java.time.Instant; import java.util.Objects; @@ -47,6 +49,19 @@ public DATA_TYPE getData() { throw new DataClassMismatchException(dataClass, data.getClass()); } + public boolean hasRawData() { + Object data = this.execution.taskInstance.getData(); + return data == null || data.getClass().equals(byte[].class); + } + + public byte[] getRawData() { + if (!hasRawData()) { + throw new MissingRawDataException(dataClass); + } + + return (byte[]) this.execution.taskInstance.getData(); + } + public Instant getLastSuccess() { return execution.lastSuccess; } diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/ScheduledExecutionsFilter.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/ScheduledExecutionsFilter.java index 5af32c79..4273fd93 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/ScheduledExecutionsFilter.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/ScheduledExecutionsFilter.java @@ -18,10 +18,15 @@ public class ScheduledExecutionsFilter { private Boolean pickedValue; + private boolean includeUnresolved = false; private ScheduledExecutionsFilter() {} public static ScheduledExecutionsFilter all() { + return new ScheduledExecutionsFilter().withIncludeUnresolved(true); + } + + public static ScheduledExecutionsFilter onlyResolved() { return new ScheduledExecutionsFilter(); } @@ -30,7 +35,16 @@ public ScheduledExecutionsFilter withPicked(boolean pickedValue) { return this; } + public ScheduledExecutionsFilter withIncludeUnresolved(boolean includeUnresolved) { + this.includeUnresolved = includeUnresolved; + return this; + } + public Optional getPickedValue() { return Optional.ofNullable(pickedValue); } + + public boolean getIncludeUnresolved() { + return includeUnresolved; + } } diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/SchedulerClient.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/SchedulerClient.java index 850bb494..356cd4ee 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/SchedulerClient.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/SchedulerClient.java @@ -139,6 +139,17 @@ void fetchScheduledExecutionsForTask( ScheduledExecutionsFilter filter, Consumer> consumer); + /** + * @see #fetchScheduledExecutionsForTask(String, Class, Consumer) + */ + default List> getScheduledExecutionsForTask( + String taskName) { + List> executions = new ArrayList<>(); + fetchScheduledExecutionsForTask(taskName, Object.class, executions::add); + return executions; + } + + /** * @see #fetchScheduledExecutionsForTask(String, Class, Consumer) */ diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/TaskResolver.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/TaskResolver.java index 18dd63b1..e294ef6a 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/TaskResolver.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/TaskResolver.java @@ -52,8 +52,12 @@ public TaskResolver(StatsRegistry statsRegistry, Clock clock, List> know } public Optional resolve(String taskName) { + return resolve(taskName, true); + } + + public Optional resolve(String taskName, boolean addUnresolvedToExclusionFilter) { Task task = taskMap.get(taskName); - if (task == null) { + if (task == null && addUnresolvedToExclusionFilter) { addUnresolved(taskName); statsRegistry.register(StatsRegistry.SchedulerStatsEvent.UNRESOLVED_TASK); LOG.info( diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/exceptions/DataClassMismatchException.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/exceptions/DataClassMismatchException.java index dbda7b51..5279fef0 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/exceptions/DataClassMismatchException.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/exceptions/DataClassMismatchException.java @@ -19,6 +19,9 @@ public class DataClassMismatchException extends DbSchedulerException { public DataClassMismatchException(Class expectedClass, Class actualClass) { super( String.format( - "Task data mismatch. Expected class : %s, actual : %s", expectedClass, actualClass)); + "Task data mismatch. If actual data-class is byte[], it might have been fetched without" + + " knowledge of task-data types, and is thus not deserialized." + + " Use getRawData() to get non-deserialized data in that case." + + " Expected class : %s, actual : %s", expectedClass, actualClass)); } } diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/exceptions/MissingRawDataException.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/exceptions/MissingRawDataException.java new file mode 100644 index 00000000..08072a6e --- /dev/null +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/exceptions/MissingRawDataException.java @@ -0,0 +1,25 @@ +/* + * Copyright (C) Gustav Karlsson + * + *

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + *

http://www.apache.org/licenses/LICENSE-2.0 + * + *

Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.kagkarlsson.scheduler.exceptions; + +public class MissingRawDataException extends DbSchedulerException { + private static final long serialVersionUID = 1L; + + public MissingRawDataException(Class dataClass) { + super( + String.format( + "Scheduled execution has typed data, use getData() to read the deserialized object. Data-class : %s", + dataClass)); + } +} diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/jdbc/JdbcTaskRepository.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/jdbc/JdbcTaskRepository.java index da4c01f0..b4954627 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/jdbc/JdbcTaskRepository.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/jdbc/JdbcTaskRepository.java @@ -13,19 +13,10 @@ */ package com.github.kagkarlsson.scheduler.jdbc; -import static com.github.kagkarlsson.scheduler.StringUtils.truncate; -import static java.util.Optional.ofNullable; -import static java.util.stream.Collectors.joining; -import static java.util.stream.Collectors.toList; - import com.github.kagkarlsson.jdbc.JdbcRunner; import com.github.kagkarlsson.jdbc.ResultSetMapper; import com.github.kagkarlsson.jdbc.SQLRuntimeException; -import com.github.kagkarlsson.scheduler.Clock; -import com.github.kagkarlsson.scheduler.ScheduledExecutionsFilter; -import com.github.kagkarlsson.scheduler.SchedulerName; -import com.github.kagkarlsson.scheduler.TaskRepository; -import com.github.kagkarlsson.scheduler.TaskResolver; +import com.github.kagkarlsson.scheduler.*; import com.github.kagkarlsson.scheduler.TaskResolver.UnresolvedTask; import com.github.kagkarlsson.scheduler.exceptions.ExecutionException; import com.github.kagkarlsson.scheduler.exceptions.TaskInstanceException; @@ -34,6 +25,10 @@ import com.github.kagkarlsson.scheduler.task.SchedulableInstance; import com.github.kagkarlsson.scheduler.task.Task; import com.github.kagkarlsson.scheduler.task.TaskInstance; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.sql.DataSource; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; @@ -44,9 +39,11 @@ import java.util.Optional; import java.util.function.Consumer; import java.util.function.Supplier; -import javax.sql.DataSource; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; + +import static com.github.kagkarlsson.scheduler.StringUtils.truncate; +import static java.util.Optional.ofNullable; +import static java.util.stream.Collectors.joining; +import static java.util.stream.Collectors.toList; @SuppressWarnings("rawtypes") public class JdbcTaskRepository implements TaskRepository { @@ -244,22 +241,31 @@ public void getScheduledExecutions( UnresolvedFilter unresolvedFilter = new UnresolvedFilter(taskResolver.getUnresolved()); QueryBuilder q = queryForFilter(filter); - if (unresolvedFilter.isActive()) { + if (unresolvedFilter.isActive() && !filter.getIncludeUnresolved()) { q.andCondition(unresolvedFilter); } jdbcRunner.query( - q.getQuery(), q.getPreparedStatementSetter(), new ExecutionResultSetConsumer(consumer)); + q.getQuery(), + q.getPreparedStatementSetter(), + new ExecutionResultSetConsumer(consumer, filter.getIncludeUnresolved(), false)); } @Override public void getScheduledExecutions( ScheduledExecutionsFilter filter, String taskName, Consumer consumer) { + UnresolvedFilter unresolvedFilter = new UnresolvedFilter(taskResolver.getUnresolved()); + QueryBuilder q = queryForFilter(filter); + if (unresolvedFilter.isActive() && !filter.getIncludeUnresolved()) { + q.andCondition(unresolvedFilter); + } q.andCondition(new TaskCondition(taskName)); jdbcRunner.query( - q.getQuery(), q.getPreparedStatementSetter(), new ExecutionResultSetConsumer(consumer)); + q.getQuery(), + q.getPreparedStatementSetter(), + new ExecutionResultSetConsumer(consumer, filter.getIncludeUnresolved(), false)); } @Override @@ -285,7 +291,7 @@ public List getDue(Instant now, int limit) { p.setMaxRows(limit); } }, - new ExecutionResultSetMapper()); + new ExecutionResultSetMapper(false, true)); } @Override @@ -377,6 +383,7 @@ private boolean rescheduleInternal( jdbcCustomization.setInstant(ps, index++, nextExecutionTime); if (newData != null) { // may cause datbase-specific problems, might have to use setNull instead + // FIXLATER: optionally support bypassing serializer if byte[] already ps.setObject(index++, serializer.serialize(newData.data)); } ps.setString(index++, execution.taskInstance.getTaskName()); @@ -449,7 +456,7 @@ public List getDeadExecutions(Instant olderThan) { jdbcCustomization.setInstant(p, index++, olderThan); unresolvedFilter.setParameters(p, index); }, - new ExecutionResultSetMapper()); + new ExecutionResultSetMapper(false, true)); } @Override @@ -497,7 +504,7 @@ public List getExecutionsFailingLongerThan(Duration interval) { jdbcCustomization.setInstant(p, index++, Instant.now().minus(interval)); unresolvedFilter.setParameters(p, index); }, - new ExecutionResultSetMapper()); + new ExecutionResultSetMapper(false, false)); } public Optional getExecution(TaskInstance taskInstance) { @@ -512,7 +519,7 @@ public Optional getExecution(String taskName, String taskInstanceId) p.setString(1, taskName); p.setString(2, taskInstanceId); }, - new ExecutionResultSetMapper()); + new ExecutionResultSetMapper(true, false)); if (executions.size() > 1) { throw new TaskInstanceException( "Found more than one matching execution for task name/id combination.", @@ -544,7 +551,11 @@ public void checkSupportsLockAndFetch() { private JdbcTaskRepositoryContext getTaskRespositoryContext() { return new JdbcTaskRepositoryContext( - taskResolver, tableName, schedulerSchedulerName, jdbcRunner, ExecutionResultSetMapper::new); + taskResolver, + tableName, + schedulerSchedulerName, + jdbcRunner, + () -> new ExecutionResultSetMapper(false, true)); } private QueryBuilder queryForFilter(ScheduledExecutionsFilter filter) { @@ -567,9 +578,12 @@ private class ExecutionResultSetMapper implements ResultSetMapper(); - this.delegate = new ExecutionResultSetConsumer(executions::add); + this.delegate = + new ExecutionResultSetConsumer( + executions::add, includeUnresolved, addUnresolvedToExclusionFilter); } @Override @@ -583,9 +597,20 @@ public List map(ResultSet resultSet) throws SQLException { private class ExecutionResultSetConsumer implements ResultSetMapper { private final Consumer consumer; + private final boolean includeUnresolved; + private boolean addUnresolvedToExclusionFilter; private ExecutionResultSetConsumer(Consumer consumer) { + this(consumer, false, true); + } + + private ExecutionResultSetConsumer( + Consumer consumer, + boolean includeUnresolved, + boolean addUnresolvedToExclusionFilter) { this.consumer = consumer; + this.includeUnresolved = includeUnresolved; + this.addUnresolvedToExclusionFilter = addUnresolvedToExclusionFilter; } @Override @@ -593,12 +618,17 @@ public Void map(ResultSet rs) throws SQLException { while (rs.next()) { String taskName = rs.getString("task_name"); - Optional task = taskResolver.resolve(taskName); - - if (!task.isPresent()) { - LOG.warn( - "Failed to find implementation for task with name '{}'. Execution will be excluded from due. Either delete the execution from the database, or add an implementation for it. The scheduler may be configured to automatically delete unresolved tasks after a certain period of time.", + Optional task = taskResolver.resolve(taskName, addUnresolvedToExclusionFilter); + + if (!task.isPresent() && !includeUnresolved) { + if (addUnresolvedToExclusionFilter) { + LOG.warn( + "Failed to find implementation for task with name '{}'. Execution will be excluded from due. " + + "Either delete the execution from the database, or add an implementation for it. " + + "The scheduler may be configured to automatically delete unresolved tasks " + + "after a certain period of time.", taskName); + } continue; } @@ -612,24 +642,32 @@ public Void map(ResultSet rs) throws SQLException { Instant lastSuccess = jdbcCustomization.getInstant(rs, "last_success"); Instant lastFailure = jdbcCustomization.getInstant(rs, "last_failure"); int consecutiveFailures = - rs.getInt("consecutive_failures"); // null-value is returned as 0 which is the preferred + rs.getInt("consecutive_failures"); // null-value is returned as 0 which is the preferred // default Instant lastHeartbeat = jdbcCustomization.getInstant(rs, "last_heartbeat"); long version = rs.getLong("version"); Supplier dataSupplier = - memoize(() -> serializer.deserialize(task.get().getDataClass(), data)); + memoize( + () -> { + if (!task.isPresent()) { + // return the data raw if the type is not known + // a case for standalone clients, with no "known tasks" + return data; + } + return serializer.deserialize(task.get().getDataClass(), data); + }); this.consumer.accept( - new Execution( - executionTime, - new TaskInstance(taskName, instanceId, dataSupplier), - picked, - pickedBy, - lastSuccess, - lastFailure, - consecutiveFailures, - lastHeartbeat, - version)); + new Execution( + executionTime, + new TaskInstance(taskName, instanceId, dataSupplier), + picked, + pickedBy, + lastSuccess, + lastFailure, + consecutiveFailures, + lastHeartbeat, + version)); } return null; @@ -638,12 +676,11 @@ public Void map(ResultSet rs) throws SQLException { private static Supplier memoize(Supplier original) { return new Supplier() { - Supplier delegate = this::firstTime; boolean initialized; public T get() { return delegate.get(); - } + } Supplier delegate = this::firstTime; private synchronized T firstTime() { if (!initialized) { @@ -653,6 +690,8 @@ private synchronized T firstTime() { } return delegate.get(); } + + }; } diff --git a/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/JdbcTaskRepositoryTest.java b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/JdbcTaskRepositoryTest.java index f789139c..81af3520 100644 --- a/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/JdbcTaskRepositoryTest.java +++ b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/JdbcTaskRepositoryTest.java @@ -1,6 +1,7 @@ package com.github.kagkarlsson.scheduler; import static com.github.kagkarlsson.scheduler.ScheduledExecutionsFilter.all; +import static com.github.kagkarlsson.scheduler.ScheduledExecutionsFilter.onlyResolved; import static com.github.kagkarlsson.scheduler.jdbc.JdbcTaskRepository.DEFAULT_TABLE_NAME; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.empty; @@ -29,6 +30,7 @@ import java.util.Optional; import java.util.Random; import java.util.stream.IntStream; + import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; @@ -402,15 +404,18 @@ public void get_dead_executions_should_not_include_previously_unresolved() { @Test public void get_scheduled_executions_should_work_with_unresolved() { Instant now = TimeHelper.truncatedInstantNow(); + String taskName = "unresolved1"; final OneTimeTask unresolved1 = - TestTasks.oneTime("unresolved1", Void.class, TestTasks.DO_NOTHING); + TestTasks.oneTime(taskName, Void.class, TestTasks.DO_NOTHING); taskRepository.createIfNotExists( new SchedulableTaskInstance<>(unresolved1.instance("id"), now)); assertThat(taskRepository.getDue(now, POLLING_LIMIT), hasSize(0)); assertThat(taskResolver.getUnresolved(), hasSize(1)); - taskRepository.getScheduledExecutions(ScheduledExecutionsFilter.all(), e -> {}); - taskRepository.getScheduledExecutions(ScheduledExecutionsFilter.all(), "sometask", e -> {}); + assertThat(getScheduledExecutions(ScheduledExecutionsFilter.onlyResolved()), hasSize(0)); + assertThat(getScheduledExecutions(ScheduledExecutionsFilter.onlyResolved(), taskName), hasSize(0)); + assertThat(getScheduledExecutions(all()), hasSize(1)); + assertThat(getScheduledExecutions(all(), taskName), hasSize(1)); } @Test @@ -459,7 +464,7 @@ private Execution getSingleDueExecution() { private Execution getSingleExecution() { List executions = new ArrayList<>(); - taskRepository.getScheduledExecutions(all().withPicked(false), executions::add); + taskRepository.getScheduledExecutions(onlyResolved().withPicked(false), executions::add); return executions.get(0); } } diff --git a/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/ScheduledExecutionTest.java b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/ScheduledExecutionTest.java index 4cf081bf..4b0796b3 100644 --- a/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/ScheduledExecutionTest.java +++ b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/ScheduledExecutionTest.java @@ -1,5 +1,6 @@ package com.github.kagkarlsson.scheduler; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -8,6 +9,8 @@ import com.github.kagkarlsson.scheduler.task.Execution; import com.github.kagkarlsson.scheduler.task.helper.OneTimeTask; import java.time.Instant; + +import org.hamcrest.CoreMatchers; import org.junit.jupiter.api.Test; public class ScheduledExecutionTest { @@ -59,8 +62,8 @@ public void test_data_class_type_not_equals() { .getData(); // Instantiate with incorrect type }); - assertEquals( - "Task data mismatch. Expected class : class java.lang.String, actual : class java.lang.Integer", - dataClassMismatchException.getMessage()); + assertThat( + dataClassMismatchException.getMessage(), + CoreMatchers.containsString("Task data mismatch")); } } diff --git a/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/SchedulerClientTest.java b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/SchedulerClientTest.java index c0d477bb..1037011b 100644 --- a/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/SchedulerClientTest.java +++ b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/SchedulerClientTest.java @@ -1,12 +1,8 @@ package com.github.kagkarlsson.scheduler; -import static java.time.Duration.ofSeconds; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.not; - import co.unruly.matchers.OptionalMatchers; import com.github.kagkarlsson.scheduler.TestTasks.SavingHandler; +import com.github.kagkarlsson.scheduler.serializer.JavaSerializer; import com.github.kagkarlsson.scheduler.task.ExecutionContext; import com.github.kagkarlsson.scheduler.task.TaskInstance; import com.github.kagkarlsson.scheduler.task.TaskInstanceId; @@ -15,13 +11,23 @@ import com.github.kagkarlsson.scheduler.testhelper.ManualScheduler; import com.github.kagkarlsson.scheduler.testhelper.SettableClock; import com.github.kagkarlsson.scheduler.testhelper.TestHelper; -import java.time.Instant; -import java.util.concurrent.atomic.AtomicInteger; import org.hamcrest.CoreMatchers; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; +import java.time.Instant; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicInteger; + +import static com.github.kagkarlsson.scheduler.SchedulerClient.Builder.create; +import static java.time.Duration.ofSeconds; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.*; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + public class SchedulerClientTest { @RegisterExtension public EmbeddedPostgresqlExtension DB = new EmbeddedPostgresqlExtension(); @@ -40,6 +46,8 @@ public class SchedulerClientTest { private SavingHandler savingHandler; private OneTimeTask savingTask; + private OneTimeTask oneTimeTaskC; + private VoidExecutionHandler onetimeTaskHandlerC; @BeforeEach public void setUp() { @@ -51,6 +59,9 @@ public void setUp() { oneTimeTaskB = TestTasks.oneTime("OneTimeB", Void.class, onetimeTaskHandlerB); onetimeTaskHandlerB = new TestTasks.CountingHandler<>(); + oneTimeTaskC = TestTasks.oneTime("OneTimeC", Integer.class, onetimeTaskHandlerC); + onetimeTaskHandlerC = new TestTasks.CountingHandler<>(); + scheduleAnother = new ScheduleAnotherTaskHandler<>( oneTimeTaskA.instance("secondTask"), settableClock.now().plusSeconds(1)); @@ -68,7 +79,7 @@ public void setUp() { @Test public void client_should_be_able_to_schedule_executions() { - SchedulerClient client = SchedulerClient.Builder.create(DB.getDataSource()).build(); + SchedulerClient client = create(DB.getDataSource()).build(); client.schedule(oneTimeTaskA.instance("1"), settableClock.now()); scheduler.runAnyDueExecutions(); @@ -89,8 +100,7 @@ public void should_be_able_to_schedule_other_executions_from_an_executionhandler @Test public void client_should_be_able_to_fetch_executions_for_task() { - SchedulerClient client = - SchedulerClient.Builder.create(DB.getDataSource(), oneTimeTaskA, oneTimeTaskB).build(); + SchedulerClient client = create(DB.getDataSource(), oneTimeTaskA, oneTimeTaskB).build(); client.schedule(oneTimeTaskA.instance("1"), settableClock.now()); client.schedule(oneTimeTaskA.instance("2"), settableClock.now()); client.schedule(oneTimeTaskB.instance("10"), settableClock.now()); @@ -107,8 +117,7 @@ public void client_should_be_able_to_fetch_executions_for_task() { @Test public void client_should_be_able_to_fetch_single_scheduled_execution() { - SchedulerClient client = - SchedulerClient.Builder.create(DB.getDataSource(), oneTimeTaskA).build(); + SchedulerClient client = create(DB.getDataSource(), oneTimeTaskA).build(); client.schedule(oneTimeTaskA.instance("1"), settableClock.now()); assertThat( @@ -142,6 +151,35 @@ public void client_should_be_able_to_reschedule_executions() { assertThat(savingHandler.savedData, CoreMatchers.is(data2)); } + @SuppressWarnings("OptionalGetWithoutIsPresent") + @Test + public void raw_client_should_be_able_to_fetch_executions() { + TaskInstance instance = oneTimeTaskC.instance("1", 5); + + SchedulerClient clientWithTypes = create(DB.getDataSource(), oneTimeTaskC).build(); + clientWithTypes.schedule(instance, settableClock.now()); + + SchedulerClient clientWithoutTypes = create(DB.getDataSource()).build(); + + Optional> e1 = clientWithoutTypes.getScheduledExecution(instance); + assertThat(e1, not(OptionalMatchers.empty())); + assertRawData(e1.get(), 5); + + List> allScheduled = clientWithoutTypes.getScheduledExecutions(); + assertThat(allScheduled, hasSize(1)); + assertRawData(allScheduled.get(0), 5); + + List> scheduledForTask = + clientWithoutTypes.getScheduledExecutionsForTask(instance.getTaskName()); + assertThat(scheduledForTask, hasSize(1)); + assertRawData(scheduledForTask.get(0), 5); + } + + private void assertRawData(ScheduledExecution se, Integer expectedValue) { + assertTrue(se.hasRawData()); + assertEquals(expectedValue, new JavaSerializer().deserialize(Integer.class, se.getRawData())); + } + private int countAllExecutions(SchedulerClient client) { AtomicInteger counter = new AtomicInteger(0); client.fetchScheduledExecutions( @@ -164,9 +202,9 @@ private int countExecutionsForTask( } public static class ScheduleAnotherTaskHandler implements VoidExecutionHandler { - public int timesExecuted = 0; private final TaskInstance secondTask; private final Instant instant; + public int timesExecuted = 0; public ScheduleAnotherTaskHandler(TaskInstance secondTask, Instant instant) { this.secondTask = secondTask; diff --git a/examples/features/pom.xml b/examples/features/pom.xml index 0b2f839a..31abb893 100644 --- a/examples/features/pom.xml +++ b/examples/features/pom.xml @@ -41,6 +41,12 @@ ${jackson.version} runtime + + com.fasterxml.jackson.datatype + jackson-datatype-jsr310 + ${jackson.version} + runtime + org.postgresql postgresql diff --git a/examples/features/src/main/java/com/github/kagkarlsson/examples/SchedulerClientMain.java b/examples/features/src/main/java/com/github/kagkarlsson/examples/SchedulerClientMain.java index d235f794..68c382c8 100644 --- a/examples/features/src/main/java/com/github/kagkarlsson/examples/SchedulerClientMain.java +++ b/examples/features/src/main/java/com/github/kagkarlsson/examples/SchedulerClientMain.java @@ -16,6 +16,7 @@ import com.github.kagkarlsson.examples.helpers.Example; import com.github.kagkarlsson.scheduler.ScheduledExecutionsFilter; import com.github.kagkarlsson.scheduler.SchedulerClient; +import com.github.kagkarlsson.scheduler.serializer.JacksonSerializer; import com.github.kagkarlsson.scheduler.task.helper.OneTimeTask; import com.github.kagkarlsson.scheduler.task.helper.Tasks; import java.time.Instant; @@ -30,30 +31,50 @@ public static void main(String[] args) { @Override public void run(DataSource dataSource) { - final OneTimeTask task = - Tasks.oneTime("task-a") + final OneTimeTask task = + Tasks.oneTime("task-a", Integer.class) .execute( (taskInstance, executionContext) -> { System.out.println("Task a executed"); }); - final SchedulerClient client = SchedulerClient.Builder.create(dataSource, task).build(); + final SchedulerClient clientWithTypeInformation = SchedulerClient.Builder + .create(dataSource, task) + .serializer(new JacksonSerializer()) + .build(); final Instant now = Instant.now(); for (int i = 0; i < 5; i++) { - client.schedule(task.instance("id" + i), now.plusSeconds(i)); + clientWithTypeInformation.schedule(task.instance("id" + i, i), now.plusSeconds(i)); } System.out.println("Listing scheduled executions"); - client + clientWithTypeInformation .getScheduledExecutions(ScheduledExecutionsFilter.all()) .forEach( execution -> { System.out.printf( - "Scheduled execution: taskName=%s, instance=%s, executionTime=%s%n", + "Scheduled execution: taskName=%s, instance=%s, executionTime=%s, data=%s%n", execution.getTaskInstance().getTaskName(), execution.getTaskInstance().getId(), - execution.getExecutionTime()); + execution.getExecutionTime(), + execution.getData() + ); }); + + final SchedulerClient rawClient = SchedulerClient.Builder.create(dataSource).build(); + System.out.println("Listing scheduled executions for client with no known tasks (data-classes and implementations)"); + rawClient + .getScheduledExecutions(ScheduledExecutionsFilter.all()) + .forEach( + execution -> { + System.out.printf( + "Scheduled execution: taskName=%s, instance=%s, executionTime=%s, data=%s%n", + execution.getTaskInstance().getTaskName(), + execution.getTaskInstance().getId(), + execution.getExecutionTime(), + new String((byte[]) execution.getData()) + ); + }); } }