2017-09-13 1 views
1

내가 각 라인에서 JSON을 포함하는 몇 개의 파일을HDFS-싱크는 다음 HDFS에 수로에 의해 모든 이벤트에 추가 된 타임 스탬프를 제거하는 방법은

[[email protected] vp_flume]# more vp_170801.txt.finished | awk '{printf("%s\n", substr($0,0,20))}' 
{"status":"OK","resp 
{"status":"OK","resp 
{"status":"OK","resp 
{"status":"OK","resp 
{"status":"OK","resp 
{"status":"OK","resp 
{"status":"OK","resp 
{"status":"OK","resp 
{"status":"OK","resp 
{"status":"OK","resp 

내 수로의 설정이

[[email protected] flume]# cat flume_test.conf 
agent.sources = seqGenSrc 
agent.channels = memoryChannel 
agent.sinks = loggerSink 

agent.sources.seqGenSrc.type = spooldir 
agent.sources.seqGenSrc.spoolDir = /moveitdata/dong/vp_flume 
agent.sources.seqGenSrc.deserializer.maxLineLength = 10000000 
agent.sources.seqGenSrc.fileSuffix = .finished 
agent.sources.seqGenSrc.deletePolicy = never 

agent.sources.seqGenSrc.channels = memoryChannel 
agent.sinks.loggerSink.channel = memoryChannel 
agent.channels.memoryChannel.type = memory 
agent.channels.memoryChannel.capacity = 100 

agent.sinks.loggerSink.type = hdfs 
agent.sinks.loggerSink.hdfs.path = /home/dong/vp_flume 

agent.sinks.loggerSink.hdfs.writeFormat = Text 
agent.sinks.loggerSink.hdfs.rollInterval = 0 
agent.sinks.loggerSink.hdfs.rollSize = 1000000000 
agent.sinks.loggerSink.hdfs.rollCount = 0 
되어있는 파일

HDFS의 파일은 다음과 같습니다

[[email protected] flume]# hadoop fs -text /home/dong/vp_flume/* | awk '{printf("%s\n", substr($0,0,20))}' | more 
1505276698665 {"stat 
1505276698665 {"stat 
1505276698666 {"stat 
1505276698666 {"stat 
1505276698666 {"stat 
1505276698667 {"stat 
1505276698667 {"stat 
1505276698667 {"stat 
1505276698668 {"stat 
1505276698668 {"stat 
1505276698668 {"stat 
1505276698668 {"stat 
1505276698669 {"stat 
1505276698669 {"stat 
1505276698669 {"stat 
1505276698669 {"stat 
1505276698670 {"stat 
1505276698670 {"stat 
1505276698670 {"stat 
1505276698670 {"stat 

질문 : 각 이벤트에 수로에 의해 추가 된 타임 스탬프를 좋아하지 않는다. 그러나, 어떻게 적절하게 flume을 구성하여 제거 할 수 있습니까?

답변

1

Flume이 SequenceFile을 기본값으로 사용하도록 에이전트 구성 파일에 hdfs.fileType 속성을 명시 적으로 언급하지 않았습니다. SequenceFile은 TextWritable의 두 가지 쓰기 형식을 지원합니다. hdfs.writeFormat = Text을 설정 했으므로 Flume이 HDFSTextSerializer을 사용하여 이벤트를 직렬화합니다. its source (53 행)을 살펴보면 타임 스탬프가 기본 키로 추가되었음을 알 수 있습니다.

hdfs.writeFormat = Writable을 사용하면 동일하지 않기 때문에 도움이되지 않습니다. 소스 here (라인 52)을 확인할 수 있습니다.

키는 항상 SequenceFile에 필요합니다. 따라서, SequenceFile을 사용하는 좋은 이유가 없다면 hdfs.fileType = DataStream을 에이전트 구성에 사용하는 것이 좋습니다.

관련 문제