2014-02-18 5 views
3

Spark와 Scala를 처음 사용했습니다. CSV 형식으로 광고 이벤트 로그 파일을 만든 다음 pkzip을 사용하여 압축합니다. Java를 사용하여 압축 파일을 압축 해제하는 방법에 대한 많은 예제를 보았습니다. 그러나 Spark에서 Scala를 사용하면 어떻게됩니까? 궁극적으로 우리는 들어오는 각 파일의 데이터를 Hbase 대상 테이블로 가져 와서 추출하고로드하려고합니다. 어쩌면 이것이 HadoopRDD로 할 수 있을까요? 이 후, 우리는이 파일을보기 위해 Spark 스트리밍을 도입 할 것입니다.Spark/Scala 압축 된 CSV 파일 열기

덕분에, 벤

답변

4

는 스파크에서, 파일 (예에 .gz gzip으로 압축을위한) 올바른 파일 이름 접미사를 제공하고, 그런 다음 당신은 그냥

sc.textFile(path) 

UPDATE를 사용할 수 org.apache.hadoop.io.compress.CompressionCodecFactory 지원하는 것 : 쓰는 중 Hadoop bzip2 라이브러리의 버그입니다. 즉, ArrayIndexOutOfBounds와 같은 이상한 예외에서 spark 결과를 사용하여 bzip2 파일을 읽으려고 시도하는 것을 의미합니다. 당신이 스파크에서 사용 가능한 기본 (하둡)입니다 압축 형식을 사용하는 경우

+0

내게는 효과가 없습니다. 나는 zip 파일 (.zip 확장자를 가짐)을 가지고 있고'sc.textFile (path)'가 예외를 던진다. – mgaido

0

기본 압축 지원

@samthebest 대답은 올바른 것입니다. 나는 내 다른 대답에 깊은이 주제를 설명했다

  • 의 bzip2
  • lz4
  • 물어 GZIP :있는 https://stackoverflow.com/a/45958182/1549135

    읽기

    을 압축 그러나, 사용자 지정 솔루션을 만드는 데 필요한 zip 파일을 읽으려고합니다. 하나는 이미 제공 한 대답에 언급되어 있습니다. 샘플에서처럼 PortableDataStream을 압축 해제 나중에 sc.binaryFiles 등을 사용하여, 모든 시간 기본적으로 https://stackoverflow.com/a/45958458/1549135

    : 당신이 당신의 아카이브에서 여러 개의 파일을 읽어해야하는 경우

    , 당신은 내가 제공 한 대답에 관심이있을 수 있습니다 :

    sc.binaryFiles(path, minPartitions) 
        .flatMap { case (name: String, content: PortableDataStream) => 
        val zis = new ZipInputStream(content.open) 
        Stream.continually(zis.getNextEntry) 
          .takeWhile(_ != null) 
          .flatMap { _ => 
           val br = new BufferedReader(new InputStreamReader(zis)) 
           Stream.continually(br.readLine()).takeWhile(_ != null) 
          }