2017-09-20 6 views
1

POST 데이터 (반드시 폼 데이터는 아님)를 처리하기 위해 Jersey로 Vert.X를 설정하려고합니다.Vert.x ReadStream <Buffer> to InputStream

저지 ContainerRequest.setEntityStream은 제가 구축하려고 시도한 InputStream입니다. 그러나, 나는 bodyHandler 또는 유사한 무언가를하지만, 입력

final Buffer body = Buffer.buffer(); 
    event 
     .handler(buffer -> { 
      if (!event.response().headWritten()) { 
       body.appendBuffer(buffer); 
       if (body.length() > 10 * 1024 * 1024) { 
        event.response() 
         .setStatusCode(REQUEST_ENTITY_TOO_LARGE.getStatusCode()) 
         .setStatusMessage(REQUEST_ENTITY_TOO_LARGE.getReasonPhrase()) 
         .end(); 
       } 
      } 
     }) 
     .endHandler(aVoid -> { 
      request.setEntityStream(new VertxBufferInputStream(body)); 
      appHandler.handle(request); 
     }); 

VertxBufferInputStream을 제한하는 내 자신 만의 방법을 사용하여 메모리에 전체를 읽지 않고 데이터를 전달하는 주위를 얻이 수없는 것은 VertXbuffer에 대한 간단한 래퍼입니다 . ByteArrayInputStream()으로 변환하지 않도록하여 메모리를 절약하십시오. 그러나 그것은 온 몸을 가지고 있습니다.

전신을 피하고 스트리밍을하고 싶습니다. 나는 꽤 많은 해커와 나쁜 코드를 시도했다. 왜냐하면 handler이 호출되지 않고 기다리고 있기 때문에 이벤트 루프를 차단하기 때문에 결국에는 작동하지 않는다.

답변

0

이러한 아름다운 문제 해결 :
은 당신의 Netty와 뉴저지 통합이 구현 https://github.com/jersey/jersey/blob/12e5d8bdf22bcd2676a1032ed69473cf2bbc48c7/containers/netty-http/src/main/java/org/glassfish/jersey/netty/httpserver/JerseyHttp2ServerHandler.java#L124 에서 영감을 얻을 수 있습니다.

  1. HTTP 요청과 같이, (인 Netty 의해) 이벤트 루프 처리 (다른 웹 서버에 대해,이 경우의 Netty가 있음), 즉 I 믿어

    정확하게 해결 같은 문제가있다 그러한 ApplicationHandler.handle() 메서드는 다른 스레드 (이벤트 루프가 아닌)에서 호출되어야합니다.

  2. 비 블로킹 API는 블로킹 InputStream으로 변환되어야합니다. 이는 NettyInputStream에서 구현됩니다. Netty의 ByteBufInputStream으로 쉽게 변환 될 수 있으며 결과적으로 LinkedBlockingDeque을 사용함으로써 이러한 InputStream이 단일 것으로 변환됩니다. (그냥 fyi, 공급자가 이벤트 루프 스레드는 버그가 결코 차단되지 않도록 100 % 확실하지 않습니다.)

그런데 이것은 수신되는 것의 처리입니다. 데이터. 응답을 얻으려면 OutputStream (Jersey에서 사용)을 Vert.X 논 블로킹 API로 변환해야합니다.

+0

감사합니다. 내 시도 중 하나는 비슷하지만 "take()"가 차단되므로 여전히 차단됩니다. –

+0

이해가 안됩니다. 괜찮습니다. 'take()'가 블로킹하고 있습니다 (이것은'InputStream.read()'의 계약입니다).중요한 부분은 [1]에서 설명한 것인데, 'ApplicationHandler.handle()'을 다른 스레드에 위임해야합니다. 결과적으로,'take()'메서드는이 다른 스레드에 의해 호출되며 모든 것이 완벽하게 작동합니다. Jersey가 non-blocking API를 가지고 있지 않으면 (불행하게도 그것은 가까운 것이었다.) –

+0

가장 가까운 것이 맞습니다. (방금 작동했습니다.) –

0

두 가지 구성 요소가 필요합니다. 데이터의 새로운 버퍼에 올 때 그것이 끝날 때 :

  1. 당신은 vertx.executeBlocking는 두 개의 이벤트를 처리 할 필요가

  2. https://github.com/trajano/app-ms/blob/master/ms-engine/src/main/java/net/trajano/ms/engine/JaxRsRoute.java#L128를 참조 사용 차단할 수 있습니다 아무것도 처리를 분리 있는지 확인해야합니다. https://github.com/trajano/app-ms/blob/master/ms-engine/src/main/java/net/trajano/ms/engine/JaxRsRoute.java#L123

  3. 다른 스레드에서 데이터를 받아 들일 수있는 입력 스트림을 구현해야하며 더 이상 입력이 없다는 메시지를받을 수있는 기능이없는 데이터를 차단해야합니다. https://github.com/trajano/app-ms/blob/master/ms-engine/src/main/java/net/trajano/ms/engine/internal/VertxBlockingInputStream.java