2015-01-05 2 views
1

많은 사람들이 스파크 아트가 정렬 및 분산 컴퓨팅에 뛰어나다 고 말합니다. 현재 팀은 스파크와 스칼라에 대한 연구를하고 있습니다. 우리는 spark에 정렬 서비스를 구현하려고합니다. 지금은 스파크 클러스터를 설정하고 스파크 클러스터에서 예제를 실행하고 정렬하려고했지만 정렬에 드는 시간은 오래 걸리는 것처럼 보입니다. 여기 내 코드가있다. 스파크 클러스터로 위의 코드를 배포 한 후스파크 정렬이 스칼라 원본 정렬 방법보다 느린 이유

import org.apache.spark.{SparkConf, SparkContext} 

import scala.collection.mutable.ListBuffer 
import scala.util.Random 

/** 
* Created by on 1/1/15. 
*/ 
object AdvancedSort { 
    /** 
    * bin/spark-submit --master spark://master:7077 --executor-memory 1024M --class com.my.sortedspark.AdvancedSort lib/sortedspark.jar 100000 3 
    * @param args 
    */ 
    def main(args: Array[String]) { 
    val sampleSize = if (args.length > 0) args(0).toInt else 100000 
    val slice = if (args.length > 1) args(1).toInt else 3 

    sort(sampleSize, slice) 
    } 

    def sort(listSize: Int, slice: Int): Unit = { 
    val conf = new SparkConf().setAppName(getClass.getName) 
    val spark = new SparkContext(conf) 
    val step1 = System.currentTimeMillis() 
    val data = genRandom(listSize) 
    val step2 = System.currentTimeMillis() 
    println(">>>>>>>>>> genRandom : " + (step2 - step1)) 

    val distData = spark.parallelize(data, slice) 
    val step3 = System.currentTimeMillis() 
    println(">>>>>>>>>> parallelize : " + (step3 - step2)) 

    val result = distData.sortBy(x => x, true).collect 
    val step4 = System.currentTimeMillis() 
    println(">>>>>>>>>> sortBy and collect: " + (step4 - step3)) 
    println(">>>>>>>>>> total time : " + (step4 - step1)) 

    printlnArray(result, 0, 10) 

    spark.stop() 
    } 

    /** 
    * generate random number 
    * @return 
    */ 
    def genRandom(listSize: Int): List[Int] = { 
    val range = 100000 
    var listBuffer = new ListBuffer[Int] 
    val random = new Random() 
    for (i <- 1 to listSize) listBuffer += random.nextInt(range) 
    listBuffer.toList 
    } 

    def printlnList(list: List[Int], start: Int, offset: Int) { 
    for (i <- start until start + offset) println(">>>>>>>>> list : " + i + " | " + list(i)) 
    } 

    def printlnArray(list: Array[Int], start: Int, offset: Int) { 
    for (i <- start until start + offset) println(">>>>>>>>> list : " + i + " | " + list(i)) 
    } 
} 

, 나는 마스터의 스파크 홈에서 다음 명령을 실행

bin/spark-submit --master spark://master:7077 --executor-memory 1024M --class com.my.sortedspark.AdvancedSort lib/sortedspark.jar 100000 3 

다음

내가 마지막으로 가지고 비용 시간입니다. 내 로컬 컴퓨터에 스칼라의 정렬 방법을 통해 지능의 100000 개 임의의 데이터를 실행하는 경우, 비용 시간이 빨리 스파크의 때문에

>>>>>>>>>> genRandom : 86 
>>>>>>>>>> parallelize : 53 
>>>>>>>>>> sortBy and collect: 6756 

이, 이상한 보인다.

import scala.collection.mutable.ListBuffer 
import scala.util.Random 

/** 
* Created by on 1/5/15. 
*/ 
object ScalaSort { 
    def main(args: Array[String]) { 
    val list = genRandom(1000000) 
    val start = System.currentTimeMillis() 
    val result = list.sorted 
    val end = System.currentTimeMillis() 
    println(">>>>>>>>>>>>>>>>>> cost time : " + (end - start)) 
    } 

    /** 
    * generate random number 
    * @return 
    */ 
    def genRandom(listSize: Int): List[Int] = { 
    val range = 100000 
    var listBuffer = new ListBuffer[Int] 
    val random = new Random() 
    for (i <- 1 to listSize) listBuffer += random.nextInt(range) 
    listBuffer.toList 
    } 
} 

제 생각에는 로컬 컴퓨터

>>>>>>>>>>>>>>>>>> cost time : 169 

에 스칼라의 정렬 방법, 다음과 같은 요인 의상 불꽃의 정렬 시간 비용 시간 : 마스터와 노동자

사이

  1. 데이터 transfor

  2. 작업자 정렬은 병합이 느려질 수 있습니다.

스파크 마스터는 이것이 일어나는 이유를 알고 있습니까?

+6

100000 요소는 작습니다. 당신이 말하는 것처럼 메모리 내 정렬은 오버 헤드로 인해 병렬 버전을 이길 것입니다. 제대로 큰 배열을 사용해보십시오. –

+0

안녕하세요 폴, 더 큰 배열을 시도 스파크의 장점을 보여줍니다. 하지만 다른 질문이 있습니다. 100ms로 비용 시간을 조정할 수 있습니까? – Chan

+0

"비용 시간 조정"이란 무엇을 의미하는지 이해할 수 없습니다. 스파크는 큰 문제를 의미합니다. 큰 문제가 없다면 Spark를 사용하지 않거나 더 빠르지 않을 때 놀라지 마십시오. –

답변

0

스파크는 BigData 용입니다. 작은 숫자를 삽입하면 모든 코어/클러스터 분배가 정상적으로 정렬하는 것보다 시간이 오래 걸리기 때문에 속도가 느려집니다. 더 큰 데이터를 사용하거나 Scark에서 Spark 대신 Parallation을 사용하십시오.

collection.par.<any code here>