2016-06-27 2 views
0

RDD에서 입력을 받아서 drl 파일에서 읽는 스파이웨어 프로그램을 몇 가지 실행합니다. 내가 개체의 Hz에서 속성이 0 인 곳은 내가 그 작동되지 않는 이유 단서가 없다 1.Spark and Drools integration (drl 파일의 규칙 읽기)

에 의해 카운터 속성을 증가해야한다는 규칙을 만들어 한 DRL 파일에

, 나을 제공합니다 스트림의 모든 데이터에 대해 0을 출력합니다 (예, hz 속성이 0이고 yes 인 데이터가 있습니다. 모든 속성을 인쇄하고 카운터가 0인지도 확인할 수 있음)

KieSessionFactory 여기에있는 자식 프로젝트에서 발견 한 클래스 https://github.com/mganta/sprue/blob/master/src/main/java/com/cloudera/sprue/KieSessionFactory.java

하지만이 부분이 아니라고 확신합니다. 문제는 단지 drl 파일에서 읽고 규칙을 적용한다는 것입니다. 아래

내 스칼라 코드입니다 (나는 문제가있다 생각하는 부분을 표시하지만, 먼저 DRL 파일에서 봐 주시기 바랍니다있다)

package com.streams.Scala_Consumer 

import org.apache.kafka.clients.consumer.ConsumerConfig 
import org.apache.kafka.common.serialization.StringDeserializer 
import org.apache.spark.{ SparkConf, SparkContext } 
import org.apache.spark.SparkContext._ 
import org.apache.spark.streaming._ 
import org.apache.spark.streaming.dstream.{ DStream, InputDStream, ConstantInputDStream } 
import org.apache.spark.streaming.kafka.v09.KafkaUtils 
import org.apache.spark.streaming.{ Seconds, StreamingContext } 
import org.apache.spark.sql.functions.avg 
import org.apache.spark.rdd.RDD 
import org.apache.spark.sql.SQLContext 
import org.apache.spark.streaming.kafka.producer._ 
import org.apache.kafka.common.serialization.{ Deserializer, Serializer } 
import org.apache.kafka.common.serialization.StringSerializer 
import org.kie.api.runtime.StatelessKieSession 
//import KieSessionFactory.getKieSession; 
//import Sensor 

object scala_consumer extends Serializable { 

// schema for sensor data 
class Sensor(resid_1: String, date_1: String, time_1: String, hz_1: Double, disp_1: Double, flo_1: Double, sedPPM_1: Double, psi_1: Double, chlPPM_1: Double, counter_1: Int) extends Serializable 
{ 
var resid = resid_1 
var date = date_1 
var time = time_1 
var hz = hz_1 
var disp = disp_1 
var flo = flo_1 
var sedPPM = sedPPM_1 
var psi = psi_1 
var chlPPM = chlPPM_1 
var counter = counter_1 

def IncrementCounter (param: Int) = 
{ 
    counter = counter + param 
} 
} 

// function to parse line of sensor data into Sensor class 
def parseSensor(str: String): Sensor = { 
    val p = str.split(",") 
    //println("printing p: " + p) 
    new Sensor(p(0), p(1), p(2), p(3).toDouble, p(4).toDouble, p(5).toDouble, p(6).toDouble, p(7).toDouble, p(8).toDouble, 0) 
} 

var counter = 0 
val timeout = 10 // Terminate after N seconds 
val batchSeconds = 2 // Size of batch intervals 

def main(args: Array[String]): Unit = { 

    val brokers = "maprdemo:9092" // not needed for MapR Streams, needed for Kafka 
    val groupId = "testgroup" 
    val offsetReset = "latest" 
    val batchInterval = "2" 
    val pollTimeout = "1000" 
    val topics = "/user/vipulrajan/streaming/original:sensor" 
    val topica = "/user/vipulrajan/streaming/fail:test" 
    val xlsFileName = "./src/main/Rules.drl" 

    val sparkConf = new SparkConf().setAppName("SensorStream").setMaster("local[1]").set("spark.testing.memory", "536870912") 
                    .set("spark.streaming.backpressure.enabled", "true") 
            .set("spark.streaming.receiver.maxRate", Integer.toString(2000000)) 
            .set("spark.streaming.kafka.maxRatePerPartition", Integer.toString(2000000)); 

    val ssc = new StreamingContext(sparkConf, Seconds(batchInterval.toInt)) 

    // Create direct kafka stream with brokers and topics 
    val topicsSet = topics.split(",").toSet 
    val kafkaParams = Map[String, String](
     ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers, 
     ConsumerConfig.GROUP_ID_CONFIG -> groupId, 
     ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> 
      "org.apache.kafka.common.serialization.StringDeserializer", 
     ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> 
      "org.apache.kafka.common.serialization.StringDeserializer", 
     ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> offsetReset, 
     ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> "false", 
     "spark.kafka.poll.time" -> pollTimeout 
    ) 

    val producerConf = new ProducerConf(
     bootstrapServers = brokers.split(",").toList 
    ) 

    val messages = KafkaUtils.createDirectStream[String, String](ssc, kafkaParams, topicsSet) 

    val values: DStream[String] = messages.map(_._2) 
    println("message values received") 
    //values.print(10) 
///////////*************************PART THAT COULD BE CAUSING A PROBLEM**************************///////////// 
    values.foreachRDD(x => try{ 
           print("did 1\n")  //markers for manual and minor debugging 
           val myData = x.mapPartitions(s => {s.map(sens => {parseSensor(sens)})}) 
           //myData.collect().foreach(println) 
           //println(youData.date) 
           print("did 2\n") 
           val evalData = myData.mapPartitions(s => { 
           val ksession = KieSessionFactory.getKieSession(xlsFileName) 
           val retData = s.map(sens => {ksession.execute(sens); sens;}) 
           retData 
           }) 
           evalData.foreach(t => {println(t.counter)}) 
           print("did 3\n") 
           } 

    catch{case e1: ArrayIndexOutOfBoundsException => println("exception in line ")}) 
///////////*************************PART THAT COULD BE CAUSING A PROBLEM**************************///////////// 
    println("filtered alert messages ") 

    // Start the computation 
    ssc.start() 
    // Wait for the computation to terminate 
    ssc.awaitTermination() 

} 
} 

