2016-08-05 1 views
3

이 질문은 Reactive Spring Controller에서 데이터를 스트리밍하는 방법에 대해 묻는 this one과 관련이 있습니다.WebClient ClassCastException Spring에서 JSON을 언 마샬링 할 때 데이터를 스트림하는 Reactive Controller

로젠 (Rossen)은 서버가 보낸 이벤트로 스트리밍 된 결과를 보내려면 text/event-stream을 사용해야한다고 지적 했으므로 지금까지는 좋았습니다.

나는이 같은 서비스가 있습니다

@GetMapping(value="/accounts/alertsStreaming", headers="accept=text/event-stream") 
public Flux<Alert> getAccountAlertsStreaming() { 
    return Flux.fromArray(new Alert[]{new Alert((long)1, "Alert message"), 
             new Alert((long)2, "Alert message2"), 
             new Alert((long)3, "Alert message3")}) 
       .delayMillis(1000) 
       .log(); 
} 

가 브라우저에서 호출, 3 개 결과 1 초 지연 수신 시작합니다.

은 내가 웹 클라이언트에서이 서비스를 호출하고 싶어하고 이런 식으로 구현 :

@Component 
public class AccountsServiceClient { 

    @Autowired 
    private WebClient webClient; 

    public Flux<Alert> getAccountAlertsStreaming(String serviceBaseUrl){ 
     Flux<Alert> response = webClient 
       .perform(get(serviceBaseUrl+"/accounts/alertsStreaming").header("Accept", "text/event-stream")) 
       .extract(bodyStream(Alert.class)); 
     return response; 
    }  
} 

을 그리고 이것은 테스트 코드입니다 :

@Test 
@ContextConfiguration(classes={WebClientConfig.class, AccountsServiceClient.class}) 
public class AccountsServiceClientTest extends AbstractTestNGSpringContextTests{ 

    private Logger logger = LoggerFactory.getLogger(getClass()); 

    @Autowired 
    private AccountsServiceClient client; 

    public void testNumbersServiceClientStreamingTest() throws InterruptedException{ 

     CountDownLatch latch = new CountDownLatch(1); 

     Flux<Alert> alerts = client.getAccountAlertsStreaming("http://localhost:8080"); 
     alerts.doOnComplete(() -> { 
      latch.countDown(); 
     }).subscribe((n) -> { 
      logger.info("------------> GOT ALERT {}", n); 
     }); 

     latch.await(); 
    } 
} 

문제는 그 때 클라이언트의 시도이다 결과가 나오는대로 결과를 추출하려면 HttpMessageReader' 중 어느 것도 text/event-stream + Alert.class을 읽을 수 없습니다.

public class ResponseExtractors { 

    protected static HttpMessageReader<?> resolveMessageReader(List<HttpMessageReader<?>> messageReaders, 
       ResolvableType responseType, MediaType contentType) { 

      return messageReaders.stream() 
        .filter(e -> e.canRead(responseType, contentType)) 
        .findFirst() 
        .orElseThrow(() -> 
          new WebClientException(
            "Could not decode response body of type '" + contentType 
              + "' with target type '" + respons 

eType.toString() + "'")); 
    } 

예외 :

reactor.core.Exceptions$BubblingException: org.springframework.web.client.reactive.WebClientException: Could not decode response body of type 'text/event-stream' with target type 'com.codependent.spring5.playground.reactive.dto.Alert' 
    at reactor.core.Exceptions.bubble(Exceptions.java:97) 
    at reactor.core.Exceptions.onErrorDropped(Exceptions.java:263) 
    at reactor.core.publisher.LambdaSubscriber.onError(LambdaSubscriber.java:126) 
    at reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:183) 
    at reactor.core.publisher.MonoFlatMap$FlattenSubscriber.onNext(MonoFlatMap.java:128) 
    at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:169) 
    at reactor.core.publisher.FluxLog$LoggerSubscriber.doNext(FluxLog.java:161) 
    at reactor.core.publisher.OperatorAdapter.onNext(OperatorAdapter.java:88) 
    at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:123) 
    at reactor.core.publisher.FluxResume$ResumeSubscriber.onNext(FluxResume.java:75) 
    at reactor.core.publisher.FluxJust$WeakScalarSubscription.request(FluxJust.java:103) 
    at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:1010) 
    at reactor.core.publisher.FluxResume$ResumeSubscriber.onSubscribe(FluxResume.java:70) 
    at reactor.core.publisher.FluxJust.subscribe(FluxJust.java:71) 
    at reactor.ipc.netty.http.NettyHttpClientHandler.channelRead(NettyHttpClientHandler.java:120) 
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372) 
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358) 
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350) 
    at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:435) 
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293) 
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:280) 
    at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:396) 
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:248) 
    at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:250) 
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372) 
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358) 
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350) 
    at io.netty.handler.logging.LoggingHandler.channelRead(LoggingHandler.java:233) 
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372) 
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358) 
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350) 
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1334) 
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372) 
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358) 
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:926) 
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:123) 
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:571) 
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:512) 
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:426) 
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:398) 
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:877) 
    at java.lang.Thread.run(Thread.java:745) 
