2016-08-02 2 views
1

스트리밍 데이터를 찾고 향상시키는 Spark 스트림 코드의 HBase 데이터를 읽고 싶습니다. spark-hbase-connector_2.10-1.0.3.jar을 사용하고 있습니다. 내 코드에서스칼라에서 HBase 읽기 - it.nerdammer

다음 줄은

val docRdd = 
    sc.hbaseTable[(Option[String], Option[String])]("hbase_customer_profile") 
    .select("id","gender").inColumnFamily("data") 

docRdd.count 오른쪽 수를 반환 성공한 것입니다.

docRdd

유형의

HBaseReaderBuilder (org.apache.spark.SparkContext @ 3a49e5, hbase_customer_profile, 일부 (데이터), WrappedArray (ID, 성), 없음, 없음,()리스트)

입니다

id, gender 열의 모든 행을 어떻게 읽을 수 있습니까? 또한 docRdd을 데이터 프레임으로 변환하여 SparkSQL을 사용할 수 있습니다. 내가 행 키를 추가 한

case class Customer(rowKey: String, id: Option[String], gender: Option[String]) 

:

답변

1

당신은 DataFrameRDD을 변환하려면

docRdd.collect().foreach(println) 

를 사용하여 RDD에서 모든 행을 읽을 수 있습니다 당신은 케이스 클래스를 정의 할 수 있습니다 케이스 클래스에; 그건 꼭 필요한 것은 아니므로 필요하지 않으면 생략 할 수 있습니다.

그런 다음 mapRDD 오버 :

: 다음

// Row key, id, gender 
type Record = (String, Option[String], Option[String]) 

val rdd = 
    sc.hbaseTable[Record]("customers") 
    .select("id","gender") 
    .inColumnFamily("data") 
    .map(r => Customer(r._1, r._2, r._3)) 

와 - - 케이스 클래스를 기반으로 RDDDataFrame

import sqlContext.implicits._ 
val df = rdd.toDF() 
df.show() 
df.printSchema() 

spark-shell의 출력은 다음과 같습니다로 변환

scala> df.show() 
+---------+----+------+ 
| rowKey| id|gender| 
+---------+----+------+ 
|customer1| 1| null| 
|customer2|null|  f| 
|customer3| 3|  m| 
+---------+----+------+ 

scala> df.printSchema() 
root 
|-- rowKey: string (nullable = true) 
|-- id: string (nullable = true) 
|-- gender: string (nullable = true) 
+0

감사합니다. @ 베릴륨. 나는 이것을 시도 할 것이다. SparkStream에서 RDD를 사용하고 싶습니다. 나는 그것이 직렬화되기를 희망한다. 도움에 다시 한번 감사드립니다 –

+0

이 질문에 대한 도움이 필요하십니까? – Beryllium

+0

나는 이것 모두에 놓인다. 고맙습니다.. –