2016-12-15 2 views
1

TcpOutboundGateway 및 클라이언트 TcpConnectionFactory을 사용하여 외부 TCP 서버와 통신하려고합니다. 내 시나리오에서 각 연결은 서로 다른 스레드와 연결되어야합니다 (스레드의 각 연결은 하나 이상의 요청/응답에 사용될 수 있습니다). 스프링 통합의 맞춤 동시 TcpOutboundGateway

그래서 나는이 주제에서 ThreadAffinityClientConnectionFactory를 사용 : 나는 4 개 이상의 동시 연결을 열려고 할 때까지 Spring Integration tcp client multiple connections

그것은 괜찮 았는데, 다섯 번째 (이상) 연결이 타임 아웃에 실패합니다. 나는 org.springframework.integration.ip.tcp.TcpOutboundGateway 연결을 취득 handleRequestMessage 방법에 세마포어를 사용하는 알아 낸, 그래서 나는이 같은 TcpOuboundGateway를 오버라이드 (override) :

public class NoSemaphoreTcpOutboundGateway extends TcpOutboundGateway { 

    private volatile AbstractClientConnectionFactory connectionFactory; 
    private final Map<String, NoSemaphoreTcpOutboundGateway.AsyncReply> pendingReplies = new ConcurrentHashMap(); 

    @Override 
    public boolean onMessage(Message<?> message) { 
     String connectionId = (String)message.getHeaders().get("ip_connectionId"); 
     if(connectionId == null) { 
      this.logger.error("Cannot correlate response - no connection id"); 
      this.publishNoConnectionEvent(message, (String)null, "Cannot correlate response - no connection id"); 
      return false; 
     } 

     if(this.logger.isTraceEnabled()) { 
      this.logger.trace("onMessage: " + connectionId + "(" + message + ")"); 
     } 

     NoSemaphoreTcpOutboundGateway.AsyncReply reply = (NoSemaphoreTcpOutboundGateway.AsyncReply)this.pendingReplies.get(connectionId); 
     if(reply == null) { 
      if(message instanceof ErrorMessage) { 
       return false; 
      } else { 
       String errorMessage = "Cannot correlate response - no pending reply for " + connectionId; 
       this.logger.error(errorMessage); 
       this.publishNoConnectionEvent(message, connectionId, errorMessage); 
       return false; 
      } 
     } else { 
      reply.setReply(message); 
      return false; 
     } 

    } 

    @Override 
    protected Message handleRequestMessage(Message<?> requestMessage) { 
     connectionFactory = (AbstractClientConnectionFactory) this.getConnectionFactory(); 
     Assert.notNull(this.getConnectionFactory(), this.getClass().getName() + " requires a client connection factory"); 

     TcpConnection connection = null; 
     String connectionId = null; 

     Message var7; 
     try { 
      /*if(!this.isSingleUse()) { 
       this.logger.debug("trying semaphore"); 
       if(!this.semaphore.tryAcquire(this.requestTimeout, TimeUnit.MILLISECONDS)) { 
        throw new MessageTimeoutException(requestMessage, "Timed out waiting for connection"); 
       } 

       haveSemaphore = true; 
       if(this.logger.isDebugEnabled()) { 
        this.logger.debug("got semaphore"); 
       } 
      }*/ 

      connection = this.getConnectionFactory().getConnection(); 
      NoSemaphoreTcpOutboundGateway.AsyncReply e = new NoSemaphoreTcpOutboundGateway.AsyncReply(10000); 
      connectionId = connection.getConnectionId(); 
      this.pendingReplies.put(connectionId, e); 
      if(this.logger.isDebugEnabled()) { 
       this.logger.debug("Added pending reply " + connectionId); 
      } 

      connection.send(requestMessage); 

      //connection may be closed after send (in interceptor) if its disconnect message 
      if (!connection.isOpen()) 
       return null; 

      Message replyMessage = e.getReply(); 
      if(replyMessage == null) { 
       if(this.logger.isDebugEnabled()) { 
        this.logger.debug("Remote Timeout on " + connectionId); 
       } 

       this.connectionFactory.forceClose(connection); 
       throw new MessageTimeoutException(requestMessage, "Timed out waiting for response"); 
      } 

      if(this.logger.isDebugEnabled()) { 
       this.logger.debug("Response " + replyMessage); 
      } 

      var7 = replyMessage; 
     } catch (Exception var11) { 
      this.logger.error("Tcp Gateway exception", var11); 
      if(var11 instanceof MessagingException) { 
       throw (MessagingException)var11; 
      } 

      throw new MessagingException("Failed to send or receive", var11); 
     } finally { 
      if(connectionId != null) { 
       this.pendingReplies.remove(connectionId); 
       if(this.logger.isDebugEnabled()) { 
        this.logger.debug("Removed pending reply " + connectionId); 
       } 
      } 
     } 
     return var7; 
    } 

