2016-08-23 2 views
1

그래서 스프링 배치 3.0.7.RELEASE 및 스프링 4.3.2.RELEASE에 문제가 있습니다. 여기서는 동시성을 사용할 때 ItemProcessor에 prototype 범위를 사용하려고합니다.스프링 배치 프로토 타입 범위 항목 프로세서

내가 appBatchCreationProcessorprototype의 범위를 만들기 위해 노력했습니다, appBatchCreationProcessor()BatchCreationStep() 참조하지만, 동일한 항목 프로세서는 10 개 개의 스레드에 걸쳐 사용되는 어떤 영향을 미칠 것 같지 않습니다.

이 방법이 있습니까? 또는 디자인에 의한 것인가? 이 싱글 BatchCreationStep()로 주입되기 때문에

AppBatchConfiguration.java

@Configuration 
@EnableBatchProcessing 
@ComponentScan(basePackages = "our.org.base") 
public class AppBatchConfiguration { 

    private final static SimpleLogger LOGGER = SimpleLogger.getInstance(AppBatchConfiguration.class); 

    private final static String OUTPUT_XML_FILE_PATH_PLACEHOLDER = null; 
    private final static String INPUT_XML_FILE_PATH_PLACEHOLDER = null; 

    @Autowired 
    public JobBuilderFactory jobBuilderFactory; 

    @Autowired 
    public StepBuilderFactory stepBuilderFactory; 

    @Bean(name = "cimAppXmlReader") 
    @StepScope 
    public <T> ItemStreamReader<T> appXmlReader(@Value("#{jobParameters[inputXmlFilePath]}") 
    String inputXmlFilePath) { 
     LOGGER.info("Job Parameter => App XML File Path :" + inputXmlFilePath); 
     StaxEventItemReader<T> reader = new StaxEventItemReader<T>(); 
     reader.setResource(new FileSystemResource(inputXmlFilePath)); 
     reader.setUnmarshaller(mecaUnMarshaller()); 
     reader.setFragmentRootElementNames(getAppRootElementNames()); 
     reader.setSaveState(false); 

     // Make the StaxEventItemReader thread-safe 
     SynchronizedItemStreamReader<T> synchronizedItemStreamReader = new SynchronizedItemStreamReader<T>(); 
     synchronizedItemStreamReader.setDelegate(reader); 

     return synchronizedItemStreamReader; 
    } 

    @Bean 
    @StepScope 
    public ItemStreamReader<JAXBElement<AppIBTransactionHeaderType>> appXmlTransactionHeaderReader(@Value("#{jobParameters[inputXmlFilePath]}") 
    String inputXmlFilePath) { 
     LOGGER.info("Job Parameter => App XML File Path for Transaction Header :" + inputXmlFilePath); 
     StaxEventItemReader<JAXBElement<AppIBTransactionHeaderType>> reader = new StaxEventItemReader<>(); 
     reader.setResource(new FileSystemResource(inputXmlFilePath)); 
     reader.setUnmarshaller(mecaUnMarshaller()); 

     String[] fragmentRootElementNames = new String[] {"AppIBTransactionHeader"}; 
     reader.setFragmentRootElementNames(fragmentRootElementNames); 
     reader.setSaveState(false); 

     return reader; 
    } 

    @Bean 
    public Unmarshaller mecaUnMarshaller() { 
     Jaxb2Marshaller marshaller = new Jaxb2Marshaller(); 
     marshaller.setPackagesToScan(ObjectFactory.class.getPackage().getName()); 
     return marshaller; 
    } 

    @Bean 
    public Marshaller uberMarshaller() { 
     Jaxb2Marshaller marshaller = new Jaxb2Marshaller(); 
     marshaller.setClassesToBeBound(ServiceRequestType.class); 
     marshaller.setSupportJaxbElementClass(true); 
     return marshaller; 
    } 

