2014-09-18 6 views
12

다음 java 프로그램이 apache spark를 시험하기 위해 작성되었습니다.아파치 스파크 작업을 실행하는 동안 작업이 직렬화 가능하지 않음

프로그램은 해당 파일에서 양수 및 음수 단어 목록을 읽고이를 마스터 파일과 비교하여 그에 따라 결과를 필터링합니다.

import java.io.Serializable; 
import java.io.FileNotFoundException; 
import java.io.File; 
import java.util.*; 
import java.util.Iterator; 
import java.util.List; 
import java.util.List; 
import org.apache.spark.api.java.*; 
import org.apache.spark.api.java.function.Function; 

public class SimpleApp implements Serializable{ 
    public static void main(String[] args) { 
    String logFile = "/tmp/master.txt"; // Should be some file on your system 
    String positive = "/tmp/positive.txt"; // Should be some file on your system 
    String negative = "/tmp/negative.txt"; // Should be some file on your system 

    JavaSparkContext sc = new JavaSparkContext("local[4]", "Twitter Analyzer", "/home/welcome/Downloads/spark-1.1.0/", new String[]{"target/scala-2.10/Simple-assembly-0.1.0.jar"}); 

    JavaRDD<String> positiveComments = sc.textFile(logFile).cache(); 

    List<String> positiveList = GetSentiments(positive); 
    List<String> negativeList= GetSentiments(negative); 

    final Iterator<String> iterator = positiveList.iterator(); 
    int i = 0; 
    while (iterator.hasNext()) 
    { 
     JavaRDD<String> numAs = positiveComments.filter(new Function<String, Boolean>() 
     { 
     public Boolean call(String s) 
     { 
      return s.contains(iterator.next()); 
     } 
     }); 

    numAs.saveAsTextFile("/tmp/output/"+ i); 
    i++; 
    } 

    } 

public static List<String> GetSentiments(String fileName) { 
    List<String> input = new ArrayList<String>(); 
try 
{ 
    Scanner sc = new Scanner(new File(fileName)); 

    while (sc.hasNextLine()) { 
     input.add(sc.nextLine()); 
    } 
} 
catch (FileNotFoundException e){ 
    // do stuff here.. 
} 
    return input; 
} 

} 

다음과 같은 오류가, 스파크 작업을 실행하는 동안

Exception in thread "main" org.apache.spark.SparkException: Task not serializable 
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) 
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) 
    at org.apache.spark.SparkContext.clean(SparkContext.scala:1242) 
    at org.apache.spark.rdd.RDD.filter(RDD.scala:282) 
    at org.apache.spark.api.java.JavaRDD.filter(JavaRDD.scala:78) 
    at SimpleApp.main(SimpleApp.java:37) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:606) 
    at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328) 
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) 
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 
Caused by: java.io.NotSerializableException: java.util.ArrayList$Itr 
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) 
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) 
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) 
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) 
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) 
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) 
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) 
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) 
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) 
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) 
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42) 
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73) 
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164) 
    ... 12 more 

모든 포인터를 발생합니다 ??

JavaRDD<String> numAs = positiveComments.filter(new Function<String, Boolean>() 
     { 
     public Boolean call(String s) 
     { 
      return s.contains(iterator.next()); 
     } 
     }); 

그것은으로 다시 작성됩니다 : 익명 클래스를 만들 때

답변

11

는, 컴파일러는 몇 가지 물건을 수행하는 반복자가 없기 때문에 당신이 NotSerializableException을 가질 수있는 이유

JavaRDD<String> numAs = positiveComments.filter(new Function<String, Boolean>() 
     { 
     private Iterator<...> $iterator; 
     public Boolean call(String s) 
     { 
      return s.contains($iterator.next()); 
     } 
     }); 

입니다 직렬화 가능.

String value = iterator.next(); 
JavaRDD<String> numAs = positiveComments.filter(new Function<String, Boolean>() 
     { 
     public Boolean call(String s) 
     { 
      return s.contains(value); 
     } 
     }); 
4

일부 자바 사실

1. Any anonymous class defined inside a outer class has reference to the outer class. 
    2. If the anonymous class needs to be serialized it will compel you to make the outer class serialized. 
    3. Inside the lambda function if one uses a method of the enclosing class , the class needs to be serialized , if the lambda function is being serialized. 

스파크에 대한 몇 가지 사실 : 그 피하려면

, 단순히 다음 이전의 결과를 추출합니다. 엄지 손가락의

1. On Same Executor multiple tasks can run at the same time in the same JVM as Tasks are spawned as threads in spark. 
2. Any lambda, Anonymous Class used with the spark Transformation function (map, mapPartitions, keyBy , redudeByKey …) will be instantiated on driver, serialized and sent to the executor. 
3. To serialize an object means to convert its state to a byte stream so that the byte stream can be reverted back into a copy of the object. 
4. A Java object is serializable if its class or any of its super class implements either the java.io.Serializable interface or its subinterface, java.io.Externalizable. 

규칙 직렬화 문제를 방지하려면 다음과 깊이 이해에 들어

1. Avoid using anonymous class , instead use static classes as anonymous class will force you to have the outer class serialized. 
    2. Avoid using static variables as a work around for serialization issue , as Multiple Task can run inside the same JVM and the static instance might not be thread safe. 
    3. Use Transient variables to avoid serialization issue , you will have to initialize them inside the function call and not Constructor. As on driver the constructor will be called , on Executor it will de-serialize and for the object . only way to initialize is inside the function call . 
    4. Use Static class in place of anonymous class. 
    5. Religiously follow ” attaching implements Serializable ” only for the classes which only needs to be serialized 
    6. Inside a “lambda function” never refer to outclass method directly , as this will lead to serialization of outer class. 
    7. Make methods static if it needs to be used within Lambda function directly , else use Class::func() notion but not func() directly 
    8. Java Map<> doesn’t implement Serializable but HashMap does . 
    9. Be wise when deciding over using Braodcast vs Raw DataStructures. If you see a real benefit then only use Broadcast. 

http://bytepadding.com/big-data/spark/understanding-spark-serialization/

관련 문제