2017-12-21 3 views
1

이 코드에는 두 개의 파일이 있습니다. 이름이 들어있는 athletes.csv와 트윗 메시지가 포함 된 twitter.test입니다. 우리는 athletes.csv의 이름과 일치하는 twitter.test의 모든 단일 행의 이름을 찾고 싶습니다. athletes.csv의 이름을 저장하는 map 함수를 적용했으며 테스트의 모든 행에 모든 이름을 반복하려고합니다 파일.Map Spark를 통한 루핑 Scala

object twitterAthlete { 

    def loadAthleteNames() : Map[String, String] = { 

    // Handle character encoding issues: 
    implicit val codec = Codec("UTF-8") 
    codec.onMalformedInput(CodingErrorAction.REPLACE) 
    codec.onUnmappableCharacter(CodingErrorAction.REPLACE) 

    // Create a Map of Ints to Strings, and populate it from u.item. 
    var athleteInfo:Map[String, String] = Map() 
    //var movieNames:Map[Int, String] = Map() 
    val lines = Source.fromFile("../athletes.csv").getLines() 
    for (line <- lines) { 
     var fields = line.split(',') 
     if (fields.length > 1) { 
     athleteInfo += (fields(1) -> fields(7)) 
     } 
    } 

    return athleteInfo 
    } 

    def parseLine(line:String): (String)= { 
    var athleteInfo = loadAthleteNames() 
    var hello = new String 
    for((k,v) <- athleteInfo){ 
     if(line.toString().contains(k)){ 
     hello = k 
     } 
    } 
    return (hello) 
    } 


    def main(args: Array[String]){ 
    Logger.getLogger("org").setLevel(Level.ERROR) 

    val sc = new SparkContext("local[*]", "twitterAthlete") 

    val lines = sc.textFile("../twitter.test") 
    var athleteInfo = loadAthleteNames() 

    val splitting = lines.map(x => x.split(";")).map(x => if(x.length == 4 && x(2).length <= 140)x(2)) 

    var hello = new String() 
    val container = splitting.map(x => for((key,value) <- athleteInfo)if(x.toString().contains(key)){key}).cache 


    container.collect().foreach(println) 

    // val mapping = container.map(x => (x,1)).reduceByKey(_+_) 
    //mapping.collect().foreach(println) 
    } 
} 

같은 첫 번째 파일의 모양을

id,name,nationality,sex,height........ 
001,Michael,USA,male,1.96 ... 
002,Json,GBR,male,1.76 .... 
003,Martin,female,1.73 . ... 

두 번째 파일보기가 좋아 :

time, id , tweet ..... 
12:00, 03043, some message that contain some athletes names , ..... 
02:00, 03023, some message that contain some athletes names , ..... 

일부는 다음과 같이 생각 ...

하지만 난 후 빈 결과를 얻었다 이 코드를 실행하면 어떤 제안이든지 크게 환영합니다.

내가 가진

결과는 비어 :

().... 
()... 
()... 

하지만 난 같은 예상 결과 : 나는 그냥 먼저 간단한 방법으로 시작해야한다고 생각

(name,1) 
(other name,1) 
+0

로 값을 반환 yield를 사용할 필요가? – philantrovert

+0

그냥 편집 된 질문 pls 좀 봐 주셔서 감사합니다 – amprie286

+0

당신은 '루프'와 함께 그냥 '키'대신'yield 키'를 사용해 볼 수 있습니까? – philantrovert

답변

1

당신은 당신이 파일과 예상 출력 모두의 샘플을 게시 할 수 귀하의 map

val container = splitting.map(x => for((key,value) <- athleteInfo ; if(x.toString().contains(key))) yield (key, 1)).cache 
+1

답변을 주셔서 감사합니다. – amprie286

1

을 ...

I DataFrames를 사용하여 내장 CSV 파싱을 사용하고 Catalyst, Tungsten 등을 활용할 수 있습니다.

그런 다음 내장 To 트위터를 단어로 분할하고, 폭발시키고, 간단한 참여를하는 kenizer. 선수 이름을 가진 데이터의 크기에 따라 최적의 브로드 캐스트 참여로 끝나고 셔플을 피할 수 있습니다.

import org.apache.spark.sql.functions._ 
import org.apache.spark.ml.feature.Tokenizer 

val tweets = spark.read.format("csv").load(...) 
val athletes = spark.read.format("csv").load(...) 

val tokenizer = new Tokenizer() 
tokenizer.setInputCol("tweet") 
tokenizer.setOutputCol("words") 

val tokenized = tokenizer.transform(tweets) 

val exploded = tokenized.withColumn("word", explode('words)) 

val withAthlete = exploded.join(athletes, 'word === 'name) 

withAthlete.select(exploded("id"), 'name).show() 
+0

답해 주셔서 감사합니다. – amprie286

+0

DF 솔루션을 손으로 코딩 한 솔루션과 비교한다면 이것이 하나 일 것입니다. 특히 데이터 볼륨이 증가할수록 성능과 확장 성이 향상됩니다. 유용하다고 생각되면 upvote하십시오. – Silvio