2016-06-28 3 views
-1

XML 파일이 거의 없습니다 & 카프카 항목에 넣었습니다 & 카프카 항목의 Dstream 개체를 만들었습니다. 주제에서 xml 데이터를 구문 분석하려고하므로 더 이상 진행할 수 없습니다. Spark 스트리밍에서 XML 처리를 담당 한 사람이라면 누구나 진행할 수있는 입력을 제공 할 수 있습니다. 나는 지난 2 일 동안 이것에 붙어있다.Kafka Spark 스트리밍 XML 구문 분석/처리

내가 취하는 접근법은 XML 파일 -> Kafka 주제 -> Spark 스트리밍 처리 -> 다시 카프카에 넣는 것입니다.

나는 카프카의 주제로 다시 데이터를 입력 할 수 있지만, 처리 또는 스파크 스트리밍 화제의 데이터와 아무것도 할 수 없습니다입니다.

당신이 기대하는 어떤 처리
+0

코드를 추가하고 문제가 무엇이고 어떤 오류나 예외가 발생했는지 구체적으로 설명해 줄 수 있습니까? – maasg

+0

@Harsha는 kafka에서 메시지를 읽는 동안 동일한 문제로 실행 중이며 메시지를 각 태그로 메시지로 받고 있습니다. 어떻게 문제를 해결했는지 알려주십시오. –

+0

@ankush는 해당 스키마 – Harsha

답변

0

.?

어떤 종류의 데이터 추출이 필요한 경우 foreach 메시지는 json으로 변환하고 (json은 매우 간단합니다) jsonRDD 및 JsonRDD를 DF로 변환하는 것은 직접 변환입니다. 따라서 데이터 프레임에 대한 선택이나 다른 작업을 수행 할 수 있습니다.

나는 정확한 솔루션을 제공하기 위해, 당신에게서 몇 가지 더 입력을 필요

1) 당신이 데이터에서합니다.? 2) 데이터에서 데이터 프레임이 충분합니까?

입력으로 설명 할 수 있다면 매우 유용 할 것입니다.

xml 데이터에서 데이터 프레임을 가져 오려면 샘플 코드를 추가했습니다.

val jsonStream = kafkaStream.transform(
     y => { 
     y.filter(x => x._1 != null && x._2 != null).map(x => { 
      XML.toJSONObject(x).toString(4); 
     } 
     ) 
     }) 


jsonStream.foreachRDD(x => { 
     val sqlContext = SQLContextSingleton.getInstance(x.sparkContext) 
     if (x != null) { 

     val df = sqlContext.read.json(x) 
     // Your DF Operations 
     } 
     } 
    } 
) 

object SQLContextSingleton { 

    @transient private var instance: HiveContext = _ 

    def getInstance(sparkContext: SparkContext): HiveContext = { 
    if (instance == null) { 
     sparkContext.hadoopConfiguration.set("parquet.enable.summary-metadata", "false"); 
     sparkContext.hadoopConfiguration.set("spark.sql.parquet.mergeSchema", "true"); 
     sparkContext.hadoopConfiguration.set("spark.sql.parquet.cacheMetadata","false"); 
     instance = new HiveContext(sparkContext) 
    } 
    instance 
    } 
} 
+0

안녕 Srini, 빠른 답장을 보내 주셔서 감사합니다 사용하는 XML의 유효성을 검사 JAXB를 사용 레디. 이 문제는 해결되었지만 매우 복잡한 사용 사례였으며 스파크 스트리밍을 사용하는 xml의 3 가지 유형을 연결하고자했습니다. 마침내 끝났다. 우리는 JAXB를 사용하여 각각의 스키마로 XML의 유효성을 검사했습니다. 내가 말했듯이, 유즈 케이스가 꽤 복잡하다고 말했기 때문에 많은 코딩이 있었기 때문에 나에게 도움이되는 어떤 코드도 공유하지 않는다. 다시 한번 감사드립니다. – Harsha

관련 문제