2017-03-02 1 views
2

커다란 카산드라 테이블을 스파크로 가져 오기 위해 sparklyr과 함께 작업 해 왔으며이를 R로 등록하고 dplyr 작업을 수행했습니다. sparklyr을 통해 cassandra 테이블을 spark로 가져 오기 - 일부 열만 선택할 수 있습니까?

내가 성공적으로 다음과 같습니다 코드와 카산드라 테이블을 가져왔다

:

# import cassandra table into spark 

cass_df <- sparklyr:::spark_data_read_generic(
    sc, "org.apache.spark.sql.cassandra", "format", 
    list(keyspace = "cass_keyspace", table = "cass_table") 
) %>% 
    invoke("load") 


# register table in R 

cass_tbl <- sparklyr:::spark_partition_register_df(
     sc, cass_df, name = "cass_table", repartition = 0, memory = TRUE) 
     ) 

이 카산드라 테이블의 일부는 꽤 큰 (> 8.5bn 행) 및 등록/가져 오기 시간이 걸릴하고, 6 개의 노드가 총 60 개의 코어와 192gb의 RAM을 실행하는 경우에도 메모리 오버런이 발생합니다. 그러나 일반적으로 각 cassandra 데이터베이스에서 몇 개의 열만 필요합니다.

내 질문은 : 그것은 단지 정도가 SQL/CQL를 전달하여 즉, 기본 키 (에 여과 몇 가지 열을 가져 있도록

  1. 가져 오기/등록에 카산드라 데이터베이스를 필터링 할 수 있나요 SELECT name FROM cass_table WHERE id = 5과 같은 형식의 쿼리?
  2. 위의 코드에서 위와 같은 쿼리가 어디에 쓰이고 구문에서 어떤 형식이 사용됩니까?

나는, 옵션 목록에 추가 옵션과 같은 쿼리를 추가하는 시도 예 :

list(. . . , select = "id") 

뿐만 아니라 즉, %>% invoke("load") 전에 별도의 파이프로 호출 :

invoke("option", "select", "id") %>% 

# OR 

invoke("option", "query", s"select id from cass_table") %>% 

하지만 이것들은 작동하지 않습니다. 어떤 제안?

답변

3

당신이 열망 캐시와 관심의 선택 열을 건너 뛸 수 있습니다 :

session <- spark_session(sc) 

# Some columns to select 
cols <- list("x", "y", "z") 

cass_df <- session %>% 
    invoke("read") %>% 
    invoke("format", "org.apache.spark.sql.cassandra") %>% 
    invoke("options", as.environment(list(keyspace="test"))) %>% 
    invoke("load") %>% 
    # We use select(col: String, cols* String) so the first column 
    # has to be used separately. If you want only one column the third argument 
    # has to be an empty list 
    invoke("select", cols[[1]], cols[2:length(cols)]) %>% 
    # Standard lazy cache if you need one 
    invoke("cache") 

이 크게 가져온 데이터의 양이 "true" (기본값) pushdown 옵션을 설정 줄이고 캐싱하기 전에 filter을 사용할 수 있습니다 술어를 사용하는 경우

.

당신이 임시보기 및 sql 방법을 등록 더 복잡한 쿼리를 전달하려는 경우

:

session %>% 
    invoke("read") %>% 
    ... 
    invoke("load") %>% 
    invoke("createOrReplaceTempView", "some_name") 

cass_df <- session %>% 
    invoke("sql", "SELECT id FROM some_name WHERE foo = 'bar'") %>% 
    invoke("cache") 
+0

멋진,이 게시물은 내게 너무 많은 도움이되었습니다 ... 나는로드 딱 맞는하지만이에서 영감을 짓을 한 열을 CSV 파일에서 가져옵니다. 나는 case'df를 등록하기를 원할 수도 있으므로 dplyr 동사를 사용할 수있다. (sparklyr에는 dplyr 백엔드가 붙어 있기 때문에). 'R_cass_df = sdf_register (cass_df "spark_cass_df ')'동사는, 예를 적용 할 수 dplyr이어서 : 등록이 이루어집니다 '라이브러리 ("dplyr "); R_cass_df %> % 필터 (foo == "bar") %> % select (id)' – Raphvanns

관련 문제