spark-sql을 사용하는 공통 매개 변수에 두 개의 파일을 결합하는 프로그램을 작성하고 있습니다. 내 코드는 괜찮지 만 텍스트 파일로 저장하려고 할 때 오류가 발생한다고 생각합니다.spark-sql의 NullPointerException
import java.util.regex.Pattern;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.api.java.JavaSQLContext;
import org.apache.spark.sql.api.java.JavaSchemaRDD;
import java.io.Serializable;
public class JoinCSV {
@SuppressWarnings("serial")
public static class CompleteSample implements Serializable {
private String ASSETNUM;
private String ASSETTAG;
private String CALNUM;
public String getASSETNUM() {
return ASSETNUM;
}
public void setASSETNUM(String aSSETNUM) {
ASSETNUM = aSSETNUM;
}
public String getASSETTAG() {
return ASSETTAG;
}
public void setASSETTAG(String aSSETTAG) {
ASSETTAG = aSSETTAG;
}
public String getCALNUM() {
return CALNUM;
}
public void setCALNUM(String cALNUM) {
CALNUM = cALNUM;
}
}
@SuppressWarnings("serial")
public static class ExtendedSample implements Serializable {
private String ASSETNUM;
private String CHANGEBY;
private String CHANGEDATE;
public String getASSETNUM() {
return ASSETNUM;
}
public void setASSETNUM(String aSSETNUM) {
ASSETNUM = aSSETNUM;
}
public String getCHANGEBY() {
return CHANGEBY;
}
public void setCHANGEBY(String cHANGEBY) {
CHANGEBY = cHANGEBY;
}
public String getCHANGEDATE() {
return CHANGEDATE;
}
public void setCHANGEDATE(String cHANGEDATE) {
CHANGEDATE = cHANGEDATE;
}
}
private static final Pattern comma = Pattern.compile(",");
@SuppressWarnings("serial")
public static void main(String[] args) throws Exception {
String path="C:/Users/cyg_server/Documents/bigDataExample/AssetsImportCompleteSample.csv";
String path1="C:/Users/cyg_server/Documents/bigDataExample/AssetsImportExtendedSample.csv";
JavaSparkContext ctx = new JavaSparkContext("local[2]", "JavaSparkSQL");
JavaSQLContext sqlCtx = new JavaSQLContext(ctx);
JavaRDD<CompleteSample> cs = ctx.textFile("C:/Users/cyg_server/Documents/bigDataExample/AssetsImportCompleteSample.csv").map(
new Function<String, CompleteSample>() {
public CompleteSample call(String line) throws Exception {
String[] parts = line.split(",");
CompleteSample cs = new CompleteSample();
cs.setASSETNUM(parts[0]);
cs.setASSETTAG(parts[1]);
cs.setCALNUM(parts[2]);
return cs;
}
});
JavaRDD<ExtendedSample> es = ctx.textFile("C:/Users/cyg_server/Documents/bigDataExample/AssetsImportExtendedSample.csv").map(
new Function<String, ExtendedSample>() {
public ExtendedSample call(String line) throws Exception {
String[] parts = line.split(",");
ExtendedSample es = new ExtendedSample();
es.setASSETNUM(parts[0]);
es.setCHANGEBY(parts[1]);
es.setCHANGEDATE(parts[2]);
return es;
}
});
JavaSchemaRDD complete = sqlCtx.applySchema(cs, CompleteSample.class);
complete.registerAsTable("cs");
JavaSchemaRDD extended = sqlCtx.applySchema(es, ExtendedSample.class);
extended.registerAsTable("es");
JavaSchemaRDD fs= sqlCtx.sql("SELECT ASSETTAG, CALNUM FROM cs INNER JOIN es ON cs.ASSETNUM=es.ASSETNUM;");
fs.saveAsTextFile("result"); //Here I am getting error
}
}
을 내 오류는 다음과 같습니다 : - - : 나는 아래 내 코드를 걸었습니다
14/07/19 00:40:13 INFO TaskSchedulerImpl: Cancelling stage 0
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0:0 failed 1 times, most recent failure: Exception failure in TID 4 on host localhost: java.lang.NullPointerException
java.lang.ProcessBuilder.start(Unknown Source)
org.apache.hadoop.util.Shell.runCommand(Shell.java:404)
org.apache.hadoop.util.Shell.run(Shell.java:379)
org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:589)
org.apache.hadoop.util.Shell.execCommand(Shell.java:678)
------------
------------
및
14/07/19 00:40:11 ERROR Shell: Failed to locate the winutils binary in the hadoop binary path
java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries.
at org.apache.hadoop.util.Shell.getQualifiedBinPath(Shell.java:278)
at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:300)
at org.apache.hadoop.util.Shell.<clinit>(Shell.java:293)
at org.apache.hadoop.util.StringUtils.<clinit>(StringUtils.java:76)
at org.apache.hadoop.mapred.FileInputFormat.setInputPaths(FileInputFormat.java:362)
at org.apache.spark.SparkContext$$anonfun$22.apply(SparkContext.scala:546)
at org.apache.spark.SparkContext$$anonfun$22.apply(SparkContext.scala:546)
-----------------
-----------------
이 두 번째 오류는 모든 곳에서오고 나는 불꽃을 사용하고 있는지 여부 , spark-sql 또는 spark-streaming. 나는이 오류가 무엇인지 전혀 모른다. 그러나이 두 번째 오류는 코드에 아무런 영향을 미치지 않습니다. 왜냐하면이 오류가 발생해도 결과가 정상적으로 나오기 때문입니다. 하지만 여전히 매우 자극적이어서 프로그램을 실행할 때마다 알려지지 않은 오류가 발생합니다.
누구든지 문제를 이해하는 데 저를 도울 수 있습니까? 나는 이것을 매우 심하게 붙잡고있다. 감사합니다
Linux에서 같은 오류가 발생합니까? –
아니요, "saveAsTextFile"을 사용하여 Windows OS의 로컬 파일 시스템에 파일을 저장하려고합니다. 로컬 파일 시스템의 경우 "saveAs"옵션이 작동하지 않습니다. 그러나 이러한 옵션은 파일을 hdfs에 저장하는 동안 매우 완벽하게 작동합니다. –
나에게 Windows 관련 문제가있는 것 같습니다. 문제를 해결하는 방법을 모르지만 로컬 저장 만하는 경우 문제를 해결할 수 있습니다. 'RDD.collect()'로 데이터를 꺼내서 일반 Java'FileOutputStream'을 통해 저장하십시오. –