2013-02-15 3 views
1

Camel의 FTP2 구성 요소에 문제가 있습니다. 소비자 행위자는 Akka 시스템 내에 있습니다.Akka + Camel + FTP2 + localWorkingDirectory가 안정적으로 작동하지 않습니다.

기본적인 아이디어는 FTP 디렉토리에서 파일을 모니터링하고 하위 액터를 만들어 각 파일을 개별적으로 처리하는 것입니다. Akka는 동시성과 신뢰성을 관리하는 데 사용되고 있습니다. 부모 소비자 액터는 noop = true로 디렉토리를 폴링하므로 아무 것도하지 않으면 하위 소비자 액터가 'include'Camel 옵션으로 필터링 된 파일을 다운로드해야합니다. 다운로드가 동시 적이어야하며 파일이 메모리에로드되지 않아야합니다 (따라서 localWorkDirectory 사용). 버전을 보여주는

package camelrepro; 

import java.io.InputStream; 

import org.mockftpserver.core.command.Command; 
import org.mockftpserver.core.command.ReplyCodes; 
import org.mockftpserver.core.session.Session; 
import org.mockftpserver.core.session.SessionKeys; 
import org.mockftpserver.fake.FakeFtpServer; 
import org.mockftpserver.fake.UserAccount; 
import org.mockftpserver.fake.command.AbstractFakeCommandHandler; 
import org.mockftpserver.fake.filesystem.FileEntry; 
import org.mockftpserver.fake.filesystem.UnixFakeFileSystem; 

import akka.actor.ActorSystem; 
import akka.actor.Props; 
import akka.camel.CamelMessage; 
import akka.camel.javaapi.UntypedConsumerActor; 
import akka.testkit.JavaTestKit; 

public class Main { 

    public static class ParentActor extends UntypedConsumerActor { 

     public ParentActor() { 
      System.out.println("Parent started"); 
     } 
     @Override 
     public String getEndpointUri() { 
      return "ftp://[email protected]:8021?password=password&readLock=changed&initialDelay=0&delay=200&noop=true"; 
     } 

     @Override 
     public void onReceive(Object msg) throws Exception { 
      if (msg instanceof CamelMessage) { 
       getContext().actorOf(new Props(ChildActor.class), "0"); 
      } else { 
       unhandled(msg); 
      } 
     } 
    } 

    public static class ChildActor extends UntypedConsumerActor { 

     public ChildActor() { 
      System.out.println("Child started"); 
     } 

     @Override 
     public String getEndpointUri() { 
      return "ftp://[email protected]:8021?password=password&readLock=changed&initialDelay=0&delay=200&include=test.txt&localWorkDirectory=/tmp"; 
     } 

     @Override 
     public void onReceive(Object msg) throws Exception { 
      if (msg instanceof CamelMessage) { 
       System.out.println("Child got message"); 
       CamelMessage camelMsg = (CamelMessage) msg; 

       InputStream source = camelMsg.getBodyAs(InputStream.class, getCamelContext()); 
       System.out.println(source.getClass().getName()); 
       System.exit(0); 
      } else { 
       unhandled(msg); 
      } 
     } 
    } 

    public static void main(String[] args) { 

     ActorSystem system = ActorSystem.create("default"); 

     FakeFtpServer ftpServer = new FakeFtpServer(); 
     UnixFakeFileSystem ftpFileSystem = new UnixFakeFileSystem(); 
     ftpServer.setFileSystem(ftpFileSystem); 
     ftpServer.addUserAccount(new UserAccount("anonymous", "password", "/")); 
     ftpServer.setServerControlPort(8021); 

     // fix bug in PWD handling (either Apache FTP client or mock server depending on opinion) 
     ftpServer.setCommandHandler("PWD", new AbstractFakeCommandHandler() { 
      @Override 
      protected void handle(Command command, Session session) { 
       String currentDirectory = (String) session.getAttribute(SessionKeys.CURRENT_DIRECTORY); 
       this.replyCodeForFileSystemException = ReplyCodes.READ_FILE_ERROR; 
       verifyFileSystemCondition(notNullOrEmpty(currentDirectory), currentDirectory, "filesystem.currentDirectoryNotSet"); 
       int replyCode = ReplyCodes.PWD_OK; 
       String replyText = String.format("\"%s\" OK", currentDirectory.replaceAll("\"", "\"\"")); 
       session.sendReply(replyCode, replyText); 
      } 
     }); 
     ftpFileSystem.add(new FileEntry("/test.txt", "hello world")); 
     ftpServer.start(); 

     new JavaTestKit(system) {{ 
      getSystem().actorOf(new Props(ParentActor.class)); 
     }}; 
    } 
} 

메이븐 의존성 : - 그리고이 InputStream가 아닌 확인

