Camel의 FTP2 구성 요소에 문제가 있습니다. 소비자 행위자는 Akka 시스템 내에 있습니다.Akka + Camel + FTP2 + localWorkingDirectory가 안정적으로 작동하지 않습니다.
기본적인 아이디어는 FTP 디렉토리에서 파일을 모니터링하고 하위 액터를 만들어 각 파일을 개별적으로 처리하는 것입니다. Akka는 동시성과 신뢰성을 관리하는 데 사용되고 있습니다. 부모 소비자 액터는 noop = true로 디렉토리를 폴링하므로 아무 것도하지 않으면 하위 소비자 액터가 'include'Camel 옵션으로 필터링 된 파일을 다운로드해야합니다. 다운로드가 동시 적이어야하며 파일이 메모리에로드되지 않아야합니다 (따라서 localWorkDirectory 사용). 버전을 보여주는
package camelrepro;
import java.io.InputStream;
import org.mockftpserver.core.command.Command;
import org.mockftpserver.core.command.ReplyCodes;
import org.mockftpserver.core.session.Session;
import org.mockftpserver.core.session.SessionKeys;
import org.mockftpserver.fake.FakeFtpServer;
import org.mockftpserver.fake.UserAccount;
import org.mockftpserver.fake.command.AbstractFakeCommandHandler;
import org.mockftpserver.fake.filesystem.FileEntry;
import org.mockftpserver.fake.filesystem.UnixFakeFileSystem;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.camel.CamelMessage;
import akka.camel.javaapi.UntypedConsumerActor;
import akka.testkit.JavaTestKit;
public class Main {
public static class ParentActor extends UntypedConsumerActor {
public ParentActor() {
System.out.println("Parent started");
}
@Override
public String getEndpointUri() {
return "ftp://[email protected]:8021?password=password&readLock=changed&initialDelay=0&delay=200&noop=true";
}
@Override
public void onReceive(Object msg) throws Exception {
if (msg instanceof CamelMessage) {
getContext().actorOf(new Props(ChildActor.class), "0");
} else {
unhandled(msg);
}
}
}
public static class ChildActor extends UntypedConsumerActor {
public ChildActor() {
System.out.println("Child started");
}
@Override
public String getEndpointUri() {
return "ftp://[email protected]:8021?password=password&readLock=changed&initialDelay=0&delay=200&include=test.txt&localWorkDirectory=/tmp";
}
@Override
public void onReceive(Object msg) throws Exception {
if (msg instanceof CamelMessage) {
System.out.println("Child got message");
CamelMessage camelMsg = (CamelMessage) msg;
InputStream source = camelMsg.getBodyAs(InputStream.class, getCamelContext());
System.out.println(source.getClass().getName());
System.exit(0);
} else {
unhandled(msg);
}
}
}
public static void main(String[] args) {
ActorSystem system = ActorSystem.create("default");
FakeFtpServer ftpServer = new FakeFtpServer();
UnixFakeFileSystem ftpFileSystem = new UnixFakeFileSystem();
ftpServer.setFileSystem(ftpFileSystem);
ftpServer.addUserAccount(new UserAccount("anonymous", "password", "/"));
ftpServer.setServerControlPort(8021);
// fix bug in PWD handling (either Apache FTP client or mock server depending on opinion)
ftpServer.setCommandHandler("PWD", new AbstractFakeCommandHandler() {
@Override
protected void handle(Command command, Session session) {
String currentDirectory = (String) session.getAttribute(SessionKeys.CURRENT_DIRECTORY);
this.replyCodeForFileSystemException = ReplyCodes.READ_FILE_ERROR;
verifyFileSystemCondition(notNullOrEmpty(currentDirectory), currentDirectory, "filesystem.currentDirectoryNotSet");
int replyCode = ReplyCodes.PWD_OK;
String replyText = String.format("\"%s\" OK", currentDirectory.replaceAll("\"", "\"\""));
session.sendReply(replyCode, replyText);
}
});
ftpFileSystem.add(new FileEntry("/test.txt", "hello world"));
ftpServer.start();
new JavaTestKit(system) {{
getSystem().actorOf(new Props(ParentActor.class));
}};
}
}
메이븐 의존성 : - 그리고이 InputStream가 아닌 확인
<dependencies>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-actor_2.10</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-remote_2.10</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-camel_2.10</artifactId>
<version>2.1.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-testkit_2.10</artifactId>
<version>2.1.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-ftp</artifactId>
<version>2.10.3</version>
</dependency>
<dependency>
<groupId>org.mockftpserver</groupId>
<artifactId>MockFtpServer</artifactId>
<version>2.4</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-io</artifactId>
<version>1.3.2</version>
</dependency>
<dependency>
<groupId>commons-net</groupId>
<artifactId>commons-net</artifactId>
<version>3.2</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.2</version>
</dependency>
</dependencies>
내가 BufferedInputStream을 표준에 기록 볼 것으로 예상
나는 간단한 생식을 작성했습니다.
하지만 그 대신, 내가 발견하지 예외 파일을 참조하십시오
[ERROR] [02/15/2013 10:53:32.951] [default-akka.actor.default-dispatcher-7] [akka://default/user/$a/0] Error during type conversion from type: org.apache.camel.component.file.remote.RemoteFile to the required type: java.io.InputStream with value GenericFile[test.txt] due java.io.FileNotFoundException: /tmp/test.txt (No such file or directory)
org.apache.camel.TypeConversionException: Error during type conversion from type: org.apache.camel.component.file.remote.RemoteFile to the required type: java.io.InputStream with value GenericFile[test.txt] due java.io.FileNotFoundException: /tmp/test.txt (No such file or directory)
at org.apache.camel.impl.converter.BaseTypeConverterRegistry.mandatoryConvertTo(BaseTypeConverterRegistry.java:162)
배의 커플 그것은 어딘가에 경쟁 할 수 의심 저를 선도했다. 그러나 거의 항상 실패합니다.
단서, 아이디어, 제안?
FWIW :
uname -a: Linux 3.2.0-37-generiC#58-Ubuntu SMP Thu Jan 24 15:28:10 UTC 2013 x86_64 x86_64 x86_64 GNU/Linux
java: 1.7.0_11-b21
좋은 생각이지만, 불행히도 동작은 2.10.2와 같습니다. –