포트에서 데이터를 읽을 때 스톰 스파우트를 작성해야합니다. 논리적으로 가능한지 알고 싶었습니다.폭풍 : 포트에서 데이터를 읽는 데 사용되는 스파우트
마음에, 나는 하나의 주둥이와 하나의 볼트로 똑같이 설계된 간단한 토폴로지를 설계했습니다. 스파우트는 wget을 사용하여 전송 된 HTTP 요청을 수집하며 볼트는 요청을 표시합니다.
public class ProxySpout extends BaseRichSpout{
//The O/P collector
SpoutOutputCollector sc;
//The socket
Socket clientSocket;
//The server socket
ServerSocket sc;
public ProxySpout(int port){
this.sc=new ServerSocket(port);
try{
clientSocket=sc.accept();
}catch(IOException ex){
//Handle it
}
}
public void nextTuple(){
try{
InputStream ic=clientSocket.getInputStream();
byte b=new byte[8196];
int len=ic.read(b);
sc.emit(new Values(b));
ic.close();
}catch(//){
//Handle it
}finally{
clientSocket.close();
}
}
}
내가 너무 방법의 나머지 부분을 구현 한 다음과 같이
내 주둥이 구조이다.
java.lang.RuntimeException가 : java.io.NotSerializableException : java.net.Socket의
나는 첫 번째 요청을 보낼 때 오류가 발생하는, 토폴로지에이 전원을 켜고 실행하면
이 스파우트를 구현하는 방식에 문제가 있는지 알아야합니다. 스파우트가 포트에서 데이터를 수집 할 수 있습니까? 또는 스파우트가 프록시의 인스턴스로 작동하도록하려면?
편집 작업을 얻었다.
코드는 다음과 같습니다
public class ProxySpout extends BaseRichSpout{
//The O/P collector
static SpoutOutputCollector _collector;
//The socket
static Socket _clientSocket;
static ServerSocket _serverSocket;
static int _port;
public ProxySpout(int port){
_port=port;
}
public void open(Map conf,TopologyContext context, SpoutOutputCollector collector){
_collector=collector;
_serverSocket=new ServerSocket(_port);
}
public void nextTuple(){
_clientSocket=_serverSocket.accept();
InputStream incomingIS=_clientSocket.getInputStream();
byte[] b=new byte[8196];
int len=b.incomingIS.read(b);
_collector.emit(new Values(b));
}
}
쇼의 제안 @를 당으로 요청을 청취 nextTuple()
방법에 open()
방법 _serverSocket
과 _clientSocket
실행을 초기화했습니다.
그럼, 가능할까요? 스파우트 기능을 프록시처럼 만들기 위해서? –
네,하지만 작은 시간마다 nextTuple()이 호출됩니다. 스파우트가 아무것도받지 못하면 오류를 관리해야합니다. – gasparms