Google Cloud Dataproc에서 실행되는 간단한 바닐라 협업 필터링 애플리케이션을 작성하려고합니다. 데이터는 BigQuery에 있습니다. 이 자습서에 따라이 코드를 구현했습니다. https://cloud.google.com/dataproc/docs/tutorials/bigquery-sparkmlGoogle Cloud Dataproc의 IllegalStateException
이제이 문제 (약간 수정 됨)를 실행할 때 IllegalStateException이 발생합니다. 더 구체적으로 여기에 스택 트레이스입니다 :
17/09/25 10:55:37 ERROR org.apache.spark.scheduler.TaskSetManager: Task 0 in stage 0.0 failed 4 times; aborting job
Traceback (most recent call last):
File "/tmp/af84ad68-0259-4ca1-b464-a118a96f0742/marketing-pages-collaborative-filtering.py", line 109, in <module>
compute_recommendations()
File "/tmp/af84ad68-0259-4ca1-b464-a118a96f0742/marketing-pages-collaborative-filtering.py", line 59, in compute_recommendations
conf=conf)
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/context.py", line 646, in newAPIHadoopRDD
File "/usr/lib/spark/python/lib/py4j-0.10.3-src.zip/py4j/java_gateway.py", line 1133, in __call__
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
File "/usr/lib/spark/python/lib/py4j-0.10.3-src.zip/py4j/protocol.py", line 319, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.newAPIHadoopRDD.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, marketing-pages-collaborative-filtering-w-1.c.dg-dev-personalization.internal): java.lang.IllegalStateException: Found known file 'data-000000000002.json' with index 2, which isn't less than or equal to than endFileNumber 1!
at com.google.cloud.hadoop.repackaged.com.google.common.base.Preconditions.checkState(Preconditions.java:197)
at com.google.cloud.hadoop.io.bigquery.DynamicFileListRecordReader.setEndFileMarkerFile(DynamicFileListRecordReader.java:327)
at com.google.cloud.hadoop.io.bigquery.DynamicFileListRecordReader.nextKeyValue(DynamicFileListRecordReader.java:177)
at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:182)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:389)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
at scala.collection.AbstractIterator.to(Iterator.scala:1336)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1336)
at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1324)
at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1324)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1899)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1899)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1454)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1442)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1441)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1441)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1667)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1622)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1611)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1873)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1886)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1899)
at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1324)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
at org.apache.spark.rdd.RDD.take(RDD.scala:1298)
at org.apache.spark.api.python.SerDeUtil$.pairRDDToPython(SerDeUtil.scala:203)
at org.apache.spark.api.python.PythonRDD$.newAPIHadoopRDD(PythonRDD.scala:582)
at org.apache.spark.api.python.PythonRDD.newAPIHadoopRDD(PythonRDD.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalStateException: Found known file 'data-000000000002.json' with index 2, which isn't less than or equal to than endFileNumber 1!
at com.google.cloud.hadoop.repackaged.com.google.common.base.Preconditions.checkState(Preconditions.java:197)
at com.google.cloud.hadoop.io.bigquery.DynamicFileListRecordReader.setEndFileMarkerFile(DynamicFileListRecordReader.java:327)
at com.google.cloud.hadoop.io.bigquery.DynamicFileListRecordReader.nextKeyValue(DynamicFileListRecordReader.java:177)
at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:182)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:389)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
at scala.collection.AbstractIterator.to(Iterator.scala:1336)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1336)
at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1324)
at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1324)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1899)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1899)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
... 1 more
17/09/25 10:55:37 INFO org.spark_project.jetty.server.ServerConnector: Stopped [email protected]{HTTP/1.1}{0.0.0.0:4040}
ERROR: (gcloud.dataproc.jobs.submit.pyspark) Job [af84ad68-0259-4ca1-b464-a118a96f0742] entered state [ERROR] while waiting for [DONE].
내가 문제를 식별 한 생각하지만, 나는 문제의 원인을 찾을 수 없습니다. 관련 코드 스 니펫은 다음과 같습니다.
table_rdd = spark.sparkContext.newAPIHadoopRDD(
"com.google.cloud.hadoop.io.bigquery.JsonTextBigQueryInputFormat",
"org.apache.hadoop.io.LongWritable",
"com.google.gson.JsonObject",
conf=conf)
table_json = table_rdd.map(lambda x: x[1])
visit_data = sparkSession.read.json(table_json)
먼저 Google의 튜토리얼에 따라 RDD를 만듭니다. 다음 단계는 RDD에서 JSON 요소를 추출한 다음이를 테이블로 읽어 들여 쿼리 할 수 있습니다. stacktrace는 conf를 할당 할 때 예외가 발생하지만 코드가 sparkSession.read.json(table_json)
으로 호출 될 때까지 코드가 작동 함을 보여줍니다. 지연이 느리게 작동한다는 것을 이해하고 BigQuery에서 내 보낸 실제 JSON 파일에 액세스하려고하기 때문입니다.
이제는 Spark가 있어야하는 것보다 많은 JSON 파일을 찾습니다. BigQuery Hadoop 라이브러리의 코드에서이 comment에 따르면 모든 것이 하나의 샤드에 맞아도 최소값은 2이므로 BigQuery가 내보내기를 이와 같이 인식합니다. 또한 말하자면 끝 마커 파일을 생성합니다.이 파일은 비어있는 JSON 파일입니다.
그러나 코드를 실행하면 BigQuery에서 생성 한 내보내기에 필요한 파일 2 개 (데이터가 포함 된 파일 1 개와 끝 마커로 1 개)가 있습니다. BigQuery에서 1 ~ 2 행만 포함하는 최대 5 개의 JSON 파일을 생성합니다.
나는 이것이 문제라고 확신하며, 수출이 어떻게 든 잘못되었다고 확신한다. 하지만 왜 이런 일이 일어나고 어떻게 수정해야하는지 알 수 없습니다. 어떤 도움을 주셔서 감사합니다.
업데이트 :
나는 다른 것을 시도했다. BigQuery에서 표를 삭제하고 처음부터 다시 채 웠습니다. 이로 인해 수출 문제가 해결되었습니다. 현재 두 개의 파일 만 있습니다. 그러나 문제는 여전히 지속된다고 생각합니다. 클라우드 기능을 통해 행을 추가하고 (응용 프로그램에서 발생) 행태를 업데이트합니다.
UPDATE 2 :
그래서 하루를 기다리고 클라우드 기능을 사용하여 스트리밍 삽입을 통해 일부 행을 추가 한 후, 문제가 다시 발생합니다. 어떻게 든 수출은 하루 단위로 분할됩니다. 매일 매일 자체 파편을 얻는다면 그건 문제가되지 않지만, 불행하게도 그렇게되지는 않습니다.
공유 할 수있는 BigQuery 작업 ID가 있습니까? 또한 [email protected]에서 직접 Google 팀에 연락하여 프로젝트 ID를 공유 할 수도 있습니다. "end-marker"길이가 0 인 파일 뒤에 번호가 매겨진 파일이 없어야한다는 귀하의 평가가 정확합니다. BigQuery "스트리밍 삽입"을 사용하여 행을 추가 하시겠습니까? 아니면 중량이 큰 "로드"작업을 추가하고 있습니까? –
@DennisHuo 답변 해 주셔서 감사합니다. 이미 Google 팀에 연락했지만 처리하는 데 많은 시간이 걸립니다. 삽입하려면 클라우드 기능보다 스트리밍 삽입을 사용합니다. 어떻게 든 추가 파일은 하루 동안 기다렸다가 추가 행을 추가하는 경우에만 나타납니다. –
ML 시스템을 빌드하기 위해 BQ에 대한 쿼리를 실행하는 것이 좋은 방법인지 확실하지 않습니다. 여기 [이 프로젝트] (https://github.com/WillianFuks/PySpark-RecSys)에서 내가 BQ에서 쿼리를 실행하고 GCS로 결과를 내보내는 일부 수출 업체가 있다는 것을 알게되었습니다. 그런 다음이를 스파크에서 읽습니다. 전혀 문제가 없었으며 매우 빠르게 작동합니다. 또한 쿼리를 항상 실행하여 돈을내는 것을 피했습니다.이 방법은 단 한번만 실행됩니다. 우연히도 권장 시스템을 구현하지만 DIMSUM 알고리즘을 사용합니다. –