RDD에서 입력을 받아서 drl 파일에서 읽는 스파이웨어 프로그램을 몇 가지 실행합니다. 내가 개체의 Hz에서 속성이 0 인 곳은 내가 그 작동되지 않는 이유 단서가 없다 1.Spark and Drools integration (drl 파일의 규칙 읽기)
에 의해 카운터 속성을 증가해야한다는 규칙을 만들어 한 DRL 파일에
, 나을 제공합니다 스트림의 모든 데이터에 대해 0을 출력합니다 (예, hz 속성이 0이고 yes 인 데이터가 있습니다. 모든 속성을 인쇄하고 카운터가 0인지도 확인할 수 있음)KieSessionFactory 여기에있는 자식 프로젝트에서 발견 한 클래스 https://github.com/mganta/sprue/blob/master/src/main/java/com/cloudera/sprue/KieSessionFactory.java
하지만이 부분이 아니라고 확신합니다. 문제는 단지 drl 파일에서 읽고 규칙을 적용한다는 것입니다. 아래
내 스칼라 코드입니다 (나는 문제가있다 생각하는 부분을 표시하지만, 먼저 DRL 파일에서 봐 주시기 바랍니다있다)package com.streams.Scala_Consumer
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.{ SparkConf, SparkContext }
import org.apache.spark.SparkContext._
import org.apache.spark.streaming._
import org.apache.spark.streaming.dstream.{ DStream, InputDStream, ConstantInputDStream }
import org.apache.spark.streaming.kafka.v09.KafkaUtils
import org.apache.spark.streaming.{ Seconds, StreamingContext }
import org.apache.spark.sql.functions.avg
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext
import org.apache.spark.streaming.kafka.producer._
import org.apache.kafka.common.serialization.{ Deserializer, Serializer }
import org.apache.kafka.common.serialization.StringSerializer
import org.kie.api.runtime.StatelessKieSession
//import KieSessionFactory.getKieSession;
//import Sensor
object scala_consumer extends Serializable {
// schema for sensor data
class Sensor(resid_1: String, date_1: String, time_1: String, hz_1: Double, disp_1: Double, flo_1: Double, sedPPM_1: Double, psi_1: Double, chlPPM_1: Double, counter_1: Int) extends Serializable
{
var resid = resid_1
var date = date_1
var time = time_1
var hz = hz_1
var disp = disp_1
var flo = flo_1
var sedPPM = sedPPM_1
var psi = psi_1
var chlPPM = chlPPM_1
var counter = counter_1
def IncrementCounter (param: Int) =
{
counter = counter + param
}
}
// function to parse line of sensor data into Sensor class
def parseSensor(str: String): Sensor = {
val p = str.split(",")
//println("printing p: " + p)
new Sensor(p(0), p(1), p(2), p(3).toDouble, p(4).toDouble, p(5).toDouble, p(6).toDouble, p(7).toDouble, p(8).toDouble, 0)
}
var counter = 0
val timeout = 10 // Terminate after N seconds
val batchSeconds = 2 // Size of batch intervals
def main(args: Array[String]): Unit = {
val brokers = "maprdemo:9092" // not needed for MapR Streams, needed for Kafka
val groupId = "testgroup"
val offsetReset = "latest"
val batchInterval = "2"
val pollTimeout = "1000"
val topics = "/user/vipulrajan/streaming/original:sensor"
val topica = "/user/vipulrajan/streaming/fail:test"
val xlsFileName = "./src/main/Rules.drl"
val sparkConf = new SparkConf().setAppName("SensorStream").setMaster("local[1]").set("spark.testing.memory", "536870912")
.set("spark.streaming.backpressure.enabled", "true")
.set("spark.streaming.receiver.maxRate", Integer.toString(2000000))
.set("spark.streaming.kafka.maxRatePerPartition", Integer.toString(2000000));
val ssc = new StreamingContext(sparkConf, Seconds(batchInterval.toInt))
// Create direct kafka stream with brokers and topics
val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, String](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
ConsumerConfig.GROUP_ID_CONFIG -> groupId,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG ->
"org.apache.kafka.common.serialization.StringDeserializer",
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG ->
"org.apache.kafka.common.serialization.StringDeserializer",
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> offsetReset,
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> "false",
"spark.kafka.poll.time" -> pollTimeout
)
val producerConf = new ProducerConf(
bootstrapServers = brokers.split(",").toList
)
val messages = KafkaUtils.createDirectStream[String, String](ssc, kafkaParams, topicsSet)
val values: DStream[String] = messages.map(_._2)
println("message values received")
//values.print(10)
///////////*************************PART THAT COULD BE CAUSING A PROBLEM**************************/////////////
values.foreachRDD(x => try{
print("did 1\n") //markers for manual and minor debugging
val myData = x.mapPartitions(s => {s.map(sens => {parseSensor(sens)})})
//myData.collect().foreach(println)
//println(youData.date)
print("did 2\n")
val evalData = myData.mapPartitions(s => {
val ksession = KieSessionFactory.getKieSession(xlsFileName)
val retData = s.map(sens => {ksession.execute(sens); sens;})
retData
})
evalData.foreach(t => {println(t.counter)})
print("did 3\n")
}
catch{case e1: ArrayIndexOutOfBoundsException => println("exception in line ")})
///////////*************************PART THAT COULD BE CAUSING A PROBLEM**************************/////////////
println("filtered alert messages ")
// Start the computation
ssc.start()
// Wait for the computation to terminate
ssc.awaitTermination()
}
}
DRL 파일
package droolsexample
import com.streams.Scala_Consumer.Sensor;
import scala.com.streams.Scala_Consumer.Sensor; //imported because my rules file lies in the src/main folder
//and code lies in src/main/scala
// declare any global variables here
dialect "java"
rule "Counter Incrementer"
when
sens : Sensor (hz == 0)
then
sens.IncrementCounter(1);
end
나는 drls 파일 대신 xls 파일을 사용해 보았습니다. java 클래스를 만들고 scala로 객체를 만들려고했습니다. 나는 많은 다른 것들을 시도했지만 결과물에 나타나는 모든 것은 경고이다 :
6/06/27 16 : 38 : 30.462 Executor task launch worker-0 WARN AbstractKieModule : KieBase defaultKieBase에 대한 파일이 없다.
그리고 카운터 값을 인쇄 할 때 모두 0이됩니다. 구조 대원?
간단한 규칙'rule x 인 경우 System.out.println ("Hello"); end'가 해당 파일에 있고 발사하지 않으면 대부분 지식베이스를 올바르게 작성하지 않은 것입니다. 동일한 간단한 이름으로 두 개의 다른 클래스를 가져 오는 것이 좋은 생각이라고 생각합니까? – laune
개별적으로 가져 오기를 시도했습니다. 또한, 나는 작동하지 않았던 인쇄 ("Hello") 만 시도했다. 미안하지만 지식 기반이 무엇인지 전혀 모른다는 단서가 있습니다. Google에 링크해야합니다. 링크 나 리소스가 있으면 여기에 게시 할 수 있다면 정말 감사 할 것입니다. :) –