    @Bean(destroyMethod="") // To stop multiple close calls, see: http://stackoverflow.com/a/23089536 
    @StepScope 
    public ResourceAwareItemWriterItemStream<JAXBElement<ServiceRequestType>> writer(@Value("#{jobParameters[outputXmlFilePath]}") 
    String outputXmlFilePath) { 
     SyncStaxEventItemWriter<JAXBElement<ServiceRequestType>> writer = new SyncStaxEventItemWriter<JAXBElement<ServiceRequestType>>(); 

     writer.setResource(new FileSystemResource(outputXmlFilePath)); 
     writer.setMarshaller(uberMarshaller()); 
     writer.setSaveState(false); 
     HashMap<String, String> rootElementAttribs = new HashMap<String, String>(); 
     rootElementAttribs.put("xmlns:ns1", "http://some.org/corporate/message/2010/1"); 
     writer.setRootElementAttributes(rootElementAttribs); 
     writer.setRootTagName("ns1:SetOfServiceRequests"); 

     return writer; 
    } 

    @Bean 
    @StepScope 
    public <T> ItemProcessor<T, JAXBElement<ServiceRequestType>> appNotificationProcessor() { 
     return new AppBatchNotificationItemProcessor<T>(); 
    } 

    @Bean 
    @Scope(scopeName=ConfigurableBeanFactory.SCOPE_PROTOTYPE) 
    public ItemProcessor<JAXBElement<AppIBTransactionHeaderType>, Boolean> appBatchCreationProcessor() { 
     return new AppBatchCreationItemProcessor(); 
    } 


    public String[] getAppRootElementNames() {   
     //get list of App Transaction Element Names   
     return AppProcessorEnum.getValues();   
    } 

    @Bean 
    public Step AppStep() { 
     // INPUT_XML_FILE_PATH_PLACEHOLDER and OUTPUT_XML_FILE_PATH_PLACEHOLDER will be overridden 
     // by injected jobParameters using late binding (StepScope) 
     return stepBuilderFactory.get("AppStep") 
       .<Object, JAXBElement<ServiceRequestType>> chunk(10) 
       .reader(appXmlReader(INPUT_XML_FILE_PATH_PLACEHOLDER)) 
       .processor(appNotificationProcessor()) 
       .writer(writer(OUTPUT_XML_FILE_PATH_PLACEHOLDER)) 
       .taskExecutor(concurrentTaskExecutor()) 
       .throttleLimit(1) 
       .build(); 

    } 

    @Bean 
    public Step BatchCreationStep() { 
     return stepBuilderFactory.get("BatchCreationStep") 
       .<JAXBElement<AppIBTransactionHeaderType>, Boolean>chunk(1) 
       .reader(appXmlTransactionHeaderReader(INPUT_XML_FILE_PATH_PLACEHOLDER)) 
       .processor(appBatchCreationProcessor()) 
       .taskExecutor(concurrentTaskExecutor()) 
       .throttleLimit(10) 
       .build(); 
    } 

    @Bean 
    public Job AppJob() { 
     return jobBuilderFactory.get("AppJob") 
       .incrementer(new RunIdIncrementer()) 
       .listener(AppJobCompletionNotificationListener()) 
       .flow(AppStep()) 
       .next(BatchCreationStep()) 
       .end() 
       .build(); 
    } 

    @Bean 
    public JobCompletionNotificationListener AppJobCompletionNotificationListener() { 
     return new JobCompletionNotificationListener(); 
    } 

    @Bean 
    public TaskExecutor concurrentTaskExecutor() { 
     SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor(); 
     taskExecutor.setConcurrencyLimit(10); 
     return taskExecutor; 
    } 
} 

답변

3

예,이 디자인입니다 사용할 수 있습니다.

작업이 실행되기 전에 작성된 정적 구조로 작업자와 리더, 프로세서 및 작성자의 작업 단계를 생각하십시오. 즉, 적절한 createReader, createProcessor 메소드가 호출되었고 작업이 실행되기 전에 bean 인스턴스가 작성되었다는 것을 의미합니다.

SCOPE_PROTOTYPE은이 단계에서 평가되며이 단계에서 createProcessor 메소드가 한 번만 호출되기 때문에 인스턴스가 하나만 존재합니다.

작업이 시작되면이 구조는 "안정적"으로 유지됩니다.

이제 스프링 배치는 단계 작성을 시작하기 위해 bean 작성을 연기하는 "단계 범위"를 제공하여 약간 수정합니다. 그러나 다중 스레드로 단계를 실행하는 경우에는 도움이되지 않습니다. 여전히 인스턴스가 하나뿐입니다. 예제의 프로세서는 모든 스레드에 사용됩니다.

