2017-10-17 5 views
5

처리를 위해 X? -GB 대용량 파일을 스트림으로 ActiveMQ 대기열로 보내려고합니다.낙타를 사용하여 대용량 파일을 ActiveMQ로 보내는 방법

저는 ActiveMQ supports streams을 알고 있습니다. 따라서 camel-jms도 있습니다.하지만 대기열에 설정하려고 시도하는 것은 아무런 차이가없는 것 같습니다. 변화가있는 유일한 것은 스트림 캐싱 결과를 끄는 것이 대신 "스트림 폐쇄"예외입니다.

프로세서 나 사용자 정의 클래스를 사용하는 것이 좋습니다 (리소스를 정리할 수있는 한).하지만 청사진을 통해 가능해야합니다. OutOfMemoryError를받지 않고도 camel-activemq를 통해 대용량 파일을 올바르게 처리하려면 어떻게해야합니까? 사용

  • ServiceMix를-7.0.0
  • 낙타 2.16.4
  • ActiveMQ를-5.14.3 여기

내 낙타 청사진

<?xml version="1.0" encoding="UTF-8"?> 
<blueprint 
xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0" 
xmlns:cm="http://aries.apache.org/blueprint/xmlns/blueprint-cm/v1.0.0" 

    <!-- just calls exchange.setBody(exchange.getBody(InputStream.class)) -->  
    <bean id="toStreamBody" class="my.test.toInputStream"/> 

    <!-- define a bean of type StreamCachingStrategy which CamelContext will automaticly use --> 
    <bean id="streamStrategy" class="org.apache.camel.impl.DefaultStreamCachingStrategy"> 
     <property name="spoolDirectory" value="${java.io.tmpdir}TestTemp/#uuid#/"/> 
     <property name="spoolThreshold" value="131072"/> 
     <property name="spoolUsedHeapMemoryThreshold" value="70"/> 
     <property name="anySpoolRules" value="true"/> 
    </bean> 

    <!-- streamCaching="true" is "not a valid attribute" --> 
    <camelContext streamCache="true" xmlns="http://camel.apache.org/schema/blueprint"> 

     <route id="file_route"> 
      <from uri="file://FileUploadBin?delete=false&amp;moveFailed=.error"/> 
      <!-- just calls exchange.setBody(exchange.getBody(InputStream.class)) --> 
      <bean ref="toStreamBody"/> 
      <to uri="activemq:queue:TestQ"/> 
     </route> 

     <route id="myTestQ"> 
      <from uri="activemq:queue:TestQ?concurrentConsumers=1&amp;maxConcurrentConsumers=64&amp;maxMessagesPerTask=100&amp;asyncConsumer=true&amp;jmsMessageType=Stream&amp;mapJmsMessage=false"/> 
      <bean ref="toStreamBody"/> 
      <log message="FINISHED" loggingLevel="WARN"/> 
     </route> 

    </camelContext> 
</blueprint> 

여기에 계속 오류가 있습니다

2017-10-17 08:46:53,859 | ERROR | - RecipientList | DefaultErrorHandler    | 43 - org.apache.camel.camel-core - 2.16.4 | Failed delivery for (MessageId: ID-DESKTOP-H2O66PO-62468-1508242908251-4-4 on ExchangeId: ID-DESKTOP-H2O66PO-62468-1508242908251-4-5). Exhausted after delivery attempt: 1 caught: org.apache.camel.TypeConversionException: Error during type conversion from type: java.lang.String to the required type: byte[] with value [Body is instance of org.apache.camel.StreamCache] due java.lang.OutOfMemoryError: Java heap space 

