2017-04-26 3 views
0

3 개의 파일을 결합하려고하는데, pyspark를 사용하여 콘솔에서 최종 파일을 출력하려고합니다. 저는 RDD를 쌍으로 변환했습니다. 문제없이 2 개를 RDD에 연결할 수 있습니다. 하지만 어떤 이유로 3 번째로 연결된 RDD에 이전에 참여한 RDD에 참여할 수 없습니다. 아래는 3 개의 파일 구조입니다.pyspark에서 여러 쌍의 RDD 결합하기

EmployeeManager.csv

E01,John 
E02,Kate 
E03,Emily 

EmployeeName.csv

E01,Brick 
E02,Blunt 
E03,Leo 

EmployeeSalary.csv

E01,50000 
E02,50000 
E03,45000 

다음은 지금까지했습니다 pyspark 코드입니다.

from pyspark import SparkConf, SparkContext 
sc = SparkContext(conf=SparkConf()) 

manager = sc.textFile('spark1/EmployeeManager.csv') 
name = sc.textFile('spark1/EmployeeName.csv') 
salary = sc.textFile('spark1/EmployeeSalary.csv') 

managerPairRDD = manager.map(lambda x: x.split(',')) 
namePairRDD = name.map(lambda x: x.split(',')) 
salaryPairRDD = salary.map(lambda x: x.split(',')) 

ns = namePairRDD.join(salaryPairRDD) 
print 'After name and salary join: \n %s' %ns.collect() 

nsm = managerPairRDD.join(ns) 
print 'After joining 3 files: %s' %nsm.collect() 

마지막 단계에서 프로그램 실행이 중지됩니다. 아래 콘솔 출력입니다

[[email protected] Spark]$ pyspark q7.py 
WARNING: Running python applications through 'pyspark' is deprecated as of Spark 1.0. 
Use ./bin/spark-submit <python file> 
SLF4J: Class path contains multiple SLF4J bindings. 
SLF4J: Found binding in [jar:file:/usr/lib/zookeeper/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class] 
SLF4J: Found binding in [jar:file:/usr/lib/flume-ng/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class] 
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. 
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] 
After name and salary join:              
[(u'E02', (u'Blunt', u'50000')), (u'E03', (u'Leo', u'45000')), (u'E01', (u'Brick', u'50000'))] 
[Stage 3:=======================================>     (2 + 0)/3] 

이 문제를 해결하는 방법을 알려주십시오. 어떤 도움이라도 대단히 감사합니다.

감사합니다.

+0

Spark 버전에 대한 자세한 정보를 제공 할 수 있습니까? 저는 Spark 2.1.0에서이 프로그램을 실행하고 있으며 저에게 도움이되었습니다. –

+0

나는 내 Cloudera VM에서 Spark 1.6.0을 실행 중입니다. –

+0

디버깅을 위해 spark UI를 사용하여 작업이 실패한 단계에 대한 세부 정보를 확인할 수 있습니다. – voldy

답변

0

마지막으로, 입력 파일을 데이터 프레임으로 변환하여 알아 냈습니다.

from pyspark import SparkConf, SparkContext 
from pyspark import SQLContext 

sc = SparkContext(conf=SparkConf()) 
sqlContext = SQLContext(sc) 

manager = sc.textFile('spark1/EmployeeManager.csv') 
name = sc.textFile('spark1/EmployeeName.csv') 
salary = sc.textFile('spark1/EmployeeSalary.csv') 

manager_df = manager.map(lambda x: list(x.split(','))).toDF(["col1","col2"]) 
name_df = name.map(lambda x: list(x.split(','))).toDF(["col1","col2"]) 
salary_df = salary.map(lambda x: list(x.split(','))).toDF(["col1","col2"]) 

nsm = name_df.alias('name_df') \ 
.join(salary_df.alias('salary_df'), name_df.col1==salary_df.col1) \ 
.join(manager_df.alias('manager_df'), name_df.col1==manager_df.col1) \ 
.select(name_df.col1, name_df.col2, salary_df.col2, manager_df.col2) 

nsm.saveAsTextFile('/spark1/q7sol')