Flink : Monitoring the Wikipedia Edit Stream의 빠른 시작 예제를 따르고 있습니다.Flink : 사용 중지 된 폴드를 집계하는 방법은 무엇입니까?
예는 자바에, 나는 다음과 같은 스칼라에 구현하고 그러나
/**
* Wikipedia Edit Monitoring
*/
object WikipediaEditMonitoring {
def main(args: Array[String]) {
// set up the execution environment
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val edits: DataStream[WikipediaEditEvent] = env.addSource(new WikipediaEditsSource)
val result = edits.keyBy(_.getUser)
.timeWindow(Time.seconds(5))
.fold(("", 0L)) {
(acc: (String, Long), event: WikipediaEditEvent) => {
(event.getUser, acc._2 + event.getByteDiff)
}
}
result.print
// execute program
env.execute("Wikipedia Edit Monitoring")
}
}
, FLINK의 fold
기능은 가 사용되지 않는 이미하고 aggregate
기능을 권장합니다.
aggregrate
에 사용되지 않는
fold
을 변환하는 방법에 대한 예제 또는 자습서를 찾을 수 없습니다.
어떻게하면 좋을까요? 아마도 aggregrate
을 적용했을뿐만 아니라
UPDATE 나는 다음과 같은 또 다른 구현을 가지고 :
/**
* Wikipedia Edit Monitoring
*/
object WikipediaEditMonitoring {
def main(args: Array[String]) {
// set up the execution environment
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val edits: DataStream[WikipediaEditEvent] = env.addSource(new WikipediaEditsSource)
val result = edits
.map(e => UserWithEdits(e.getUser, e.getByteDiff))
.keyBy("user")
.timeWindow(Time.seconds(5))
.sum("edits")
result.print
// execute program
env.execute("Wikipedia Edit Monitoring")
}
/** Data type for words with count */
case class UserWithEdits(user: String, edits: Long)
}
나는 또한 사용하여 구현을하는 방법을 알고 싶습니다 자기 정의 AggregateFunction
을.
UPDATE
나는이 문서 다음 : AggregateFunction,하지만 다음과 같은 질문이 : 릴리스 1.3 인터페이스 AggregateFunction
의 소스 코드에서
을, 당신은 참으로 반환 add
을 볼 수 void
:
void add(IN value, ACC accumulator);
버전 1.4의 경우 AggregateFunction
인 경우 :
ACC add(IN value, ACC accumulator);
어떻게 처리해야합니까?
사용중인 Flink 버전이 1.3.2
이고이 버전의 설명서에 AggregateFunction
이 없지만 아직 artifactory에 릴리스 1.4가 없습니다.
내 새 업데이트를 참조하십시오. – fluency03