2016-12-28 1 views
0

우리는 매일 한 번씩 업데이트되는 작은 하이브 테이블 (약 50000 개의 레코드)을 가지고 있습니다.캐시 된 데이터 프레임을 새로 고칩니다.

이 테이블에 대해 캐시 된 데이터 프레임이 있으며 스파크 스트리밍 데이터와 결합됩니다. 기본 하이브에 새 데이터가로드되면 데이터 프레임을 어떻게 새로 고 칩니 까?

DataFrame tempApp = hiveContext.table("emp_data"); 

//Get Max Load-Date 
Date max_date = max_date = tempApp.select(max("load_date")).collect()[0].getDate(0); 

//Get data for latest date and cache. This will be used to join with stream data. 
DataFrame emp= hiveContext.table("emp_data").where("load_date='" + max_date + "'").cache(); 

// Get message from Kafka Stream 
JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(....); 

JavaDStream<MobileFlowRecord> rddMobileFlorRecs = messages.map(Record::parseFromMessage); 

kafkaRecs.foreachRDD(rdd->{DataFrame recordDataFrame = hiveContext.createDataFrame(rdd, Record.class); 

DataFrame joinedDataSet = recordDataFrame.join(emp, 
recordDataFrame.col("application").equalTo(app.col("emp_id")); 
joinedDataSet. <Do furthur processing> 
}); 

답변

0

스파크 자동으로 RDD 또는 데이터 프레임이 더 이상 사용되지 않는 경우. RDD 나 Dataframe이 캐싱되어 있는지를 알기 위해 Spark UI -> Storage tabl에 들어가서 메모리 세부 사항을 볼 수 있습니다. df.unpersist() 또는 sqlContext.uncacheTable("sparktable")uncacheTable APi을 사용하여 메모리에서 df 또는 테이블을 제거 할 수 있습니다.이 옵션은 새 SparksessionAPi에서는 사용할 수 없지만 이전 버전과의 호환성은 항상 있습니다. Lazy Evaluation 용 스파크 (Spaz)는 어떤 액션을 말할 때까지 RDD 나 DataFrame에 데이터를로드하거나 처리하지 않는 한 말입니다.

귀하를 위해 join을 수행 한 후 데이터 프레임으로 unpersist()을 수행하십시오. 성능을 향상시키고 문제를 해결할 수 있습니다.

Databricks

+0

귀하의 해결책을 이해할 수 있을지 의심 스럽습니다. 캐싱 및 캐싱 데이터 세트는 문제를 해결할 수 있지만 캐시는 한 번의 반복에만 유효하기 때문에 캐싱의 목적을 무효화합니다. 더 명확히하기 위해 샘플 코드를 추가했습니다. 둘째, 테스트 한 결과 각 반복마다 캐싱과 캐싱을 해제하는 데 약 3 초의 지연이 추가되었습니다. 이것을 달성 할 다른 방법이 있다면 궁금하십니까? – Akhil

0

수동으로 설정할 수 있습니다. 이런 식으로 뭔가가 :

DataFrame refresh(DataFrame orig) { 
    if (orig != null) { 
     orig.unpersist(); 
    } 
    DataFrame res = get the dataframe as you normally would 
    res.cache() 
    return res 

지금 하루에 한 번이 전화를 걸거나이 같은 새로 고침 할 때마다이 기본적으로 무엇을

DataFrame join_df = refresh(join_df) 

이 unpersists있는 이전 버전의 (캐시 제거), 읽고 새로운 것이고 그 후에 그것을 캐싱한다. 따라서 실제로는 데이터 프레임이 새로 고쳐집니다.

데이터 프레임은 캐싱이 느리므로 새로 고침 후에 처음 사용 된 후에 만 ​​메모리에 유지됩니다.

관련 문제