2016-08-15 2 views
2

pyspark와 함께 Apache Spark 2.0을 사용하면 1000 행의 데이터 행을 포함하는 DataFrame을 가지며이 DataFrame을 2 개의 개별 DataFrames로 분할/분할하고자합니다. 임의의 씨앗이 충분하지 않습니다, 나는이 분할을 반복 의도 한대로 :Apache Spark의 데이터 프레임 분할

  • 첫 번째 DataFrame는 처음 750 개 행이
  • 두 번째 DataFrame 나머지 250 개 행을

주를 포함해야합니다 포함해야 메서드를 여러 번 사용하고 첫 번째 및 두 번째 DataFrame에 대해 어떤 데이터가 사용되는지 제어하려고합니다.

첫 번째 결과를 생성하는 데 유용한 take (n) 메서드를 발견했습니다.
하지만 두 번째 DataFrame을 얻으려면 올바른 방법 (또는 그 어떤 방법 으로든)을 찾을 수 없습니다.

올바른 방향으로 어떤 포인터가 크게 감사하겠습니다.

미리 감사드립니다.

업데이트 : 이제 take (n)을 다시 정렬하고 적용하여 해결책을 찾을 수있었습니다. 이것은 여전히 ​​비록 최적 솔루션 같은 느낌 : 당신은 그것을 드라이버에 데이터를 그립니다 다음 createDataFrame가 클러스터에서 그것을 재분배하기 때문에 테이크를 사용하여 질문하는 것이 옳다

# First DataFrame, simply take the first 750 rows 
part1 = spark.createDataFrame(df.take(750)) 
# Second DataFrame, sort by key descending, then take 250 rows 
part2 = spark.createDataFrame(df.rdd.sortByKey(False).toDF().take(250)) 
# Then reverse the order again, to maintain the original order 
part2 = part2.rdd.sortByKey(True).toDF() 
# Then rename the columns as they have been reset to "_1" and "_2" by the sorting process 
part2 = part2.withColumnRenamed("_1", "label").withColumnRenamed("_2", "features") 

답변

3

. 이는 비효율적이며 드라이버에 데이터를 저장할 메모리가 충분하지 않은 경우 실패 할 수 있습니다.

from pyspark.sql.functions import monotonicallyIncreasingId 

idxDf = df.withColumn("idx", monotonicallyIncreasingId()) 
part1 = idxDf.filter('idx < 750') 
part2 = idxDf.filter('idx >= 750') 
: 여기

해당에 행 인덱스 컬럼 슬라이스를 생성하는 해결책
관련 문제