Add retry and parameter increment logic to job runner

The existing behaviour of JobLauncherCommandLineRunner was really too
basic. It has now been enhanced (at the expense of duplicating a lot
of code in Spring Batch it seems) to automatically increment job
parameters if it can, and to retry a failed or stopped execution
if it can (without incrementing, but with additional job parameters
added from command line if they are non-identifying).

The JobLauncherCommandLineRunner is more extendable and exposes
its DI wiring points now as well, so hopefully users can make
use of it independently of autoconfig (by providing a @Bean of
that type).

Not everything from the wishlist in gh-325 is implememented yet,
but it should be a good platform to work with and to extend.
This commit is contained in:
Dave Syer 2014-02-24 18:15:20 +00:00
parent 09f3ee14a4
commit 8947307702
3 changed files with 289 additions and 23 deletions

View File

@ -44,14 +44,15 @@ import org.springframework.jdbc.core.JdbcOperations;
import org.springframework.util.StringUtils;
/**
* {@link EnableAutoConfiguration Auto-configuration} for Spring Batch. By default a Runner
* will be created and all jobs in the context will be executed on startup.
* {@link EnableAutoConfiguration Auto-configuration} for Spring Batch. By default a
* Runner will be created and all jobs in the context will be executed on startup.
*
* Disable this behaviour with <code>spring.batch.job.enabled=false</code>).
* Disable this behaviour with <code>spring.batch.job.enabled=false</code>).
*
* Alternatively, discrete Job names to execute on startup can be supplied by the User with
* a comma-delimited list: <code>spring.batch.job.names=job1,job2</code>. In this case the
* Runner will first find jobs registered as Beans, then those in the existing JobRegistry.
* Alternatively, discrete Job names to execute on startup can be supplied by the User
* with a comma-delimited list: <code>spring.batch.job.names=job1,job2</code>. In this
* case the Runner will first find jobs registered as Beans, then those in the existing
* JobRegistry.
*
* @author Dave Syer
*/
@ -76,8 +77,10 @@ public class BatchAutoConfiguration {
@Bean
@ConditionalOnMissingBean
@ConditionalOnExpression("${spring.batch.job.enabled:true}")
public JobLauncherCommandLineRunner jobLauncherCommandLineRunner() {
JobLauncherCommandLineRunner runner = new JobLauncherCommandLineRunner();
public JobLauncherCommandLineRunner jobLauncherCommandLineRunner(
JobLauncher jobLauncher, JobExplorer jobExplorer) {
JobLauncherCommandLineRunner runner = new JobLauncherCommandLineRunner(
jobLauncher, jobExplorer);
if (StringUtils.hasText(this.jobNames)) {
runner.setJobNames(this.jobNames);
}

View File

@ -19,23 +19,32 @@ package org.springframework.boot.autoconfigure.batch;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionException;
import org.springframework.batch.core.JobInstance;
import org.springframework.batch.core.JobParameter;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersIncrementer;
import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.batch.core.configuration.JobRegistry;
import org.springframework.batch.core.converter.DefaultJobParametersConverter;
import org.springframework.batch.core.converter.JobParametersConverter;
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.launch.JobParametersNotFoundException;
import org.springframework.batch.core.launch.NoSuchJobException;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
@ -58,25 +67,25 @@ public class JobLauncherCommandLineRunner implements CommandLineRunner,
private static Log logger = LogFactory.getLog(JobLauncherCommandLineRunner.class);
@Autowired(required = false)
private final JobParametersConverter converter = new DefaultJobParametersConverter();
private JobParametersConverter converter = new DefaultJobParametersConverter();
@Autowired
private JobLauncher jobLauncher;
@Autowired(required = false)
private JobRegistry jobRegistry;
@Autowired
private JobRepository jobRepository;
private JobExplorer jobExplorer;
private String jobNames;
@Autowired(required = false)
private final Collection<Job> jobs = Collections.emptySet();
private Collection<Job> jobs = Collections.emptySet();
private ApplicationEventPublisher publisher;
public JobLauncherCommandLineRunner(JobLauncher jobLauncher, JobExplorer jobExplorer) {
this.jobLauncher = jobLauncher;
this.jobExplorer = jobExplorer;
}
public void setJobNames(String jobNames) {
this.jobNames = jobNames;
}
@ -86,6 +95,21 @@ public class JobLauncherCommandLineRunner implements CommandLineRunner,
this.publisher = publisher;
}
@Autowired(required = false)
public void setJobRegistry(JobRegistry jobRegistry) {
this.jobRegistry = jobRegistry;
}
@Autowired(required = false)
public void setJobParametersConverter(JobParametersConverter converter) {
this.converter = converter;
}
@Autowired(required = false)
public void setJobs(Collection<Job> jobs) {
this.jobs = jobs;
}
@Override
public void run(String... args) throws JobExecutionException {
logger.info("Running default command line with: " + Arrays.asList(args));
@ -99,6 +123,61 @@ public class JobLauncherCommandLineRunner implements CommandLineRunner,
executeRegisteredJobs(jobParameters);
}
private JobParameters getNextJobParameters(Job job, JobParameters additionalParameters) {
String jobIdentifier = job.getName();
JobParameters jobParameters = new JobParameters();
List<JobInstance> lastInstances = this.jobExplorer.getJobInstances(jobIdentifier,
0, 1);
JobParametersIncrementer incrementer = job.getJobParametersIncrementer();
Map<String, JobParameter> additionals = additionalParameters.getParameters();
if (lastInstances.isEmpty()) {
// Start from a completely clean sheet
if (incrementer != null) {
jobParameters = incrementer.getNext(new JobParameters());
}
}
else {
List<JobExecution> lastExecutions = this.jobExplorer
.getJobExecutions(lastInstances.get(0));
JobExecution previousExecution = lastExecutions.get(0);
if (previousExecution == null) {
// Normally this will not happen - an instance exists with no executions
if (incrementer != null) {
jobParameters = incrementer.getNext(new JobParameters());
}
}
else if (previousExecution.getStatus() == BatchStatus.STOPPED
|| previousExecution.getStatus() == BatchStatus.FAILED) {
// Retry a failed or stopped execution
jobParameters = previousExecution.getJobParameters();
for (Entry<String, JobParameter> parameter : additionals.entrySet()) {
// Non-identifying additional parameters can be added to a retry
if (!parameter.getValue().isIdentifying()) {
additionals.remove(parameter.getKey());
}
}
}
else if (incrementer != null) {
// New instance so increment the parameters if we can
if (incrementer != null) {
jobParameters = incrementer.getNext(previousExecution
.getJobParameters());
}
}
}
Map<String, JobParameter> map = new HashMap<String, JobParameter>(
jobParameters.getParameters());
map.putAll(additionals);
jobParameters = new JobParameters(map);
return jobParameters;
}
private void executeRegisteredJobs(JobParameters jobParameters)
throws JobExecutionException {
if (this.jobRegistry != null && StringUtils.hasText(this.jobNames)) {
@ -121,12 +200,11 @@ public class JobLauncherCommandLineRunner implements CommandLineRunner,
protected void execute(Job job, JobParameters jobParameters)
throws JobExecutionAlreadyRunningException, JobRestartException,
JobInstanceAlreadyCompleteException, JobParametersInvalidException {
String jobName = job.getName();
JobExecution previousExecution = this.jobRepository.getLastJobExecution(jobName,
jobParameters);
if (previousExecution == null || previousExecution.getStatus().isUnsuccessful()) {
JobExecution execution = this.jobLauncher.run(job, jobParameters);
JobInstanceAlreadyCompleteException, JobParametersInvalidException,
JobParametersNotFoundException {
JobParameters nextParameters = getNextJobParameters(job, jobParameters);
if (nextParameters != null) {
JobExecution execution = this.jobLauncher.run(job, nextParameters);
if (this.publisher != null) {
this.publisher.publishEvent(new JobExecutionEvent(execution));
}

View File

@ -0,0 +1,185 @@
/*
* Copyright 2012-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.boot.autoconfigure.batch;
import org.junit.Before;
import org.junit.Test;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.BatchConfigurer;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.batch.core.explore.support.MapJobExplorerFactoryBean;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.launch.support.SimpleJobLauncher;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.batch.support.transaction.ResourcelessTransactionManager;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.SyncTaskExecutor;
import org.springframework.transaction.PlatformTransactionManager;
import static org.junit.Assert.assertEquals;
/**
* @author Dave Syer
*/
public class JobLauncherCommandLineRunnerTests {
private JobLauncherCommandLineRunner runner;
private AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();
private JobExplorer jobExplorer;
private JobLauncher jobLauncher;
private JobBuilderFactory jobs;
private StepBuilderFactory steps;
private Job job;
private Step step;
@Before
public void init() throws Exception {
this.context.register(BatchConfiguration.class);
this.context.refresh();
JobRepository jobRepository = this.context.getBean(JobRepository.class);
this.jobLauncher = this.context.getBean(JobLauncher.class);
this.jobs = new JobBuilderFactory(jobRepository);
PlatformTransactionManager transactionManager = this.context
.getBean(PlatformTransactionManager.class);
this.steps = new StepBuilderFactory(jobRepository, transactionManager);
this.step = this.steps.get("step").tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution,
ChunkContext chunkContext) throws Exception {
return null;
}
}).build();
this.job = this.jobs.get("job").start(this.step).build();
this.jobExplorer = this.context.getBean(JobExplorer.class);
this.runner = new JobLauncherCommandLineRunner(this.jobLauncher, this.jobExplorer);
this.context.getBean(BatchConfiguration.class).clear();
}
@Test
public void basicExecution() throws Exception {
this.runner.execute(this.job, new JobParameters());
assertEquals(1, this.jobExplorer.getJobInstances("job", 0, 100).size());
this.runner.execute(this.job, new JobParametersBuilder().addLong("id", 1L)
.toJobParameters());
assertEquals(2, this.jobExplorer.getJobInstances("job", 0, 100).size());
}
@Test
public void incrementExistingExecution() throws Exception {
this.job = this.jobs.get("job").start(this.step)
.incrementer(new RunIdIncrementer()).build();
this.runner.execute(this.job, new JobParameters());
this.runner.execute(this.job, new JobParameters());
assertEquals(2, this.jobExplorer.getJobInstances("job", 0, 100).size());
}
@Test
public void retryFailedExecution() throws Exception {
this.job = this.jobs.get("job")
.start(this.steps.get("step").tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution,
ChunkContext chunkContext) throws Exception {
throw new RuntimeException("Planned");
}
}).build()).incrementer(new RunIdIncrementer()).build();
this.runner.execute(this.job, new JobParameters());
this.runner.execute(this.job, new JobParameters());
assertEquals(1, this.jobExplorer.getJobInstances("job", 0, 100).size());
}
@Test
public void retryFailedExecutionWithNonIdentifyingParameters() throws Exception {
this.job = this.jobs.get("job")
.start(this.steps.get("step").tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution,
ChunkContext chunkContext) throws Exception {
throw new RuntimeException("Planned");
}
}).build()).incrementer(new RunIdIncrementer()).build();
JobParameters jobParameters = new JobParametersBuilder().addLong("id", 1L, false)
.toJobParameters();
this.runner.execute(this.job, jobParameters);
this.runner.execute(this.job, jobParameters);
assertEquals(1, this.jobExplorer.getJobInstances("job", 0, 100).size());
}
@Configuration
@EnableBatchProcessing
protected static class BatchConfiguration implements BatchConfigurer {
private ResourcelessTransactionManager transactionManager = new ResourcelessTransactionManager();
private JobRepository jobRepository;
private MapJobRepositoryFactoryBean jobRepositoryFactory = new MapJobRepositoryFactoryBean(
this.transactionManager);
public BatchConfiguration() throws Exception {
this.jobRepository = this.jobRepositoryFactory.getJobRepository();
}
public void clear() {
this.jobRepositoryFactory.clear();
}
@Override
public JobRepository getJobRepository() throws Exception {
return this.jobRepository;
}
@Override
public PlatformTransactionManager getTransactionManager() throws Exception {
return this.transactionManager;
}
@Override
public JobLauncher getJobLauncher() throws Exception {
SimpleJobLauncher launcher = new SimpleJobLauncher();
launcher.setJobRepository(this.jobRepository);
launcher.setTaskExecutor(new SyncTaskExecutor());
return launcher;
}
@Bean
public JobExplorer jobExplorer() throws Exception {
return (JobExplorer) new MapJobExplorerFactoryBean(this.jobRepositoryFactory)
.getObject();
}
}
}