2016-10-20 1 views
2

카프카를 0.9.0에서 0.10.0으로 업그레이드하면서 다른 제작자를 다른 주제로 구성하는 동안 문제가 발생했습니다. 개별적으로이 개 주제를 게시 오류가 발생했습니다 동안 XML의 설정은spring-integration-kafka 2.1.0.RELEASE 및 Kafka 0.10.0에서 다른 주제에 대해 다른 제작자를 구성하는 방법은 무엇입니까?

<?xml version="1.0" encoding="UTF-8"?> 
 
<beans xmlns="http://www.springframework.org/schema/beans" 
 
\t xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration" 
 
\t xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka" 
 
\t xmlns:task="http://www.springframework.org/schema/task" 
 
\t xsi:schemaLocation="http://www.springframework.org/schema/integration/kafka http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd 
 
\t \t http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd 
 
\t \t http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd 
 
\t \t http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd"> 
 

 
\t <int:publish-subscribe-channel id="inputToKafka" /> 
 

 
\t <!-- Producer Config --> 
 

 
\t <int-kafka:outbound-channel-adapter 
 
\t \t id="fcmOutboundChannelAdapter" kafka-template="fcmNotificationTemplate" topic="trigger-fcm-notification" 
 
\t \t auto-startup="true" channel="inputToKafka"> 
 
\t \t <int-kafka:request-handler-advice-chain> 
 
\t \t \t <bean 
 
\t \t \t \t class="org.springframework.integration.handler.advice.RequestHandlerCircuitBreakerAdvice" /> 
 
\t \t </int-kafka:request-handler-advice-chain> 
 
\t </int-kafka:outbound-channel-adapter> 
 
\t 
 
\t <int-kafka:outbound-channel-adapter 
 
\t \t id="masOutboundChannelAdapter" kafka-template="microsoftAccountSyncTemplate" topic="sync-microsoft-account" 
 
\t \t auto-startup="true" channel="inputToKafka"> 
 
\t \t <int-kafka:request-handler-advice-chain> 
 
\t \t \t <bean 
 
\t \t \t \t class="org.springframework.integration.handler.advice.RequestHandlerCircuitBreakerAdvice" /> 
 
\t \t </int-kafka:request-handler-advice-chain> 
 
\t </int-kafka:outbound-channel-adapter> 
 
\t 
 
\t <bean id="fcmNotificationTemplate" class="org.springframework.kafka.core.KafkaTemplate"> 
 
\t \t <constructor-arg> 
 
\t \t \t <bean class="org.springframework.kafka.core.DefaultKafkaProducerFactory"> 
 
\t \t \t \t <constructor-arg> 
 
\t \t \t \t \t <map> 
 
\t \t \t \t \t \t <entry key="bootstrap.servers" value="localhost:9092" /> 
 
\t \t \t \t \t \t <entry key="retries" value="0" /> 
 
\t \t \t \t \t \t <entry key="batch.size" value="16384" /> \t \t \t \t \t \t 
 
\t \t \t \t \t \t <entry key="linger.ms" value="0" /> 
 
\t \t \t \t \t \t <entry key="buffer.memory" value="33554432" /> 
 
\t \t \t \t \t \t <entry key="key.serializer" 
 
\t \t \t \t \t \t \t value="org.apache.kafka.common.serialization.StringSerializer" /> 
 
\t \t \t \t \t \t <entry key="value.serializer" 
 
\t \t \t \t \t \t \t value="common.serializer.FcmNotificationVoSerializer" /> 
 
\t \t \t \t \t </map> 
 
\t \t \t \t </constructor-arg> 
 
\t \t \t </bean> 
 
\t \t </constructor-arg> 
 
\t </bean> 
 

 
\t <bean id="microsoftAccountSyncTemplate" class="org.springframework.kafka.core.KafkaTemplate"> 
 
\t \t <constructor-arg> 
 
\t \t \t <bean class="org.springframework.kafka.core.DefaultKafkaProducerFactory"> 
 
\t \t \t \t <constructor-arg> 
 
\t \t \t \t \t <map> 
 
\t \t \t \t \t \t <entry key="bootstrap.servers" value="localhost:9092" /> 
 
\t \t \t \t \t \t <entry key="retries" value="0" /> 
 
\t \t \t \t \t \t <entry key="batch.size" value="16384" /> 
 
\t \t \t \t \t \t <entry key="buffer.memory" value="33554432" /> \t 
 
\t \t \t \t \t \t <entry key="linger.ms" value="0" /> 
 
\t \t \t \t \t \t <entry key="key.serializer" 
 
\t \t \t \t \t \t \t value="org.apache.kafka.common.serialization.StringSerializer" /> 
 