<dependencies> 
     <dependency> 
      <groupId>com.typesafe.akka</groupId> 
      <artifactId>akka-actor_2.10</artifactId> 
      <version>2.1.0</version> 
     </dependency> 
     <dependency> 
      <groupId>com.typesafe.akka</groupId> 
      <artifactId>akka-remote_2.10</artifactId> 
      <version>2.1.0</version> 
     </dependency> 
     <dependency> 
      <groupId>com.typesafe.akka</groupId> 
      <artifactId>akka-camel_2.10</artifactId> 
      <version>2.1.0</version> 
      <scope>test</scope> 
     </dependency> 
     <dependency> 
      <groupId>com.typesafe.akka</groupId> 
      <artifactId>akka-testkit_2.10</artifactId> 
      <version>2.1.0</version> 
      <scope>test</scope> 
     </dependency> 
     <dependency> 
      <groupId>org.apache.camel</groupId> 
      <artifactId>camel-ftp</artifactId> 
      <version>2.10.3</version> 
     </dependency> 
     <dependency> 
      <groupId>org.mockftpserver</groupId> 
      <artifactId>MockFtpServer</artifactId> 
      <version>2.4</version> 
      <scope>test</scope> 
     </dependency> 
     <dependency> 
      <groupId>org.apache.commons</groupId> 
      <artifactId>commons-io</artifactId> 
      <version>1.3.2</version> 
     </dependency> 
     <dependency> 
      <groupId>commons-net</groupId> 
      <artifactId>commons-net</artifactId> 
      <version>3.2</version> 
     </dependency> 
     <dependency> 
      <groupId>org.slf4j</groupId> 
      <artifactId>slf4j-simple</artifactId> 
      <version>1.7.2</version> 
     </dependency> 
    </dependencies> 

내가 BufferedInputStream을 표준에 기록 볼 것으로 예상

나는 간단한 생식을 작성했습니다.

하지만 그 대신, 내가 발견하지 예외 파일을 참조하십시오

[ERROR] [02/15/2013 10:53:32.951] [default-akka.actor.default-dispatcher-7] [akka://default/user/$a/0] Error during type conversion from type: org.apache.camel.component.file.remote.RemoteFile to the required type: java.io.InputStream with value GenericFile[test.txt] due java.io.FileNotFoundException: /tmp/test.txt (No such file or directory) 
org.apache.camel.TypeConversionException: Error during type conversion from type: org.apache.camel.component.file.remote.RemoteFile to the required type: java.io.InputStream with value GenericFile[test.txt] due java.io.FileNotFoundException: /tmp/test.txt (No such file or directory) 
    at org.apache.camel.impl.converter.BaseTypeConverterRegistry.mandatoryConvertTo(BaseTypeConverterRegistry.java:162) 

배의 커플 그것은 어딘가에 경쟁 할 수 의심 저를 선도했다. 그러나 거의 항상 실패합니다.

단서, 아이디어, 제안?

FWIW :

uname -a: Linux 3.2.0-37-generiC#58-Ubuntu SMP Thu Jan 24 15:28:10 UTC 2013 x86_64 x86_64 x86_64 GNU/Linux 
java: 1.7.0_11-b21 

답변

2

위의 문제에 대한 해결책을 찾았습니다.

자식 소비자 autoAck()이 사실을 반환한다는 것은 사실입니다 (기본적으로). 그런 경우, akka-camel은 CamelMessage을 보내고 잊어 버리고 정리를 진행합니다. 그 사이에 자식 배우는 getBodyAs()에 의해 호출 된 유형 변환기 중 하나가 열릴 때까지 실제로는 InputStream을 열지 않습니다. 따라서 자식 배우가 getBodyAs()을 통해 파일을 여는 것과 파일을 비동기 적으로 보낸 후 Camel 정리를 통해 파일을 삭제하는 경쟁이 있습니다.

따라서 autoAck()을 재정의하여 false를 반환하고 자식 메시지 처리기 끝에 Ack.getInstance() (또는 new Status.Failure(<cause>) 원한다면)을 보내야합니다.

1

사용 낙타 2.10.2은 ftp 구성 요소 2.10.3에 문제가있는 한

+0

좋은 생각이지만, 불행히도 동작은 2.10.2와 같습니다. –

0

localWorkDirectory =/tmp를 사용하는 경우 해당 디렉토리에 임시 파일을 저장하기위한 것입니다 라우팅 중. Camel Exchange가 완료되면 파일이 삭제됩니다. 나는 이것이 비동기 이벤트 인 Akka와 어떻게 작동하는지 모르겠습니다. 따라서 Akel onReceive는 Camel Exchange가 완료된 후 비동기라고 불릴 수 있으므로 임시 파일이 삭제됩니다. 대신 : 더 permament 성격

from("ftp:...") 
    .to("file:inbox") 

을의 filke 위치에 경로 파일을 것 그리고 당신이 Akka를 가질 수 있습니다 낙타에서

는 ("받은 편지함 파일")에서 소비한다.

+0

이 경우 전송이 동시에 이루어지며 예 : FTP 서버가 가끔 죽어? 나는 그것을 한순간에 설정했었다. 그러나 나의 상사는 직접적인 낙타 루트 접근법을 좋아하지 않았고, 내결함성을 위해 akka를 사용하고 싶었다. :) –

관련 문제