0

toDF TempTable에서 일부 분석을 저장하려고하는데 다음 오류가 발생합니다. ": 215 : 오류 : toDF가 Double의 멤버가 아닙니다." 카산드라 테이블의 데이터를 읽고 있는데 계산 중입니다. 이 결과를 임시 테이블에 저장하려고합니다. 저는 스칼라를 처음 접했고, 누군가 제발 저를 도울 수 있습니까? 내 코드스칼라 : 결과를 toDf 임시 테이블에 저장하십시오.

case class Consumo(consumo:Double, consumo_mensal: Double, mes: org.joda.time.DateTime,ano: org.joda.time.DateTime, soma_pf: Double,empo_gasto: Double); 

object Analysegridata{ 

val conf = new SparkConf(true) 

.set("spark.cassandra.connection.host","127.0.0.1").setAppName("LiniarRegression") 
.set("spark.cassandra.connection.port", "9042") 
.set("spark.driver.allowMultipleContexts", "true") 
.set("spark.streaming.receiver.writeAheadLog.enable", "true"); 
val sc = new SparkContext(conf); 

val ssc = new StreamingContext(sc, Seconds(1)) 

val sqlContext = new org.apache.spark.sql.SQLContext(sc); 
val checkpointDirectory = "/var/lib/cassandra/data" 
ssc.checkpoint(checkpointDirectory) // set checkpoint directory 

// val context = StreamingContext.getOrCreate(checkpointDirectory)  
import sqlContext.implicits._ 
JavaSparkContext.fromSparkContext(sc); 

def rddconsumo(rddData: Double): Double = { 

val rddData: Double = { 
    implicit val data = conf 
    val grid = sc.cassandraTable("smartgrids", "analyzer").as((r:Double) => (r)).collect 

def goto(cs: Array[Double]): Double = { 

    var consumo = 0.0; 
    var totaldias = 0; 
    var soma_pf = 0.0; 
    var somamc = 0.0; 
    var tempo_gasto = 0.0; 
    var consumo_mensal = 0.0; 
    var i=0 
for (i <- 0 until grid.length) {  
    val minutos = sc.cassandraTable("smartgrids","analyzer_temp").select("timecol", "MINUTE"); 
    val horas = sc.cassandraTable("smartgrids","analyzer_temp").select("timecol","HOUR_OF_DAY"); 
    val dia = sc.cassandraTable("smartgrids","analyzer_temp").select("timecol", "DAY_OF_MONTH"); 
    val ano = sc.cassandraTable("smartgrids","analyzer_temp").select("timecol", "YEAR"); 
    val mes = sc.cassandraTable("smartgrids","analyzer_temp").select("timecol", "MONTH"); 
    val potencia = sc.cassandraTable("smartgrids","analyzer_temp").select("n_pf1", "n_pf2", "n_pf3") 

def convert_minutos (minuto : Int) : Double ={ 
    minuto/60 
} 
    dia.foreach (i => { 

    def adSum(potencia: Array[Double]) = { 

    var i=0; 
     while (i < potencia.length) { 
     soma_pf += potencia(i); 
     i += 1; 
     soma_pf; 
     println("Potemcia =" + soma_pf) 
    } 
} 
    def tempo(minutos: Array[Int]) = { 
     var i=0; 
     while (i < minutos.length) { 
     somamc += convert_minutos(minutos(i)) 
     i += 1; 
     somamc 
    } 
    } 

    def tempogasto(horas: Array[Int]) = { 
     var i=0; 
     while (i < horas.length) { 
     tempo_gasto = horas(i) + somamc; 
     i += 1; 
     tempo_gasto; 
     println("Temo que o aparelho esteve ligado =" + tempo_gasto) 
    } 
} 

def consumof(dia: Array[Int]) = { 
    var i=0; 
    while (i < dia.length) { 
     consumo = soma_pf * tempo_gasto; 
     i += 1; 
     consumo; 
     println("Consumo diario =" + consumo)  
    } 
    } 
}) 

mes.foreach (i => { 

def totaltempo(dia: Array[Int]) = { 
    var i = 0; 
    while(i < dia.length){ 
     totaldias += dia(i); 
     i += 1; 
     totaldias; 
     println("Numero total de dias =" + totaldias) 
    } 
} 
def consumomensal(mes: Array[Int]) = { 
    var i = 0; 
    while(i < mes.length){ 
     consumo_mensal = consumo * totaldias; 
     i += 1; 
    consumo_mensal; 
    println("Consumo Mensal =" + consumo_mensal); 
    } 
} 
}) 

} 
    consumo; 
    totaldias; 
    consumo_mensal; 
    soma_pf; 
    tempo_gasto; 
    somamc 

} 

rddData 

    } 
    rddData.toDF().registerTempTable("rddData") 
} 
     ssc.start() 
     ssc.awaitTermination() 




error: value toDF is not a member of Double" 
+0

전체 오류 스택을 게시하십시오! 그게 도움이 될거야 –

+0

: 260 : 오류 : 값 toDF는 Double rddData.toDF()의 멤버가 아닙니다. registerTempTable ("rddData") –

답변

0

그것은 당신이 (하는 최소한의 예를 제공하려고 너무 많은 코드) 정확하게 일을하려고하는지 다소 불분명하지만, 몇 가지 분명 문제가 있습니다

  1. rddData 유형이 Double 인 경우 : RDD[Double] (Double 값의 분산 컬렉션)이어야합니다. 하나의 테이블로 하나의 Double 값을 저장하려고 시도하는 것은별로 의미가 없으며 실제로 작동하지 않습니다. 은 RDD에서 호출 할 수 있으며 어떤 형식이든 Double이 아니라 컴파일러에서 경고합니다.
  2. 당신은 데이터collect() : 당신은 RDD를로드 할 경우, 일부 조작을 사용하여 변환 한 다음 테이블로 저장 - collect()은 아마 RDD에 호출 할 수 없습니다. collect()은 모든 데이터 (클러스터를 통해 배포 됨)를 단일 "드라이버"머신 (이 코드를 실행하는 머신)에 보냅니다. 그 후에는 클러스터를 이용하지 않고 다시 RDD 데이터 구조를 사용하지 않으므로 ' toDF을 사용하여 데이터를 DataFrame으로 변환하십시오.
관련 문제