2014-10-23 4 views
3

일부 ETL에 대해 spark을 사용하려고합니다. 대부분 "update"문으로 구성됩니다 (열은 집합이므로 첨부 될 예정이므로 간단한 삽입은 없을 것입니다). 일하다). 따라서 데이터를 가져 오기 위해 CQL 쿼리를 실행하는 것이 가장 좋은 방법 인 것 같습니다. 스파크 카산드라 커넥터를 사용하여, 나는이 작업을 수행 할 수 있습니다 참조 : 이제Spark Cassandra Connector 올바른 사용법

https://github.com/datastax/spark-cassandra-connector/blob/master/doc/1_connecting.md#connecting-manually-to-cassandra

내가 세션을 열고 소스의 모든 행을 닫습니다 싶지 않아 (난이 원하지 않는 바로 무엇입니까? 일반적으로 전체 프로세스에 대해 하나의 세션을 보유하고 있으며이를 "일반"응용 프로그램에서 계속 사용합니다. 그러나 커넥터가 직렬화 가능하다고하지만 세션은 분명히 아닙니다. 그래서, 하나의 "withSessionDo"안에 전체 가져 오기를 래핑하면 문제가 발생할 것처럼 보입니다. 다음과 같은 것을 사용하려고 생각했습니다 :

class CassandraStorage(conf:SparkConf) { 
    val session = CassandraConnector(conf).openSession() 
    def store (t:Thingy) : Unit = { 
    //session.execute cql goes here 
    } 
} 

이 방법이 좋은 방법입니까? 세션 종료에 대해 걱정할 필요가 있습니까? 어디서/어떻게하면 좋을까요? 모든 포인터는 감사하겠습니다.

+0

나는 Spark Conf 객체를 만들고 싶지 않을뿐만 아니라 거기에 언급 된 커넥터 페이지에서 예제와 같은 스파크 컨텍스트에서 참조하는 참조를 왜 만들고 싶지 않을까? conf 객체를 생성하고 질의를 실행하는 동안 컨텍스트를 열어 둘 수 있어야합니다. – markc

답변

1

실제로 모든 액세스시 세션을 열고 닫지 않으므로 withSessionDo을 사용하려고합니다. 후드 아래에서 withSessionDo은 JVM 레벨 세션에 액세스합니다. 이는 클러스터 구성 PER 노드 당 하나의 세션 개체 만 가질 수 있음을 의미합니다.

이것은 오직 관계없이 각 기계가 얼마나 많은 코어의 각 집행 JVM에 1 개 클러스터 및 세션 개체를 만들 것 코드

val connector = CassandraConnector(sc.getConf) 
sc.parallelize(1 to 10000000L).map(connector.withSessionDo(Session => stuff) 

같은 의미합니다.

효율성을 위해 캐시 검사를 최소화하기 위해 mapPartitions을 사용하는 것이 좋습니다.

sc.parallelize(1 to 10000000L) 
    .mapPartitions(it => connector.withSessionDo(session => 
     it.map(row => do stuff here))) 

는 또한 세션 객체는 당신이 당신의 직렬화 코드에서 준비된 명령문을 캐시 할 수있는 준비 캐시를 사용하고, JVM 당 (다른 모든 호출이 캐시 참조를 반환합니다 일단은 오직 준비됩니다.)

관련 문제