    private void publishNoConnectionEvent(Message<?> message, String connectionId, String errorMessage) { 
     ApplicationEventPublisher applicationEventPublisher = this.connectionFactory.getApplicationEventPublisher(); 
     if(applicationEventPublisher != null) { 
      applicationEventPublisher.publishEvent(new TcpConnectionFailedCorrelationEvent(this, connectionId, new MessagingException(message, errorMessage))); 
     } 
    } 

    private final class AsyncReply { 
     private final CountDownLatch latch; 
     private final CountDownLatch secondChanceLatch; 
     private final long remoteTimeout; 
     private volatile Message<?> reply; 

     private AsyncReply(long remoteTimeout) { 
      this.latch = new CountDownLatch(1); 
      this.secondChanceLatch = new CountDownLatch(1); 
      this.remoteTimeout = remoteTimeout; 
     } 

     public Message<?> getReply() throws Exception { 
      try { 
       if(!this.latch.await(this.remoteTimeout, TimeUnit.MILLISECONDS)) { 
        return null; 
       } 
      } catch (InterruptedException var2) { 
       Thread.currentThread().interrupt(); 
      } 
      for(boolean waitForMessageAfterError = true; this.reply instanceof ErrorMessage; waitForMessageAfterError = false) { 
       if(!waitForMessageAfterError) { 
        if(this.reply.getPayload() instanceof MessagingException) { 
         throw (MessagingException)this.reply.getPayload(); 
        } 

        throw new MessagingException("Exception while awaiting reply", (Throwable)this.reply.getPayload()); 
       } 
       NoSemaphoreTcpOutboundGateway.this.logger.debug("second chance"); 
       this.secondChanceLatch.await(2L, TimeUnit.SECONDS); 
      } 
      return this.reply; 
     } 

     public void setReply(Message<?> reply) { 
      if(this.reply == null) { 
       this.reply = reply; 
       this.latch.countDown(); 
      } else if(this.reply instanceof ErrorMessage) { 
       this.reply = reply; 
       this.secondChanceLatch.countDown(); 
      } 
     } 
    } 
} 

SpringContext의 구성은 다음과 같습니다

@Configuration 
@ImportResource("gateway.xml") 
public class Conf { 

@Bean 
@Autowired 
@ServiceActivator(inputChannel = "clientOutChannel") 
public NoSemaphoreTcpOutboundGateway noSemaphoreTcpOutboundGateway(ThreadAffinityClientConnectionFactory cf, DirectChannel clientInChannel){ 
    NoSemaphoreTcpOutboundGateway gw = new NoSemaphoreTcpOutboundGateway(); 
    gw.setConnectionFactory(cf); 
    gw.setReplyChannel(clientInChannel); 
    gw.setRequestTimeout(10000); 
    return gw; 
} 

<int-ip:tcp-connection-factory 
     id="delegateCF" 
     type="client" 
     host="${remoteService.host}" 
     port="${remoteService.port}" 
     single-use="true" 
     lookup-host="false" 
     ssl-context-support="sslContext" 
     deserializer="clientDeserializer" 
     serializer="clientSerializer" 
     interceptor-factory-chain="clientLoggingTcpConnectionInterceptorFactory" 
     using-nio="false"/> 

delegateCF이 전달됩니다 ThreadAffinityClientConnectionFactory 생성자

질문은 다음과 같습니다.

  • ThreadAffinityClientConnectionFactory과 함께 사용하면 NoSemaphoreTcpOutboundGateway을 동시성 측면에서 사용할 수 있습니까?

답변

0

올바른 방법으로 보이지만 동시에 사용자 정의가 필요하지 않은 것 같습니다 TcpOutboundGateway. semaphore 논리에 기초한다 : 주석에

@Bean 
public TcpNetClientConnectionFactory delegateCF() { 
    TcpNetClientConnectionFactory clientCF = new TcpNetClientConnectionFactory("localhost", 1234); 
    clientCF.setSingleUse(true); // so each thread gets his own connection 
    return clientCF; 
} 

@Bean 
public ThreadAffinityClientConnectionFactory affinityCF() { 
    return new ThreadAffinityClientConnectionFactory(delegateCF()); 
} 

에주의 : 게리 년대 ThreadAffinityClientConnectionFactory에 대한 솔루션에서 동시에 모양에

if (!this.isSingleUse) { 
      logger.debug("trying semaphore"); 
      if (!this.semaphore.tryAcquire(this.requestTimeout, TimeUnit.MILLISECONDS)) { 
       throw new MessageTimeoutException(requestMessage, "Timed out waiting for connection"); 
      } 

. 위임자 인 isSingleUse() 만 필요합니다.

+0

스프링 TcpOtboundGateway는 handleRequestMessage에 다음 코드가 있습니다. 'finally { if (this.isSingleUse) { connection.close(); }' 그리고 연결을 유지해야합니다 ... – GennadyJ

관련 문제