Skip to content

Commit

Permalink
fix: getScheduledExecutions(...) should be able to return executions …
Browse files Browse the repository at this point in the history
…with unresolved tasks.
  • Loading branch information
kagkarlsson committed Aug 25, 2023
1 parent 597e09e commit f93afa0
Show file tree
Hide file tree
Showing 12 changed files with 255 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@
package com.github.kagkarlsson.scheduler;

import com.github.kagkarlsson.scheduler.exceptions.DataClassMismatchException;
import com.github.kagkarlsson.scheduler.exceptions.MissingRawDataException;
import com.github.kagkarlsson.scheduler.task.Execution;
import com.github.kagkarlsson.scheduler.task.TaskInstanceId;

import java.time.Instant;
import java.util.Objects;

Expand Down Expand Up @@ -47,6 +49,19 @@ public DATA_TYPE getData() {
throw new DataClassMismatchException(dataClass, data.getClass());
}

public boolean hasRawData() {
Object data = this.execution.taskInstance.getData();
return data == null || data.getClass().equals(byte[].class);
}

public byte[] getRawData() {
if (!hasRawData()) {
throw new MissingRawDataException(dataClass);
}

return (byte[]) this.execution.taskInstance.getData();
}

public Instant getLastSuccess() {
return execution.lastSuccess;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,15 @@
public class ScheduledExecutionsFilter {

private Boolean pickedValue;
private boolean includeUnresolved = false;

private ScheduledExecutionsFilter() {}

public static ScheduledExecutionsFilter all() {
return new ScheduledExecutionsFilter().withIncludeUnresolved(true);
}

public static ScheduledExecutionsFilter onlyResolved() {
return new ScheduledExecutionsFilter();
}

Expand All @@ -30,7 +35,16 @@ public ScheduledExecutionsFilter withPicked(boolean pickedValue) {
return this;
}

public ScheduledExecutionsFilter withIncludeUnresolved(boolean includeUnresolved) {
this.includeUnresolved = includeUnresolved;
return this;
}

public Optional<Boolean> getPickedValue() {
return Optional.ofNullable(pickedValue);
}

public boolean getIncludeUnresolved() {
return includeUnresolved;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,17 @@ <T> void fetchScheduledExecutionsForTask(
ScheduledExecutionsFilter filter,
Consumer<ScheduledExecution<T>> consumer);

/**
* @see #fetchScheduledExecutionsForTask(String, Class, Consumer)
*/
default <T> List<ScheduledExecution<Object>> getScheduledExecutionsForTask(
String taskName) {
List<ScheduledExecution<Object>> executions = new ArrayList<>();
fetchScheduledExecutionsForTask(taskName, Object.class, executions::add);
return executions;
}


/**
* @see #fetchScheduledExecutionsForTask(String, Class, Consumer)
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,12 @@ public TaskResolver(StatsRegistry statsRegistry, Clock clock, List<Task<?>> know
}

public Optional<Task> resolve(String taskName) {
return resolve(taskName, true);
}

public Optional<Task> resolve(String taskName, boolean addUnresolvedToExclusionFilter) {
Task task = taskMap.get(taskName);
if (task == null) {
if (task == null && addUnresolvedToExclusionFilter) {
addUnresolved(taskName);
statsRegistry.register(StatsRegistry.SchedulerStatsEvent.UNRESOLVED_TASK);
LOG.info(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ public class DataClassMismatchException extends DbSchedulerException {
public DataClassMismatchException(Class expectedClass, Class actualClass) {
super(
String.format(
"Task data mismatch. Expected class : %s, actual : %s", expectedClass, actualClass));
"Task data mismatch. If actual data-class is byte[], it might have been fetched without" +
" knowledge of task-data types, and is thus not deserialized." +
" Use getRawData() to get non-deserialized data in that case." +
" Expected class : %s, actual : %s", expectedClass, actualClass));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright (C) Gustav Karlsson
*
* <p>Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
* except in compliance with the License. You may obtain a copy of the License at
*
* <p>http://www.apache.org/licenses/LICENSE-2.0
*
* <p>Unless required by applicable law or agreed to in writing, software distributed under the
* License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.github.kagkarlsson.scheduler.exceptions;

public class MissingRawDataException extends DbSchedulerException {
private static final long serialVersionUID = 1L;

public MissingRawDataException(Class<?> dataClass) {
super(
String.format(
"Scheduled execution has typed data, use getData() to read the deserialized object. Data-class : %s",
dataClass));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,10 @@
*/
package com.github.kagkarlsson.scheduler.jdbc;

import static com.github.kagkarlsson.scheduler.StringUtils.truncate;
import static java.util.Optional.ofNullable;
import static java.util.stream.Collectors.joining;
import static java.util.stream.Collectors.toList;

import com.github.kagkarlsson.jdbc.JdbcRunner;
import com.github.kagkarlsson.jdbc.ResultSetMapper;
import com.github.kagkarlsson.jdbc.SQLRuntimeException;
import com.github.kagkarlsson.scheduler.Clock;
import com.github.kagkarlsson.scheduler.ScheduledExecutionsFilter;
import com.github.kagkarlsson.scheduler.SchedulerName;
import com.github.kagkarlsson.scheduler.TaskRepository;
import com.github.kagkarlsson.scheduler.TaskResolver;
import com.github.kagkarlsson.scheduler.*;
import com.github.kagkarlsson.scheduler.TaskResolver.UnresolvedTask;
import com.github.kagkarlsson.scheduler.exceptions.ExecutionException;
import com.github.kagkarlsson.scheduler.exceptions.TaskInstanceException;
Expand All @@ -34,6 +25,10 @@
import com.github.kagkarlsson.scheduler.task.SchedulableInstance;
import com.github.kagkarlsson.scheduler.task.Task;
import com.github.kagkarlsson.scheduler.task.TaskInstance;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.sql.DataSource;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
Expand All @@ -44,9 +39,11 @@
import java.util.Optional;
import java.util.function.Consumer;
import java.util.function.Supplier;
import javax.sql.DataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static com.github.kagkarlsson.scheduler.StringUtils.truncate;
import static java.util.Optional.ofNullable;
import static java.util.stream.Collectors.joining;
import static java.util.stream.Collectors.toList;

@SuppressWarnings("rawtypes")
public class JdbcTaskRepository implements TaskRepository {
Expand Down Expand Up @@ -244,22 +241,31 @@ public void getScheduledExecutions(
UnresolvedFilter unresolvedFilter = new UnresolvedFilter(taskResolver.getUnresolved());

QueryBuilder q = queryForFilter(filter);
if (unresolvedFilter.isActive()) {
if (unresolvedFilter.isActive() && !filter.getIncludeUnresolved()) {
q.andCondition(unresolvedFilter);
}

jdbcRunner.query(
q.getQuery(), q.getPreparedStatementSetter(), new ExecutionResultSetConsumer(consumer));
q.getQuery(),
q.getPreparedStatementSetter(),
new ExecutionResultSetConsumer(consumer, filter.getIncludeUnresolved(), false));
}

@Override
public void getScheduledExecutions(
ScheduledExecutionsFilter filter, String taskName, Consumer<Execution> consumer) {
UnresolvedFilter unresolvedFilter = new UnresolvedFilter(taskResolver.getUnresolved());

QueryBuilder q = queryForFilter(filter);
if (unresolvedFilter.isActive() && !filter.getIncludeUnresolved()) {
q.andCondition(unresolvedFilter);
}
q.andCondition(new TaskCondition(taskName));

jdbcRunner.query(
q.getQuery(), q.getPreparedStatementSetter(), new ExecutionResultSetConsumer(consumer));
q.getQuery(),
q.getPreparedStatementSetter(),
new ExecutionResultSetConsumer(consumer, filter.getIncludeUnresolved(), false));
}

@Override
Expand All @@ -285,7 +291,7 @@ public List<Execution> getDue(Instant now, int limit) {
p.setMaxRows(limit);
}
},
new ExecutionResultSetMapper());
new ExecutionResultSetMapper(false, true));
}

