2016-09-07 4 views
0

나는 가장 큰 라인과 인덱스를 찾아야 만하는 문제에 직면 해있다. 여기 내 접근 방식이다Spark java를 사용하여 가장 큰 라인 번호 찾기

SparkConf conf = new SparkConf().setMaster("local").setAppName("basicavg"); 
    JavaSparkContext sc = new JavaSparkContext(conf); 
    JavaRDD<String> rdd = sc.textFile("/home/impadmin/ravi.txt"); 
    JavaRDD<Tuple2<Integer,String>> words = rdd.map(new Function<String, Tuple2<Integer,String>>() { 

     @Override 
     public Tuple2<Integer,String> call(String v1) throws Exception { 
      // TODO Auto-generated method stub 
      return new Tuple2<Integer, String>(v1.split(" ").length, v1); 
     } 
    }); 
    JavaPairRDD<Integer, String> linNoToWord = JavaPairRDD.fromJavaRDD(words).sortByKey(false); 

    System.out.println(linNoToWord.first()._1+" ********************* "+linNoToWord.first()._2); 
+2

특정 문제를 설명하십시오. 당신의 접근 방식은 실패하고 어떻게됩니까? – YakovL

+0

문제는 줄과 함께 spark를 사용하여 파일에서 가장 큰 줄의 색인을 찾는 것입니다. –

+0

@RaviShankar 아래 답변은 0부터 시작하는 라인 인덱스를 제공합니다. –

답변

0

정렬 후의 새로운 rdd의 최대 길이는 다음과 같습니다.

JavaRDD<String> rdd = sc.textFile("/home/impadmin/ravi.txt"); 
JavaRDD<Tuple2<Integer,String>> words = rdd.map(new Function<String, Tuple2<Integer,String>>() { 

    @Override 
    public Tuple2<Integer,String> call(String v1) throws Exception { 
     // TODO Auto-generated method stub 
     return new Tuple2<Integer, String>(v1.split(" ").length, v1); 
    } 
}); 
JavaRDD<Tuple2<Integer,String>> tupleRDD1= tupleRDD.sortBy(new Function<Tuple2<Integer,String>, Integer>() { 

     @Override 
     public Integer call(Tuple2<Integer, String> v1) throws Exception { 
      // TODO Auto-generated method stub 
      return v1._1; 
     } 
    }, false, 1); 
    System.out.println(tupleRDD1.first()); 
} 
0

줄 번호와 텍스트 모두에 관심이 있으니 시도해보십시오.

먼저 직렬화 클래스 라인을 만듭니다

public static class Line implements Serializable { 
    public Line(Long lineNo, String text) { 
     lineNo_ = lineNo; 
     text_ = text; 
    } 
    public Long lineNo_; 
    public String text_; 
} 

그런 다음 다음을 수행 작업하십시오 tupleRDD 키의 기초 및 첫 번째 요소에 정리 될 것입니다 이런 식으로

SparkConf conf = new SparkConf().setMaster("local[1]").setAppName("basicavg"); 
    JavaSparkContext sc = new JavaSparkContext(conf); 
    JavaRDD<String> rdd = sc.textFile("/home/impadmin/words.txt"); 
    JavaPairRDD<Long, Line> linNoToWord2 = rdd.zipWithIndex().mapToPair(new PairFunction<Tuple2<String,Long>, Long, Line>() { 
     public Tuple2<Long, Line> call(Tuple2<String, Long> t){ 

      return new Tuple2<Long, Line>(Long.valueOf(t._1.split(" ").length), new Line(t._2, t._1)); 
     } 
    }).sortByKey(false); 

    System.out.println(linNoToWord2.first()._1+" ********************* "+linNoToWord2.first()._2.text_);