2017-03-10 1 views
0

저는 Apache Spark의 새로운 버전입니다. Spark RDD 및 DataFrames에 CSV 파일을로드하려고합니다.RDD에서 스키마를 시행하면서 DataFrame으로 변환 중

저는 RDD를 사용하여 데이터를 조작하고 데이터 프레임을 SQL과 같은 방식으로 데이터 프레임에서 조작합니다.

RDD를 Spark DataFrame으로 변환하는 중에 문제가 발생합니다. 문제는 다음과 같습니다.

# to load data 
dataRDD = sc.textFile(trackfilepath) 
# To use it as a csv  
dataRDD = testData.mapPartitions(lambda x: csv.reader(x)) 
# To load into data frame and capture the schema 
dataDF = sqlContext.read.load(trackfilepath, 
         format='com.databricks.spark.csv', 
         header='true', 
         inferSchema='true') 
schema = dataDF.schema 

는 데이터 보이는

print (dataRDD.take(3)) 
[['Name', 'f1', 'f2', 'f3', 'f4'], ['Joe', '5', '7', '8', '3'], ['Jill', '3', '2', '2', '23']] 

print (dataDF.take(3)) 
[Row(_c0='Name', _c1='f1', _c2='f2', _c3='f3', _c4='f4'), Row(_c0='Joe', _c1='5', _c2='7', _c3='8', _c4='3'), Row(_c0='Jill', _c1='3', _c2='2', _c3='2', _c4='23')] 

print schema 
StructType(List(StructField(Name,StringType,true),StructField(f1,IntegerType,true),StructField(f2,IntegerType,true),StructField(f3,IntegerType,true),StructField(f4,IntegerType,true))) 

데이터 조작

def splitWords(line): 
    return ['Jillwa' if item=='Jill' else item for item in line] 

dataCleanRDD = dataRDD.map(splitWords) 

문제점 등 :

이제

나는 아래의 코드를 사용하여 DataFrame로 조작 RDD를 저장하는 것을 시도하고있다 및 스키마. 때문에 RDD 및 스키마에있는 값의 데이터 유형의 불일치로

TypeError: IntegerType can not accept object 'f1' in type <class 'str'> 

오류는 다음과 같습니다

dataCleanDF = sqlContext.createDataFrame(dataCleanRDD, schema=schema) 

이 나에게 아래의 오류를 제공합니다. RDD는 모든 것을 String으로 취급하고 스키마는 field1 field2에 정수를 갖습니다. 이것은 더미 데이터 세트입니다. 실제 데이터 세트는 200 개의 열과 100,000 개의 행으로 구성됩니다. 따라서 RDD 값을 정수로 수동으로 변경하기가 어렵습니다.

RDD 값에 스키마를 강제 적용하는 방법이 있는지 궁금합니다. 어떤 도움을 주시면 감사하겠습니다. 당신이 스키마 CSV를 읽으려면

답변

1

, 내가 좋아하는 뭔가를 제안 : 당신이 당신에게 스키마와 데이터를해야합니다 그래서

df = sqlContext.read.format("com.databricks.spark.csv") 
    .schema(dataSchema) 
      .option("header", "false") 
      .option("delimiter", ",") 
      .option("inferSchema", "true") 
      .option("treatEmptyValuesAsNulls", "true") 
      .option("nullValue", "null") 
      .load("data.csv") 

당신은 그들에 대신 열지도의 사용과의 작동 할 수 있습니다 그 안에 udf가 있으므로 열 이름이 항상 있습니다.

또한 큰 데이터 집합이있는 경우 먼저 마루 또는 ORC 형식으로 저장 한 다음 다시 읽은 다음 작업을 수행하면 많은 오류가 저장되어 성능이 매우 높아집니다.

+0

"udf"에 대한 귀하의 제안은 실제로 도움이되었습니다. – Sam

관련 문제