2016-12-20 1 views
5

저는 Scala 버전 2.10.5 Cassandra 3.0과 Spark 1.6을 사용하고 있습니다. 나는 기본적인 예Cassandra 테이블에 데이터 삽입하기 Spark DataFrame 사용하기

작동 및 Cassandra.So에 수 삽입 데이터가 내가 스키마

val person = sc.textFile("hdfs://localhost:9000/user/hduser/person") 
import org.apache.spark.sql._ 
val schema = StructType(Array(StructField("firstName",StringType,true),StructField("lastName",StringType,true),StructField("age",IntegerType,true))) 
val rowRDD = person.map(_.split(",")).map(p => org.apache.spark.sql.Row(p(0),p(1),p(2).toInt)) 
val personSchemaRDD = sqlContext.applySchema(rowRDD, schema) 
personSchemaRDD.saveToCassandra 
을 일치시켜 카산드라 테이블에 삽입 완 CSV 파일을했다
scala> val collection = sc.parallelize(Seq(("cat", 30), ("fox", 40))) 
scala> collection.saveToCassandra("test", "words", SomeColumns("word", "count")) 

밖으로 시도 그래서 나는 카산드라에 데이터를 삽입 할

SaveToCassndra를 사용하고 있는데 saveToCassandra를 얻는 것이 personSchemaRDD의 일부가 아닙니다. 그래서 다른 방법으로 시도하는 방법을 가르쳐

df.write.format("org.apache.spark.sql.cassandra").options(Map("table" -> "words_copy", "keyspace" -> "test")).save() 

하지만 ipand : port.can에서 cassandra에 연결할 수 없게되면 어느 것이 나에게 가장 좋은 방법이라고 말해줍니다. 파일에서 cassandra에 주기적으로 데이터를 저장해야합니다.

답변

4

sqlContext.applySchema(...)DataFrame이고 DataFrame에는 saveToCassandra 메서드가 없습니다.

당신 수 그것과 .write 방법 :

val personDF = sqlContext.applySchema(rowRDD, schema) 
personDF.write.format("org.apache.spark.sql.cassandra").options(Map("table" -> "words_copy", "keyspace" -> "test")).save() 

우리는 savetoCassandra 방법을 사용하려는 경우 가장 좋은 방법은 경우 클래스를 사용하여, 스키마 인식 RDD하는 것입니다.

case class Person(firstname:String, lastName:String, age:Int) 
val rowRDD = person.map(_.split(",")).map(p => Person(p(0),p(1),p(2).toInt) 
rowRDD.saveToCassandra(keyspace, table) 

데이터 프레임 write이 작동해야합니다. 컨텍스트를 올바르게 구성했는지 확인하십시오.

+0

Row()의 요소 중 하나를 'val rowRDD = input.map (_. split (","))으로 변환하려면 어떻게해야합니까? (p => Row (p (0), getTimestamp ()), p (2))) YYYY-MM-DD ''HH : mm : ss format – Anji

+0

@Anji 타임 스탬프를'jodatime.DateTime'의'java.util.Date'에 매핑하는 것이 더 나을 것입니다. 형식 문제를 피할 수 있습니다. – maasg

+0

com.databricks.spark.csv를 사용할 때 "NA"를 내릴 수있는 옵션이 있습니다. 발생 원인 : java.text.ParseException : Unparseable number : "NA"' – Anji

관련 문제