2016-12-28 1 views
1

나는 Flume을 Python 스크립트로 로그를 수집하기 위해 netcat 소스로 Flume을 설정하는 사용자 가이드를 따르기 위해 telnet과 nc 테스트를 사용한다. 잘 작동한다.flume이 파이썬 소켓이나 텔넷으로 작업하는 동안 이벤트를 올바르게 가져올 수 없음

내 설정 코드 :

a1.sources = r1 
a1.sinks = k1 
a1.channels = c1 
Describe/configure the source 
a1.sources.r1.type = netcat 
a1.sources.r1.bind = localhost 
a1.sources.r1.port = 44444 
a1.sinks.k1.type = logger 
a1.channels.c1.type = memory 
a1.channels.c1.capacity = 1000 
a1.channels.c1.transactionCapacity = 100 
a1.sources.r1.channels = c1 
a1.sinks.k1.channel = c1 

가 그럼 난 파이썬이 수로를 연결하고 이런 식으로 몇 가지 단어를 보내는 데 사용 :

import socket 
def netcat(hostname, port): 
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 
    s.connect((hostname, port)) 
    s.send("test words 1\n") 
    s.send("test words 2\n") 
    s.send("test words 3\n") 
    s.send("test words 4\n") 
    s.shutdown(socket.SHUT_WR) 
    s.close() 

if _name_ == "_main_": 
    netcat("127.0.0.1",44444) 

문제가 발생, 수로는 2 개 행을받을 수 있습니다. 수로 로그 :

2016-12-28 16:44:32,248 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.source.NetcatSource.start(NetcatSource.java:169)] Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/127.0.0.1:44444] 2016-12-28 16:44:41,814 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 74 65 73 74 20 77 6F 72 64 73 20 31 test words 1 } 2016-12-28 16:44:41,815 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 74 65 73 74 20 77 6F 72 64 73 20 32 test words 2 }

우분투 & Java1.8 및 CentOS의 & 자바 1.7과 파이썬에서 텔넷 모델과 같은 결과 모두에서 같은 결과를 얻었다.

config 또는 Python 스크립트에 문제가 있습니까? 아니면 누군가이 사건에 대한 조언을 받았습니까?

답변

2

응답이 다시 오기를 기다리지 않기 때문에 발생합니다. 기본적으로 Flume의 netcat 소스는 모든 이벤트에 대해 "확인"메시지를 보냅니다. 해당 응답을 보내기 전에 연결을 종료하면 추가 메시지 처리가 실패하게됩니다 (파이프가 클라이언트 쪽에서 끊어짐).

a1.sources.r1.ack-every-event=false 

이가 "OK"전송하기위한 요구 사항을 제거하고, 따라서 실패를 중지 :

는이 문제를 해결하려면, 당신은 당신의 flume.conf에 다음과 같은 변화가 필요합니다.

또는 연결을 닫기 전에 "확인"메시지를 보낼 때마다 대기하도록 Python을 변경할 수 있습니다. 인위적으로 에 sleep 문을 추가하면도 문제를 해결해야하지만 메시지를 처리하는 데 걸리는 시간을 가정합니다. 정상적으로는 좋지만 처리가 지연되는 다른 상황이있을 수 있습니다.

관련 문제