Apache Spark를 사용하여 레코드를 더 빠르게 일치시키는 방법을 생각했지만 select 문을 사용하여 SQL 일치보다 비효율적 인 방법을 찾고 있습니다. 사용
,
JavaSparkContext javaSparkContext = new JavaSparkContext(new SparkConf().setAppName("AIRecordLinkage").setMaster("local[*]"));<br> Dataset<Row> sourceFileContent = spark.read().jdbc("jdbc:oracle:thin:@//Connection_IP/stage", "Database_name.Table_name", connectionProperties);
Apache Spark 성능 문제
우리는 DataSet 개체에 저장되어있는 환경을 촉발하는 약 180 만 기록을 가져올 수 있습니다. 지금 필터 함수 targetFileContent.filter (COL ("TARGETUPC"참조). EqualTo가 (upcValue))를 사용
상기 필터 문 upcValue 약 46K ID에 대해 업데이트되는 루프이다.
이 프로그램은 몇 시간 동안 실행되지만 우리는 1 분 안에 실행되는 46k UPC ID를 모두 보관하는 sql IN 연산자를 사용하여 동일하게 시도했습니다.
구성 :
불꽃-SQL 2.11
불꽃 코어 2.11
JDK 8
윈도우 10, 단일 노드 4 개 코어 3GHz의 16 기가 바이트 RAM.
C 드라이브 -> 12GB의 여유 공간.
Eclipse -> 구성 실행 -> -Xms15000m.
실수가 있는지 분석하고 이해하여 성능 향상을 위해 무엇을해야하는지 알려주세요.
@Component("upcExactMatch")
public class UPCExactMatch {
@Autowired
private Environment envirnoment;
@Autowired
private LoadCSV loadCSV;
@Autowired
private SQLHandler sqlHandler;
public ArrayList<Row> perform(){
ArrayList<Row> upcNonMatchedItemIDs=new ArrayList<Row>();
ArrayList<Row> upcMatchedItemIDs=new ArrayList<Row>();
JavaSparkContext javaSparkContext = new JavaSparkContext(new SparkConf().setAppName("SparkJdbcDs").setMaster("local[*]"));
SQLContext sqlContext = new SQLContext(javaSparkContext);
SparkSession sparkSession = SparkSession.builder().appName("JavaStopWordshandlerTest").getOrCreate();
try{
Dataset<Row> sourceFileContent =loadCSV.load(sourceFileName,sourceFileLocation,javaSparkContext,sqlContext);
// load target from database
Dataset<Row> targetFileContent = spark.read().jdbc("jdbc:oracle:thin:@//Connection_IP/stage", "Database_name.Table_name", connectionProperties);
System.out.println("File counts :"+sourceFileContent.count()+" : "+targetFileContent.count());
ArrayList<String> upcMatched = new ArrayList<String>();
ArrayList<String> partNumberMatched = new ArrayList<String>();
List<Row> sourceFileContents = sourceFileContent.collectAsList();
int upcColumnIndex=-1;
int itemIDColumnIndex=-1;
int partNumberTargetIndex=-1;
String upcValue="";
StructType schema = targetFileContent.schema();
List<Row> data = Arrays.asList();
Dataset<Row> upcMatchedRows = sparkSession.createDataFrame(data, schema);
for(Row rowSourceFileContent: sourceFileContents){
upcColumnIndex=rowSourceFileContent.fieldIndex("Vendor UPC");
if(!rowSourceFileContent.isNullAt(upcColumnIndex)){
upcValue=rowSourceFileContent.get(upcColumnIndex).toString();
upcMatchedRows=targetFileContent.filter(col("TARGETUPC").equalTo(upcValue));
if(upcMatchedRows.count() > 0){
for(Row upcMatchedRow: upcMatchedRows.collectAsList()){
partNumberTargetIndex=upcMatchedRow.fieldIndex("PART_NUMBER");
if(partNumberTargetIndex != -1){
upcMatched.add(upcValue);
partNumberMatched.add(upcMatchedRow.get(partNumberTargetIndex).toString());
System.out.println("Source UPC : "+upcValue +"\tTarget part number :"+ upcMatchedRow.get(partNumberTargetIndex));
}
}
}
}
}
for(int i=0;i<upcMatched.size();i++){
System.out.println("Matched Exact UPC ids are :"+upcMatched.get(i) + "\t:Target\t"+partNumberMatched.get(i));
}
}catch(Exception e){
e.printStackTrace();
}finally{
sparkSession.stop();
sqlContext.clearCache();
javaSparkContext.close();
}
return upcMatchedItemIDs;
}
}
감사합니다. 기록 매칭을 위해 내부 조인이 작동했습니다. – Nischay