우리는 매일 한 번씩 업데이트되는 작은 하이브 테이블 (약 50000 개의 레코드)을 가지고 있습니다.캐시 된 데이터 프레임을 새로 고칩니다.
이 테이블에 대해 캐시 된 데이터 프레임이 있으며 스파크 스트리밍 데이터와 결합됩니다. 기본 하이브에 새 데이터가로드되면 데이터 프레임을 어떻게 새로 고 칩니 까?
DataFrame tempApp = hiveContext.table("emp_data");
//Get Max Load-Date
Date max_date = max_date = tempApp.select(max("load_date")).collect()[0].getDate(0);
//Get data for latest date and cache. This will be used to join with stream data.
DataFrame emp= hiveContext.table("emp_data").where("load_date='" + max_date + "'").cache();
// Get message from Kafka Stream
JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(....);
JavaDStream<MobileFlowRecord> rddMobileFlorRecs = messages.map(Record::parseFromMessage);
kafkaRecs.foreachRDD(rdd->{DataFrame recordDataFrame = hiveContext.createDataFrame(rdd, Record.class);
DataFrame joinedDataSet = recordDataFrame.join(emp,
recordDataFrame.col("application").equalTo(app.col("emp_id"));
joinedDataSet. <Do furthur processing>
});
귀하의 해결책을 이해할 수 있을지 의심 스럽습니다. 캐싱 및 캐싱 데이터 세트는 문제를 해결할 수 있지만 캐시는 한 번의 반복에만 유효하기 때문에 캐싱의 목적을 무효화합니다. 더 명확히하기 위해 샘플 코드를 추가했습니다. 둘째, 테스트 한 결과 각 반복마다 캐싱과 캐싱을 해제하는 데 약 3 초의 지연이 추가되었습니다. 이것을 달성 할 다른 방법이 있다면 궁금하십니까? – Akhil