2014-03-19 2 views
1

spark shell을 사용하여 HDFS에서 두 파일을 결합하려고합니다. 두 파일 탭 분리되고 난Spark에서 두 개의 HDFS 파일 결합

코드 을 시도하지만 모든 출력

val ny_daily= sc.parallelize(List("hdfs://localhost:8020/user/user/NYstock /NYSE_daily")) 

val ny_daily_split = ny_daily.map(line =>line.split('\t')) 

val enKeyValuePair = ny_daily_split.map(line => (line(0).substring(0, 5), line(3).toInt)) 


val ny_dividend= sc.parallelize(List("hdfs://localhost:8020/user/user/NYstock/NYSE_dividends")) 

val ny_dividend_split = ny_dividend.map(line =>line.split('\t')) 

val enKeyValuePair1 = ny_dividend_split.map(line => (line(0).substring(0, 4),  line(3).toInt)) 

enKeyValuePair1.join(enKeyValuePair) 

을 포기하지 않을하지만 특정 열 에서 파일을 결합하는 방법에 대한 정보를 얻고 있지 않다 두 번째 열에서 가입 할 나는 특정 컬럼에 파일을 결합하는 방법에 대한 정보를 얻고 있지 않다

답변

4

제안하십시오

RDDs는 자신의 키에 연결된다, 그래서 당신 은 당신이 쓴시에 가입 열을 결정 :

val enKeyValuePair = ny_daily_split.map(line => (line(0).substring(0, 5), line(3).toInt)) 
... 
val enKeyValuePair1 = ny_daily_split.map(line => (line(0).substring(0, 4), line(3).toInt)) 

귀하의 RDDs는 line(0).substring(0, 5)line(0).substring(0, 4)에서 오는 값에 합류한다.

join 기능 (및 기타 많은 유용한 기능) hereSpark Programming Guide은 Spark 작동 방식을 이해하는 데 훌륭한 참고 자료입니다.

enKeyValuePair1.join(enKeyValuePair).foreach(println) 

참고 :가에서 데이터를로드 할 수

코드를 시도했지만 출력을보기 위해 모든 출력

을 포기하지 않을, 당신은 그것을 인쇄 불꽃을 요청해야 sc.textFile() : sc.parallelize()을 사용해야하는 파일은 스칼라 컬렉션에서 RDD를 만드는 데에만 사용됩니다.

다음 코드는 작업을 수행해야합니다 그런데

val ny_daily_split = sc.textFile("hdfs://localhost:8020/user/user/NYstock/NYSE_daily").map(line =>line.split('\t')) 
val ny_dividend_split = sc.textFile("hdfs://localhost:8020/user/user/NYstock/NYSE_dividends").map(line =>line.split('\t')) 

val enKeyValuePair = ny_daily_split.map(line => line(0).substring(0, 5) -> line(3).toInt) 
val enKeyValuePair1 = ny_dividend_split.map(line => line(0).substring(0, 4) -> line(3).toInt) 

enKeyValuePair1.join(enKeyValuePair).foreach(println) 

을, 당신은 당신이 두 번째 열에서 가입 할 것을 언급하지만 실제로 line(0)을 사용하고,이 의도?

희망이 도움이됩니다.

+0

정확하게 내가 조인의 키와 값에 넣어야하는 것은 내가 열에 합류하고 출력으로 합쳐진 전체 데이터 세트를 볼 수 있어야합니다. –

+0

'map' 함수를'ny_daily_split.map (line => line (1) -> line.mkString ("\ t"))'ny_dividend_split.map (line => line (1) -> line.mkString ("\ t")) – fedragon