2016-10-25 1 views
0

결과적으로 정규 파일과 다른 파일을 나열하는 주 파일을 생성하는 장기 실행 작업이 있습니다.멀티 스레딩 + RxJava는 조건을 준수합니다.

스케줄러는 하루에 한 번이 파일을 cron을 통해 재생성합니다.

작업 흐름은 rx-java을 사용하여 구현됩니다.

하나의 요청이 들어 와서 시작되거나 스케줄러가 작업을 시작한 다음 다른 작업이 진행되는 동안 다른 작업이 완료 될 때까지 기다리지 않고 다른 실행을 시작하는 경우 문제가 발생합니다.

그래서 작업 실행시 동기화하는 방법이 문제이므로 한 번만 수행됩니다. 아래의 샘플 코드,

@Service 
public class FileService { 
    @Autowired FileRepository fileRepository; 
    @Autowired List<Pipeline> pipelines; 

    public Observable<File> getMainFile() { 
     if (fileRepository.isMainFileExists()) 
      return Observable.just(fileRepository.getMainFile()); 
     else 
      return generate(() -> fileRepository.getMainFile()); 
    } 

    public Observable<File> getFile(String fileName) { 
     if (fileRepository.isMainFileExists()) 
      return Observable.just(fileRepository.getFile(fileName)); 
     else 
      return generate(() -> fileRepository.getFile(fileName)); 
    } 

    Observable<File> generate(Func0<File> whenGenerated) { 
     return Observable.from(pipelines) 
       // other business logic goes here 
       // after task execution finished just get needed file 
       .map(isAllPipelinesSuccessful -> { 
        return whenGenerated.call(); 
       }); 
    } 

    @Scheduled(cron = "0 0 4 * * ?") 
    void scheduleGeneration() { 
     generate(() -> fileRepository.getMainFile()).subscribe(); 
    } 
} 

그리고 그것은 컨트롤러라고 :

@RestController 
public class FileController { 
    private static final Long TIMEOUT = 1_000 * 60 * 10L; //ten mins 
    @Autowired FileService fileService; 

    @RequestMapping(value = "/mainfile", produces = "application/xml") 
    public DeferredResult<ResponseEntity<InputStreamResource>> getMainFile() { 
     DeferredResult<ResponseEntity<InputStreamResource>> deferredResult = new DeferredResult<>(TIMEOUT); 
     Observable<File> observableMainFile = fileService.getMainFile(); 
     observableMainFile 
       .map(this::fileToInputStreamResource) 
       .map(resource -> ResponseEntity.ok().cacheControl(CacheControl.maxAge(1, TimeUnit.HOURS).cachePublic()).body(resource)) 
       .subscribe(deferredResult::setResult, ex -> { 
       deferredResult.setErrorResult(ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(null)); 
       }); 
     return deferredResult; 
    } 
    @RequestMapping(value = "/files/{filename:.+}", produces = "application/xml") 
    public DeferredResult<ResponseEntity<InputStreamResource>> getFile(@PathVariable("filename") String filename) { 
     DeferredResult<ResponseEntity<InputStreamResource>> deferredResult = new DeferredResult<>(TIMEOUT); 
     Observable<File> observableFile = fileService.getFile(filename); 
     observableFile 
       .map(this::fileToInputStreamResource) 
       .map(resource -> ResponseEntity.ok().cacheControl(CacheControl.maxAge(1, TimeUnit.HOURS).cachePublic()).body(resource)) 
       .subscribe(deferredResult::setResult, ex -> { 
        boolean isFileNotFound = FileNotFoundException.class.isInstance(ex.getCause()); 
        HttpStatus status = isFileNotFound ? HttpStatus.NOT_FOUND : HttpStatus.INTERNAL_SERVER_ERROR; 
        deferredResult.setErrorResult(ResponseEntity.status(status).body(null)); 
       }); 
     return deferredResult; 
    } 
} 

답변

1

나는 다음과 같은 것을 가지고,하지만 난 이것에 더 나은 방법 해결책이 있다고 생각

샘플 코드 . RxJava2-RC5를 사용하고 있습니다.

  1. 답변이 없으면 해당 작업이 실행되었습니다. https://gist.github.com/anonymous/7b4717cea7ddce270a2e39850a3bd2a4

UPDATE :

interface FileRepository { 
     String getFile(); 

     Boolean isMainFileExists(); 
} 

private static Scheduler executorService = Schedulers.from(Executors.newFixedThreadPool(1)); 

@org.junit.Test 
public void schedulerTest123() throws Exception { 
     FileRepository fRepo = mock(FileRepository.class); 

     when(fRepo.getFile()).thenReturn(""); 
     when(fRepo.isMainFileExists()).thenReturn(false); 

     Thread t1 = new Thread(() -> { 
      getFile(fRepo, executorService).subscribe(); 
     }); 

     Thread t2 = new Thread(() -> { 
      getFile(fRepo, executorService).subscribe(); 
     }); 

     t1.start(); 
     t2.start(); 

     Thread.sleep(3_000); 

     when(fRepo.getFile()).thenReturn("DasFile"); 
     when(fRepo.isMainFileExists()).thenReturn(true); 

     Thread t3 = new Thread(() -> { 
      getFile(fRepo, executorService).subscribe(); 
     }); 

     t3.start(); 

     Thread.sleep(5_000); 
} 

private Observable<String> getFile(FileRepository fileRepo, Scheduler scheduler) { 
     return Observable.defer(() -> { 
      try { 
       if (fileRepo.isMainFileExists()) { 
        return Observable.fromCallable(fileRepo::getFile) 
          .subscribeOn(Schedulers.io()) 
          .doOnNext(s -> printCurrentThread("Get File from Repo")); 
       } else { 
        return startLongProcess().doOnNext(s -> printCurrentThread("Push long processValue")); 
       } 

      } catch (Exception ex) { 
       return Observable.error(ex); 
      } 
     }).subscribeOn(scheduler).doOnSubscribe(disposable -> printCurrentThread("SUB")); 
    } 

private Observable<String> startLongProcess() { 
     return Observable.fromCallable(() -> { 
      printCurrentThread("Doing LongProcess"); 

      Thread.sleep(5_000); 

      return "leFile"; 
     }); 
} 

private void printCurrentThread(String additional) { 
     System.out.println(additional + "_" + Thread.currentThread()); 
} 
+0

내가 틀렸다면 수정 : 모든 스레드가 제한됩니다 생성하기 위해 호출하지만 그들이 대기열에 포함됩니다하지 않은이 방법으로, 하나가 완료 그렇게 할 때 다른 하나는 부름받을 것인가? – marknorkin

+0

네, 맞습니다. 나는 내 대답을 편집 할 것이다. 나는 당신이 수표를 Callable에서 추가 하겠지만. –

+0

코드가 좀 지저분하지만 아이디어는 하나의 스레드에 대한 구독을 큐에 넣고 실행을 연기하는 것입니다. 따라서 큐에서 각 코드를 가져 오면 추가 확인이 수행됩니까? – marknorkin