2017-11-28 2 views
-1

방금 ​​GraphFrames로 시작했지만 설명서를 따르고 있지만 aggregateMessages 함수에서 결과를 가져올 수 없습니다 (빈 데이터 프레임을 반환 함). 내 vertexRDD 더 정점 속성 없습니다 만 하나의 정점 Y 구성하도록 testGraph라는 객체 GraphFrames, 내 edgeRDD이 같은 두 개의 레코드 구성 : 여기 내 문제의 단순화 된 예는 지금GraphFrames의 aggregateMessages에서 출력이 없습니다.

| src | dst | min_ts1 | min_ts2 | 
| X | Y | 20 | null | 
| Y | X | null | -10 | 

, I min_ts1의 값을 dst으로 보내는 간단한 알고리즘을 구현하고 min_ts2src에 보냅니다. 나는이 알고리즘을 구현하기 위해 사용하고있는 코드는 다음과 같습니다 최초의 기록을 보면, 그리고를 보내 내가 거기에 일부 null 값이 여기에 있습니다,하지만 상관없이 나는 다음과 같은 작업을 수행 할 알고리즘을 통과하는 메시지를 기대 실현

import org.graphframes.lib.AggregateMessages 
import org.apache.spark.sql.functions._ 
val AM = AggregateMessages 

val msgToSrc = AM.edge("min_ts2) 
val msgToDst = AM.edge("min_ts1") 

val delay = testGraph 
.aggregateMessages 
    .sendToSrc(msgToSrc) 
    .sendToDst(msgToDst) 
    .agg(sum(AM.msg).as("avg_time_delay")) 

메시지는 20에서 Y이고 메시지는 null에서 X입니다. 그런 다음 두 번째 레코드를보고 null이라는 메시지를 X에 보내고 -10부터 Y의 메시지를 보냅니다. 마지막으로 나는 결과가 Y에 대한 메시지의 합이 10이고 결과에 X에 대한 레코드가 없기 때문에 그 결과가 vertexRDD에 포함되지 않았기 때문에 그 결과가 기대됩니다. 그리고 만약 X이 vertexRDD에 포함 되었다면, 두 메시지가 모두 null이기 때문에 결과는 단순히 null 일 것으로 기대합니다.

그러나 내가 얻는 것은 빈 RDD입니다. 누군가 내가 빈 결과를 얻는 이유를 이해하도록 도와 줄 수 있습니까?

답변

0

좋아,이 문제의 이유는 실제로 내 VertexRDD에 X이 없다는 것입니다. 내 edgeRDD에있는 꼭지점을오고가는 가장자리가 있더라도 내 aggregatemessages는 가장자리 속성에만 의존하며, 알고리즘은 그 메시지를 보낼 수 없습니다.

관련 문제