방금 GraphFrames로 시작했지만 설명서를 따르고 있지만 aggregateMessages 함수에서 결과를 가져올 수 없습니다 (빈 데이터 프레임을 반환 함). 내 vertexRDD 더 정점 속성 없습니다 만 하나의 정점 Y
구성하도록 testGraph
라는 객체 GraphFrames, 내 edgeRDD이 같은 두 개의 레코드 구성 : 여기 내 문제의 단순화 된 예는 지금GraphFrames의 aggregateMessages에서 출력이 없습니다.
| src | dst | min_ts1 | min_ts2 |
| X | Y | 20 | null |
| Y | X | null | -10 |
, I min_ts1
의 값을 dst
으로 보내는 간단한 알고리즘을 구현하고 min_ts2
을 src
에 보냅니다. 나는이 알고리즘을 구현하기 위해 사용하고있는 코드는 다음과 같습니다 최초의 기록을 보면, 그리고를 보내 내가 거기에 일부 null 값이 여기에 있습니다,하지만 상관없이 나는 다음과 같은 작업을 수행 할 알고리즘을 통과하는 메시지를 기대 실현
import org.graphframes.lib.AggregateMessages
import org.apache.spark.sql.functions._
val AM = AggregateMessages
val msgToSrc = AM.edge("min_ts2)
val msgToDst = AM.edge("min_ts1")
val delay = testGraph
.aggregateMessages
.sendToSrc(msgToSrc)
.sendToDst(msgToDst)
.agg(sum(AM.msg).as("avg_time_delay"))
메시지는 20
에서 Y
이고 메시지는 null
에서 X
입니다. 그런 다음 두 번째 레코드를보고 null
이라는 메시지를 X에 보내고 -10
부터 Y
의 메시지를 보냅니다. 마지막으로 나는 결과가 Y
에 대한 메시지의 합이 10
이고 결과에 X
에 대한 레코드가 없기 때문에 그 결과가 vertexRDD에 포함되지 않았기 때문에 그 결과가 기대됩니다. 그리고 만약 X
이 vertexRDD에 포함 되었다면, 두 메시지가 모두 null
이기 때문에 결과는 단순히 null
일 것으로 기대합니다.
그러나 내가 얻는 것은 빈 RDD입니다. 누군가 내가 빈 결과를 얻는 이유를 이해하도록 도와 줄 수 있습니까?