-1

Apache Spark를 사용하여 레코드를 더 빠르게 일치시키는 방법을 생각했지만 select 문을 사용하여 SQL 일치보다 비효율적 인 방법을 찾고 있습니다. 사용
,
JavaSparkContext javaSparkContext = new JavaSparkContext(new SparkConf().setAppName("AIRecordLinkage").setMaster("local[*]"));<br> Dataset<Row> sourceFileContent = spark.read().jdbc("jdbc:oracle:thin:@//Connection_IP/stage", "Database_name.Table_name", connectionProperties);Apache Spark 성능 문제

우리는 DataSet 개체에 저장되어있는 환경을 촉발하는 약 180 만 기록을 가져올 수 있습니다. 지금 필터 함수 targetFileContent.filter (COL ("TARGETUPC"참조). EqualTo가 (upcValue))를 사용

상기 필터 문 upcValue 약 46K ID에 대해 업데이트되는 루프이다.

이 프로그램은 몇 시간 동안 실행되지만 우리는 1 분 안에 실행되는 46k UPC ID를 모두 보관하는 sql IN 연산자를 사용하여 동일하게 시도했습니다.

구성 :
불꽃-SQL 2.11
불꽃 코어 2.11
JDK 8
윈도우 10, 단일 노드 4 개 코어 3GHz의 16 기가 바이트 RAM.
C 드라이브 -> 12GB의 여유 공간.
Eclipse -> 구성 실행 -> -Xms15000m.

실수가 있는지 분석하고 이해하여 성능 향상을 위해 무엇을해야하는지 알려주세요.

@Component("upcExactMatch") 
    public class UPCExactMatch { 
     @Autowired 
     private Environment envirnoment; 

     @Autowired 
     private LoadCSV loadCSV; 

     @Autowired 
     private SQLHandler sqlHandler; 

     public ArrayList<Row> perform(){ 

      ArrayList<Row> upcNonMatchedItemIDs=new ArrayList<Row>(); 
      ArrayList<Row> upcMatchedItemIDs=new ArrayList<Row>(); 

      JavaSparkContext javaSparkContext = new JavaSparkContext(new SparkConf().setAppName("SparkJdbcDs").setMaster("local[*]")); 
      SQLContext sqlContext = new SQLContext(javaSparkContext); 
      SparkSession sparkSession = SparkSession.builder().appName("JavaStopWordshandlerTest").getOrCreate(); 

      try{ 
       Dataset<Row> sourceFileContent =loadCSV.load(sourceFileName,sourceFileLocation,javaSparkContext,sqlContext); 

       // load target from database 
       Dataset<Row> targetFileContent = spark.read().jdbc("jdbc:oracle:thin:@//Connection_IP/stage", "Database_name.Table_name", connectionProperties); 
       System.out.println("File counts :"+sourceFileContent.count()+" : "+targetFileContent.count()); 

       ArrayList<String> upcMatched = new ArrayList<String>(); 
       ArrayList<String> partNumberMatched = new ArrayList<String>(); 

       List<Row> sourceFileContents = sourceFileContent.collectAsList(); 

       int upcColumnIndex=-1; 
       int itemIDColumnIndex=-1; 
       int partNumberTargetIndex=-1; 
       String upcValue=""; 

       StructType schema = targetFileContent.schema(); 
       List<Row> data = Arrays.asList(); 
       Dataset<Row> upcMatchedRows = sparkSession.createDataFrame(data, schema); 

       for(Row rowSourceFileContent: sourceFileContents){ 

        upcColumnIndex=rowSourceFileContent.fieldIndex("Vendor UPC"); 

        if(!rowSourceFileContent.isNullAt(upcColumnIndex)){ 

         upcValue=rowSourceFileContent.get(upcColumnIndex).toString(); 
         upcMatchedRows=targetFileContent.filter(col("TARGETUPC").equalTo(upcValue)); 

         if(upcMatchedRows.count() > 0){ 

          for(Row upcMatchedRow: upcMatchedRows.collectAsList()){ 
           partNumberTargetIndex=upcMatchedRow.fieldIndex("PART_NUMBER"); 

           if(partNumberTargetIndex != -1){ 
            upcMatched.add(upcValue); 
            partNumberMatched.add(upcMatchedRow.get(partNumberTargetIndex).toString()); 
            System.out.println("Source UPC : "+upcValue +"\tTarget part number :"+ upcMatchedRow.get(partNumberTargetIndex)); 

           } 
          } 

         } 

        } 

       } 

       for(int i=0;i<upcMatched.size();i++){ 
        System.out.println("Matched Exact UPC ids are :"+upcMatched.get(i) + "\t:Target\t"+partNumberMatched.get(i)); 

       } 

      }catch(Exception e){ 
       e.printStackTrace(); 
      }finally{ 
       sparkSession.stop(); 
       sqlContext.clearCache(); 
       javaSparkContext.close(); 
      } 

      return upcMatchedItemIDs; 

     } 

    } 

답변

0

일치하는 레코드에 대해 두 데이터 프레임 간의 내부 결합을 시도하십시오.

+0

감사합니다. 기록 매칭을 위해 내부 조인이 작동했습니다. – Nischay