@Override
Expand Down Expand Up @@ -377,6 +383,7 @@ private boolean rescheduleInternal(
jdbcCustomization.setInstant(ps, index++, nextExecutionTime);
if (newData != null) {
// may cause datbase-specific problems, might have to use setNull instead
// FIXLATER: optionally support bypassing serializer if byte[] already
ps.setObject(index++, serializer.serialize(newData.data));
}
ps.setString(index++, execution.taskInstance.getTaskName());
Expand Down Expand Up @@ -449,7 +456,7 @@ public List<Execution> getDeadExecutions(Instant olderThan) {
jdbcCustomization.setInstant(p, index++, olderThan);
unresolvedFilter.setParameters(p, index);
},
new ExecutionResultSetMapper());
new ExecutionResultSetMapper(false, true));
}

@Override
Expand Down Expand Up @@ -497,7 +504,7 @@ public List<Execution> getExecutionsFailingLongerThan(Duration interval) {
jdbcCustomization.setInstant(p, index++, Instant.now().minus(interval));
unresolvedFilter.setParameters(p, index);
},
new ExecutionResultSetMapper());
new ExecutionResultSetMapper(false, false));
}

public Optional<Execution> getExecution(TaskInstance taskInstance) {
Expand All @@ -512,7 +519,7 @@ public Optional<Execution> getExecution(String taskName, String taskInstanceId)
p.setString(1, taskName);
p.setString(2, taskInstanceId);
},
new ExecutionResultSetMapper());
new ExecutionResultSetMapper(true, false));
if (executions.size() > 1) {
throw new TaskInstanceException(
"Found more than one matching execution for task name/id combination.",
Expand Down Expand Up @@ -544,7 +551,11 @@ public void checkSupportsLockAndFetch() {

private JdbcTaskRepositoryContext getTaskRespositoryContext() {
return new JdbcTaskRepositoryContext(
taskResolver, tableName, schedulerSchedulerName, jdbcRunner, ExecutionResultSetMapper::new);
taskResolver,
tableName,
schedulerSchedulerName,
jdbcRunner,
() -> new ExecutionResultSetMapper(false, true));
}

private QueryBuilder queryForFilter(ScheduledExecutionsFilter filter) {
Expand All @@ -567,9 +578,12 @@ private class ExecutionResultSetMapper implements ResultSetMapper<List<Execution

private final ExecutionResultSetConsumer delegate;

private ExecutionResultSetMapper() {
private ExecutionResultSetMapper(
boolean includeUnresolved, boolean addUnresolvedToExclusionFilter) {
this.executions = new ArrayList<>();
this.delegate = new ExecutionResultSetConsumer(executions::add);
this.delegate =
new ExecutionResultSetConsumer(
executions::add, includeUnresolved, addUnresolvedToExclusionFilter);
}

@Override
Expand All @@ -583,22 +597,38 @@ public List<Execution> map(ResultSet resultSet) throws SQLException {
private class ExecutionResultSetConsumer implements ResultSetMapper<Void> {

private final Consumer<Execution> consumer;
private final boolean includeUnresolved;
private boolean addUnresolvedToExclusionFilter;

private ExecutionResultSetConsumer(Consumer<Execution> consumer) {
this(consumer, false, true);
}

private ExecutionResultSetConsumer(
Consumer<Execution> consumer,
boolean includeUnresolved,
boolean addUnresolvedToExclusionFilter) {
this.consumer = consumer;
this.includeUnresolved = includeUnresolved;
this.addUnresolvedToExclusionFilter = addUnresolvedToExclusionFilter;
}

@Override
public Void map(ResultSet rs) throws SQLException {

while (rs.next()) {
String taskName = rs.getString("task_name");
Optional<Task> task = taskResolver.resolve(taskName);

if (!task.isPresent()) {
LOG.warn(
"Failed to find implementation for task with name '{}'. Execution will be excluded from due. Either delete the execution from the database, or add an implementation for it. The scheduler may be configured to automatically delete unresolved tasks after a certain period of time.",
Optional<Task> task = taskResolver.resolve(taskName, addUnresolvedToExclusionFilter);

if (!task.isPresent() && !includeUnresolved) {
if (addUnresolvedToExclusionFilter) {
LOG.warn(
"Failed to find implementation for task with name '{}'. Execution will be excluded from due. "
+ "Either delete the execution from the database, or add an implementation for it. "
+ "The scheduler may be configured to automatically delete unresolved tasks "
+ "after a certain period of time.",
taskName);
}
continue;
}

Expand All @@ -612,24 +642,32 @@ public Void map(ResultSet rs) throws SQLException {
Instant lastSuccess = jdbcCustomization.getInstant(rs, "last_success");
Instant lastFailure = jdbcCustomization.getInstant(rs, "last_failure");
int consecutiveFailures =
rs.getInt("consecutive_failures"); // null-value is returned as 0 which is the preferred
rs.getInt("consecutive_failures"); // null-value is returned as 0 which is the preferred
// default
Instant lastHeartbeat = jdbcCustomization.getInstant(rs, "last_heartbeat");
long version = rs.getLong("version");

Supplier dataSupplier =
memoize(() -> serializer.deserialize(task.get().getDataClass(), data));
memoize(
() -> {
if (!task.isPresent()) {
// return the data raw if the type is not known
// a case for standalone clients, with no "known tasks"
return data;
}
return serializer.deserialize(task.get().getDataClass(), data);
});
this.consumer.accept(
new Execution(
executionTime,
new TaskInstance(taskName, instanceId, dataSupplier),
picked,
pickedBy,
lastSuccess,
lastFailure,
consecutiveFailures,
lastHeartbeat,
version));
new Execution(
executionTime,
new TaskInstance(taskName, instanceId, dataSupplier),
picked,
pickedBy,
lastSuccess,
lastFailure,
consecutiveFailures,
lastHeartbeat,
version));
}

return null;
Expand All @@ -638,12 +676,11 @@ public Void map(ResultSet rs) throws SQLException {

private static <T> Supplier<T> memoize(Supplier<T> original) {
return new Supplier<T>() {
Supplier<T> delegate = this::firstTime;
boolean initialized;

public T get() {
return delegate.get();
}
} Supplier<T> delegate = this::firstTime;

private synchronized T firstTime() {
if (!initialized) {
Expand All @@ -653,6 +690,8 @@ private synchronized T firstTime() {
}
return delegate.get();
}


};
}

Expand Down
Loading

0 comments on commit f93afa0

Please sign in to comment.