2017-11-05 7 views
2

Flink : Monitoring the Wikipedia Edit Stream의 빠른 시작 예제를 따르고 있습니다.Flink : 사용 중지 된 폴드를 집계하는 방법은 무엇입니까?

예는 자바에, 나는 다음과 같은 스칼라에 구현하고 그러나

/** 
* Wikipedia Edit Monitoring 
*/ 
object WikipediaEditMonitoring { 
    def main(args: Array[String]) { 
    // set up the execution environment 
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment 

    val edits: DataStream[WikipediaEditEvent] = env.addSource(new WikipediaEditsSource) 

    val result = edits.keyBy(_.getUser) 
     .timeWindow(Time.seconds(5)) 
     .fold(("", 0L)) { 
     (acc: (String, Long), event: WikipediaEditEvent) => { 
      (event.getUser, acc._2 + event.getByteDiff) 
     } 
     } 

    result.print 

    // execute program 
    env.execute("Wikipedia Edit Monitoring") 
    } 
} 

, FLINK의 fold 기능은 사용되지 않는 이미하고 aggregate 기능을 권장합니다.

enter image description here

하지만 aggregrate에 사용되지 않는 fold을 변환하는 방법에 대한 예제 또는 자습서를 찾을 수 없습니다.

어떻게하면 좋을까요? 아마도 aggregrate을 적용했을뿐만 아니라

UPDATE 나는 다음과 같은 또 다른 구현을 가지고 :

/** 
* Wikipedia Edit Monitoring 
*/ 
object WikipediaEditMonitoring { 
    def main(args: Array[String]) { 
    // set up the execution environment 
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment 

    val edits: DataStream[WikipediaEditEvent] = env.addSource(new WikipediaEditsSource) 

    val result = edits 
     .map(e => UserWithEdits(e.getUser, e.getByteDiff)) 
     .keyBy("user") 
     .timeWindow(Time.seconds(5)) 
     .sum("edits") 

    result.print 

    // execute program 
    env.execute("Wikipedia Edit Monitoring") 
    } 

    /** Data type for words with count */ 
    case class UserWithEdits(user: String, edits: Long) 
} 

나는 또한 사용하여 구현을하는 방법을 알고 싶습니다 자기 정의 AggregateFunction을.

UPDATE

나는이 문서 다음 : AggregateFunction,하지만 다음과 같은 질문이 : 릴리스 1.3 인터페이스 AggregateFunction의 소스 코드에서

을, 당신은 참으로 반환 add을 볼 수 void :

void add(IN value, ACC accumulator); 

버전 1.4의 경우 AggregateFunction 인 경우 :

ACC add(IN value, ACC accumulator); 

어떻게 처리해야합니까?

사용중인 Flink 버전이 1.3.2이고이 버전의 설명서에 AggregateFunction이 없지만 아직 artifactory에 릴리스 1.4가 없습니다.

enter image description here

답변

3

당신은 예를 포함, AggregateFunctionin the Flink 1.4 docs에 대한 몇 가지 문서를 찾을 수 있습니다.

1.3.2에 포함 된 버전은 추가 조작으로 누적기를 수정하는 변경 가능한 누적 기 유형과 함께 사용하는 것으로 제한됩니다. 이것은 fixed for Flink 1.4이지만 아직 공개되지 않았습니다.

+0

내 새 업데이트를 참조하십시오. – fluency03

1
import org.apache.flink.api.common.functions.AggregateFunction 
import org.apache.flink.streaming.api.scala._ 
import org.apache.flink.api.common.serialization.SimpleStringSchema 
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} 
import org.apache.flink.streaming.api.windowing.time.Time 
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer08 
import org.apache.flink.streaming.connectors.wikiedits.{WikipediaEditEvent, WikipediaEditsSource} 

class SumAggregate extends AggregateFunction[WikipediaEditEvent, (String, Int), (String, Int)] { 
    override def createAccumulator() = ("", 0) 

    override def add(value: WikipediaEditEvent, accumulator: (String, Int)) = (value.getUser, value.getByteDiff + accumulator._2) 

    override def getResult(accumulator: (String, Int)) = accumulator 

    override def merge(a: (String, Int), b: (String, Int)) = (a._1, a._2 + b._2) 
} 

object WikipediaAnalysis extends App { 
    val see: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment 
    val edits: DataStream[WikipediaEditEvent] = see.addSource(new WikipediaEditsSource()) 

    val result: DataStream[(String, Int)] = edits 
    .keyBy(_.getUser) 
    .timeWindow(Time.seconds(5)) 
    .aggregate(new SumAggregate) 
// .fold(("", 0))((acc, event) => (event.getUser, acc._2 + event.getByteDiff)) 
    result.print() 

    result.map(_.toString()).addSink(new FlinkKafkaProducer08[String]("localhost:9092", "wiki-result", new SimpleStringSchema())) 
    see.execute("Wikipedia User Edit Volume") 
} 
관련 문제