2016-11-02 2 views
2

Spark은 rdd.saveAsObjectFile("file")으로 파일에 rdd를 저장할 수 있습니다. 스파크 밖에서이 파일을 읽어야합니다. doc에 따르면,이 파일은 기본 spark serializer를 사용하여 표준 Java 직렬화로 직렬화 된 일련의 객체입니다. 그러나, 파일을 헤더와 개체 사이에 구분 기호가있는 것 같아요. 이 파일을 읽고 jdeserialize을 사용하여 클래스 정의가 없으므로 각 Java/Scala 객체를 deserialize해야합니다.spark rdd.saveAsObjectFile의 파일 형식에 대한 설명서

rdd.saveAsObjectFile("file") (Kryo 시리얼 라이저가 아닌 표준 시리얼 라이저) 파일 형식에 대한 설명서는 어디에서 찾을 수 있습니까? VladoDemcak의 답변에 따라


업데이트 근무 예 : 그것은 매우 흥미로운 질문은

import org.apache.hadoop.io._ 
import org.apache.hadoop.conf._ 
import org.apache.hadoop.fs._ 
import org.apache.hadoop.io._ 

def deserialize(data: Array[Byte]) = 
    new ObjectInputStream(new ByteArrayInputStream(data)).readObject() 

val path = new Path("/tmp/part-00000") 
val config = new Configuration() 
val reader = new SequenceFile.Reader(FileSystem.get(new Configuration()), path, config) 
val key = NullWritable.get 
val value = new BytesWritable 

while (reader.next(key, value)) { 
    println("key: {} and value: {}.", key, value.getBytes) 
    println(deserialize(value.getBytes())) 
} 
reader.close() 
+1

https://gist.github.com/dportabella/dd8886ebb8d5f0eddd1196e1c30e34f6 –

답변

1

그래서 나는이 직원에 대해 알고있는 것을 설명하려고합니다. 당신은 saveAsObjectFile 만 문서를 확인하실 수 있습니다 나는 saveAsObjectFileSequenceFile을 생산하고 알 수 있도록 일부 세부 사항

/** 
    * Save this RDD as a SequenceFile of serialized objects. 
    */ 
    def saveAsObjectFile(path: String): Unit = withScope { 
    this.mapPartitions(iter => iter.grouped(10).map(_.toArray)) 
     .map(x => (NullWritable.get(), new BytesWritable(Utils.serialize(x)))) 
     .saveAsSequenceFile(path) 
    } 

is API javadoc에 대해 보았다.

압축되지 않은 키/값 기록 : 그리고 sequenceFile에 대한 설명서에 따라 그것은 3 가지 SequenceFile 형식이 있습니다

... version, classname, metadata와 헤더있다. 압축 된 키/값 레코드 기록 - '값'만 압축됩니다. 블록 압축 키/값 레코드 - 키와 값이 모두 '블록'으로 수집되고 압축됩니다. '블록'의 크기는 구성 가능합니다.

위의 모든 형식은 공통 헤더 (적절한 키/값 쌍을 반환하기 위해 SequenceFile.Reader에서 사용함)를 공유합니다.

시퀀스 파일을 읽으려면 hadoop SequenceFile.Reader 구현을 사용할 수 있습니다.

Path path = new Path("/hdfs/file/path/seqfile"); 
SequenceFile.Reader reader = new SequenceFile.Reader(FileSystem.get(new Configuration()), path, config); 
WritableComparable key = (WritableComparable) reader.getKeyClass().newInstance(); 
Writable value = (Writable) reader.getValueClass().newInstance(); 

while (reader.next(key, value)){ 
    logger.info("key: {} and value: {}.", key, value.getBytes()); 
    // (MyObject) deserialize(value.getBytes()); 
} 

reader.close(); 

나는이 테스트를하지만 당신은 당신의 질문에 발견 doc 링크를 기반으로하지 않은 : 기본적으로

, 스파크 루프 프레임 워크 그래서

을 자바의 ObjectOutputStream에 를 사용하여 객체를 직렬화하면 값을위한 바이트를 가져올 수 있고 ObjectInputStream

당신이 역 직렬화 방법에 라이브러리 (jdeserialize)를 사용할 필요가 귀하의 경우 10

- 나는 run(InputStream is, boolean shouldConnect)

+0

큰, 난이 내일 테스트 겠네 아침. 고마워! –

+0

'reader.getKeyClass()를 호출하면 런타임 오류가 발생합니다.newInstance()', 내 질문에 업데이트 된 참조하십시오. 어떤 생각? –

+1

그것은 작동합니다; 질문을 코드로 업데이트했습니다. 감사! –