2014-04-21 4 views
4

Java Spark API를 사용하여 일부 테스트 응용 프로그램을 작성하고 있습니다. serializable 인터페이스를 확장하지 않는 클래스를 사용하고 있습니다. 그래서 응용 프로그램을 작동 시키려면 kryo serializer를 사용하여 클래스를 직렬화합니다. 하지만 내가 디버깅하는 동안 관찰 한 문제는 디 직렬화 중에 반환 된 클래스 객체는 null이되어 결국 null 포인터 예외을 던졌습니다. 상황이 잘못되었지만 확실하지 않은 폐쇄 문제 인 것 같습니다. 이러한 직렬화에 익숙하지 않아서 어디에서 파기를 시작할 지 모르겠습니다. 내가 직렬화하고있는 클래스apache spark에서 클래스 (작업 객체)의 kryo 직렬화가 역 직렬화 중에 null을 반환합니다.

다음
package org.apache.spark.examples; 


import java.io.FileWriter; 
import java.io.IOException; 
import java.io.PrintWriter; 
import java.net.InetAddress; 
import java.net.UnknownHostException; 

import org.apache.spark.SparkConf; 
import org.apache.spark.api.java.JavaRDD; 
import org.apache.spark.api.java.JavaSparkContext; 
import org.apache.spark.api.java.function.Function; 




/** 
* Spark application to test the Serialization issue in spark 
*/ 
public class Test { 

    static PrintWriter outputFileWriter; 
    static FileWriter file; 
    static JavaSparkContext ssc; 

    public static void main(String[] args) { 


     String inputFile = "/home/incubator-spark/examples/src/main/scala/org/apache/spark/examples/InputFile.txt"; 

     String master = "local"; 
     String jobName = "TestSerialization"; 
     String sparkHome = "/home/test/Spark_Installation/spark-0.7.0"; 
     String sparkJar = "/home/test/TestSerializationIssesInSpark/TestSparkSerIssueApp/target/TestSparkSerIssueApp-0.0.1-SNAPSHOT.jar"; 


     SparkConf conf = new SparkConf(); 
     conf.set("spark.closure.serializer","org.apache.spark.serializer.KryoSerializer"); 
     conf.set("spark.kryo.registrator", "org.apache.spark.examples.MyRegistrator"); 
     // create the Spark context 
     if(master.equals("local")){ 
      ssc = new JavaSparkContext("local", jobName,conf); 
      //ssc = new JavaSparkContext("local", jobName); 
     } else { 
      ssc = new JavaSparkContext(master, jobName, sparkHome, sparkJar); 
     } 
     JavaRDD<String> testData = ssc.textFile(inputFile).cache(); 
     final NotSerializableJavaClass notSerializableTestObject= new NotSerializableJavaClass("Hi "); 
     @SuppressWarnings({ "serial", "unchecked"}) 
     JavaRDD<String> classificationResults = testData.map(
       new Function<String, String>() { 
        @Override 
        public String call(String inputRecord) throws Exception {     
         if(!inputRecord.isEmpty()) { 
          //String[] pointDimensions = inputRecord.split(","); 
          String result = ""; 

          try { 
           FileWriter file = new FileWriter("/home/test/TestSerializationIssesInSpark/results/test_result_" + (int) (Math.random() * 100)); 
           PrintWriter outputFile = new PrintWriter(file); 
           InetAddress ip; 
           ip = InetAddress.getLocalHost(); 
           outputFile.println("IP of the server: " + ip); 

           result = notSerializableTestObject.testMethod(inputRecord); 
           outputFile.println("Result: " + result); 

           outputFile.flush(); 
           outputFile.close(); 
           file.close(); 

          } catch (UnknownHostException e) { 
           e.printStackTrace(); 
          } 
          catch (IOException e1) { 
           e1.printStackTrace(); 
          } 

          return result; 
         } else { 
          System.out.println("End of elements in the stream."); 
          String result = "End of elements in the input data"; 
          return result; 
         } 
        } 

       }).cache(); 

     long processedRecords = classificationResults.count(); 

     ssc.stop(); 
     System.out.println("sssssssssss"+processedRecords); 
    } 
} 

인 KryoRegistrator 클래스 다음

package org.apache.spark.examples; 

import org.apache.spark.serializer.KryoRegistrator; 

import com.esotericsoftware.kryo.Kryo; 

public class MyRegistrator implements KryoRegistrator { 
    public void registerClasses(Kryo kryo) { 
     kryo.register(NotSerializableJavaClass.class); 
    } 
} 

됩니다 :

package org.apache.spark.examples; 

public class NotSerializableJavaClass { 
    public String testVariable; 

    public NotSerializableJavaClass(String testVariable) { 
     super(); 
     this.testVariable = testVariable; 
    } 

    public String testMethod(String vartoAppend){ 
     return this.testVariable + vartoAppend; 
    } 
} 
다음

내가 테스트입니다 코드입니다
+0

내가 알아 차 렸던 몇 가지; NotSerializableJavaClass에는 인수가없는 생성자가 없습니다. Kryo는 그런 것을 좋아하지 않습니다. 또한 전문가는 아니지만 직렬화가 비정상적으로 보입니다. 나는'Kryo kryo = 새로운 Kryo(); kryo.writeClassAndObject (output, objectToSerialise); 또는 비슷하게 거기에 대해 걷어차 기 –

+0

[arg가없는 생성자 객체를 serialize하는 것이 가능하지만 쉽지는 않습니다] (http://code.google.com/p/kryo/issues/detail? id = 5) –

+1

안녕하세요. Richard, Spark API에 내장 된 kryo API가 구현되어 있습니다.이 API는 'Kryo kryo = new Kryo(); kryo.writeClassAndObject (output, objectToSerialise);'. 하지만 앞으로 어떻게해야합니까? no arg 생성자가이 문제를 극복하는 데 도움이됩니까? –

답변

1

spark.closure.serializer은 Java serializer 만 지원하기 때문입니다. http://spark.apache.org/docs/latest/configuration.htmlspark.closure.serializer

+0

사실, 자바 클래스를 Kryo에 등록하면 java 시리얼 라이저가 사용됩니다. 구아바의 BiMap으로이 작업을 수행했습니다. BiMap을 등록하지 않으면 작업을 직렬화하지 못했습니다. – pferrel

+0

그래서 다른 문제로 인해 클로저에서 작업을 직렬화 할 수 없습니다. 클로저에서 외부 scala 라이브러리를 사용하고 있습니다. 어떤 클래스가 문제를 일으키는 지 어떻게 알 수 있습니까? 전이 의존성이있을 수 있습니다. – pferrel

+0

클로저에서 외부 스칼라 라이브러리를 사용하면 클로저에서 외부 스칼라 라이브러리를 사용하지 않아도됩니다. 디버그의 경우 Java 옵션'-Dsun.io.serialization.extendedDebugInfo = true'를 사용할 수 있습니다. – zsxwing

관련 문제