2017-03-20 1 views
0

저는 Kafka 및 SparkStreaming에 매우 익숙합니다. 이 스파크 스트리밍 소비자는 Kafka (키가 문자열이고 값이 문자열 임)에서 데이터를 읽었을 때 데이터에 이스케이프 된 UTF8 문자열이 없으면 정상적으로 작동합니다. 그것은 그렇게되면, 그것은 다음과 같은 오류 메시지와 함께 실패합니다SparkStream의 MalformedInputException 오류 카프카 항목 읽기 앱

Los m\\xC3\\xA1s vendidos en ... 

처음에 내가 의심 : 여기

java.nio.charset.MalformedInputException: Input length = 1 
at java.nio.charset.CoderResult.throwException(CoderResult.java:281) 
at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:339) 
at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178) 
at java.io.InputStreamReader.read(InputStreamReader.java:184) 
at java.io.BufferedReader.fill(BufferedReader.java:161) 
at java.io.BufferedReader.readLine(BufferedReader.java:324) 
at java.io.BufferedReader.readLine(BufferedReader.java:389) 
at scala.io.BufferedSource$BufferedLineIterator.hasNext(BufferedSource.scala:72) 
at org.apache.spark.rdd.PipedRDD$$anon$1.hasNext(PipedRDD.scala:172) 
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) 
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:461) 
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:461) 
at scala.collection.Iterator$class.foreach(Iterator.scala:893) 
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) 
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59) 
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104) 
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48) 
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310) 
at scala.collection.AbstractIterator.to(Iterator.scala:1336) 
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302) 
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336) 
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289) 
at scala.collection.AbstractIterator.toArray(Iterator.scala:1336) 
at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:935) 
at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:935) 
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944) 
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944) 
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) 
at org.apache.spark.scheduler.Task.run(Task.scala:99) 
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) 
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
at java.lang.Thread.run(Thread.java:745) 

탈출 UTF8 문자열 그 라인의 예입니다 (이 두 개의 슬래시가 포함되어 있습니다) 인코딩 구성을 스트리밍하는 Kafka이지만 변경하면 도움이되지 않았습니다. 결국 우리는 파이썬 UDF에서 반환 된 데이터에 대해 collect가 실행될 때 오류가 발생한다는 것을 발견했습니다.

자세한 정보가 필요하면 의견을 말하십시오. 미리 감사드립니다.

답변

0

우리의 JVM 인코딩이 데이터 수집 측 (Python UDF)에서 ISO8859-1로 설정 되었음이 밝혀졌습니다. 그래서 우리는 호출 파이프에 utf-8을 지정하여이를 해결했습니다.

rdd.pipe(command, encoding = Codec.UTF8.name)