2017-09-25 4 views
1

Google Cloud Dataproc에서 실행되는 간단한 바닐라 협업 필터링 애플리케이션을 작성하려고합니다. 데이터는 BigQuery에 있습니다. 이 자습서에 따라이 코드를 구현했습니다. https://cloud.google.com/dataproc/docs/tutorials/bigquery-sparkmlGoogle Cloud Dataproc의 IllegalStateException

이제이 문제 (약간 수정 됨)를 실행할 때 IllegalStateException이 발생합니다. 더 구체적으로 여기에 스택 트레이스입니다 :

17/09/25 10:55:37 ERROR org.apache.spark.scheduler.TaskSetManager: Task 0 in stage 0.0 failed 4 times; aborting job 
Traceback (most recent call last): 
File "/tmp/af84ad68-0259-4ca1-b464-a118a96f0742/marketing-pages-collaborative-filtering.py", line 109, in <module> 
compute_recommendations() 
File "/tmp/af84ad68-0259-4ca1-b464-a118a96f0742/marketing-pages-collaborative-filtering.py", line 59, in compute_recommendations 
conf=conf) 
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/context.py", line 646, in newAPIHadoopRDD 
File "/usr/lib/spark/python/lib/py4j-0.10.3-src.zip/py4j/java_gateway.py", line 1133, in __call__ 
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco 
File "/usr/lib/spark/python/lib/py4j-0.10.3-src.zip/py4j/protocol.py", line 319, in get_return_value 
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.newAPIHadoopRDD. 
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, marketing-pages-collaborative-filtering-w-1.c.dg-dev-personalization.internal): java.lang.IllegalStateException: Found known file 'data-000000000002.json' with index 2, which isn't less than or equal to than endFileNumber 1! 
    at com.google.cloud.hadoop.repackaged.com.google.common.base.Preconditions.checkState(Preconditions.java:197) 
    at com.google.cloud.hadoop.io.bigquery.DynamicFileListRecordReader.setEndFileMarkerFile(DynamicFileListRecordReader.java:327) 
    at com.google.cloud.hadoop.io.bigquery.DynamicFileListRecordReader.nextKeyValue(DynamicFileListRecordReader.java:177) 
    at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:182) 
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) 
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) 
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:389) 
    at scala.collection.Iterator$class.foreach(Iterator.scala:893) 
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) 
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59) 
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104) 
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48) 
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310) 
    at scala.collection.AbstractIterator.to(Iterator.scala:1336) 
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302) 
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336) 
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289) 
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1336) 
    at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1324) 
    at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1324) 
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1899) 
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1899) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) 
    at org.apache.spark.scheduler.Task.run(Task.scala:86) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:748) 

Driver stacktrace: 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1454) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1442) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1441) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) 
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1441) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811) 
    at scala.Option.foreach(Option.scala:257) 
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1667) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1622) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1611) 
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1873) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1886) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1899) 
    at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1324) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) 
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:358) 
    at org.apache.spark.rdd.RDD.take(RDD.scala:1298) 
    at org.apache.spark.api.python.SerDeUtil$.pairRDDToPython(SerDeUtil.scala:203) 
    at org.apache.spark.api.python.PythonRDD$.newAPIHadoopRDD(PythonRDD.scala:582) 
    at org.apache.spark.api.python.PythonRDD.newAPIHadoopRDD(PythonRDD.scala) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:498) 
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237) 
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) 
    at py4j.Gateway.invoke(Gateway.java:280) 
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) 
    at py4j.commands.CallCommand.execute(CallCommand.java:79) 
    at py4j.GatewayConnection.run(GatewayConnection.java:214) 
    at java.lang.Thread.run(Thread.java:748) 
Caused by: java.lang.IllegalStateException: Found known file 'data-000000000002.json' with index 2, which isn't less than or equal to than endFileNumber 1! 
    at com.google.cloud.hadoop.repackaged.com.google.common.base.Preconditions.checkState(Preconditions.java:197) 
    at com.google.cloud.hadoop.io.bigquery.DynamicFileListRecordReader.setEndFileMarkerFile(DynamicFileListRecordReader.java:327) 
    at com.google.cloud.hadoop.io.bigquery.DynamicFileListRecordReader.nextKeyValue(DynamicFileListRecordReader.java:177) 
    at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:182) 
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) 
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) 
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:389) 
    at scala.collection.Iterator$class.foreach(Iterator.scala:893) 
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) 
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59) 
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104) 
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48) 
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310) 
    at scala.collection.AbstractIterator.to(Iterator.scala:1336) 
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302) 
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336) 
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289) 
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1336) 
    at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1324) 
    at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1324) 
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1899) 
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1899) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) 
    at org.apache.spark.scheduler.Task.run(Task.scala:86) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    ... 1 more 

17/09/25 10:55:37 INFO org.spark_project.jetty.server.ServerConnector: Stopped [email protected]{HTTP/1.1}{0.0.0.0:4040} 
ERROR: (gcloud.dataproc.jobs.submit.pyspark) Job [af84ad68-0259-4ca1-b464-a118a96f0742] entered state [ERROR] while waiting for [DONE]. 

내가 문제를 식별 한 생각하지만, 나는 문제의 원인을 찾을 수 없습니다. 관련 코드 스 니펫은 다음과 같습니다.

table_rdd = spark.sparkContext.newAPIHadoopRDD(
    "com.google.cloud.hadoop.io.bigquery.JsonTextBigQueryInputFormat", 
    "org.apache.hadoop.io.LongWritable", 
    "com.google.gson.JsonObject", 
    conf=conf) 

