2016-11-18 1 views
3

IoT 애플리케이션의 JSON 데이터가있는 Kafka 브로커가 있습니다. 몇 가지 처리를 수행하기 위해 Spark Streaming 애플리케이션에서이 서버에 연결합니다.Spark Streaming 응용 프로그램에서 캐시 된 데이터에 액세스하는 방법은 무엇입니까?

메모리 (RAM)에 cache()persist() 연산자를 사용하여 달성 할 수 있다고 생각되는 내 json 데이터의 일부 특정 필드를 저장하고 싶습니다.

다음 번에 Spark Streaming 응용 프로그램에서 새 JSON 데이터를받을 때 검색 할 수있는 공통 필드가 있으면 메모리 (RAM)를 체크인합니다. 그리고 그렇다면 몇 가지 간단한 계산을 수행하고 마침내 메모리 (RAM)에 저장 한 필드의 값을 업데이트합니다.

따라서 이전에 내린 설명이 가능한지 알고 싶습니다. 그렇다면 cache() 또는 persist()를 사용해야합니까? 그리고 어떻게하면 메모리에서 내 필드를 검색 할 수 있습니까?

답변

2

Spark 응용 프로그램의 데이터에 메모리 또는 디스크를 사용하는 cache/persist (가능하면 caching in Spark의 일반적인 사용)은 가능합니다.

하지만 ... Spark Streaming에서는 상태 계산 인이라고하는 사용 사례를 특별히 지원합니다. 가능한지 알아 보려면 Spark Streaming Programming Guide을 참조하십시오.

사용 사례가 mapWithState 인 것으로 생각합니다.

0

스파크가 그렇게 작동하지 않습니다. 분산 된 방식으로 생각하십시오.

RAM에 보관하는 첫 번째 부분입니다. cache() 또는 persist()을 사용하면 기본적으로 작업자의 메모리에 데이터를 보관합니다.

아파치 스파크 코드에서 확인할 수 있습니다.

/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */ 
    def persist(): this.type = persist(StorageLevel.MEMORY_ONLY) 

    /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */ 
    def cache(): this.type = persist() 

귀하의 유스 케이스를 이해하는 한, 두 번째 유스 케이스를 구현하려면 UpdateStateByKey 연산이 필요합니다!

Windowing에 대한 자세한 내용은 here을 참조하십시오.

관련 문제