TcpOutboundGateway
및 클라이언트 TcpConnectionFactory
을 사용하여 외부 TCP 서버와 통신하려고합니다. 내 시나리오에서 각 연결은 서로 다른 스레드와 연결되어야합니다 (스레드의 각 연결은 하나 이상의 요청/응답에 사용될 수 있습니다). 스프링 통합의 맞춤 동시 TcpOutboundGateway
ThreadAffinityClientConnectionFactory
를 사용 : 나는 4 개 이상의 동시 연결을 열려고 할 때까지
Spring Integration tcp client multiple connections
그것은 괜찮 았는데, 다섯 번째 (이상) 연결이 타임 아웃에 실패합니다. 나는 org.springframework.integration.ip.tcp.TcpOutboundGateway
연결을 취득 handleRequestMessage
방법에 세마포어를 사용하는 알아 낸, 그래서 나는이 같은 TcpOuboundGateway
를 오버라이드 (override) :
public class NoSemaphoreTcpOutboundGateway extends TcpOutboundGateway {
private volatile AbstractClientConnectionFactory connectionFactory;
private final Map<String, NoSemaphoreTcpOutboundGateway.AsyncReply> pendingReplies = new ConcurrentHashMap();
@Override
public boolean onMessage(Message<?> message) {
String connectionId = (String)message.getHeaders().get("ip_connectionId");
if(connectionId == null) {
this.logger.error("Cannot correlate response - no connection id");
this.publishNoConnectionEvent(message, (String)null, "Cannot correlate response - no connection id");
return false;
}
if(this.logger.isTraceEnabled()) {
this.logger.trace("onMessage: " + connectionId + "(" + message + ")");
}
NoSemaphoreTcpOutboundGateway.AsyncReply reply = (NoSemaphoreTcpOutboundGateway.AsyncReply)this.pendingReplies.get(connectionId);
if(reply == null) {
if(message instanceof ErrorMessage) {
return false;
} else {
String errorMessage = "Cannot correlate response - no pending reply for " + connectionId;
this.logger.error(errorMessage);
this.publishNoConnectionEvent(message, connectionId, errorMessage);
return false;
}
} else {
reply.setReply(message);
return false;
}
}
@Override
protected Message handleRequestMessage(Message<?> requestMessage) {
connectionFactory = (AbstractClientConnectionFactory) this.getConnectionFactory();
Assert.notNull(this.getConnectionFactory(), this.getClass().getName() + " requires a client connection factory");
TcpConnection connection = null;
String connectionId = null;
Message var7;
try {
/*if(!this.isSingleUse()) {
this.logger.debug("trying semaphore");
if(!this.semaphore.tryAcquire(this.requestTimeout, TimeUnit.MILLISECONDS)) {
throw new MessageTimeoutException(requestMessage, "Timed out waiting for connection");
}
haveSemaphore = true;
if(this.logger.isDebugEnabled()) {
this.logger.debug("got semaphore");
}
}*/
connection = this.getConnectionFactory().getConnection();
NoSemaphoreTcpOutboundGateway.AsyncReply e = new NoSemaphoreTcpOutboundGateway.AsyncReply(10000);
connectionId = connection.getConnectionId();
this.pendingReplies.put(connectionId, e);
if(this.logger.isDebugEnabled()) {
this.logger.debug("Added pending reply " + connectionId);
}
connection.send(requestMessage);
//connection may be closed after send (in interceptor) if its disconnect message
if (!connection.isOpen())
return null;
Message replyMessage = e.getReply();
if(replyMessage == null) {
if(this.logger.isDebugEnabled()) {
this.logger.debug("Remote Timeout on " + connectionId);
}
this.connectionFactory.forceClose(connection);
throw new MessageTimeoutException(requestMessage, "Timed out waiting for response");
}
if(this.logger.isDebugEnabled()) {
this.logger.debug("Response " + replyMessage);
}
var7 = replyMessage;
} catch (Exception var11) {
this.logger.error("Tcp Gateway exception", var11);
if(var11 instanceof MessagingException) {
throw (MessagingException)var11;
}
throw new MessagingException("Failed to send or receive", var11);
} finally {
if(connectionId != null) {
this.pendingReplies.remove(connectionId);
if(this.logger.isDebugEnabled()) {
this.logger.debug("Removed pending reply " + connectionId);
}
}
}
return var7;
}
private void publishNoConnectionEvent(Message<?> message, String connectionId, String errorMessage) {
ApplicationEventPublisher applicationEventPublisher = this.connectionFactory.getApplicationEventPublisher();
if(applicationEventPublisher != null) {
applicationEventPublisher.publishEvent(new TcpConnectionFailedCorrelationEvent(this, connectionId, new MessagingException(message, errorMessage)));
}
}
private final class AsyncReply {
private final CountDownLatch latch;
private final CountDownLatch secondChanceLatch;
private final long remoteTimeout;
private volatile Message<?> reply;
private AsyncReply(long remoteTimeout) {
this.latch = new CountDownLatch(1);
this.secondChanceLatch = new CountDownLatch(1);
this.remoteTimeout = remoteTimeout;
}
public Message<?> getReply() throws Exception {
try {
if(!this.latch.await(this.remoteTimeout, TimeUnit.MILLISECONDS)) {
return null;
}
} catch (InterruptedException var2) {
Thread.currentThread().interrupt();
}
for(boolean waitForMessageAfterError = true; this.reply instanceof ErrorMessage; waitForMessageAfterError = false) {
if(!waitForMessageAfterError) {
if(this.reply.getPayload() instanceof MessagingException) {
throw (MessagingException)this.reply.getPayload();
}
throw new MessagingException("Exception while awaiting reply", (Throwable)this.reply.getPayload());
}
NoSemaphoreTcpOutboundGateway.this.logger.debug("second chance");
this.secondChanceLatch.await(2L, TimeUnit.SECONDS);
}
return this.reply;
}
public void setReply(Message<?> reply) {
if(this.reply == null) {
this.reply = reply;
this.latch.countDown();
} else if(this.reply instanceof ErrorMessage) {
this.reply = reply;
this.secondChanceLatch.countDown();
}
}
}
}
SpringContext의 구성은 다음과 같습니다
@Configuration
@ImportResource("gateway.xml")
public class Conf {
@Bean
@Autowired
@ServiceActivator(inputChannel = "clientOutChannel")
public NoSemaphoreTcpOutboundGateway noSemaphoreTcpOutboundGateway(ThreadAffinityClientConnectionFactory cf, DirectChannel clientInChannel){
NoSemaphoreTcpOutboundGateway gw = new NoSemaphoreTcpOutboundGateway();
gw.setConnectionFactory(cf);
gw.setReplyChannel(clientInChannel);
gw.setRequestTimeout(10000);
return gw;
}
<int-ip:tcp-connection-factory
id="delegateCF"
type="client"
host="${remoteService.host}"
port="${remoteService.port}"
single-use="true"
lookup-host="false"
ssl-context-support="sslContext"
deserializer="clientDeserializer"
serializer="clientSerializer"
interceptor-factory-chain="clientLoggingTcpConnectionInterceptorFactory"
using-nio="false"/>
delegateCF
이 전달됩니다 ThreadAffinityClientConnectionFactory
생성자
질문은 다음과 같습니다.
ThreadAffinityClientConnectionFactory
과 함께 사용하면NoSemaphoreTcpOutboundGateway
을 동시성 측면에서 사용할 수 있습니까?
스프링 TcpOtboundGateway는 handleRequestMessage에 다음 코드가 있습니다. 'finally { if (this.isSingleUse) { connection.close(); }' 그리고 연결을 유지해야합니다 ... – GennadyJ