나는 스파크 SQL을 연구 중이다. JavaPairRDD를 사용하여 HBase에서 데이터를 얻은 다음지도를 작성했습니다. 지도에서 모든 키를 세트에 저장했습니다. 이 맵을 강제 실행하려면 collect()를 수행하십시오. 그런 다음 Set의 값을 사용하여 다른 작업을 수행했습니다.스파크 지연 변환 실행 장애
이 프로그램은 내 로컬 PC에서 완벽하게 작동 할 수 있습니다. 그러나 그것을 클러스터 (2 명)에 넣을 때, 실행 장애가 있습니다. 맵 변환 전에 Set 조작이 실행됩니다. 위의지도가 실행
JavaRDD<Map<String, String>> data = hBaseRDD.map(
new Function<Tuple2<ImmutableBytesWritable, Result>, Map<String, String>>(){
public Map<String, String> call(
Tuple2<ImmutableBytesWritable, Result> re)
throws Exception {
byte[] payload =re._2().getValue(Bytes.toBytes("ContentInfo"), Bytes.toBytes("Payload"));
Map<String, String> map = new ConcurrentHashMap<String, String>();
String primaryKey = new String(re._1().get());
map.put("primaryKey", primaryKey);
if(payload != null)
map.put("payload", new String(payload));
Map<byte[], byte[]> tmpMetaMap = re._2().getFamilyMap(Bytes.toBytes("MetaInfo"));
if(tmpMetaMap != null){
for(Entry<byte[], byte[]> entry : tmpMetaMap.entrySet()){
String tmpKey = Bytes.toString(entry.getKey());
String tmpValue = Bytes.toString(entry.getValue());
map.put(tmpKey, tmpValue);
//save result to the set
keySet.add(tmpKey);
}
}
return map;
}
});
힘 :
data.collect();
는 결과를 가져
JavaPairRDD<ImmutableBytesWritable, Result> hBaseRDD = jsc.newAPIHadoopRDD(hbase_conf,
TableInputFormat.class, ImmutableBytesWritable.class,
Result.class);
데이터를 변환 : 는 HBase를에서 데이터를 가져 오기 :
코드의 흐름은 같다 세트의 :
StringBuilder sb = new StringBuilder();
for(String fieldName: keySet){
sb.append(fieldName).append(",");
}
코드를 로컬에서 실행할 때 모든 결과를 얻을 수 있습니다. 그러나 클러스터에서 실행할 때 sb는 아무런 가치가 없습니다.
또 다른 질문 : 로컬에서 실행할 때 keySet 값을 설정할 수있는 이유는 무엇입니까? – user2965590