table_json = table_rdd.map(lambda x: x[1]) 
visit_data = sparkSession.read.json(table_json) 

먼저 Google의 튜토리얼에 따라 RDD를 만듭니다. 다음 단계는 RDD에서 JSON 요소를 추출한 다음이를 테이블로 읽어 들여 쿼리 할 수 ​​있습니다. stacktrace는 conf를 할당 할 때 예외가 발생하지만 코드가 sparkSession.read.json(table_json)으로 호출 될 때까지 코드가 작동 함을 보여줍니다. 지연이 느리게 작동한다는 것을 이해하고 BigQuery에서 내 보낸 실제 JSON 파일에 액세스하려고하기 때문입니다.

이제는 Spark가 있어야하는 것보다 많은 JSON 파일을 찾습니다. BigQuery Hadoop 라이브러리의 코드에서이 comment에 따르면 모든 것이 하나의 샤드에 맞아도 최소값은 2이므로 BigQuery가 내보내기를 이와 같이 인식합니다. 또한 말하자면 끝 마커 파일을 생성합니다.이 파일은 비어있는 JSON 파일입니다.

그러나 코드를 실행하면 BigQuery에서 생성 한 내보내기에 필요한 파일 2 개 (데이터가 포함 된 파일 1 개와 끝 마커로 1 개)가 있습니다. BigQuery에서 1 ~ 2 행만 포함하는 최대 5 개의 JSON 파일을 생성합니다.

나는 이것이 문제라고 확신하며, 수출이 어떻게 든 잘못되었다고 확신한다. 하지만 왜 이런 일이 일어나고 어떻게 수정해야하는지 알 수 없습니다. 어떤 도움을 주셔서 감사합니다.

업데이트 :

나는 다른 것을 시도했다. BigQuery에서 표를 삭제하고 처음부터 다시 채 웠습니다. 이로 인해 수출 문제가 해결되었습니다. 현재 두 개의 파일 만 있습니다. 그러나 문제는 여전히 지속된다고 생각합니다. 클라우드 기능을 통해 행을 추가하고 (응용 프로그램에서 발생) 행태를 업데이트합니다.

UPDATE 2 :

그래서 하루를 기다리고 클라우드 기능을 사용하여 스트리밍 삽입을 통해 일부 행을 추가 한 후, 문제가 다시 발생합니다. 어떻게 든 수출은 하루 단위로 분할됩니다. 매일 매일 자체 파편을 얻는다면 그건 문제가되지 않지만, 불행하게도 그렇게되지는 않습니다.

+0

공유 할 수있는 BigQuery 작업 ID가 있습니까? 또한 [email protected]에서 직접 Google 팀에 연락하여 프로젝트 ID를 공유 할 수도 있습니다. "end-marker"길이가 0 인 파일 뒤에 번호가 매겨진 파일이 없어야한다는 귀하의 평가가 정확합니다. BigQuery "스트리밍 삽입"을 사용하여 행을 추가 하시겠습니까? 아니면 중량이 큰 "로드"작업을 추가하고 있습니까? –

+0

@DennisHuo 답변 해 주셔서 감사합니다. 이미 Google 팀에 연락했지만 처리하는 데 많은 시간이 걸립니다. 삽입하려면 클라우드 기능보다 스트리밍 삽입을 사용합니다. 어떻게 든 추가 파일은 하루 동안 기다렸다가 추가 행을 추가하는 경우에만 나타납니다. –

+0

ML 시스템을 빌드하기 위해 BQ에 대한 쿼리를 실행하는 것이 좋은 방법인지 확실하지 않습니다. 여기 [이 프로젝트] (https://github.com/WillianFuks/PySpark-RecSys)에서 내가 BQ에서 쿼리를 실행하고 GCS로 결과를 내보내는 일부 수출 업체가 있다는 것을 알게되었습니다. 그런 다음이를 스파크에서 읽습니다. 전혀 문제가 없었으며 매우 빠르게 작동합니다. 또한 쿼리를 항상 실행하여 돈을내는 것을 피했습니다.이 방법은 단 한번만 실행됩니다. 우연히도 권장 시스템을 구현하지만 DIMSUM 알고리즘을 사용합니다. –

답변

2

이것은 BigQuery의 버그 (제로 레코드 파일을 포함하지 않는 출력 파일 수 통계를 반환 함)입니다. 이 문제에 대한 수정이 제출되었으며 약 1 주 후에 롤아웃이 완료됩니다.

그 동안 DataProc 작업을 구성 할 때 hadoop 구성에서 플래그 "mapred.bq.input.sharded.export.enable" (a.k.a. ENABLE_SHARDED_EXPORT_KEY)을 false로 설정할 수 있습니다.

업데이트 :
오늘 2017년 10월 6일로, 수정은 100 %의 BigQuery에 출시 지금이다.

+0

어딘가에 대한 티켓이나 버그 리포트가 있습니까? DataProc에서 PySpark와 함께 BigQuery 커넥터를 사용하여 테이블에서 데이터를 읽으려고 할 때이 문제가 발생합니다. – zo7

+0

공개 버그 보고서가 아직 제출되지 않았지만이 오류가 계속 표시되는 경우 테이블이 특정 상태가되면 동일한 오류가 발생할 수 있습니다. 이 버그를 추적하는 내부 버그가 있지만 유용하다고 생각되면 BigQuery에 버그 보고서를 제출하십시오. 그 동안 답변에서 제안 된 방법을 사용하여 차단을 해제 할 수 있습니다. 감사! –

관련 문제