2016-10-09 2 views
0

나는 스파크 스트리밍으로 트위터 통합을 배우고있다.스파크에서 트위터 데이터

import org.apache.spark.streaming.{Seconds, StreamingContext} 
    import org.apache.spark.SparkContext._ 
    import org.apache.spark.streaming.twitter._ 
    import org.apache.spark.SparkConf 

    /** 
    * Calculates popular hashtags (topics) over sliding 10 and 60 second windows from a Twitter 
    * stream. The stream is instantiated with credentials and optionally filters supplied by the 
    * command line arguments. 
    * 
    * Run this on your local machine as 
    * 
    */ 
    object TwitterPopularTags { 
     def main(args: Array[String]) { 


     if (args.length < 4) { 
      System.err.println("Usage: TwitterPopularTags <consumer key> <consumer secret> " + 
      "<access token> <access token secret> [<filters>]") 
      System.exit(1) 
     } 

     StreamingExamples.setStreamingLogLevels() 

     val Array(consumerKey, consumerSecret, accessToken, accessTokenSecret) = args.take(4) 
     val filters = args.takeRight(args.length - 4) 

     // Set the system properties so that Twitter4j library used by twitter stream 
     // can use them to generat OAuth credentials 
     System.setProperty("twitter4j.oauth.consumerKey", consumerKey) 
     System.setProperty("twitter4j.oauth.consumerSecret", consumerSecret) 
     System.setProperty("twitter4j.oauth.accessToken", accessToken) 
     System.setProperty("twitter4j.oauth.accessTokenSecret", accessTokenSecret) 

     val sparkConf = new SparkConf().setAppName("TwitterPopularTags").setMaster("local[2]") 
     val ssc = new StreamingContext(sparkConf, Seconds(2)) 
     val stream = TwitterUtils.createStream(ssc, None, filters)//Dstream 

     val hashTags = stream.flatMap(status => status.getText.split(" ").filter(_.startsWith("#"))) 

     val topCounts60 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(60)) 
         .map{case (topic, count) => (count, topic)} 
         .transform(_.sortByKey(false)) 

     val topCounts10 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(10)) 
         .map{case (topic, count) => (count, topic)} 
         .transform(_.sortByKey(false)) 


     // Print popular hashtags 
     topCounts60.foreachRDD(rdd => { 
      val topList = rdd.take(10) 
      println("\nPopular topics in last 60 seconds (%s total):".format(rdd.count())) 
      topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))} 
     }) 

     topCounts10.foreachRDD(rdd => { 
      val topList = rdd.take(10) 
      println("\nPopular topics in last 10 seconds (%s total):".format(rdd.count())) 
      topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))} 
     }) 

     ssc.start() 
     ssc.awaitTermination() 
     } 
    } 

나는 아래 완전히 2 코드 라인을 이해할 수 없습니다입니다 :

val Array(consumerKey, consumerSecret, accessToken, accessTokenSecret) = args.take(4) 
val filters = args.takeRight(args.length - 4) 

누군가가 나에게이 두 라인을 설명시겠습니까?

감사 및 감사

val Array(consumerKey, consumerSecret, accessToken, accessTokenSecret) = args.take(4) 

답변

2
args

배열이고; take(4)은 첫 번째 (맨 왼쪽) 네 개의 요소가있는 하위 배열을 반환합니다. 이 네 요소를 Array(consumerKey, consumerSecret, accessToken, accessTokenSecret)에 할당하면 consumerKey 값이 첫 번째 요소의 값을 보유하게됩니다. consumerSecret은 두 번째 값을 보유합니다. 이것은 이름이 지정된 값으로 Array (다른 컬렉션에서도 가능)를 "정리하는"스칼라 트릭입니다.

val filters = args.takeRight(args.length - 4) 

takeRight(n) 배열의 마지막 요소를 의미 n 오른쪽에서 서브 어레이를 리턴한다. 여기서 첫 번째 네 요소를 제외한 모든 요소가있는 배열은 filters이라는 새 값에 할당됩니다.

+0

설명을 주셔서 대단히 감사합니다. 그것은 많은 도움이된다. – subho

관련 문제