2016-10-21 2 views
0

나는 일련의 단계를 통해이스칼라/스파크가있는 요소의 발생 횟수를 계산하는 방법은 무엇입니까?

00|905000|20160125204123|79644809999||HGMTC|1||22|7905000|56321647569|||34110|I||||||250995210056537|354805064211510||56191|||38704||A|||11|V|81079681404134|5||||SE|||G|144|||||||||||||||Y|b00534589.huawei_anadyr.20151231184912||1|||||79681404134|0|||[email protected]@+1{79098509982}2{2}3{2}5{79644809999}6{0000002A7A5AC635}7{79681404134}|20160125| 

같은 요소의 목록이 포함 된 파일을 가지고,이

(902996760100000,CompactBuffer(6, 5, 2, 2, 8, 6, 5, 3)) 

905000 같은 요소의리스트로 변환 관리 은 키이고 6, 5, 2, 2, 8, 6, 5, 3은 값입니다. 값은 1에서 8까지의 숫자가 될 수 있습니다. spark를 사용하여이 값의 발생 횟수를 세는 방법이 있습니까? 결과는 다음과 같습니다.

(902996760100000, 0_1, 2_2, 1_3, 0_4, 2_5, 2_6, 0_7, 1_8) 

나는 다른 블록과 직원,하지만 그 꽤하지 않을 경우 함께 할 수있는, 그래서 스칼라/스파크에서 사용할 수있는 instrumets가 있는지 궁금하네요.

이것은 내 코드입니다.

class ScalaJob(sc: SparkContext) { 
    def run(cdrPath: String) : RDD[(String, Iterable[String])] = { 
    //pass the file 
    val fileCdr = sc.textFile(cdrPath); 

    //find values in every raw cdr 
    val valuesCdr = fileCdr.map{ 
     dataRaw => 
     val p = dataRaw.split("[|]",-1) 
     (p(1), ScalaJob.processType(ScalaJob.processTime(p(2)) + "_" + p(32))) 
    } 
    val x = valuesCdr.groupByKey() 
    return x 
    } 

최적화에 대한 조언을 보내 주시면 감사하겠습니다. 나는 scala/spark에 정말로 새로운 사람입니다.

+0

질문의 범위와 관련이없는 코드와 정보를 제거하면 질문을 쉽게 읽을 수 없게됩니다. – cheseaux

+0

정식 [단어 수 예] (http://spark.apache.org/examples.html)를 볼 수 있습니다. – erip

답변

1

먼저 스칼라는 유형 안전 언어이므로 Spark의 RDD API입니다. 따라서 을 사용하면 모든 것을 문자열로 "인코딩"하여 이동하는 대신 유형 시스템을 사용하는 것이 좋습니다.

RDD[(String, Seq[(Int, Int)])] (튜플에 두 번째 항목이 (ID, 개수) 튜플 시퀀스 인) 및 덜 유용 해 보이는 RDD[(String, Iterable[String])]이 아닌 솔루션을 제안합니다.

여기에 8 일의 발생을 계산하는 간단한 기능의 Iterable[Int] 제공 :

def countValues(l: Iterable[Int]): Seq[(Int, Int)] = { 
    (1 to 8).map(i => (i, l.count(_ == i))) 
} 

이 기능 (당신이했던 것처럼, 직렬화에 대한 개체의 기능을 장소 mapValues을 사용할 수 있습니다

,536,913 : 전체 솔루션은 다음 조금 단순화 할 수

valuesCdr.groupByKey().mapValues(ScalaJob.countValues) 

: RDD[(String, Iterable[Int])]에 나머지) 결과를 얻을 수 있습니다

class ScalaJob(sc: SparkContext) { 
    import ScalaJob._ 

    def run(cdrPath: String): RDD[(String, Seq[(Int, Int)])] = { 
    val valuesCdr = sc.textFile(cdrPath) 
     .map(_.split("\\|")) 
     .map(p => (p(1), processType(processTime(p(2)), p(32)))) 

    valuesCdr.groupByKey().mapValues(countValues) 
    } 
} 

object ScalaJob { 
    val dayParts = Map((6 to 11) -> 1, (12 to 18) -> 2, (19 to 23) -> 3, (0 to 5) -> 4) 

    def processTime(s: String): Int = { 
    val hour = DateTime.parse(s, DateTimeFormat.forPattern("yyyyMMddHHmmss")).getHourOfDay 
    dayParts.filterKeys(_.contains(hour)).values.head 
    } 

    def processType(dayPart: Int, s: String): Int = s match { 
    case "S" => 2 * dayPart - 1 
    case "V" => 2 * dayPart 
    } 

    def countValues(l: Iterable[Int]): Seq[(Int, Int)] = { 
    (1 to 8).map(i => (i, l.count(_ == i))) 
    } 
} 
관련 문제