Caused by: org.springframework.web.client.reactive.WebClientException: Could not decode response body of type 'text/event-stream' with target type 'com.codependent.spring5.playground.reactive.dto.Alert' 
    at org.springframework.web.client.reactive.ResponseExtractors.lambda$resolveMessageReader$23(ResponseExtractors.java:203) 
    at org.springframework.web.client.reactive.ResponseExtractors$$Lambda$61/1950155746.get(Unknown Source) 
    at java.util.Optional.orElseThrow(Optional.java:290) 
    at org.springframework.web.client.reactive.ResponseExtractors.resolveMessageReader(ResponseExtractors.java:200) 
    at org.springframework.web.client.reactive.ResponseExtractors.decodeResponseBody(ResponseExtractors.java:181) 
    at org.springframework.web.client.reactive.ResponseExtractors.lambda$null$12(ResponseExtractors.java:89) 
    at org.springframework.web.client.reactive.ResponseExtractors$$Lambda$36/70386506.apply(Unknown Source) 
    at reactor.core.publisher.MonoFlatMap$FlattenSubscriber.onNext(MonoFlatMap.java:126) 
    ... 37 common frames omitted 
+0

서버의 관점에서 WebClient와 웹 브라우저의 차이점은 무엇입니까? –

+0

새로운 의미론을 제외하고는 결과가 스트리밍되지는 않지만 모두 동시에, 이전의 'RestTemplate'대신 새로운 'WebClient'를 사용하는 점이 무엇입니까? ? – codependent

답변

0

어쩌면이 프레임 워크에 의해 자동으로 처리되어야한다. 어떤 경우에는, 나는 나 자신에게 JSON 스트림 데이터를 비 정렬 화 해결 :

WebConfigClient :

@Configuration 
public class WebClientConfig { 

    @Bean 
    public ObjectMapper jacksonObjectMapper(){ 
     return new ObjectMapper(); 
    } 

    @Bean 
    public WebClient webClient(){ 
     WebClient webClient = new WebClient(new ReactorClientHttpConnector()); 
     return webClient; 
    } 

} 

서비스 클라이언트 :

@Component 
public class AccountsServiceClient { 

    @Autowired 
    private WebClient webClient; 

    @Autowired 
    private ObjectMapper jacksonObjectMapper; 

    public Flux<Alert> getAccountAlertsStreaming(String serviceBaseUrl){ 
     Flux<Alert> response = webClient 
       .perform(get(serviceBaseUrl+"/accounts/alertsStreaming").header("Accept", "text/event-stream")) 
       .extract(bodyStream(String.class)) 
       .map((e -> { 
        try { 
         e = e.substring(e.indexOf(":")+1); 
         Alert a = jacksonObjectMapper.readValue(e, Alert.class); 
         return a; 
        } catch (Exception e1) { 
         e1.printStackTrace(); 
         return null; 
        } 

       })); 
     return response; 
    } 

} 

UPDATE : 봄 5 M4의로 이것은 프레임 워크에 의해 수행됩니다. 새 API를 사용하여 솔루션을 확인할 수 있습니다. Spring 5 Web Reactive - How can we use WebClient to retrieve streamed data in a Flux?

0

이미 문제가 있습니다. SPR-14539에 대해 의견을 말하고 투표하십시오.

관련 문제