2016-06-03 6 views
0

나는 Scala/Spark의 초보자입니다. 다음 코드에서는 Twitter 공개 스트림 콘텐츠를 HBase에 추출하고 있습니다.
마지막 네 줄 (HBase에 명령 넣기)에 주석 달기는했지만 터미널에 짹짹 내용을 인쇄 할 수는 있지만 HBase 표에 덤프 할 수는 없습니다.스칼라/스파크 직렬화 오류 - HBASE에 데이터 스트리밍

다음과 관련하여 도움이 필요합니다.
1. serialilization 오류를 어떻게 극복 할 수 있습니까?
2.이 오류를 극복하는 효과적인 방법이 있습니까 (Kryo serialilization 사용 중일 수 있습니다)?

에 의해 발생 : java.io.NotSerializableException :
org.apache.hadoop.conf.Configuration 직렬화 스택 :
- 객체 직렬화 할 수없는 (클래스 : org.apache.hadoop.conf.Configuration, 값 : 구성 : core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site XML)

import twitter4j.auth._ 
import twitter4j.conf._ 
import twitter4j._ 
import twitter4j.json._ 
import scala.io.Source 
import org.apache.hadoop.conf.Configuration 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.hbase.client.ConnectionFactory 
import org.apache.hadoop.hbase.client.HBaseAdmin 
import org.apache.hadoop.hbase.client.HTable; 
import org.apache.hadoop.hbase.client.Put; 
import org.apache.hadoop.hbase.HColumnDescriptor 
import org.apache.hadoop.hbase.io.ImmutableBytesWritable 
import org.apache.hadoop.hbase.KeyValue 
import org.apache.hadoop.hbase.mapred.TableOutputFormat 
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2 
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles 
import org.apache.hadoop.hbase.mapreduce.TableInputFormat 
import org.apache.hadoop.hbase.TableName 
import org.apache.hadoop.hbase.util.Bytes 
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor} 
import org.apache.hadoop.mapreduce.Job 
import org.apache.spark._ 
import org.apache.spark.rdd.NewHadoopRDD 
import org.apache.spark.SparkConf 
import org.apache.spark.SparkContext 
import org.apache.spark.streaming._ 
import org.apache.spark.streaming.StreamingContext._ 
import org.apache.spark.streaming.{Seconds, StreamingContext} 
import org.apache.hadoop.hbase.util.Bytes 
import java.io._ 
import org.apache.spark.streaming.twitter.TwitterUtils 

//////////////////////////// 
val conf = new SparkConf().setAppName("model1").setMaster("local[*]") 
// val sc = new SparkContext(conf) 

val TABLE_NAME = "publicrd" 
val CF_USER = "user" 
val CF_TWEET = "tweet" 
val CF_ENTITIES = "entities" 
val CF_PLACES = "places" 

val hadoopConf = new Configuration 
val conf = HBaseConfiguration.create(hadoopConf) 
val admin = new HBaseAdmin(conf) 
val tableDesc = new HTableDescriptor(Bytes.toBytes(TABLE_NAME)) 

// Define column family descriptor 
val ColumnFamilyDesc1 = new HColumnDescriptor(Bytes.toBytes(CF_USER)) 
val ColumnFamilyDesc2 = new HColumnDescriptor(Bytes.toBytes(CF_TWEET)) 
val ColumnFamilyDesc3 = new HColumnDescriptor(Bytes.toBytes(CF_ENTITIES)) 
val ColumnFamilyDesc4 = new HColumnDescriptor(Bytes.toBytes(CF_PLACES)) 

// Add column family in table descriptor 
tableDesc.addFamily(ColumnFamilyDesc1) 
tableDesc.addFamily(ColumnFamilyDesc2) 
tableDesc.addFamily(ColumnFamilyDesc3) 
tableDesc.addFamily(ColumnFamilyDesc4) 

// Check if the table exists 
if (admin.tableExists(TABLE_NAME)){ 
print(">>>>>" + TABLE_NAME + " already exists <<<<<") 
admin.disableTable(TABLE_NAME) 
admin.deleteTable(TABLE_NAME) 
} 

// Create HBASE table 
admin.createTable(tableDesc) 
val table = new HTable(conf, TABLE_NAME) 
///// 