\t \t \t \t \t \t <entry key="value.serializer" 
 
\t \t \t \t \t \t \t value="common.serializer.MicrosoftAccountSyncRequestVoSerializer" /> 
 
\t \t \t \t \t </map> 
 
\t \t \t \t </constructor-arg> 
 
\t \t \t </bean> 
 
\t \t </constructor-arg> 
 
\t </bean> 
 

 
\t <int-kafka:message-driven-channel-adapter 
 
\t \t id="kafka-message-channel-adapter-FCM" listener-container="fcmContainer" 
 
\t \t auto-startup="true" phase="100" send-timeout="5000" 
 
\t \t channel="ip-chanel-trigger-fcm-notification" mode="record" 
 
\t \t message-converter="messageConverter" /> 
 

 
\t <int-kafka:message-driven-channel-adapter 
 
\t \t id="kafka-message-channel-adapter-SMA" listener-container="microsoftAccountSyncContainer" 
 
\t \t auto-startup="true" phase="100" send-timeout="5000" 
 
\t \t channel="ip-chanel-sync-microsoft-account" mode="record" 
 
\t \t message-converter="messageConverter" /> 
 
\t \t 
 
\t <bean id="messageConverter" 
 
\t \t class="org.springframework.kafka.support.converter.MessagingMessageConverter" /> 
 

 
\t <!-- Consumer Config --> 
 
\t <int:service-activator input-channel="ip-chanel-trigger-fcm-notification" 
 
\t \t ref="fcmNotificationConsumer"> 
 
\t </int:service-activator> 
 
\t 
 
\t <int:service-activator input-channel="ip-chanel-sync-microsoft-account" 
 
\t \t ref="syncMicrosoftAccountConsumer"> 
 
\t </int:service-activator> 
 
\t 
 
\t <bean id="fcmContainer" 
 
\t \t class="org.springframework.kafka.listener.KafkaMessageListenerContainer"> 
 
\t \t <constructor-arg> 
 
\t \t \t <bean class="org.springframework.kafka.core.DefaultKafkaConsumerFactory"> 
 
\t \t \t \t <constructor-arg> 
 
\t \t \t \t \t <map> 
 
\t \t \t \t \t \t <entry key="bootstrap.servers" value="localhost:9092" /> 
 
\t \t \t \t \t \t <entry key="enable.auto.commit" value="true" /> 
 
\t \t \t \t \t \t <entry key="auto.commit.interval.ms" value="100" /> 
 
\t \t \t \t \t \t <entry key="session.timeout.ms" value="15000" /> 
 
\t \t \t \t \t \t <entry key="group.id" value="trigger-fcm-notification" /> 
 
\t \t \t \t \t \t <entry key="key.deserializer" 
 
\t \t \t \t \t \t \t value="org.apache.kafka.common.serialization.StringDeserializer" /> 
 
\t \t \t \t \t \t <entry key="value.deserializer" 
 
\t \t \t \t \t \t \t value="common.deserializer.FcmNotificationVoDeserializer" /> 
 
\t \t \t \t \t </map> 
 
\t \t \t \t </constructor-arg> 
 
\t \t \t </bean> 
 
\t \t </constructor-arg> 
 
\t \t <constructor-arg> 
 
\t \t \t <bean class="org.springframework.kafka.listener.config.ContainerProperties"> 
 
\t \t \t \t <constructor-arg name="topics" value="trigger-fcm-notification" /> 
 
\t \t \t </bean> 
 
\t \t </constructor-arg> 
 
\t </bean> 
 
\t \t 
 
\t <bean id="microsoftAccountSyncContainer" 
 
\t \t class="org.springframework.kafka.listener.KafkaMessageListenerContainer"> 
 
\t \t <constructor-arg> 
 
\t \t \t <bean class="org.springframework.kafka.core.DefaultKafkaConsumerFactory"> 
 
\t \t \t \t <constructor-arg> 
 
\t \t \t \t \t <map> 
 
\t \t \t \t \t \t <entry key="bootstrap.servers" value="localhost:9092" /> 
 
\t \t \t \t \t \t <entry key="enable.auto.commit" value="true" /> 
 
\t \t \t \t \t \t <entry key="auto.commit.interval.ms" value="100" /> 
 
\t \t \t \t \t \t <entry key="session.timeout.ms" value="15000" /> 
 
\t \t \t \t \t \t <entry key="group.id" value="sync-microsoft-account" /> 
 
\t \t \t \t \t \t <entry key="key.deserializer" 
 
\t \t \t \t \t \t \t value="org.apache.kafka.common.serialization.StringDeserializer" /> 
 
