2017-01-27 3 views
0

내가 나중에 parquet 파일로 저장할 수 있지만, 코드 실행이 createDataFrame 함수 호출에 정차하도록 DataFrameJavaRDD<PageRankCase> 클래스를 변환하려고하지 않고 정지, null을주는 때를 예외를 처리합니다. 내 수업이있어.스파크 JavaRdd 오류

public static class PageRanksCase implements Serializable{ 

    private String node; 
    private Double importance; 


    public void setNode(String node) { this.node = node; } 
    public void setImportance(Double importance) { this.importance = importance; } 

    public String getNode(String node) { return this.node; } 
    public Double getImportance(String node) { return this.importance; } 
} 

그리고 이것은 내가 DataFrame에 클래스를 변환 실행하려고 코드.

try{ 
    JavaRDD<PageRanksCase> finalData = GetTopNNodes(pairedrdd); 
    System.out.println("coming here"); 
    DataFrame finalFrame = Service.sqlCtx().createDataFrame(finalData,PageRanksCase.class); 
    System.out.println("coming here too"); 
    finalFrame.write().parquet(rdfanalyzer.spark.Configuration.storage() + "sib200PageRank.parquet"); 
} 
catch(Exception e) 
{ 
    System.out.println(e.getMessage()); // gives null here. 
} 

그것은 인쇄 coming here 않습니다하지만 coming here too 인쇄되지도 오류를 포기하지 않을거야. JavaPairRDD를 JavaRDD로 변환하는 방법은 다음과 같습니다.

public static JavaRDD<PageRanksCase> GetTopNNodes(JavaPairRDD<String,Double> pairedrdd){ 

    return pairedrdd.map(new Function<Tuple2<String,Double>, PageRanksCase>() { 

     @Override 
     public PageRanksCase call(Tuple2<String, Double> line) throws Exception { 
      PageRanksCase pgrank = new PageRanksCase(); 
      pgrank.setImportance(line._2); 
      pgrank.setNode(line._1()); 
      return pgrank; 
     } 
    }); 
} 

아무도 아이디어가 있습니까?

답변

0

Spark website에 언급 된 예제를 사용하여 코드를 다시 구현했습니다.