2014-12-19 4 views
3

나는 Apache SparkCassandra으로 일하고 있으며, spark-cassandra-connector으로 내 RDD를 카산드라에 저장하고 싶습니다.RDD를 카산드라에 삽입 (저장 또는 업데이트하지 않음)하는 방법은 무엇입니까?

여기에 코드입니다 :

def saveToCassandra(step: RDD[(String, String, Date, Int, Int)]) = { 
    step.saveToCassandra("keyspace", "table") 
} 

이 시간을 잘 대부분 작동하지만, 이미 DB에 존재하는 데이터보다 우선합니다. 나는 어떤 데이터도 무시하지 않을 것이다. 어떻게 든 가능합니까?

rdd.foreachPartition(x => connector.WithSessionDo(session => { 
    someUpdater.UpdateEntries(x, session) 
    // or 
    x.foreach(y => someUpdater.UpdateEntry(y, session)) 
})) 

connectorCassandraConnector(sparkConf)입니다 :

+0

저장해야하는 데이터를 결정하는 방법이 있습니까? – maasg

+0

나는이 질문과 비슷한 질문을한다고 생각합니다. [https://stackoverflow.com/questions/41307386/how-to-insertrows-into-cassandra-if-they-dont-exist-using-spark-cassandra -dri/48985224 # 48985224] (https://stackoverflow.com/questions/41307386/how-to-insert-rows-into-cassandra-if-they-dont-exist-using-spark-cassandra-dri/48985224# 48985224) –

답변

4

은 내가 할 것은 이것이다.

단순한 saveToCassandra만큼 좋지는 않지만 세밀한 제어가 가능합니다.

0

foreach 파티션 외부에서 WithSessionDo를 사용하는 것이 더 좋습니다. 그 호출에 포함 된 오버 헤드가 반복 될 필요가 없습니다.

관련 문제