2016-11-27 1 views
0

Flume 1.7을 사용하여 HDFS로 데이터를로드하려고합니다.Flume HDFS 싱크는 netcat 소스를 사용하여 한 줄의 데이터 소스 만 저장합니다.

# Starting with: /opt/flume/bin/flume-ng agent -n Agent -c conf -f /opt/flume/conf/test.conf 
# Naming the components on the current agent 
Agent.sources = Netcat 
Agent.channels = MemChannel 
Agent.sinks = LoggerSink hdfs-sink LocalOut 

# Describing/Configuring the source 
Agent.sources.Netcat.type = netcat 
Agent.sources.Netcat.bind = 0.0.0.0 
Agent.sources.Netcat.port = 56565 

# Describing/Configuring the sink 
Agent.sinks.LoggerSink.type = logger 

# Define a sink that outputs to hdfs. 
Agent.sinks.hdfs-sink.type = hdfs 
Agent.sinks.hdfs-sink.hdfs.path = hdfs://<<IP of HDFS node>>:8020/user/admin/flume_folder/%y-%m-%d/%H%M/ 
Agent.sinks.hdfs-sink.hdfs.useLocalTimeStamp = true 
Agent.sinks.hdfs-sink.hdfs.fileType = DataStream 
Agent.sinks.hdfs-sink.hdfs.writeFormat = Text 
Agent.sinks.hdfs-sink.hdfs.batchSize = 100 
Agent.sinks.hdfs-sink.hdfs.rollSize = 0 
Agent.sinks.hdfs-sink.hdfs.rollCount = 0 
Agent.sinks.hdfs-sink.hdfs.rollInterval = 0 
Agent.sinks.hdfs-sink.hdfs.idleTimeout = 0 

# Schreibt input into local Filesystem 
#http://flume.apache.org/FlumeUserGuide.html#file-roll-sink 
Agent.sinks.LocalOut.type = file_roll 
Agent.sinks.LocalOut.sink.directory = /tmp/flume 
Agent.sinks.LocalOut.sink.rollInterval = 0 


# Describing/Configuring the channel 
Agent.channels.MemChannel.type = memory 
Agent.channels.MemChannel.capacity = 1000 
Agent.channels.MemChannel.transactionCapacity = 100 

# Bind the source and sink to the channel 
Agent.sources.Netcat.channels = MemChannel 
Agent.sinks.LoggerSink.channel = MemChannel 
Agent.sinks.hdfs-sink.channel = MemChannel 
Agent.sinks.LocalOut.channel = MemChannel 

나는 다음과 같은 파일을 보내 그 후 소스에 netcat을 사용 : :

cat textfile.csv | nc <IP of flume agent> 56565 

파일은 다음과 같은 요소로 포함되어

Name1,1 
Name2,2 
Name3,3 
Name4,4 
Name5,5 
Name6,6 
Name7,7 
Name8,8 
Name9,9 
Name10,10 
Name11,11 
Name12,12 
Name13,13 
Name14,14 
Name15,15 
Name16,16 
Name17,17 
Name18,18 
Name19,19 
Name20,20 
... 
Name490,490 
Name491,491 
Name492,492 

문제 나는 다음과 같은 구성을 생성 내가 겪고있는 것은 오류없이 flume이 hdfs에 기록하지만 전송 된 파일의 한 줄만 쓰는 것입니다. nectat를 사용하여 여러 번 파일을 소스로 푸시하기 시작하면 때때로 flume은 둘 이상의 파일을 하나 이상의 행을 포함하는 hdfs에 씁니다. 그러나 드물게 모든 행.

rollSize, 배치 크기 및 기타에 대한 hdfs 매개 변수를 변경하려고했지만 실제로 동작이 변경되지 않습니다.

또한 구성된 싱크 로컬 파일도 올바르게 작동합니다.

누구나 항목을 삭제하지 않고 모든 항목을 hdfs에 기록하도록 구성하는 방법을 알고 있었습니까?

도움 주셔서 감사합니다. 2016년 1월 12일


업데이트 나는 HDFS의 싱크를 제외한 모든 싱크를 제거하고 일부 매개 변수를 변경했습니다. 이 후 HDFS 싱크가 있어야대로 수행합니다. 여기

구성 :

# Naming the components on the current agent 
Agent.sources = Netcat 
Agent.channels = MemChannel 
Agent.sinks = hdfs-sink 

# Describing/Configuring the source 
Agent.sources.Netcat.type = netcat 
Agent.sources.Netcat.bind = 0.0.0.0 
Agent.sources.Netcat.port = 56565 


# Define a sink that outputs to hdfs. 
Agent.sinks.hdfs-sink.type = hdfs 
Agent.sinks.hdfs-sink.hdfs.path = hdfs://<<IP of HDFS node>>/user/admin/flume_folder/%y-%m-%d/%H%M/ 
Agent.sinks.hdfs-sink.hdfs.useLocalTimeStamp = true 
Agent.sinks.hdfs-sink.hdfs.fileType = DataStream 
Agent.sinks.hdfs-sink.hdfs.writeFormat = Text 
Agent.sinks.hdfs-sink.hdfs.batchSize = 100 
Agent.sinks.hdfs-sink.hdfs.rollSize = 0 
Agent.sinks.hdfs-sink.hdfs.rollCount = 100 


# Describing/Configuring the channel 
Agent.channels.MemChannel.type = memory 
Agent.channels.MemChannel.capacity = 1000 
Agent.channels.MemChannel.transactionCapacity = 100 

# Bind the source and sink to the channel 
Agent.sources.Netcat.channels = MemChannel 
Agent.sinks.hdfs-sink.channel = MemChannel 

가지고 누군가가이 구성하지만 더 이상 작동하지 않습니다 두 개 이상의 싱크 작업을 왜 생각?

답변

0

나는 내 자신으로 해결책을 찾았습니다. 내가 이해할 때, 나는 두 싱크 모두에 대해 동일한 채널을 사용했습니다. 따라서 더 빨라진 싱크대는 모든 항목을 차지하며 항목 중 일부만 hdfs 싱크로 전달되었습니다. 예상대로

파라미터와 소스의 패닝을 상이한 채널을 사용하여 포함되면

Agent.sources.Netcat.selector.type = replicating 

수로는 로컬 파일에 기록하고 HDFS.

관련 문제