2017-01-09 1 views
2

아파치 NiFi와 Apache Spark를 사용하는 대학교 용으로 작은 프로젝트를 진행하고 있습니다. HDFS에서 TSV 파일을 읽고 Spark Streaming을 사용하는 NiFi로 워크 플로우를 만들고 싶습니다. 파일을 처리하고 필요한 정보를 MySQL에 저장할 수 있습니다. 이미 NiFi에서 워크 플로를 만들었으며 저장 부분은 이미 작동 중입니다. 문제는 내가 그들을 사용할 수 있도록 NiFi 패키지를 파싱 할 수 없다는 것입니다.스파크를 사용하여 NiFi 데이터 패킷을 구문 분석하십시오.

파일은 다음과 같이 행을 포함 :

val ssc = new StreamingContext(config, Seconds(10)) 
val packet = ssc.receiverStream(new NiFiReceiver(conf, StorageLevel.MEMORY_ONLY)) 
val file = packet.map(dataPacket => new String(dataPacket.getContent, StandardCharsets.UTF_8)) 

:

linea1File1 TheReceptionist 653 Entertainment 424 13021 4.34 1305 744 DjdA-5oKYFQ NxTDlnOuybo c-8VuICzXtU 

각 공간 탭 ("\의 t")이 스칼라를 사용하여 불꽃에 내 코드

입니다 경우 여기까지는 하나의 문자열에서 전체 파일 (7000 개 이상의 행)을 얻을 수 있습니다 ... 불행히도 행에서 해당 문자열을 나눌 수 없습니다. 전체 파일을 행으로 가져와 개체에서 구문 분석하고 일부 작업을 적용하고 원하는 내용을 저장할 수 있습니다.

누구든지이 문제를 해결할 수 있습니까?

답변

3

각 데이터 패킷은 NiFi의 하나의 플로 파일 내용이 될 것입니다. 따라서 NiFi가 많은 행이있는 HDFS에서 하나의 TSV 파일을 선택하면 모든 행이 하나의 데이터 패킷에있게됩니다.

NiFi 흐름을 보지 않고 말하기는 어렵지만 스트리밍을 시작하기 전에 라인 수가 1 인 SplitText를 사용하여 NiFi로 TSV를 분할 할 수 있습니다.

+0

많이 고마워요.이게 내 문제를 완전히 해결했습니다 ... 저는 NiFi로 해결할 생각이 전혀 없었습니다. 나는 불꽃에 중점을 두었습니다 ... –

관련 문제