0
저는 Kafka 및 SparkStreaming에 매우 익숙합니다. 이 스파크 스트리밍 소비자는 Kafka (키가 문자열이고 값이 문자열 임)에서 데이터를 읽었을 때 데이터에 이스케이프 된 UTF8 문자열이 없으면 정상적으로 작동합니다. 그것은 그렇게되면, 그것은 다음과 같은 오류 메시지와 함께 실패합니다SparkStream의 MalformedInputException 오류 카프카 항목 읽기 앱
Los m\\xC3\\xA1s vendidos en ...
처음에 내가 의심 : 여기
java.nio.charset.MalformedInputException: Input length = 1
at java.nio.charset.CoderResult.throwException(CoderResult.java:281)
at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:339)
at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
at java.io.InputStreamReader.read(InputStreamReader.java:184)
at java.io.BufferedReader.fill(BufferedReader.java:161)
at java.io.BufferedReader.readLine(BufferedReader.java:324)
at java.io.BufferedReader.readLine(BufferedReader.java:389)
at scala.io.BufferedSource$BufferedLineIterator.hasNext(BufferedSource.scala:72)
at org.apache.spark.rdd.PipedRDD$$anon$1.hasNext(PipedRDD.scala:172)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:461)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:461)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
at scala.collection.AbstractIterator.to(Iterator.scala:1336)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1336)
at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:935)
at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:935)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
탈출 UTF8 문자열 그 라인의 예입니다 (이 두 개의 슬래시가 포함되어 있습니다) 인코딩 구성을 스트리밍하는 Kafka이지만 변경하면 도움이되지 않았습니다. 결국 우리는 파이썬 UDF에서 반환 된 데이터에 대해 collect가 실행될 때 오류가 발생한다는 것을 발견했습니다.
자세한 정보가 필요하면 의견을 말하십시오. 미리 감사드립니다.