2016-09-15 2 views
2

RDD에서 Apache Spark에 neo4j-spark-connector를 사용하여 다음 쿼리에 대한 결과를 얻는 방법은 무엇입니까?RDD에서 neo4j-spark-connector를 사용하여 특정 노드를 찾고 계산하지 않고

MATCH (n)-[r]-() 
WITH n AS Nodes, COUNT(Distinct r) as Degree 
RETURN Degree, count(Nodes) 
ORDER BY Degree ASC 

github의 예는 우리가 노드에 대해 계산 반환하는 방법을 보여줍니다. 이 불꽃에 .collect()를 사용하여 체크

import org.neo4j.spark._ 
Neo4jRowRDD(sc,"MATCH (n) where id(n) < {maxId} return 
id(n)",Seq(("maxId",100000))).count 

우리는 RDD &에 결과를로드 할 수 없습니다? 나는 다음과 같은 오류 얻을 동일 할 때 :

scala> xyz.take(2) 
16/09/19 15:04:46 ERROR Executor: Exception in task 0.0 in stage 4.0 (TID 4) 
java.io.NotSerializableException: org.neo4j.driver.internal.InternalNode 
Serialization stack: 
    - object not serializable (class: org.neo4j.driver.internal.InternalNode, value: node<10516047>) 
    - element of array (index: 0) 
    - array (class [Ljava.lang.Object;, size 1) 
    - field (class: org.apache.spark.sql.catalyst.expressions.GenericRow, name: values, type: class [Ljava.lang.Object;) 
    - object (class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema, [node<10516047>]) 
    - element of array (index: 0) 
    - array (class [Lorg.apache.spark.sql.Row;, size 1) 
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) 
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46) 
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:313) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 
16/09/19 15:04:46 ERROR TaskSetManager: Task 0.0 in stage 4.0 (TID 4) had a not serializable result: org.neo4j.driver.internal.InternalNode 
Serialization stack: 
    - object not serializable (class: org.neo4j.driver.internal.InternalNode, value: node<10516047>) 
    - element of array (index: 0) 
    - array (class [Ljava.lang.Object;, size 1) 
    - field (class: org.apache.spark.sql.catalyst.expressions.GenericRow, name: values, type: class [Ljava.lang.Object;) 
    - object (class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema, [node<10516047>]) 
    - element of array (index: 0) 
    - array (class [Lorg.apache.spark.sql.Row;, size 1); not retrying 
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0 in stage 4.0 (TID 4) had a not serializable result: org.neo4j.driver.internal.InternalNode 
Serialization stack: 
    - object not serializable (class: org.neo4j.driver.internal.InternalNode, value: node<10516047>) 
    - element of array (index: 0) 
    - array (class [Ljava.lang.Object;, size 1) 
    - field (class: org.apache.spark.sql.catalyst.expressions.GenericRow, name: values, type: class [Ljava.lang.Object;) 
    - object (class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema, [node<10516047>]) 
    - element of array (index: 0) 
    - array (class [Lorg.apache.spark.sql.Row;, size 1) 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1450) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1438) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1437) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) 
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1437) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811) 
    at scala.Option.foreach(Option.scala:257) 
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1659) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1618) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1607) 
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1871) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1884) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1897) 
    at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1305) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) 
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:358) 
    at org.apache.spark.rdd.RDD.take(RDD.scala:1279) 
    ... 50 elided 

답변

1

물론 당신은 무엇이든 할 수 있습니다, 당신은 되돌려 RDD를 얻을.

Btw에는 새로운 API가있는 Spark 2.0에 대한 업데이트가 있습니다.

+0

나는 OP가 그것보다 조금 더 많은 안내가 필요할 것이라고 생각한다. – zero323

+0

내 편집을 확인해 주시겠습니까! .collect()를 사용하려고하면 편집에서 언급 한 오류가 발생합니다. – kaxil

+0

@kaxil이 문제에 대한 업데이트가 있습니까? –

관련 문제