0

데이터 소스로 Cassandra (3.9)와 함께 Spark 2.1을 사용하고 있습니다. C *에는 50 개 열이있는 큰 테이블이 있는데 이는 유스 케이스의 좋은 데이터 모델이 아닙니다. 그래서 나는 각 센서에 대한 분할 테이블을 파티션 키와 클러스터링 키 열과 함께 만들었습니다.스파크 하나의 데이터 프레임에서 여러 데이터 프레임을 만듭니다.

All sensor table 
----------------------------------------------------- 
| Device | Time  | Sensor1 | Sensor2 | Sensor3 | 
| dev1 | 1507436000 | 50.3 | 1 | 1 | 
| dev2 | 1507436100 | 90.2 | 0 | 1 | 
| dev1 | 1507436100 | 28.1 | 1 | 1 | 
----------------------------------------------------- 
Sensor1 table 
------------------------------- 
| Device | Time  | value | 
| dev1 | 1507436000 | 50.3 | 
| dev2 | 1507436100 | 90.2 | 
| dev1 | 1507436100 | 28.1 | 
------------------------------- 

이제 spark를 사용하여 이전 테이블의 데이터를 새로운 테이블로 복사합니다.

df = spark.read\ 
    .format("org.apache.spark.sql.cassandra")\ 
    .options(table="allsensortables", keyspace="dataks")\ 
    .load().cache() 
df.createOrReplaceTempView("data") 
query = ('''select device,time,sensor1 as value from data ''') 
vgDF = spark.sql(query) 
vgDF.write\ 
    .format("org.apache.spark.sql.cassandra")\ 
    .mode('append')\ 
    .options(table="sensor1", keyspace="dataks")\ 
    .save() 

하나의 테이블에 대해 하나씩 데이터를 복사하는 데 많은 시간 (2.1) 시간이 소요됩니다. 내가 할 수있는 방법이있다 select * 및 각 센서에 대한 여러 df를 만들고 한 번에 저장? (또는 순차적으로). 코드에서

+0

어떻게 spark-shell 명령을 사용하여 코드를 실행하고 있습니까 ?? –

+0

@Vijay_Shinde using spark-submit – Junaid

+0

Ok @Junaid, 드라이버 메모리 및 실행 프로그램 메모리를 늘려보십시오. 그것은 당신을 도울 것입니다. –

답변

0

한 가지 문제는 캐시 여기

df = spark.read\ 
.format("org.apache.spark.sql.cassandra")\ 
.options(table="allsensortables", keyspace="dataks")\ 
.load().cache() 

내가 DF 떨어져 저장에서 여러 번 사용하는 방법을 볼 수 없습니다입니다. 그래서 여기 캐시는 생산성이 떨어집니다. 데이터를 읽고 필터링하고 별도의 카산드라 테이블에 저장합니다. 이제 데이터 프레임에서 일어나는 유일한 작업은 저장과 아무것도 아닌 것입니다.

따라서 여기에 데이터를 캐싱 할 때 이점이 없습니다. 캐시를 제거하면 속도가 향상됩니다.

여러 테이블을 순차적으로 생성하려면. 나는 partitionBy를 사용하고 HDFS에 데이터를 먼저 파티션 된 데이터 w.r.t 센서로 쓰고 그것을 다시 cassandra에 쓰도록 제안 할 것입니다.

+0

을 사용하여 캐시를 제거하고 테스트합니다. 내가 hdfs 및 partitionBy 시도해 보자. 또한 거기에 c * 사이드 필터링 커넥터에서 사용할 수 있지만 그것은 어떻게 파이썬에서 사용할 수 있는지 잘 모르겠습니다. select가 추가되면 객체에 'select'속성이 없다는 의미입니다. https://github.com/datastax/spark-cassandra-connector/blob/master/doc/3_selection.md – Junaid

+0

예, cassandra는 술어 푸시 다운을 허용합니다. 파이썬에 lib가 있는지 여부도 확실하지 않습니다. –

관련 문제