Message History 
--------------------------------------------------------------------------------------------------------------------------------------- 
RouteId    ProcessorId   Processor                  Elapsed (ms) 
[file_route  ] [file_route  ] [file://FileUploadBin?delete=false&moveFailed=.error       ] [  3764] 

Exchange 
--------------------------------------------------------------------------------------------------------------------------------------- 
Exchange[ 
     Id     ID-DESKTOP-H2O66PO-62468-1508242908251-4-5 
     ExchangePattern  InOnly 
     Headers    {breadcrumbId=ID-DESKTOP-H2O66PO-62468-1508242908251-4-4, fileName=Die.txt} 
     BodyType   org.apache.camel.converter.stream.FileInputStreamCache 
     Body    [Body is instance of org.apache.camel.StreamCache] 
] 

Stacktrace 
--------------------------------------------------------------------------------------------------------------------------------------- 
org.apache.camel.TypeConversionException: Error during type conversion from type: java.lang.String to the required type: byte[] with value [Body is instance of org.apache.camel.StreamCache] due java.lang.OutOfMemoryError: Java heap space 
     at org.apache.camel.impl.converter.BaseTypeConverterRegistry.createTypeConversionException(BaseTypeConverterRegistry.java:610)[43:org.apache.camel.camel-core:2.16.4] 
     at org.apache.camel.impl.converter.BaseTypeConverterRegistry.convertTo(BaseTypeConverterRegistry.java:137)[43:org.apache.camel.camel-core:2.16.4] 
     at org.apache.camel.core.osgi.OsgiTypeConverter.convertTo(OsgiTypeConverter.java:108)[40:org.apache.camel.camel-blueprint:2.16.4] 
     at org.apache.camel.component.jms.JmsBinding.createJmsMessageForType(JmsBinding.java:560)[46:org.apache.camel.camel-jms:2.16.4] 
     at org.apache.camel.component.jms.JmsBinding.createJmsMessage(JmsBinding.java:490)[46:org.apache.camel.camel-jms:2.16.4] 
     at org.apache.camel.component.jms.JmsBinding.makeJmsMessage(JmsBinding.java:303)[46:org.apache.camel.camel-jms:2.16.4] 
     at org.apache.camel.component.jms.JmsProducer$2.createMessage(JmsProducer.java:300)[46:org.apache.camel.camel-jms:2.16.4] 
     at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.doSendToDestination(JmsConfiguration.java:483)[46:org.apache.camel.camel-jms:2.16.4] 
     at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.access$100(JmsConfiguration.java:426)[46:org.apache.camel.camel-jms:2.16.4] 
     at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate$1.doInJms(JmsConfiguration.java:440)[46:org.apache.camel.camel-jms:2.16.4] 
     at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:466)[154:org.apache.servicemix.bundles.spring-jms:3.2.17.RELEASE_1] 
     at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.send(JmsConfiguration.java:437)[46:org.apache.camel.camel-jms:2.16.4] 
     at org.apache.camel.component.jms.JmsProducer.doSend(JmsProducer.java:413)[46:org.apache.camel.camel-jms:2.16.4] 
     at org.apache.camel.component.jms.JmsProducer.processInOnly(JmsProducer.java:367)[46:org.apache.camel.camel-jms:2.16.4] 
     at org.apache.camel.component.jms.JmsProducer.process(JmsProducer.java:153)[46:org.apache.camel.camel-jms:2.16.4] 
     at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:460)[43:org.apache.camel.camel-core:2.16.4] 
     at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:196)[43:org.apache.camel.camel-core:2.16.4] 
     at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:109)[43:org.apache.camel.camel-core:2.16.4] 
     at org.apache.camel.processor.MulticastProcessor.doProcessParallel(MulticastProcessor.java:823)[43:org.apache.camel.camel-core:2.16.4] 
     at org.apache.camel.processor.MulticastProcessor.access$200(MulticastProcessor.java:84)[43:org.apache.camel.camel-core:2.16.4] 
     at org.apache.camel.processor.MulticastProcessor$1.call(MulticastProcessor.java:319)[43:org.apache.camel.camel-core:2.16.4] 
     at org.apache.camel.processor.MulticastProcessor$1.call(MulticastProcessor.java:304)[43:org.apache.camel.camel-core:2.16.4] 
     at java.util.concurrent.FutureTask.run(FutureTask.java:266)[:1.8.0_121] 
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)[:1.8.0_121] 
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)[:1.8.0_121] 
     at java.lang.Thread.run(Thread.java:745)[:1.8.0_121] 
Caused by: org.apache.camel.RuntimeCamelException: java.lang.OutOfMemoryError: Java heap space 
     at org.apache.camel.util.ObjectHelper.wrapRuntimeCamelException(ObjectHelper.java:1652)[43:org.apache.camel.camel-core:2.16.4] 
     at org.apache.camel.util.ObjectHelper.invokeMethod(ObjectHelper.java:1247)[43:org.apache.camel.camel-core:2.16.4] 
     at org.apache.camel.impl.converter.StaticMethodTypeConverter.convertTo(StaticMethodTypeConverter.java:59)[43:org.apache.camel.camel-core:2.16.4] 
     at org.apache.camel.impl.converter.BaseTypeConverterRegistry.doConvertTo(BaseTypeConverterRegistry.java:293)[43:org.apache.camel.camel-core:2.16.4] 
     at org.apache.camel.impl.converter.BaseTypeConverterRegistry.convertTo(BaseTypeConverterRegistry.java:120)[43:org.apache.camel.camel-core:2.16.4] 
     ... 24 more 
