0
결과적으로 정규 파일과 다른 파일을 나열하는 주 파일을 생성하는 장기 실행 작업이 있습니다.멀티 스레딩 + RxJava는 조건을 준수합니다.
스케줄러는 하루에 한 번이 파일을 cron을 통해 재생성합니다.
작업 흐름은 rx-java
을 사용하여 구현됩니다.
하나의 요청이 들어 와서 시작되거나 스케줄러가 작업을 시작한 다음 다른 작업이 진행되는 동안 다른 작업이 완료 될 때까지 기다리지 않고 다른 실행을 시작하는 경우 문제가 발생합니다.
그래서 작업 실행시 동기화하는 방법이 문제이므로 한 번만 수행됩니다. 아래의 샘플 코드,
@Service
public class FileService {
@Autowired FileRepository fileRepository;
@Autowired List<Pipeline> pipelines;
public Observable<File> getMainFile() {
if (fileRepository.isMainFileExists())
return Observable.just(fileRepository.getMainFile());
else
return generate(() -> fileRepository.getMainFile());
}
public Observable<File> getFile(String fileName) {
if (fileRepository.isMainFileExists())
return Observable.just(fileRepository.getFile(fileName));
else
return generate(() -> fileRepository.getFile(fileName));
}
Observable<File> generate(Func0<File> whenGenerated) {
return Observable.from(pipelines)
// other business logic goes here
// after task execution finished just get needed file
.map(isAllPipelinesSuccessful -> {
return whenGenerated.call();
});
}
@Scheduled(cron = "0 0 4 * * ?")
void scheduleGeneration() {
generate(() -> fileRepository.getMainFile()).subscribe();
}
}
그리고 그것은 컨트롤러라고 :
@RestController
public class FileController {
private static final Long TIMEOUT = 1_000 * 60 * 10L; //ten mins
@Autowired FileService fileService;
@RequestMapping(value = "/mainfile", produces = "application/xml")
public DeferredResult<ResponseEntity<InputStreamResource>> getMainFile() {
DeferredResult<ResponseEntity<InputStreamResource>> deferredResult = new DeferredResult<>(TIMEOUT);
Observable<File> observableMainFile = fileService.getMainFile();
observableMainFile
.map(this::fileToInputStreamResource)
.map(resource -> ResponseEntity.ok().cacheControl(CacheControl.maxAge(1, TimeUnit.HOURS).cachePublic()).body(resource))
.subscribe(deferredResult::setResult, ex -> {
deferredResult.setErrorResult(ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(null));
});
return deferredResult;
}
@RequestMapping(value = "/files/{filename:.+}", produces = "application/xml")
public DeferredResult<ResponseEntity<InputStreamResource>> getFile(@PathVariable("filename") String filename) {
DeferredResult<ResponseEntity<InputStreamResource>> deferredResult = new DeferredResult<>(TIMEOUT);
Observable<File> observableFile = fileService.getFile(filename);
observableFile
.map(this::fileToInputStreamResource)
.map(resource -> ResponseEntity.ok().cacheControl(CacheControl.maxAge(1, TimeUnit.HOURS).cachePublic()).body(resource))
.subscribe(deferredResult::setResult, ex -> {
boolean isFileNotFound = FileNotFoundException.class.isInstance(ex.getCause());
HttpStatus status = isFileNotFound ? HttpStatus.NOT_FOUND : HttpStatus.INTERNAL_SERVER_ERROR;
deferredResult.setErrorResult(ResponseEntity.status(status).body(null));
});
return deferredResult;
}
}
내가 틀렸다면 수정 : 모든 스레드가 제한됩니다 생성하기 위해 호출하지만 그들이 대기열에 포함됩니다하지 않은이 방법으로, 하나가 완료 그렇게 할 때 다른 하나는 부름받을 것인가? – marknorkin
네, 맞습니다. 나는 내 대답을 편집 할 것이다. 나는 당신이 수표를 Callable에서 추가 하겠지만. –
코드가 좀 지저분하지만 아이디어는 하나의 스레드에 대한 구독을 큐에 넣고 실행을 연기하는 것입니다. 따라서 큐에서 각 코드를 가져 오면 추가 확인이 수행됩니까? – marknorkin