2014-11-09 4 views
1

HBase에서 읽고 쓰는 데 Spyglass로 Scalding을 사용하고 있습니다.HBase Scalding 작업에서 가져 오기/검색

저는 테이블 1과 테이블 2의 왼쪽 외부 조인을 수행하고 열을 변환 한 후 table1에 다시 쓰려고합니다. table1과 table2는 모두 Spyglass HBaseSource로 선언됩니다.

잘 작동합니다. 그러나 rowkey를 사용하여 table1의 다른 행에 액세스하여 변환 된 값을 계산해야합니다. 이 링크에서 언급 한 바와 같이 나는 끓는 작업의 구성에 대한 액세스 권한을 얻고있다 val hTable = new HTable(conf, TABLE_NAME) val result = hTable.get(new Get(rowKey.getBytes()))

을 : 내가 끓는을 실행할 때

https://github.com/twitter/scalding/wiki/Frequently-asked-questions#how-do-i-access-the-jobconf

이 작동

는 내가 얻을 HBase를 위해 다음과 같은 노력 작업을 로컬로 수행합니다. 하지만 클러스터에서 실행하면이 코드가 Reducer에서 실행될 때 conf가 null입니다.

이런 경우에 대해 HBase에서 스캐닝/캐스케이드 작업을 수행하는 더 좋은 방법이 있습니까? 당신은 관리 자원을 사용할 수 있습니다

1)이 작업을 수행하는

답변

0

방법은 ...

class SomeJob(args: Args) extends Job(args) {  
    val someConfig = HBaseConfiguration.create().addResource(new Path(pathtoyourxmlfile)) 
    lazy val hPool = new HTablePool(someConfig, 3) 

    def getConf = { 
    implicitly[Mode] match { 
     case Hdfs(_, conf) => conf 
     case _ => whateveryou are doing for a local conf... 
    } 
    } 
    ... somePipe.someOperation.... { 
     val gets = key.map { key => new Get(key) } 
     managed(hPool.getTable("myTableName")) acquireAndGet { table => 
      val results = table.get(gets) 
      ...do something with these results 
     } 
    }  
} 

2) 당신은 당신이 그 안에 사용자 정의 방식과 쓰기 좀 더 구체적인 계단식 코드를 사용할 수 있습니다 소스 메소드와 필요에 따라 다른 메소드를 대체합니다. 거기에서 당신은 JobConf에 다음과 같이 접근 할 수 있습니다 :

class MyScheme extends Scheme[JobConf, SomeRecordReader, SomeOutputCollector, ..] { 

    @transient var jobConf: Configuration = super.jobConfiguration 

    override def source(flowProcess: FlowProcess[JobConf], ...): Boolean = { 
    jobConf = flowProcess match { 
    case h: HadoopFlowProcess => h.getJobConf 
    case _ => jconf 
    } 

    ... dosomething with the jobConf here 

} 

} 
관련 문제