2016-10-10 2 views
0

저는 Apache Flink를 처음 사용하고 있으며, Kafka와 함께 Flink 스트리밍 작업을 확장하는 것과 관련된 모범 사례를 이해하려고합니다. 적절한 답을 찾을 수없는 몇 가지 질문은 다음과 같습니다.Apache Flink 스트림 스케일링

  1. 몇 개의 스트리밍 작업을 실행할 수 있습니까? 너무 많은 스트림을 실행하는 데 확장 성 문제가 있습니까? 얼마나 많은가?
  2. 비즈니스 요구를 충족시키기 위해 2,000 개의 스트림을 실행한다고 가정하면이 스트림을 관리하는 가장 좋은 방법은 무엇입니까?
  3. 한 스트림에서 다른 스트림으로 스트림 데이터를 읽는 가장 좋은 방법은 무엇입니까? 스트림에 참여하고 지속적인 쿼리를 수행 할 수 있습니까?

사전 지원에 감사 드리며이 질문이 다소 기본적인 것처럼 보이지만 사과 드리며이 기술에 대한 더 나은 조치를 취하려고합니다. 나는 많은 문서를 읽었지만,이 분야에 대한 저의 경험 부족 때문에 일부 개념을 함께 모으지는 못했을 것입니다. 어떤 도움을 주셔서 감사합니다!

답변

1

-> 스트림 수에는 제한이 없으며, 작업 관리자/작업 관리자의 메모리/CPU, 병렬 처리 사용 횟수 및 슬롯 수에 따라 플링 크기가 조정됩니다. 나는 자원을 관리하기 위해 원사를 사용합니다. 연결되는 스트림의 수가 많으면 프로세스의 속도가 느려지므로 일부/전체 프로세스가 일부 작업 관리자에서 발생하지 않는다는 점에 조금주의를 기울여야합니다. 카프카 스트림 자체에 지연이있을 수 있습니다. 또는 일부 작업 관리자가 과중한 부하로 인해 내부 지연이 발생할 수 있으며이를 방지하기 위해 예방 점검을 수행해야합니다.

-> 지속적인 쿼리 지원은 최신 flink 버전의 일부로 작성되었으므로 flink 설명서에서 해당 항목을 확인할 수 있습니다.

-> 하나의 데이터 스트림을 다른 스트림으로 읽는 것이 플 링크 용어로 두 개의 스트림을 연결한다는 것을 의미하는 경우 공통 키로 연결할 수 있으며 값 상태를 유지할 수 있습니다. 값 상태는 작업 관리자에서 유지되며 작업 관리자간에 공유되지 않습니다. 그렇지 않으면 두 개 이상의 스트림을 결합하는 것을 의미하는 경우 flatmap 함수를 빌드하여 이러한 스트림의 데이터가 표준 형식으로 제공되는 방식으로 빌드 할 수 있습니다. 연합

예 : 브로 의 스트림 1 : 데이터 스트림 [UserBookingEvent = BookingClosure.getSource (RunMode에) .getSource (ENV) .MAP (새 ClosureMapFunction)

의 Val stream2 : 데이터 스트림 [UserBookingEvent = BookingCancel.getSource (RunMode에) .getSource (ENV) .MAP (새 CancelMapFunction)

브로 unionStream : 데이터 스트림 [UserBookingEvent = stream1.union (stream2)

import org.apache.flink.api.common.functions.MapFunction 
import org.json4s.jackson.JsonMethods.{parse => _, parseOpt => _} 
import org.json4s.native.JsonMethods._ 
import org.slf4j.{Logger, LoggerFactory} 

class CancelMapFunction extends MapFunction[String, Option[UserBookingEvent]] { 
    override def map(in: String): Option[UserBookingEvent] = { 
    val LOG: Logger = LoggerFactory.getLogger(classOf[CancelMapFunction]) 
    try { 
     implicit lazy val formats = org.json4s.DefaultFormats 

     val json = parse(in) 
     .............. 
    } catch { 
     case e: Exception => { 
     LOG.error("Could not parse Cancel Event= " + in + e.getMessage) 
     None 
     } 
    } 
관련 문제