.?
어떤 종류의 데이터 추출이 필요한 경우 foreach 메시지는 json으로 변환하고 (json은 매우 간단합니다) jsonRDD 및 JsonRDD를 DF로 변환하는 것은 직접 변환입니다. 따라서 데이터 프레임에 대한 선택이나 다른 작업을 수행 할 수 있습니다.
나는 정확한 솔루션을 제공하기 위해, 당신에게서 몇 가지 더 입력을 필요
1) 당신이 데이터에서합니다.? 2) 데이터에서 데이터 프레임이 충분합니까?
입력으로 설명 할 수 있다면 매우 유용 할 것입니다.
xml 데이터에서 데이터 프레임을 가져 오려면 샘플 코드를 추가했습니다.
val jsonStream = kafkaStream.transform(
y => {
y.filter(x => x._1 != null && x._2 != null).map(x => {
XML.toJSONObject(x).toString(4);
}
)
})
jsonStream.foreachRDD(x => {
val sqlContext = SQLContextSingleton.getInstance(x.sparkContext)
if (x != null) {
val df = sqlContext.read.json(x)
// Your DF Operations
}
}
}
)
object SQLContextSingleton {
@transient private var instance: HiveContext = _
def getInstance(sparkContext: SparkContext): HiveContext = {
if (instance == null) {
sparkContext.hadoopConfiguration.set("parquet.enable.summary-metadata", "false");
sparkContext.hadoopConfiguration.set("spark.sql.parquet.mergeSchema", "true");
sparkContext.hadoopConfiguration.set("spark.sql.parquet.cacheMetadata","false");
instance = new HiveContext(sparkContext)
}
instance
}
}
코드를 추가하고 문제가 무엇이고 어떤 오류나 예외가 발생했는지 구체적으로 설명해 줄 수 있습니까? – maasg
@Harsha는 kafka에서 메시지를 읽는 동안 동일한 문제로 실행 중이며 메시지를 각 태그로 메시지로 받고 있습니다. 어떻게 문제를 해결했는지 알려주십시오. –
@ankush는 해당 스키마 – Harsha