\t \t \t \t \t \t <entry key="value.deserializer" 
 
\t \t \t \t \t \t \t value="common.deserializer.MicrosoftAccountSyncRequestVoDeserializer" /> 
 
\t \t \t \t \t </map> 
 
\t \t \t \t </constructor-arg> 
 
\t \t \t </bean> 
 
\t \t </constructor-arg> 
 
\t \t <constructor-arg> 
 
\t \t \t <bean class="org.springframework.kafka.listener.config.ContainerProperties"> 
 
\t \t \t \t <constructor-arg name="topics" value="sync-microsoft-account" /> 
 
\t \t \t </bean> 
 
\t \t </constructor-arg> 
 
\t </bean> 
 

 
</beans>

아래에 주어진.

(java.lang.String,java.lang.String,java.lang.String,java.util.Locale,org.springframework.ui.Model,java.security.Principal)]: org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler#1]; nested exception is org.apache.kafka.common.errors.SerializationException: Can't convert value of class common.vo.GcmNotificationVo to class common.serializer.MicrosoftAccountSyncRequestVoSerializer specified in value.serializer 
 
2016-10-20 18:12:53,849 [http-nio-8080-exec-4] DEBUG org.springframework.web.servlet.DispatcherServlet - Could not complete request 
 
org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler#1]; nested exception is org.apache.kafka.common.errors.SerializationException: Can't convert value of class common.vo.GcmNotificationVo to class common.serializer.MicrosoftAccountSyncRequestVoSerializer specified in value.serializer 
 
\t at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:139) 
 
\t at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
 
\t at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
 
\t at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
 
\t at java.lang.reflect.Method.invoke(Method.java:498) 
 
\t at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:333) 
 
\t at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:190) 
 
\t at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157) 
 
\t at org.springframework.integration.handler.advice.AbstractRequestHandlerAdvice$1.execute(AbstractRequestHandlerAdvice.java:75) 
 
\t at org.springframework.integration.handler.advice.RequestHandlerCircuitBreakerAdvice.doInvoke(RequestHandlerCircuitBreakerAdvice.java:62) 
 
\t at org.springframework.integration.handler.advice.AbstractRequestHandlerAdvice.invoke(AbstractRequestHandlerAdvice.java:70) 
 
\t at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179) 
 
\t at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:213) 
 
\t at com.sun.proxy.$Proxy52.handleMessage(Unknown Source) 
 
\t at org.springframework.integration.dispatcher.BroadcastingDispatcher.invokeHandler(BroadcastingDispatcher.java:236) 
 
\t at org.springframework.integration.dispatcher.BroadcastingDispatcher.dispatch(BroadcastingDispatcher.java:185) 
 
\t at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77) 
 
\t at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:423) 
 
\t at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:373) 
 
\t at service.impl.AdminOperationsServiceImpl.publishToQueue(AdminOperationsServiceImpl.java:1191) 
 
\t at service.impl.AdminOperationsServiceImpl.update(AdminOperationsServiceImpl.java:1366) 
 
\t at service.TenantDocumentsController.update(TenantDocumentsController.java:277) 
 
\t at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
 
\t at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
 
\t at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
 
\t at java.lang.reflect.Method.invoke(Method.java:498) 
 
\t at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:221) 
 
\t at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:136) 
 
\t at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:114) 
 
\t at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:827) 
 
\t at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:738) 
 
\t at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:85)

정의 된 두 개의 시리얼 라이저 및 디시리얼라이저 클래스를 다음과 같이 스택 추적입니다. 그러나 어떻게 내부적으로 다른 클래스를 추천합니까? 구성을 놓쳤습니까?

답변

1

카프카에게 보내기 때문에 Deserializer의 제목이 없습니다. StackTrace에 따르면 GcmNotificationVo 개체를 inputToKafka으로 보내는 일부 REST 서비스를 수행합니다.

여기에서 두 번째 가입자는 common.serializer.MicrosoftAccountSyncRequestVoSerializer을 사용하여 해당 개체의 카프카 직렬화를 수행 할 수 없습니다.

다른 작업에 masOutboundChannelAdapter을 사용 하시겠습니까? 따라서 새로운 별도의 channel.

+0

예, 다른 작업을 위해 masOutboundChannelAdapter를 사용합니다. – Bhat

+0

내 코드에서 언급 한 "디시리얼라이저"는 값 부분 value.deserializer 구성을 나타냅니다. – Bhat

+0

귀하의 의견이 있습니다. 별도의 채널이 추가되었습니다. 예상대로 작동합니다. 다시 한번 감사드립니다. 이 질문을 닫은 것으로 표시하십시오. – Bhat

관련 문제