val timewindow = 2 // seconds 

val ssc = new StreamingContext(sc, Seconds(timewindow)) 
val cb = new ConfigurationBuilder 

val ckey = "ckey" 
val csecret = "csecret" 
val atoken = "atoken" 
val atokensecret = "atokensecret" 

cb.setDebugEnabled(true). 
setOAuthConsumerKey(ckey). 
setOAuthConsumerSecret(csecret). 
setOAuthAccessToken(atoken). 
setOAuthAccessTokenSecret(atokensecret). 
setJSONStoreEnabled(true) 

val auth = new OAuthAuthorization(cb.build) 
val tweets = TwitterUtils.createStream(ssc,Some(auth)) 

val status = tweets.filter(_.getLang()=="en") 

status.foreachRDD(foreachFunc = rdd => { 
    rdd.foreachPartition { 

    records => while (records.hasNext) { 

     var record = records.next 
     print("\n\n>>>>"+record) 

     var tweetID = record.getUser().getId().toString//.isInstanceOf[Int] 
     print("\ntweetID : "+tweetID) 

     var tweetBody = record.getText()//.toString 
     print("\ntweetBody : "+tweetBody) 

     var favoritesCount = record.getFavoriteCount()//.toInt 
     print("\nfavoritesCount : "+favoritesCount) 

     var keyrow = "t_"+tweetID //"t_"+ 
     print("\nkeyrow : "+keyrow+"\n") 

     var theput= new Put(Bytes.toBytes(keyrow)) 
     theput.add(Bytes.toBytes(CF_TWEET),Bytes.toBytes("tweetid"),Bytes.toBytes(tweetID)) 
     theput.add(Bytes.toBytes(CF_TWEET),Bytes.toBytes("tweetid"),Bytes.toBytes(tweetBody)) 
     theput.add(Bytes.toBytes(CF_USER),Bytes.toBytes("tweetid"),Bytes.toBytes(favoritesCount)) 
     table.put(theput) 
     } 
    } 
} 
) 

코드가 실행된다 를 통해 터미널 : 그것은 개체 org.apache.hadoop.conf.Configuration를 말한다

spark-shell --driver-class-path /opt/hadoop/hbase-1.2.1/lib/hbase-server-1.1.4.jar:/opt/hadoop/hbase-1.2.1/lib/hbase-protocol-1.0.0-cdh5.5.0.jar:/opt/hadoop/hbase-1.2.1/lib/hbase-hadoop2-compat-1.0.0-cdh5.5.0.jar:/opt/hadoop/hbase-1.2.1/lib/hbase-client-1.0.0-cdh5.5.0.jar:/opt/hadoop/hbase-1.2.1/lib/hbase-common-1.0.0-cdh5.5.0.jar:/opt/hadoop/hbase-1.2.1/lib/htrace-core-3.2.0-incubating.jar:/home/cloudera/Desktop/hbase/twitter4jJARS/guava-19.0.jar:/home/cloudera/Desktop/hbase/twitter4jJARS/spark-streaming-twitter_2.10-1.6.1.jar:/home/cloudera/Desktop/hbase/twitter4jJARS/twitter4j-async-4.0.4.jar:/home/cloudera/Desktop/hbase/twitter4jJARS/twitter4j-core-4.0.4.jar:/home/cloudera/Desktop/hbase/twitter4jJARS/twitter4j-examples-4.0.4.jar:/home/cloudera/Desktop/hbase/twitter4jJARS/twitter4j-media-support-4.0.4.jar:/home/cloudera/Desktop/hbase/twitter4jJARS/twitter4j-stream-4.0.4.jar 
+0

저는 foreachPartition (http://spark.apache.org/docs/latest/streaming-programming-guide.html#tab_scala_14) 내에 HBase 연결을 만들어야한다고 생각합니다. 이유에 대한 설명도 있습니다. –

+0

일반적으로 spark-hbase를 살펴볼 것을 권장합니다. 그것은 당신에게 많은 번거 로움을 덜어 줄 것입니다. –

답변

0

은이 필요한 것 동안이 Serializable 인터페이스를 구현하지 않는 의미 serialisable 없습니다. 그걸 제거하려면 @transient 키워드를 추가하십시오.

@transient val hadoopConf = new Configuration