"ThreadScope"와 같은 것이지만 스프링 또는 스프링 배치 내부에는 그러한 개념이 없습니다. 예를 들어 ThreadLocal 멤버를 사용하여 프로세서를 적절히 구현해야합니다.

예컨대 당신은 이런 식으로 당신의 프로세서를 래핑 수 :

public class ThreadLocalItemProcessor implements ItemProcessor { 

    private ThreadLocal<ItemProcessor> threadProcessor = ThreadLocal.withInitial(() -> new MyProcessor()); 

    @Override 
    public Object process(Object item) throws Exception { 
     return threadProcessor.get().process(item); 
    } 
} 

편집 : 프로토 타입 방법

사용자의 프로세서가 SpringBean으로서 인스턴스화되면,도를 autowire를 사용할 수 주입 용. `) (심지어 방식 삽입을 사용하여

@Configuration 
public class PrototypeFactory { 
    @Bean 
    @Scope(Prototype) 
    public YourInterfaceOrClass createInstance() { 
     return new YourInterfaceOrClass(); 
    } 
} 

public class ThreadLocalItemProcessor implements ItemProcessor { 

    @Autowired 
    private PrototypeFactory prototypeFactory; 

    private ThreadLocal<ItemProcessor> threadProcessor = ThreadLocal.withInitial(this::processorCreator); 

    @Override 
    public Object process(Object item) throws Exception { 
     return threadProcessor.get().process(item); 
    } 

    //ItemProcessor directly implemented as lambda 
    // this will only be called once per working thread 
    private Object process(Object input) { 

     // will produce a valid SpringBean instance 
     YourInterfaceOrClass inst = prototypeFactory.createInstance(); 

     ... process the input   
    } 
} 
+0

스프링이 수명주기를 더 이상 제어하지 못하기 때문에 스레드 로컬 프로세서가 아무 것도 주입 할 수 없다고 생각합니까? 가장 좋은 방법은 프로토 타입 범위를 가져야하는 항목에 대해 ItemProcessor 내에'@ Lookup' 메서드를 갖는 것입니다. –

+1

답안에 예제를 추가했습니다. 이렇게하면 스레드 당 유효한 springBean 인스턴스가 생성됩니다. –

0

appBatchCreationProcessor() 같은 모든 스레드에 사용되는 이유이다. 또한, BatchCreationStep()this documentation에 따르면 싱글 AppJob()

에 주입 :

당신은 프로토 타입 콩에 종속 싱글 범위의 콩을 사용하면 의존성이 인스턴스화 시간에 해결되는 것을주의해야합니다. 따라서 프로토 타입 범위의 bean을 단일 범위 bean에 종속성 삽입하면 새 프로토 타입 bean이 인스턴스화 된 후 종속 항목에 종속 입력됩니다. 프로토 타입 인스턴스는 싱글 톤 범위에 제공되는 유일한 인스턴스 인 입니다. 당신이 정말로 새로운 appBatchCreationProcessor()를 작성해야하는 경우

그러나 method injection

+0

, 여전히 하나의'appBatchCreationProcessor를 주입 것 : 따라서, 다음과 같이 (물론, 프로토 타입 공장 springbean로 인스턴스화하는) 프로토 타입 공장을 주입 할 수 안 그래? 왜냐하면 그것은 한 번만 부름받을 것이기 때문입니다. 나는 커스텀 태스크 실행 프로그램이나 뭔가를 주입이나 이상한 것을 다시 써야 할 것이라고 생각한다. 스레드로부터 안전해야하는 부분에 대해 ItemProcessor 내부에서'@ Lookup '을 사용하여 문제를 해결할 수있는 것처럼 보입니다. –

+0

@JoelPearson 아니요, 위의 방법 주사 링크를주의 깊게 읽으십시오. 메소드 삽입은 인스턴스화 시간이 아닌 런타임시 종속성을 해결하여 작동합니다. – Wins

+0

네, 그렇습니다. 스프링 배치 프레임 워크 만 실행/appBatchCreationProcessor()를 평가할 때 단계를 생성하지 않습니다. 그리고 그 인스턴스를 모든 스레드와 공유합니까? 아니면 각 스레드에 대해 appBatchCreationProcessor()를 호출 할 것이라고 생각합니까? –