2014-12-02 4 views
1

나는 스파크 SQL을 연구 중이다. JavaPairRDD를 사용하여 HBase에서 데이터를 얻은 다음지도를 작성했습니다. 지도에서 모든 키를 세트에 저장했습니다. 이 맵을 강제 실행하려면 collect()를 수행하십시오. 그런 다음 Set의 값을 사용하여 다른 작업을 수행했습니다.스파크 지연 변환 실행 장애

이 프로그램은 내 로컬 PC에서 완벽하게 작동 할 수 있습니다. 그러나 그것을 클러스터 (2 명)에 넣을 때, 실행 장애가 있습니다. 맵 변환 전에 Set 조작이 실행됩니다. 위의지도가 실행

JavaRDD<Map<String, String>> data = hBaseRDD.map(
       new Function<Tuple2<ImmutableBytesWritable, Result>, Map<String, String>>(){ 
        public Map<String, String> call(
          Tuple2<ImmutableBytesWritable, Result> re) 
          throws Exception { 
         byte[] payload =re._2().getValue(Bytes.toBytes("ContentInfo"), Bytes.toBytes("Payload")); 
         Map<String, String> map = new ConcurrentHashMap<String, String>(); 

         String primaryKey = new String(re._1().get()); 
         map.put("primaryKey", primaryKey); 

         if(payload != null) 
          map.put("payload", new String(payload)); 

         Map<byte[], byte[]> tmpMetaMap = re._2().getFamilyMap(Bytes.toBytes("MetaInfo")); 
         if(tmpMetaMap != null){ 
          for(Entry<byte[], byte[]> entry : tmpMetaMap.entrySet()){ 

           String tmpKey = Bytes.toString(entry.getKey()); 
           String tmpValue = Bytes.toString(entry.getValue()); 

           map.put(tmpKey, tmpValue); 
    //save result to the set 
           keySet.add(tmpKey); 
          } 
         } 
         return map; 
        } 
       }); 

힘 :

data.collect(); 

는 결과를 가져

JavaPairRDD<ImmutableBytesWritable, Result> hBaseRDD =  jsc.newAPIHadoopRDD(hbase_conf, 
       TableInputFormat.class, ImmutableBytesWritable.class, 
       Result.class); 

데이터를 변환 : 는 HBase를에서 데이터를 가져 오기 :

코드의 흐름은 같다 세트의 :

StringBuilder sb = new StringBuilder(); 

     for(String fieldName: keySet){ 

      sb.append(fieldName).append(","); 
     } 

코드를 로컬에서 실행할 때 모든 결과를 얻을 수 있습니다. 그러나 클러스터에서 실행할 때 sb는 아무런 가치가 없습니다.

답변

0

을 시도하지만, 클러스터 등을 오히려 작업이 진행 중입니다.

스파크에는 변환과 동작의 두 가지 유형이 있습니다.

변환은 내용에 일부 기능을 적용하여 다른 RDD로 변환하고 RDD로 변환합니다. 이것은 순수한 기능적 접근법이며 부작용이 없습니다. 작업은 RDD를 가져 와서 파일 또는 로컬 데이터 구조와 같은 다른 것을 생성합니다. 이러한 작업은 RDD의 데이터를 다른 형식으로 구체화합니다.

이 경우 맵 변환 중에 keyset이 변이 될 것으로 예상되므로 변형 기능 : map이 부작용과 함께 사용됩니다. keyset이 변환 함수의 범위 밖에서 정의되면이 변수는 직렬화되어 실행자에게 전송되지만 원격에서 발생하는 모든 변이는 이 아니며은 드라이버에서 복구되지 않습니다.

우리가 그것에 대해 생각해 보면, 모든 실행자는 데이터 파티션에 변환을 적용 할 것이므로 '키 세트'가 끝나는 모든 내용은 각 파티션의 부분보기 일뿐입니다.

이것을 모델링하는 올바른 방법은 RDD 변환 및 동작 측면에서 작업을 재정의하는 것입니다.

위의 코드에서 우리는 입력을 RDD[Map[String,String]]으로 변환하는 것처럼 보이고 드라이버에서 "기본 키"및 "페이로드"가 아닌 모든 항목에 걸쳐 키 집합을 수집하는 데 관심이 있습니다 결과.

// data = RDD[Map[String, String]] 
// first we get all the keys from all the maps 
val keys = data.map{entry => entry.keys} 
// now we collect that information on the driver 
val allKeys = keys.collect 
// we transform the resulting array into a set - this will remove duplicates by definition 
val allKeySet = allKeys.toSet 
// We need still to remove "primaryKey" and "payload" 
val keySet = fullKeySet.diff(Set("primaryKey","payload")) 

자바의 코드가 좀 더 자세한이지만, 구조와 아이디어는 동일합니다 :

은 스파크 이것은 뭔가처럼 될 수있다.

+0

또 다른 질문 : 로컬에서 실행할 때 keySet 값을 설정할 수있는 이유는 무엇입니까? – user2965590

0

키 집합을 어떻게 정의 했습니까? 이 문제는 작업의 순서에 관련되지 않은 질문이 답변이 정적으로 정의하거나 그렇지 않으면 DriverSide.Hope에 모든 데이터를 가져올 것이다 대신 mapforeach를 사용하는

+0

예, keySet을 최종 정적 HashSet으로 정의했습니다. hBaseRDD 뒤에 간단한 foreach를 시도했습니다. 그냥 foreach와 아무것도 따르지 않았다. 스파크 서버에서도 작동하지 않습니다. – user2965590