2017-05-18 3 views
0

., I는 같은 N 행 CSV가 'fileinput 함수'I 성공적 sql.rows의 배열을 만들었다대하여 반복 스파크 2.1.1 사용

colname datatype elems start end 
colA float  10  0  1 
colB int   10  0  9 

...

val df = spark.read.format("com.databricks.spark.csv").option("header", "true").load(fileInput) 
val rowCnt:Int = df.count.toInt 
val aryToUse = df.take(rowCnt) 
Array[org.apache.spark.sql.Row] = Array([colA,float,10,0,1], [colB,int,10,0,9]) 
그 행에 대하여

내 임의의 값 발전기 스크립트를 사용하여, 나는 성공적으로 내가 지금 [모든] ...

res170: scala.collection.mutable.ListBuffer[Any] = ListBuffer(List(0.24455154, 0.108798146, 0.111522496, 0.44311434, 0.13506883, 0.0655781, 0.8273762, 0.49718297, 0.5322746, 0.8416396), List(1, 9, 3, 4, 2, 3, 8, 7, 4, 6)) 

을 빈 ListBuffer를 채운 혼합 된 형식의 ListBuffer [모두]에 다른 형식의 목록이 있습니다. . 어떻게 되풀이하고 압축합니까? [모두]가 매핑/압축을 거부합니다. inputFile의 정의에 의해 생성 된 N 개의 목록을 가져 와서 csv 파일에 저장해야합니다. 최종 출력은 다음과 같아야합니다

ColA, ColB 
0.24455154, 1 
0.108798146, 9 
0.111522496, 3 
... etc 
입력 _ 다음, 각 유형의 1 : n 번 나타나는 데이터 형식 '(나는 그것을위한 스크립트가)'임의의 COLNAMES 년대, '임의의 수를 생성 할 수 있습니다

의 임의의 수의 행 ('elems'로 정의 됨). 임의 생성 스크립트는 '시작'당 값을 사용자 정의합니다. & 'end', 그러나이 열은이 질문과 관련이 없습니다.

답변

-1

나는 RDD.zipWithUniqueId() 또는 RDD.zipWithIndex() 방법으로 원하는 것을 수행 할 수 있다고 생각합니다.

자세한 내용은 official documentation을 참조하십시오. 당신이되는 결과를 꺼리지 않는 경우 List[List[Any]]을 감안할 때이 도움이

2

, 당신은 "우편"모든 목록을 함께 transpose를 사용 할 수 있기를 바랍니다 목록 -의 - 목록 대신 튜플의 목록 :

val result: Seq[List[Any]] = list.transpose 

당신이 다음 CSV에이를 작성하려는 경우, 당신은 쉼표로 구분 된 문자열로 각 "행"을 매핑하여 시작할 수 있습니다

val rows: Seq[String] = result.map(_.mkString(",")) 

(참고 : 나는 아파치 스파크 부분을 무시하고있어 어떤 이 질문에 전혀 관련이없는 것처럼 보입니다. "메타 데이터"는 Spar를 통해로드됩니다. k,하지만 배열로 모아 져서 무의미 해집니다)