2
RDD에서 Apache Spark에 neo4j-spark-connector를 사용하여 다음 쿼리에 대한 결과를 얻는 방법은 무엇입니까?RDD에서 neo4j-spark-connector를 사용하여 특정 노드를 찾고 계산하지 않고
MATCH (n)-[r]-()
WITH n AS Nodes, COUNT(Distinct r) as Degree
RETURN Degree, count(Nodes)
ORDER BY Degree ASC
github의 예는 우리가 노드에 대해 계산 반환하는 방법을 보여줍니다. 이 불꽃에 .collect()
를 사용하여 체크
import org.neo4j.spark._
Neo4jRowRDD(sc,"MATCH (n) where id(n) < {maxId} return
id(n)",Seq(("maxId",100000))).count
우리는 RDD &에 결과를로드 할 수 없습니다? 나는 다음과 같은 오류 얻을 동일 할 때 :
scala> xyz.take(2)
16/09/19 15:04:46 ERROR Executor: Exception in task 0.0 in stage 4.0 (TID 4)
java.io.NotSerializableException: org.neo4j.driver.internal.InternalNode
Serialization stack:
- object not serializable (class: org.neo4j.driver.internal.InternalNode, value: node<10516047>)
- element of array (index: 0)
- array (class [Ljava.lang.Object;, size 1)
- field (class: org.apache.spark.sql.catalyst.expressions.GenericRow, name: values, type: class [Ljava.lang.Object;)
- object (class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema, [node<10516047>])
- element of array (index: 0)
- array (class [Lorg.apache.spark.sql.Row;, size 1)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:313)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
16/09/19 15:04:46 ERROR TaskSetManager: Task 0.0 in stage 4.0 (TID 4) had a not serializable result: org.neo4j.driver.internal.InternalNode
Serialization stack:
- object not serializable (class: org.neo4j.driver.internal.InternalNode, value: node<10516047>)
- element of array (index: 0)
- array (class [Ljava.lang.Object;, size 1)
- field (class: org.apache.spark.sql.catalyst.expressions.GenericRow, name: values, type: class [Ljava.lang.Object;)
- object (class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema, [node<10516047>])
- element of array (index: 0)
- array (class [Lorg.apache.spark.sql.Row;, size 1); not retrying
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0 in stage 4.0 (TID 4) had a not serializable result: org.neo4j.driver.internal.InternalNode
Serialization stack:
- object not serializable (class: org.neo4j.driver.internal.InternalNode, value: node<10516047>)
- element of array (index: 0)
- array (class [Ljava.lang.Object;, size 1)
- field (class: org.apache.spark.sql.catalyst.expressions.GenericRow, name: values, type: class [Ljava.lang.Object;)
- object (class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema, [node<10516047>])
- element of array (index: 0)
- array (class [Lorg.apache.spark.sql.Row;, size 1)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1450)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1438)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1437)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1437)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1659)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1618)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1607)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1871)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1884)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1897)
at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1305)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
at org.apache.spark.rdd.RDD.take(RDD.scala:1279)
... 50 elided
나는 OP가 그것보다 조금 더 많은 안내가 필요할 것이라고 생각한다. – zero323
내 편집을 확인해 주시겠습니까! .collect()를 사용하려고하면 편집에서 언급 한 오류가 발생합니다. – kaxil
@kaxil이 문제에 대한 업데이트가 있습니까? –