그래서 스프링 배치 3.0.7.RELEASE
및 스프링 4.3.2.RELEASE
에 문제가 있습니다. 여기서는 동시성을 사용할 때 ItemProcessor에 prototype
범위를 사용하려고합니다.스프링 배치 프로토 타입 범위 항목 프로세서
내가 appBatchCreationProcessor
prototype
의 범위를 만들기 위해 노력했습니다, appBatchCreationProcessor()
및 BatchCreationStep()
참조하지만, 동일한 항목 프로세서는 10 개 개의 스레드에 걸쳐 사용되는 어떤 영향을 미칠 것 같지 않습니다.
이 방법이 있습니까? 또는 디자인에 의한 것인가? 이 싱글 BatchCreationStep()로 주입되기 때문에
AppBatchConfiguration.java
@Configuration
@EnableBatchProcessing
@ComponentScan(basePackages = "our.org.base")
public class AppBatchConfiguration {
private final static SimpleLogger LOGGER = SimpleLogger.getInstance(AppBatchConfiguration.class);
private final static String OUTPUT_XML_FILE_PATH_PLACEHOLDER = null;
private final static String INPUT_XML_FILE_PATH_PLACEHOLDER = null;
@Autowired
public JobBuilderFactory jobBuilderFactory;
@Autowired
public StepBuilderFactory stepBuilderFactory;
@Bean(name = "cimAppXmlReader")
@StepScope
public <T> ItemStreamReader<T> appXmlReader(@Value("#{jobParameters[inputXmlFilePath]}")
String inputXmlFilePath) {
LOGGER.info("Job Parameter => App XML File Path :" + inputXmlFilePath);
StaxEventItemReader<T> reader = new StaxEventItemReader<T>();
reader.setResource(new FileSystemResource(inputXmlFilePath));
reader.setUnmarshaller(mecaUnMarshaller());
reader.setFragmentRootElementNames(getAppRootElementNames());
reader.setSaveState(false);
// Make the StaxEventItemReader thread-safe
SynchronizedItemStreamReader<T> synchronizedItemStreamReader = new SynchronizedItemStreamReader<T>();
synchronizedItemStreamReader.setDelegate(reader);
return synchronizedItemStreamReader;
}
@Bean
@StepScope
public ItemStreamReader<JAXBElement<AppIBTransactionHeaderType>> appXmlTransactionHeaderReader(@Value("#{jobParameters[inputXmlFilePath]}")
String inputXmlFilePath) {
LOGGER.info("Job Parameter => App XML File Path for Transaction Header :" + inputXmlFilePath);
StaxEventItemReader<JAXBElement<AppIBTransactionHeaderType>> reader = new StaxEventItemReader<>();
reader.setResource(new FileSystemResource(inputXmlFilePath));
reader.setUnmarshaller(mecaUnMarshaller());
String[] fragmentRootElementNames = new String[] {"AppIBTransactionHeader"};
reader.setFragmentRootElementNames(fragmentRootElementNames);
reader.setSaveState(false);
return reader;
}
@Bean
public Unmarshaller mecaUnMarshaller() {
Jaxb2Marshaller marshaller = new Jaxb2Marshaller();
marshaller.setPackagesToScan(ObjectFactory.class.getPackage().getName());
return marshaller;
}
@Bean
public Marshaller uberMarshaller() {
Jaxb2Marshaller marshaller = new Jaxb2Marshaller();
marshaller.setClassesToBeBound(ServiceRequestType.class);
marshaller.setSupportJaxbElementClass(true);
return marshaller;
}
@Bean(destroyMethod="") // To stop multiple close calls, see: http://stackoverflow.com/a/23089536
@StepScope
public ResourceAwareItemWriterItemStream<JAXBElement<ServiceRequestType>> writer(@Value("#{jobParameters[outputXmlFilePath]}")
String outputXmlFilePath) {
SyncStaxEventItemWriter<JAXBElement<ServiceRequestType>> writer = new SyncStaxEventItemWriter<JAXBElement<ServiceRequestType>>();
writer.setResource(new FileSystemResource(outputXmlFilePath));
writer.setMarshaller(uberMarshaller());
writer.setSaveState(false);
HashMap<String, String> rootElementAttribs = new HashMap<String, String>();
rootElementAttribs.put("xmlns:ns1", "http://some.org/corporate/message/2010/1");
writer.setRootElementAttributes(rootElementAttribs);
writer.setRootTagName("ns1:SetOfServiceRequests");
return writer;
}
@Bean
@StepScope
public <T> ItemProcessor<T, JAXBElement<ServiceRequestType>> appNotificationProcessor() {
return new AppBatchNotificationItemProcessor<T>();
}
@Bean
@Scope(scopeName=ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public ItemProcessor<JAXBElement<AppIBTransactionHeaderType>, Boolean> appBatchCreationProcessor() {
return new AppBatchCreationItemProcessor();
}
public String[] getAppRootElementNames() {
//get list of App Transaction Element Names
return AppProcessorEnum.getValues();
}
@Bean
public Step AppStep() {
// INPUT_XML_FILE_PATH_PLACEHOLDER and OUTPUT_XML_FILE_PATH_PLACEHOLDER will be overridden
// by injected jobParameters using late binding (StepScope)
return stepBuilderFactory.get("AppStep")
.<Object, JAXBElement<ServiceRequestType>> chunk(10)
.reader(appXmlReader(INPUT_XML_FILE_PATH_PLACEHOLDER))
.processor(appNotificationProcessor())
.writer(writer(OUTPUT_XML_FILE_PATH_PLACEHOLDER))
.taskExecutor(concurrentTaskExecutor())
.throttleLimit(1)
.build();
}
@Bean
public Step BatchCreationStep() {
return stepBuilderFactory.get("BatchCreationStep")
.<JAXBElement<AppIBTransactionHeaderType>, Boolean>chunk(1)
.reader(appXmlTransactionHeaderReader(INPUT_XML_FILE_PATH_PLACEHOLDER))
.processor(appBatchCreationProcessor())
.taskExecutor(concurrentTaskExecutor())
.throttleLimit(10)
.build();
}
@Bean
public Job AppJob() {
return jobBuilderFactory.get("AppJob")
.incrementer(new RunIdIncrementer())
.listener(AppJobCompletionNotificationListener())
.flow(AppStep())
.next(BatchCreationStep())
.end()
.build();
}
@Bean
public JobCompletionNotificationListener AppJobCompletionNotificationListener() {
return new JobCompletionNotificationListener();
}
@Bean
public TaskExecutor concurrentTaskExecutor() {
SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
taskExecutor.setConcurrencyLimit(10);
return taskExecutor;
}
}
스프링이 수명주기를 더 이상 제어하지 못하기 때문에 스레드 로컬 프로세서가 아무 것도 주입 할 수 없다고 생각합니까? 가장 좋은 방법은 프로토 타입 범위를 가져야하는 항목에 대해 ItemProcessor 내에'@ Lookup' 메서드를 갖는 것입니다. –
답안에 예제를 추가했습니다. 이렇게하면 스레드 당 유효한 springBean 인스턴스가 생성됩니다. –