-1
hdfs : // localhost : 8020/sample.zip에 압축이 있습니다. &을 hdfs : // localhost : 8020/sample 디렉토리에 압축을 해제해야합니다. 아래의 코드는
zipLoc를 작동 할 수 있습니다hdfs에서 zip 파일 읽기 및 spark java를 사용하여 추출
hdfs : // localhost : 8020/sample.zip에 압축이 있습니다. &을 hdfs : // localhost : 8020/sample 디렉토리에 압축을 해제해야합니다. 아래의 코드는
zipLoc를 작동 할 수 있습니다hdfs에서 zip 파일 읽기 및 spark java를 사용하여 추출
hdfsBasePath이 파일
public void readWriteZipContents(String zipLoc,String hdfsBasePath){
JavaSparkContext jsc = new JavaSparkContext(new SparkContext(new SparkConf()));
JavaPairRDD<String, PortableDataStream> zipFilesRdd = jsc.binaryFiles(zipLoc);
zipFilesRdd.collect().forEach(file -> {
ZipInputStream zipStream = new ZipInputStream(file._2.open());
ZipEntry zipEntry = null;
Scanner sc = new Scanner(zipStream);
try {
while ((zipEntry = zipStream.getNextEntry()) != null) {
String entryName = zipEntry.getName();
if (!zipEntry.isDirectory()) {
//create the path in hdfs and write its contents
Configuration configuration = new Configuration();
configuration.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
configuration.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());
FileSystem fs = FileSystem.get(URI.create("hdfs://localhost:8020"), configuration);
FSDataOutputStream hdfsfile = fs.create(new Path(hdfsBasePath + "/" + entryName));
while(sc.hasNextLine()){
hdfsfile.writeBytes(sc.nextLine());
}
hdfsfile.close();
hdfsfile.flush();
}
zipStream.closeEntry();
}
} catch (IllegalArgumentException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
sc.close();
//return fileNames.iterator();
});
}
을 쓰기 위해 사용 HDFS의 디렉토리입니다 ZIP 파일의 위치입니다