2016-09-20 4 views
2

spark-kafka 소비자의 주제 목록을 동적으로 업데이트 할 수 있습니까?spark kafka 소비자에 대한 주제 목록을 동적으로 업데이트하십시오.

나는 spark-kafka 소비자를 사용하는 Spark Streaming 응용 프로그램을 가지고 있습니다. 처음에는 spark-kakfa 소비자가 주제를 듣고 있다고 말합니다. [ "test"] 잠시 후 내 주제 목록이 [ "test", "testNew"]로 업데이트되었습니다. 지금 스파크 카프카 소비자 주제 목록을 업데이트하고 상황에 맞는

답변

1

을 sparkStreaming 응용 프로그램을 중지하거나 sparkStreaming없이 주제의 업데이트 된 목록에 대한 데이터를 소비하는 스파크 카프카에게 소비자를 요청하는 방법이 동적으로 불꽃의 주제 목록을 업데이트 할 수있다 - 카프카 소비자

아니요. 수신기 및 수신기없는 접근 방식은 모두 KafkaUtils을 사용하여 카프카 스트림을 초기화하면 고정됩니다. DAG가 고정 될 때 새로운 주제를 전달할 수있는 방법은 없습니다.

동적으로 읽으려는 경우 반복적으로 예약 된 일괄 k 작업을 고려해보십시오. 주제를 동적으로 읽고 그 중 RDD을 만들 수 있습니다.

추가 솔루션은 Akka Stream과 같이 소비에 비해 유연성을 제공하는 기술을 사용하는 것입니다.

0

Yuval이 말했듯이, 카프카에서 다루고있는 데이터의 구조/형식을 아는 경우 해결 방법이있을 수 있습니다. 예를 들어

,

  • 스트리밍 응용 프로그램 [ "테스트", "testNew"] 항목을 수신하는 경우
  • 엎드려 라인 당신이로, [테스트 4]라는 새 항목을 추가 할 이를 해결하기 위해서는 포함되어있는 고유 키를 간단히 추가하여 기존 주제에 전달하면됩니다.
  • 는 데이터 구조를 사용하여 캐시를 정의하면
0

당신은 스레드 기반의 접근 방식
1. 사용할 수 TEST2 데이터에 추가 된 키를 기반으로 데이터를 필터링/인식 할 수있는 방식으로 스트리밍 응용 프로그램을 설계 주제 목록이 포함되어 있습니다
2.이 캐시에 요소를 추가하는 방법
3. B는 모든 스파크 관련 논리가있는 클래스 A와 B가 있어야합니다.
4 클래스 A는 장기 실행 작업이고 A에서 호출 중입니다. B, 새로운 주제가있을 때마다 B

01로 새 스레드를 생성합니다.
+0

, 그러나이 정상적으로 내가 주제 목록을 업데이트 할 때마다 컨텍스트를 스트리밍을 중지 할 필요 같은 많은 합병증이있다. 이것은 비동기 프로세스이므로, 멈추는 데 소요되는 시간은 예측할 수 없습니다. 이 모든 것은 들어오는 데이터 스트림을 처리 할 수 ​​없지만 스트리밍 컨텍스트는 계산을 시작하고 다시 시작해야합니다. –

+0

@ rohith-yeravothula 어떤 대안 솔루션을 찾았습니까? 저는 배우 시스템을 사용하여 Akka 스트림을 사용해야 만 생각할 수 있습니다.SubscribePattern을 시도했지만 시작 중에 DAG 중에 항목을 추가하지 않고 스트림 만 예약하도록 필터 종류를 지정합니다. – ASe

-1

Spark-Kafka integration (0.10) API 버전의 ConsumerStrategies.SubscribePattern을 사용해 보시기 바랍니다.

과 같을 것이다

: 나는 현재 유사한 접근 방식을 사용하고

KafkaUtils.createDirectStream(
mySparkStreamingContext, 
PreferConsistent, 
SubscribePattern("test.*".r.pattern, myKafkaParamsMap)) 
+0

나는 똑같이 시도했다. 동적으로 주제를 선택하지 않는다. 예를 들어 스트림을 시작할 때 정규식을 사용하여 필터처럼 모든 주제를 일치시키고 그 스트림을 만든다. 우리는 스트림이 이미 실행 중일 때 새로운 주제를 동적으로 추가 할 수있는 솔루션을 찾고 있습니다. 스파크 클러스터 작업 방식으로 작업 일정을 조정하고 DAG를 스트리밍 할 수 있으므로 불가능한 것처럼 보입니다. – ASe

관련 문제