저는 Apache Spark의 새로운 버전입니다. Spark RDD 및 DataFrames에 CSV 파일을로드하려고합니다.RDD에서 스키마를 시행하면서 DataFrame으로 변환 중
저는 RDD를 사용하여 데이터를 조작하고 데이터 프레임을 SQL과 같은 방식으로 데이터 프레임에서 조작합니다.
RDD를 Spark DataFrame으로 변환하는 중에 문제가 발생합니다. 문제는 다음과 같습니다.
# to load data
dataRDD = sc.textFile(trackfilepath)
# To use it as a csv
dataRDD = testData.mapPartitions(lambda x: csv.reader(x))
# To load into data frame and capture the schema
dataDF = sqlContext.read.load(trackfilepath,
format='com.databricks.spark.csv',
header='true',
inferSchema='true')
schema = dataDF.schema
는 데이터 보이는
print (dataRDD.take(3))
[['Name', 'f1', 'f2', 'f3', 'f4'], ['Joe', '5', '7', '8', '3'], ['Jill', '3', '2', '2', '23']]
print (dataDF.take(3))
[Row(_c0='Name', _c1='f1', _c2='f2', _c3='f3', _c4='f4'), Row(_c0='Joe', _c1='5', _c2='7', _c3='8', _c4='3'), Row(_c0='Jill', _c1='3', _c2='2', _c3='2', _c4='23')]
print schema
StructType(List(StructField(Name,StringType,true),StructField(f1,IntegerType,true),StructField(f2,IntegerType,true),StructField(f3,IntegerType,true),StructField(f4,IntegerType,true)))
데이터 조작
def splitWords(line):
return ['Jillwa' if item=='Jill' else item for item in line]
dataCleanRDD = dataRDD.map(splitWords)
문제점 등 :
이제나는 아래의 코드를 사용하여 DataFrame로 조작 RDD를 저장하는 것을 시도하고있다 및 스키마. 때문에 RDD 및 스키마에있는 값의 데이터 유형의 불일치로
TypeError: IntegerType can not accept object 'f1' in type <class 'str'>
오류는 다음과 같습니다
dataCleanDF = sqlContext.createDataFrame(dataCleanRDD, schema=schema)
이 나에게 아래의 오류를 제공합니다. RDD는 모든 것을 String으로 취급하고 스키마는 field1 field2에 정수를 갖습니다. 이것은 더미 데이터 세트입니다. 실제 데이터 세트는 200 개의 열과 100,000 개의 행으로 구성됩니다. 따라서 RDD 값을 정수로 수동으로 변경하기가 어렵습니다.
RDD 값에 스키마를 강제 적용하는 방법이 있는지 궁금합니다. 어떤 도움을 주시면 감사하겠습니다. 당신이 스키마 CSV를 읽으려면
"udf"에 대한 귀하의 제안은 실제로 도움이되었습니다. – Sam