2016-11-28 1 views
3

저는 200 차원 단위로 매우 중요한 개별 용어 (~ 100k)를 가진 Word2Vec 모델을 교육하고 있습니다.Spark Word2VecModel이 저장을위한 최대 RPC 크기를 초과합니다.

스파크의 전형적인 W2V 모델은 현재 각 단어의 벡터로 주로 구성되는 메모리 사용량을 합한 것입니다 (numberOfDimensions*sizeof(float)*numberOfWords). 수학을해라. 위의 크기는 100MB로주고 받는다.
여전히 토크 나이저를 사용하고 있으며 최적의 벡터 크기를 유지하면서 여전히 75k-150k 단어와 100-300 크기의 사전을 계산하고 있으므로 모델이 ~ 500MB에 도달 할 수 있다고 가정 해 보겠습니다. .

이제이 모델을 저장할 때까지 모든 것이 정상입니다. 현재 점화 방식이 구현된다 :

override protected def saveImpl(path: String): Unit = { 
    DefaultParamsWriter.saveMetadata(instance, path, sc) 
    val data = Data(instance.wordVectors.wordIndex, instance.wordVectors.wordVectors.toSeq) 
    val dataPath = new Path(path, "data").toString 
    sparkSession.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath) 
} 

즉 1 행의 dataframe이 생성되고, 모든 벡터의 어레이에 큰 F (L)를 포함하는 행. 데이터 프레임은 마루로 저장됩니다. 괜찮 으면 ... 집행자에게 보내야 만 ... 클러스터 모드에서 수행 할 작업. (당신이 불꽃 쉘 로컬로하지만, 클러스터에 제공 할 필요가 없습니다)

16/11/28 11:29:00 INFO scheduler.DAGScheduler: Job 3 failed: parquet at Word2Vec.scala:311, took 5,208453 s 
16/11/28 11:29:00 ERROR datasources.InsertIntoHadoopFsRelationCommand: Aborting job. 
org.apache.spark.SparkException: Job aborted due to stage failure: 
    Serialized task 32:5 was 204136673 bytes, 
    which exceeds max allowed: spark.rpc.message.maxSize (134217728 bytes). 
    Consider increasing spark.rpc.message.maxSize or using broadcast variables for large values. 
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1454) 

간단한 코드가 재현 :

이 지금과 같은 스택 추적과 함께 작업을 날려 버린다 :

object TestW2V { 

def main(args: Array[String]): Unit = { 
    val spark = SparkSession.builder().appName("TestW2V").getOrCreate() 
    import spark.implicits._ 

    // Alphabet 
    val randomChars = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTYVWXTZ".toCharArray 
    val random = new java.util.Random() 

    // Dictionnary 
    def makeWord(wordLength: Int): String = new String((0 until wordLength).map(_ => randomChars(random.nextInt(randomChars.length))).toArray) 
    val randomWords = for (wordIndex <- 0 to 100000) // Make approx 100 thousand distinct words 
        yield makeWord(random.nextInt(10)+5) 

    // Corpus (make it fairly non trivial) 
    def makeSentence(numberOfWords: Int): Seq[String] = (0 until numberOfWords).map(_ => randomWords(random.nextInt(randomWords.length))) 
    val allWordsDummySentence = randomWords // all words at least once 
    val randomSentences = for (sentenceIndex <- 0 to 100000) 
         yield makeSentence(random.nextInt(10) +5) 
    val corpus: Seq[Seq[String]] = allWordsDummySentence +: randomSentences 

    // Train a W2V model on the corpus 
    val df = spark.createDataFrame(corpus.map(Tuple1.apply)) 
    import org.apache.spark.ml.feature.Word2Vec 
    val w2v = new Word2Vec().setVectorSize(250).setMinCount(1).setInputCol("_1").setNumPartitions(4) 
    val w2vModel = w2v.fit(df) 
    w2vModel.save("/home/Documents/w2v") 

    spark.stop 
} 
} 

지금 ... 나는이 발생하는 이유 을 이해하는 것 같아요, 내부는 충분히 이해합니다. 질문은 다음과 같습니다

  • 나는
  • 나는 주위 어떻게 일할 수있는 (? 내 API 사용이 올바른지)이 권리를하고 있는가? spark.mllib.feature.Word2VecModel ("deprecated"RDD 기반 1.x 버전)에는 필자가 수동으로 작업 할 수있는 공용 생성자가 있으며, 적절하게 분할 저장 /로드 구현을 롤백합니다. 그러나 새로운 spark.ml.feature.Word2VecModel은 내가 볼 수있는 공용 생성자를 제공하지 않습니다.
  • 스파크 기고가가이 방법으로 오는 경우 : 버그/가능한 개선으로 간주 되나요? https://issues.apache.org/jira/browse/SPARK-11994 (1.x에서의 API에 대한 인), 나는 그들이 2.0 API에 한 번 확인했다 생각하고, 내가 뭔가 잘못 :-)를하고 있어요 :

이 JIRA 고정 스파크 팀을 고려.

내가 로컬 모드에서 실행할 수 있고 최종 작업 직렬화를 피할 수 있다고 생각하지만 프로덕션 수준 (데이터 접근성 및 모두 ...)에서는 불가능한 최상의 임시 솔루션입니다. 또는 RPC 크기를 512MB로 균열하십시오.

PS : 위의 사항은 Spark 2.0.1 및 스파크 독립 실행 형 클러스터 (로컬 모드에서는 재생성 불가)에서 발생합니다.
저는 보통 이런 종류의 메시지를 사용자 메일 링리스트에 올리 겠지만, Spark encourages the use of SO을보고 있습니다 ...

답변

0

나는 당신과 똑같은 경험을 가지고 있습니다. 로컬에서는 잘 작동하지만 클러스터 모드에서는 RPC 크기를 512MB로 높이 지 않고 죽습니다.

spark.rpc.message.maxSize=512을 전달하면 나를 알아 듣습니다.

그리고 저장 구현이 의심 스러우며 특히 repartition(1) 비트로 의심됩니다.

관련 문제