2016-10-17 7 views
0

TaskExecutor를 사용하는 FAILED 스프링 배치 작업을 다시 시작하는 적절한 방법은 무엇입니까?스프링 배치 - TaskExecutor를 사용하는 FAILED 작업

HTTP에서 데이터를로드하는 작업이 있는데 때로는 500 오류가 발생하여이 작업이 실패합니다. 이 작업이 성공적으로 완료 될 때까지이 작업을 다시 시작하고 싶습니다.

내가 JobExecutionListener를 만들고 afterJob() 메서드 내부에 논리를 구현하면이 작업이 실제로 실행되고 있다는 오류 메시지가 나타납니다. Spring에서 RetryTemplate을 사용하면 TaskExecutor 내에서 실행되기 때문에 이것도 작동하지 않습니다.

모든 코드 샘플은 큰 도움이 될 것입니다.

+0

= TRUE '> 재시작 : 작업이 재시작인지 여부를 지정합니다 – Pau

답변

0

마지막으로 나는 다시 구현 JobLauncher하여 문제를 해결 : 당신은 다시 시작할 수`로 작업을 구성 할 필요가

public class FaultTolerantJobLauncher implements JobLauncher, InitializingBean { 

    protected static final Log logger = LogFactory.getLog(FaultTolerantJobLauncher.class); 

    private JobRepository jobRepository; 

    private RetryTemplate retryTemplate; 

    private TaskExecutor taskExecutor; 

    /** 
    * Run the provided job with the given {@link JobParameters}. The 
    * {@link JobParameters} will be used to determine if this is an execution 
    * of an existing job instance, or if a new one should be created. 
    * 
    * @param job the job to be run. 
    * @param jobParameters the {@link JobParameters} for this particular 
    * execution. 
    * @return JobExecutionAlreadyRunningException if the JobInstance already 
    * exists and has an execution already running. 
    * @throws JobRestartException if the execution would be a re-start, but a 
    * re-start is either not allowed or not needed. 
    * @throws JobInstanceAlreadyCompleteException if this instance has already 
    * completed successfully 
    * @throws JobParametersInvalidException 
    */ 
    @Override 
    public JobExecution run(final Job job, final JobParameters jobParameters) 
      throws JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException, 
      JobParametersInvalidException { 

     Assert.notNull(job, "The Job must not be null."); 
     Assert.notNull(jobParameters, "The JobParameters must not be null."); 

     final AtomicReference<JobExecution> executionReference = new AtomicReference<>(); 
     JobExecution lastExecution = jobRepository.getLastJobExecution(job.getName(), jobParameters); 
     if (lastExecution != null) { 
      if (!job.isRestartable()) { 
       throw new JobRestartException("JobInstance already exists and is not restartable"); 
      } 
      /* 
      * validate here if it has stepExecutions that are UNKNOWN, STARTING, STARTED and STOPPING 
      * retrieve the previous execution and check 
      */ 
      for (StepExecution execution : lastExecution.getStepExecutions()) { 
       BatchStatus status = execution.getStatus(); 
       if (status.isRunning() || status == BatchStatus.STOPPING) { 
        throw new JobExecutionAlreadyRunningException("A job execution for this job is already running: " 
          + lastExecution); 
       } else if (status == BatchStatus.UNKNOWN) { 
        throw new JobRestartException(
          "Cannot restart step [" + execution.getStepName() + "] from UNKNOWN status. " 
            + "The last execution ended with a failure that could not be rolled back, " 
            + "so it may be dangerous to proceed. Manual intervention is probably necessary."); 
       } 
      } 
     } 

     // Check the validity of the parameters before doing creating anything 
     // in the repository... 
     job.getJobParametersValidator().validate(jobParameters); 

     taskExecutor.execute(new Runnable() { 
      @Override 
      public void run() { 
       try { 
        retryTemplate.execute(new FaultTolerantJobRetryCallback(executionReference, job, jobParameters)); 
       } catch (TaskRejectedException e) { 
        executionReference.get().upgradeStatus(BatchStatus.FAILED); 
        if (executionReference.get().getExitStatus().equals(ExitStatus.UNKNOWN)) { 
         executionReference.get().setExitStatus(ExitStatus.FAILED.addExitDescription(e)); 
        } 
        jobRepository.update(executionReference.get()); 
       } 
      } 
     }); 

     return executionReference.get(); 
    } 

    /** 
    * Set the JobRepsitory. 
    * 
    * @param jobRepository 
    */ 
    public void setJobRepository(JobRepository jobRepository) { 
     this.jobRepository = jobRepository; 
    } 

    /** 
    * Set the retryTemplate 
    * 
    * @param retryTemplate 
    */ 
    public void setRetryTemplate(RetryTemplate retryTemplate) { 
     this.retryTemplate = retryTemplate; 
    } 

    /** 
    * Set the TaskExecutor. (Optional) 
    * 
    * @param taskExecutor 
    */ 
    public void setTaskExecutor(TaskExecutor taskExecutor) { 
     this.taskExecutor = taskExecutor; 
    } 

    /** 
    * Ensure the required dependencies of a {@link JobRepository} have been 
    * set. 
    */ 
    @Override 
    public void afterPropertiesSet() throws Exception { 
     Assert.state(jobRepository != null, "A JobRepository has not been set."); 
     Assert.state(retryTemplate != null, "A RetryTemplate has not been set."); 
     if (taskExecutor == null) { 
      logger.info("No TaskExecutor has been set, defaulting to synchronous executor."); 
      taskExecutor = new SyncTaskExecutor(); 
     } 
    } 

    private class FaultTolerantJobRetryCallback implements RetryCallback<Object, RuntimeException> { 

     private final AtomicReference<JobExecution> executionReference; 
     private final Job job; 
     private final JobParameters jobParameters; 

     FaultTolerantJobRetryCallback(AtomicReference<JobExecution> executionReference, Job job, JobParameters jobParameters){ 
      this.executionReference = executionReference; 
      this.job = job; 
      this.jobParameters = jobParameters; 
     } 

     @Override 
     public Object doWithRetry(RetryContext retryContext) { 
      if(!job.isRestartable()){ 
       //will be set only once and in case that job can not be restarted we don't retry 
       retryContext.setExhaustedOnly(); 
      } 

      if(retryContext.getRetryCount() > 0){ 
       logger.info("Job: [" + job + "] retrying/restarting with the following parameters: [" + jobParameters 
         + "]"); 
      } 

      try { 
       /* 
       * There is a very small probability that a non-restartable job can be 
       * restarted, but only if another process or thread manages to launch 
       * <i>and</i> fail a job execution for this instance between the last 
       * assertion and the next method returning successfully. 
       */ 
       executionReference.set(jobRepository.createJobExecution(job.getName(), jobParameters)); 
       logger.info("Job: [" + job + "] launched with the following parameters: [" + jobParameters 
         + "]"); 
       job.execute(executionReference.get()); 
       logger.info("Job: [" + job + "] completed with the following parameters: [" + jobParameters 
         + "] and the following status: [" + executionReference.get().getStatus() + "]"); 
      } 
      catch (JobInstanceAlreadyCompleteException | JobExecutionAlreadyRunningException e){ 
       retryContext.setExhaustedOnly(); //don't repeat if instance already complete or running 
       rethrow(e); 
      } 
      catch (Throwable t) { 
       logger.info("Job: [" + job 
         + "] failed unexpectedly and fatally with the following parameters: [" + jobParameters 
         + "]", t); 
       rethrow(t); 
      } 

      if(job.isRestartable() && executionReference.get().getStatus() == BatchStatus.FAILED){ 
       //if job failed and can be restarted, use retry template to restart the job 
       throw new TaskRejectedException("RetryTemplate failed too many times"); 
      } 

      return null; 
     } 

     private void rethrow(Throwable t) { 
      if (t instanceof RuntimeException) { 
       throw (RuntimeException) t; 
      } 
      else if (t instanceof Error) { 
       throw (Error) t; 
      } 
      throw new IllegalStateException(t); 
     } 
    } 
} 
관련 문제