2016-06-26 2 views
2

나는 간단한 단어 카운트 FLINK 작업을 쓰고 있어요하지만이 오류가 계속에 대한 암시 적 가치를 찾을 수 없습니다. 이 추가증거 매개 변수

object Job { 
    def main(args: Array[String]) { 
    // set up the execution environment 
    val env = StreamExecutionEnvironment.getExecutionEnvironment 
    val dataStream = env.readTextFile("file:///home/plivo/code/flink/scala/flinkstream/test/") 

    val count = dataStream 
       .flatMap{_.toLowerCase.split("\\W+") filter {_.nonEmpty}} 
       .map{ (_,1) } 
       .groupBy(0) 
       .sum(1) 


    dataStream.print() 
    env.execute("Flink Scala API Skeleton") 
    } 
} 
+0

이 질문에 대한 대답을 시도, 너무 당신을 도울 수 있습니다 http://stackoverflow.com/questions/29540121/flink-scala-api-not-enough-arguments – richj

+0

내가 필요한 모든 라이브러리를 가져온 flink.api.scala._ 및 flink.streaming.api.scala._ – sidd607

+0

등의 문제가 있습니다. 문제는 flink (버전 1.0.3)의 DataStream [(String, Int)]에 groupBy (...)). KeyedStream [(String, Int), Tuple]을 생성하는 keyBy (Int) 메소드가 있습니다. – richj

답변

0

: def main(args: Array[String]) {...}의 첫 번째 행은 나를 위해 그것을 고정으로 implicit val typeInfo = TypeInformation.of(classOf[(String)])

여기 내 코드입니다.

object Job { 
    def main(args: Array[String]) { 
    implicit val typeInfo = TypeInformation.of(classOf[(String)]) //Add this here 
    // set up the execution environment 
    val env = StreamExecutionEnvironment.getExecutionEnvironment 
    val dataStream = env.readTextFile("file:///home/plivo/code/flink/scala/flinkstream/test/") 

    val count = dataStream 
       .flatMap{_.toLowerCase.split("\\W+") filter {_.nonEmpty}} 
       .map{ (_,1) } 
       .groupBy(0) 
       .sum(1) 


    dataStream.print() 
    env.execute("Flink Scala API Skeleton") 
    } 
}