DRL 파일

package droolsexample 

import com.streams.Scala_Consumer.Sensor; 
import scala.com.streams.Scala_Consumer.Sensor; //imported because my rules file lies in the src/main folder 
              //and code lies in src/main/scala 

// declare any global variables here 
dialect "java" 
rule "Counter Incrementer" 

when 
    sens : Sensor (hz == 0) 

then 
    sens.IncrementCounter(1); 
end 

나는 drls 파일 대신 xls 파일을 사용해 보았습니다. java 클래스를 만들고 scala로 객체를 만들려고했습니다. 나는 많은 다른 것들을 시도했지만 결과물에 나타나는 모든 것은 경고이다 :

6/06/27 16 : 38 : 30.462 Executor task launch worker-0 WARN AbstractKieModule : KieBase defaultKieBase에 대한 파일이 없다.

그리고 카운터 값을 인쇄 할 때 모두 0이됩니다. 구조 대원?

+0

간단한 규칙'rule x 인 경우 System.out.println ("Hello"); end'가 해당 파일에 있고 발사하지 않으면 대부분 지식베이스를 올바르게 작성하지 않은 것입니다. 동일한 간단한 이름으로 두 개의 다른 클래스를 가져 오는 것이 좋은 생각이라고 생각합니까? – laune

+0

개별적으로 가져 오기를 시도했습니다. 또한, 나는 작동하지 않았던 인쇄 ("Hello") 만 시도했다. 미안하지만 지식 기반이 무엇인지 전혀 모른다는 단서가 있습니다. Google에 링크해야합니다. 링크 나 리소스가 있으면 여기에 게시 할 수 있다면 정말 감사 할 것입니다. :) –

답변

1

spark 제출을 수행하고 실행을 위해 JAR을 전달할 때 pls는 KIE 등의 다른 종속성 JAR도 동일한 JAR에 포함되어 있는지 확인한 다음 Spark-Submit을 사용하여 실행합니다.

다른 두 개의 별도 프로젝트 당신이 당신의 스파크 프로그램을 ahve 일을하는 것입니다 당신이 두 개의 항아리있을 것이다 당신이 어떤 것을 아래와 같이 실행할 수 있도록 다른 사용자 KIE 프로젝트입니다 : 불꽃 제출 nohup을 --conf

" spark.driver.extraJavaOptions -Dlog4j.configuration = 파일 : /log4j.properties "\ --queue ABC \ --master 실 \ \ 가이 샘-KIE 프로젝트 - 0.0 --jars --deploy 모드 클러스터. 1-SNAPSHOT.jar - 클래스 com.abc.DroolsSparkJob SparkCallingDrools-0.0.1-SNAPSHOT.jar \ -inputfile/user/hive/warehouse/abc/* -output/user/hive/warehouse/drools-Op> app .log &