2016-10-15 3 views
2

"01"에서 "15"까지의 여러 폴더로 구성된 데이터 세트가 있고 각 폴더에는 "00-00.txt"라는 파일이 포함됩니다 "23-59.txt"(각 폴더는 1 일을 묘사 함).텍스트 파일의 첫 번째 줄의 일부를 RDD의 키로 사용

파일에는 다음과 같은 행이 있습니다. 내가 키 - 값 쌍의 RDD 긴 값 1443650400.010568 키 라인 인 가지고 싶어

1443650400.010568 !AIVDM,1,1,,B,[email protected]>h8Jr6?vN2><,0*4B 
!AIVDM,1,1,,A,4022051uvOFD>RG7kDCm1iW0088i,0*23 
!AIVDM,1,1,,A,[email protected]@PHRwPM<[email protected]`OvN2><,0*4C 
!AIVDM,1,1,,A,13n1mSgP00Pgq3TQpibh0?vL2><,0*74 
!AIVDM,1,1,,B,177nPmw002:<Tn<gk1toGL60><,0*2B 
!AIVDM,1,1,,B,139eu9gP00PugK:N2BOP0?vL2><,0*77 
!AIVDM,1,1,,A,13bg8N0P000E2<BN15IKUOvN2><,0*34 
!AIVDM,1,1,,B,14bL20003ReKodINRret28P0><,0*16 
!AIVDM,1,1,,B,15SkVl001EPhf?VQ5SUTaCnH0><,0*00 
!AIVDM,1,1,,A,14eG;ihP00G=4CvL=7qJmOvN0><,0*25 
!AIVDM,1,1,,A,[email protected]<cKrL=6nJ9QfN2><,0*30 

(!AIVDM로 시작하는 각각의 엔트리는 숫자로 시작하는, 제 제외한 선이다) !AIVDM...으로 시작하는 값입니다. 이것을 어떻게 할 수 있습니까?

+0

! AIVDM으로 시작하는 나머지 레코드로 무엇을하고 싶습니까? –

+0

숫자가 모든 al의 키가되기를 원하면 결과 rdd는 다음과 같습니다. (1443650400.010568,! AIVDM, 1,1, B, 15NOHL0P00J @ uq6> h8Jr6? vN2><, 0 * 4B) , ('1443650400.010568, ! AIVDM, 1,1,, A, 4022051uvOFD> RG7kDCm1iW0088i, 0 * 23) –

답변

1

각 파일을 가정하면 충분히 작은 단일 RDD 기록 (2GB를 초과하지 않음), 당신은 하나의 레코드에 각 파일을 읽고 SparkContext.wholeTextFiles 다음 flatMap이 기록 사용할 수에 포함 할 수 있습니다 :

// assuming data/ folder contains folders 00, 01, ..., 15 
val result: RDD[(String, String)] = sc.wholeTextFiles("data/*").values.flatMap(file => { 
    val lines = file.split("\n") 
    val id = lines.head.split(" ").head 
    lines.tail.map((id, _)) 
}) 

또는 각 가정의 파일 크기가 큽니다 (수백 MB 이상). 그렇지 않으면 모든 데이터를 하나의 RDD에로드하고 데이터에 인덱스를 추가하고, 색인에 대해 "키"의지도를 수집 한 다음 색인을 사용하여 각 데이터 행에 맞는 키를 찾습니다.

// read files and zip with index to later match each data line to its key 
val raw: RDD[(String, Long)] = sc.textFile("data/*").zipWithIndex().cache() 

// separate data from ID rows 
val dataRows: RDD[(String, Long)] = raw.filter(_._1.startsWith("!AIVDM")) 
val idRows: RDD[(String, Long)] = raw.filter(!_._1.startsWith("!AIVDM")) 

// collect a map if Index -> ID 
val idForIndex = idRows.map { case (row, index) => (index, row.split(" ").head) }.collectAsMap() 

// optimization: if idForIndex is very large - consider broadcasting it or not collecting it and using a join 

// map each row to its key by looking up the MAXIMUM index which is < then row index 
// in other words - find the LAST id record BEFORE the row 
val result = dataRows.map { case (row, index) => 
    val key = idForIndex.filterKeys(_ < index).maxBy(_._1)._2 
    (key, row) 
} 
+0

감사합니다. –

+0

도움이 되었기 때문에 기꺼이 도와주세요 - 다른 독자가 알 수 있도록 답변을 받거나 upvote하십시오 :) –

관련 문제