Caused by: java.lang.OutOfMemoryError: Java heap space 
     at java.util.Arrays.copyOf(Arrays.java:3236)[:1.8.0_121] 
     at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118)[:1.8.0_121] 
     at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)[:1.8.0_121] 
     at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)[:1.8.0_121] 
     at java.nio.channels.Channels$WritableByteChannelImpl.write(Channels.java:458)[:1.8.0_121] 
     at sun.nio.ch.FileChannelImpl.transferToArbitraryChannel(FileChannelImpl.java:567)[:1.8.0_121] 
     at sun.nio.ch.FileChannelImpl.transferTo(FileChannelImpl.java:616)[:1.8.0_121] 
     at org.apache.camel.converter.stream.FileInputStreamCache.writeTo(FileInputStreamCache.java:108)[43:org.apache.camel.camel-core:2.16.4] 
     at org.apache.camel.converter.stream.StreamCacheConverter.convertToByteArray(StreamCacheConverter.java:102)[43:org.apache.camel.camel-core:2.16.4] 
     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)[:1.8.0_121] 
     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)[:1.8.0_121] 
     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)[:1.8.0_121] 
     at java.lang.reflect.Method.invoke(Method.java:498)[:1.8.0_121] 
     at org.apache.camel.util.ObjectHelper.invokeMethod(ObjectHelper.java:1243)[43:org.apache.camel.camel-core:2.16.4] 
     at org.apache.camel.impl.converter.StaticMethodTypeConverter.convertTo(StaticMethodTypeConverter.java:59)[43:org.apache.camel.camel-core:2.16.4] 
     at org.apache.camel.impl.converter.BaseTypeConverterRegistry.doConvertTo(BaseTypeConverterRegistry.java:293)[43:org.apache.camel.camel-core:2.16.4] 
     at org.apache.camel.impl.converter.BaseTypeConverterRegistry.convertTo(BaseTypeConverterRegistry.java:120)[43:org.apache.camel.camel-core:2.16.4] 
     at org.apache.camel.core.osgi.OsgiTypeConverter.convertTo(OsgiTypeConverter.java:108) 
     at org.apache.camel.component.jms.JmsBinding.createJmsMessageForType(JmsBinding.java:560)[46:org.apache.camel.camel-jms:2.16.4] 
     at org.apache.camel.component.jms.JmsBinding.createJmsMessage(JmsBinding.java:490)[46:org.apache.camel.camel-jms:2.16.4] 
     at org.apache.camel.component.jms.JmsBinding.makeJmsMessage(JmsBinding.java:303)[46:org.apache.camel.camel-jms:2.16.4] 
     at org.apache.camel.component.jms.JmsProducer$2.createMessage(JmsProducer.java:300)[46:org.apache.camel.camel-jms:2.16.4] 
     at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.doSendToDestination(JmsConfiguration.java:483)[46:org.apache.camel.camel-jms:2.16.4] 
     at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.access$100(JmsConfiguration.java:426)[46:org.apache.camel.camel-jms:2.16.4] 
     at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate$1.doInJms(JmsConfiguration.java:440)[46:org.apache.camel.camel-jms:2.16.4] 
     at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:466)[154:org.apache.servicemix.bundles.spring-jms:3.2.17.RELEASE_1] 
     at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.send(JmsConfiguration.java:437)[46:org.apache.camel.camel-jms:2.16.4] 
     at org.apache.camel.component.jms.JmsProducer.doSend(JmsProducer.java:413)[46:org.apache.camel.camel-jms:2.16.4] 
     at org.apache.camel.component.jms.JmsProducer.processInOnly(JmsProducer.java:367)[46:org.apache.camel.camel-jms:2.16.4] 
     at org.apache.camel.component.jms.JmsProducer.process(JmsProducer.java:153)[46:org.apache.camel.camel-jms:2.16.4] 
     at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:460)[43:org.apache.camel.camel-core:2.16.4] 
     at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:196)[43:org.apache.camel.camel-core:2.16.4] 
2017-10-17 08:46:57,641 | WARN | ://FileUploadBin | GenericFileOnCompletion   | 43 - org.apache.camel.camel-core - 2.16.4 | Rollback file strategy: org.apache[email protected]294a3d48 for file: GenericFile[Die.txt] 
+1

관련 항목 : [이 질문에 관련] (https://stackoverflow.com/questions/46422970/streaming-using-camel-jms). 그러나 아무런 대답도, 나는 호넷/퓨즈를 사용하지 않고 있으며, 그 질문은 도움이되지 않습니다. – Tezra

+0

낙타를 시도해 보셨습니까? http://camel.apache.org/stream.html. 그냥 파일 구성 요소를 "stream : file? fileName = fileName.in"으로 바꾸고 바로 대기열로 보내십시오. – ltsallas

+1

@Itsallas 스트림 구성 요소가 더 이상 작동하지 않습니다. – Tezra

답변

3

이것은 고전적인 ActiveMQ 브로커에서 실제로 지원되지 않습니다.

그러나 차세대 ActiveMQ Artemis는 대형 메시지를 지원하므로 camel-jms에서도이 기능을 추가했습니다. 나는 이것에 대한 블로그 항목을 작성했습니다 : http://www.davsclaus.com/2017/10/working-with-large-messages-using.html

그리고 우리는 또한 javax.jms.StreamMessage 유형에 대한 지원을 camel-jms에 추가했습니다. 그러나이 API는 대형 메시지에 이상적이지 않으므로 사용이 제한적입니다. 그럼에도 불구하고 Camel 2.21 이후의 새로운 옵션 인 streamMessageTypeEnabled으로 구성 요소를 켜면 메시지 본문이 스트리밍 유형이면 BytesMessage 대신에 StreamMessage으로 전송됩니다.

관련 문제