2016-08-01 2 views
2

Cassandra 및 Spark를 처음 사용합니다. DataFrames Java에서 Embedded Cassandra Server를 사용하여 Cassandra-Spark 작업을 테스트하는 예

  • 합니까 일부 필터링,
  • 로드를 그룹화하고 이러한 DataFrames에 집계 결과에 테이블 A에서

    1. 데이터를로드 : 나는 다음과 같은 작업을 수행 내 스파크 작업에 대한 테스트를 설정하려고 테이블 B

    Cassandra 데이터베이스의 로컬 인스턴스에 연결하지 않고 테스트를 실행하려면 Embedded Cassandra Server를 사용하고 싶습니다. 누구든지 전에 이것을 했습니까? 그렇다면, 누군가가 제게 좋은 모범을 보일 수 있습니까? 미리 도움을 주셔서 감사합니다!

  • +0

    명백하게, 백본 카산드라 데이터베이스의 테스트를 용이하게하는 카산드라 단위 라이브러리가 있습니다. https://github.com/jsevellec/cassandra-unit/wiki/What-is-it –

    답변

    1
    this code does 
    
    package cassspark.clt; 
    
    import java.io.*; 
    import javafx.application.Application; 
    import java.util.concurrent.Executors ; 
    import java.util.concurrent.ExecutorService; 
    import org.apache.cassandra.service.CassandraDaemon; 
    import com.datastax.driver.core.exceptions.ConnectionException; 
    import java.util.Properties; 
    import org.apache.log4j.PropertyConfigurator; 
    import org.apache.spark.sql.SparkSession; 
    
    public class EmbeddedCassandraDemo extends Application { 
    
        private ExecutorService executor = Executors.newSingleThreadExecutor(); 
        private CassandraDaemon cassandraDaemon; 
    
        public EmbeddedCassandraDemo() { 
        } 
    
        public static void main(String[] args) { 
         try { 
          new EmbeddedCassandraDemo().run(); 
         } 
         catch(java.lang.InterruptedException e) 
         { 
          ; 
         } 
        } 
    
        @Override public void start(javafx.stage.Stage stage) throws Exception 
        { 
         stage.show(); 
        } 
    
        private void run() throws InterruptedException, ConnectionException { 
         setProperties(); 
         activateDeamon(); 
        } 
    
        private void activateDeamon() { 
         executor.execute(new Runnable() { 
    
          @Override 
          public void run() { 
           cassandraDaemon = new CassandraDaemon(); 
           cassandraDaemon.activate(); 
           SparkSession spark = SparkSession .builder().master("local").appName("ASH").getOrCreate(); 
          } 
         }); 
        } 
    
        private void setProperties() { 
    
         final String yaml = System.getProperty("user.dir") + File.separator +"conf"+File.separator+"cassandra.yaml"; 
         final String storage = System.getProperty("user.dir") + File.separator +"storage" + File.separator +"data"; 
    
         System.setProperty("cassandra.config", "file:"+ yaml); 
         System.setProperty("cassandra.storagedir", storage); 
         System.setProperty("cassandra-foreground", "true"); 
    
         String log4JPropertyFile = "./conf/log4j.properties"; 
         Properties p = new Properties(); 
         try { 
          p.load(new FileInputStream(log4JPropertyFile)); 
          PropertyConfigurator.configure(p); 
         } catch (IOException e) { 
          System.err.println("./conf/log4j.properties not found "); 
          System.exit(1); 
          ; 
         } 
        } 
    } 
    
    관련 문제