2014-05-12 4 views
1

나는 스파크 & 스칼라 초보자입니다. 이 직렬화 내 개체 (myScalaObject)으로 파일을 생성하기위한 의사 코드는Spark RDD에서 Kryo 파일 읽기

import com.esotericsoftware.kryo.Kryo 
import com.esotericsoftware.kryo.io.Output 

val kryo:Kryo = new Kryo() 
val output:Output = new Output(new FileOutputStream("filename.ext",true)) 

//kryo.writeObject(output, feed) (tested both line) 
kryo.writeClassAndObject(output, myScalaObject) 

:

나는 읽고는 Kryo 내 스칼라 코드로 작성했다고 스파크에서 파일을 분석 직렬화 필요 , 그것은 복잡한 객체입니다.

파일은 잘 쓰는 것 같다,하지만 난 스파크 RDD에서 스파크에서

의사 코드를 읽을 때 나는 문제가 : 나는이 오류가 그것을 실행하려고하면

val conf = new SparkConf() 
    .setMaster("local") 
    .setAppName("My application") 
    .set("spark.executor.memory", "1g") 


conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") 
conf.set("spark.kryo.registrator", "myScalaObject") 

val sc = new SparkContext(conf) 

val file=sc.objectFile[myScalaObject]("filename.ext") 

val counts = file.count() 

를 :

org.apache.spark.SparkException : 작업이 중단 : 작업 0.0 : 0 실패 1 회 (대부분의 실패 최근 : 예외 실패 : 때 java.io.IOException : 파일 : 여기서 filename.ext 아닌 SequenceFile)

Spark에서이 유형의 파일을 읽을 수 있습니까?

이 솔루션을 사용할 수없는 경우 복잡한 파일 구조를 만들어 Spark에서 읽을 수있는 좋은 해결책은 무엇입니까?

+1

'objectFile'는 직렬화 된 객체를 포함하는 SequenceFile로 저장된'RDD'를로드하는 데 사용됩니다. 각 파일을 별도의 파티션으로 읽어들이려면 파일 이름을 별도의 파티션으로 병렬 처리하십시오. 왜 Kryo를 사용하여 객체를 읽고 '병렬'을 사용하여 'RDD'를 생성하는 것이 좋을까요? – zsxwing

+0

@zsxwing 감사합니다. 아주 좋은 생각입니다. 시도해보십시오. 그러나 나는 많은 작은 (5-20mb) 파일의 내용을 병렬 처리하고 싶지 않습니다. 파일 이름을 병렬 처리하는 방법이 있습니까? 그런 다음 각 서버가 파일을 읽습니다. – faster2b

+1

파일 이름으로 RDD를 만들고'map'으로 내용을 읽으시겠습니까? – zsxwing

답변

2

당신이 objectFile 읽을 saveAsObjectFile으로 데이터를 기록 할 경우 감사합니다. zsxwing 말한대로

val myObjects: Seq[MyObject] = ... 
val rddToSave = sc.parallelize(myObjects) // Or better yet: construct as RDD from the start. 
rddToSave.saveAsObjectFile("/tmp/x") 
val rddLoaded = sc.objectFile[MyObject]("/tmp/x") 

은 또는, 당신은 파일 이름의 RDD를 만들고 각의 내용을 읽어 map를 사용할 수 있습니다.

def loadFiles(filenames: Seq[String]): RDD[Object] = { 
    def load(filename: String): Object = { 
    val input = new Input(new FileInputStream(filename)) 
    return kryo.readClassAndObject(input) 
    } 
    val partitions = filenames.length 
    return sc.parallelize(filenames, partitions).map(load) 
} 
+1

이제'sc.wholeTextFiles'를 사용할 수 있습니다. 언젠가는 대답을 업데이트해야합니다. –

관련 문제