2017-05-18 1 views
1

나는 다음과 같은 스키마 불꽃 Dataframe df이 다음과 같은 스키마를 얻을 기대 : 지금까지불꽃 Dataframe [벡터]

root 
    |-- features: vector (nullable = true) 

내가 (이 게시물의 영향 : Converting Spark Dataframe(with WrappedArray) to RDD[labelPoint] in scala) 다음 코드 조각을 가지고 있지만 나는 두려워 뭔가 내가 잘못이다 왜냐하면 합리적인 양의 행을 계산하는 데 매우 긴 시간이 걸리기 때문입니다. 또한 행이 너무 많으면 응용 프로그램이 힙 공간 예외로 인해 충돌합니다.

val clustSet = df.rdd.map(r => { 
      val arr = r.getAs[mutable.WrappedArray[Double]]("features") 
      val features: Vector = Vectors.dense(arr.toArray) 
      features 
      }).map(Tuple1(_)).toDF() 

이 경우 지침 arr.toArray이 좋지 않은 것으로 생각됩니다. 모든 설명이 도움이 될 것입니다.

감사합니다.

답변

4

.rdd은 내부 메모리 포맷에서 객체를 비 직렬화해야하기 때문에 시간이 많이 걸립니다.

.toArray을 사용하는 것이 좋습니다. 드라이버 노드에 모든 것을 수집하지 않고 행 수준에서 작동하고 있습니다.

당신은 UDF를 가진이 아주 쉽게 할 수

:

import org.apache.spark.ml.linalg._ 
val convertUDF = udf((array : Seq[Double]) => { 
    Vectors.dense(array.toArray) 
}) 
val withVector = dataset 
    .withColumn("features", convertUDF('features)) 

코드이 응답에서입니다 : Convert ArrayType(FloatType,false) to VectorUTD

그러나 질문이 저자는

+0

매우 감사 차이에 대해 물어 보지 않았다 , 그것은 많은 도움을 주었고 답으로 표시했습니다. 지금은 더 많은 행을 실행할 수 있으며 시간면에서 만족 스럽습니다. 예외는 있지만 여전히 발생합니다. __org.apache.spark.SparkException : Kryo serialization failed : 버퍼 오버플로입니다. 사용 가능 : 0, 필수 : ​​200,000 행을 시도 할 때 1__ 이것에 대해 통찰력을 가지겠습니까? 다시 한번 감사드립니다. – user159941

+0

@ user159941 http://stackoverflow.com/questions/31947335/how-kryo-serializer-allocates-buffer-in-spark를 확인하십시오. –

+1

다음 코드를 설정하십시오. ** val conf = new SparkConf() . set ("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .set ("spark.kryoserializer.buffer.max.mb", "256") ** 그리고 작동했습니다! 고맙습니다. – user159941