2016-09-15 1 views
-1

나는 DB2에서 압축 된 스트림으로 5 천만 개의 레코드를 읽으 려하고 다음 중 하나 인 2 개의 jar 파일을 만들고 싶습니다. 그것들은 고정 너비 포맷으로되어 있고 다른 하나는 BSON 포맷으로되어있다.엄청난 양의 데이터를 데이터베이스에서 읽어서 고정 너비와 BSON에 쓰기 직렬 쓰기보다 느린 멀티 쓰레드

코드를 만들 수 있었지만 코드를 가져 와서이 파일에 쓰는 데 걸리는 시간이 120 분에 가까워 제작자 소비자 모델을 사용하도록 코드를 다시 디자인했습니다. 필자는이 모델을 사용하여 성능이 저하되는 것을 보았습니다. 어떤 이유로 멀티 스레딩이 예상대로 작동하지 않습니다.

생산자 생산자 = 새로운 생산자 (queue1, queue2, URL, 사용자, 패스, 드라이버, strQuery);

  Consumer1 consumer1 = new Consumer1(queue1, outputDatFile, fileDat.getName()); 
      Consumer2 consumer2 = new Consumer2(queue2, outputDatBSONFile, fileBSONDat.getName()); 
      ExecutorService threadPool = Executors.newFixedThreadPool(3); 
      Future producerStatus = threadPool.submit(producer); 
      threadPool.execute(consumer1); 
      threadPool.execute(consumer2); 
      try { 
       System.out.println("This will wait for the producer to wait " + producerStatus.get()); 

      } catch (InterruptedException e) { 
       e.printStackTrace(); 
      } catch (ExecutionException e) { 
       e.printStackTrace(); 
      } 
      threadPool.shutdown(); 
      long end = System.currentTimeMillis(); 
      System.out.println("End Time: " + end); 
      long elapsedTimeMillis = end - start; 
      float elapsedTimeSec = (float) elapsedTimeMillis/1000.0F; 
      System.out.println("Total Time: " + elapsedTimeSec + " seconds.."); 
      if (!(errorStatus)) { 
       System.out.println("Successful exit..."); 
       System.exit(0); 
      } else { 
       System.out.println("Exiting - Fatal Errors Encountered!"); 
       System.exit(1); 
      } 
    } 

    private static JarOutputStream getJar(String outputDatFile, String fileName) 
       throws FileNotFoundException, IOException { 
      JarOutputStream jarOutPutStream = new JarOutputStream(
         new BufferedOutputStream(new FileOutputStream(new File(outputDatFile + ".jar")))); 

      jarOutPutStream.setMethod(JarOutputStream.DEFLATED); 
      JarEntry ze = new JarEntry(fileName); 
      jarOutPutStream.putNextEntry(ze); 
      return jarOutPutStream; 
    } 

    public class Consumer1 implements Runnable { 

      private BlockingQueue<String> queue; 
      private String outputDatFile; 
      private String datFileName; 
      public Consumer1(BlockingQueue<String> queue, String outputDatFile, String datFileName) { 
       this.queue = queue; 
       this.outputDatFile = outputDatFile; 
       this.datFileName = datFileName; 
      } 
      @Override 
      public void run() { 
       JarOutputStream jarOutPutStreamText = null; 
       try { 
         jarOutPutStreamText = getJar(outputDatFile, datFileName); 
         int recordsWritten = 0; 
         while (true) { 
           recordsWritten++; 
           try { 
            String objectRetrieved = queue.take(); 
            jarOutPutStreamText.write(objectRetrieved.getBytes()); 
            jarOutPutStreamText.flush(); 

            if (recordsWritten % 100000 == 0) { 
              System.out.println("Written Records Count Queue 1 " + recordsWritten); 
            } 

           } catch (InterruptedException e) { 
            e.printStackTrace(); 
           } 
         } 
       } catch (FileNotFoundException e1) { 
         e1.printStackTrace(); 
       } catch (IOException e1) { 
         e1.printStackTrace(); 
       } finally { 
         if (jarOutPutStreamText != null) { 
           try { 
            jarOutPutStreamText.close(); 
           } catch (IOException e) { 
            // TODO Auto-generated catch block 
            e.printStackTrace(); 
          } 
         } 

       } 

      } 

    } 

    public class Consumer2 implements Runnable { 
      private BlockingQueue<DBObject> queue2; 
      private String outputDatBSONFile; 
      private String bsonFileName; 
      public Consumer2(BlockingQueue<DBObject> queue2, String outputDatBSONFile, String bsonFileName) { 

       this.queue2 = queue2; 
       this.outputDatBSONFile = outputDatBSONFile; 
       this.bsonFileName = bsonFileName; 
      } 
      @Override 
      public void run() { 
       JarOutputStream jarOutPutStreamBSON = null; 
       try { 
         jarOutPutStreamBSON = getJar(outputDatBSONFile, bsonFileName); 
         BSONFileWriter bsonWriter = new BSONFileWriter(jarOutPutStreamBSON); 

         int recordsWritten = 0; 
         while (true) { 
           recordsWritten++; 
           try { 
            DBObject objectRetrieved = queue2.take(); 
            bsonWriter.write(objectRetrieved); 
            bsonWriter.flush(); 
            if (recordsWritten % 100000 == 0) { 
              System.out.println("Written Records Count Queue 2 " + recordsWritten); 

            } 
           } catch (InterruptedException e) { 
            e.printStackTrace(); 
           } 
         } 
       } catch (FileNotFoundException e1) { 
         e1.printStackTrace(); 
       } catch (IOException e1) { 
         e1.printStackTrace(); 
       } finally { 
         if (jarOutPutStreamBSON != null) { 
           try { 
            jarOutPutStreamBSON.close(); 
           } catch (IOException e) { 
            e.printStackTrace(); 
          } 
         } 
       } 
      } 
    } 

    public class Producer implements Runnable { 
      private BlockingQueue<String> queue1; 
      private BlockingQueue<DBObject> queue2; 
     private String url; 
     private String user; 
      private String pass; 
      private String driver; 
      private String strQuery; 
      public Producer(BlockingQueue<String> queue1, BlockingQueue<DBObject> queue2, String url, String user, 
         String pass, String driver, String strQuery) { 

       this.queue1 = queue1; 
       this.queue2 = queue2; 
       this.url = url; 
       this.pass = pass; 
       this.driver = driver; 
       this.strQuery = strQuery; 
      } 
      @Override 
      public void run() { 
       Connection con = null; 
       Statement st = null; 
       ResultSet rs = null; 
       try { 
         Class.forName(driver); 
         con = DriverManager.getConnection(url, user, pass); 
         con.setAutoCommit(false); 
         Map<String, Object> mapper = new HashMap<String, Object>(); 
         try { 
           st = con.createStatement(ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_READ_ONLY); 
           st.setFetchSize(20000); 
           System.out.println(

              "Attempting to execute statement: " + System.getProperty("line.separator") + strQuery); 

           strQuery = strQuery.replace(System.getProperty("line.separator"), " "); 

           rs = st.executeQuery(strQuery); 

           ResultSetMetaData md = rs.getMetaData(); 
           try { 
            int col = md.getColumnCount(); 
            int resultSetCounter = 0; 
            while (rs.next()) { 
              String strQueryOutput = ""; 
              resultSetCounter++; 
              for (int x = 1; x <= col; ++x) { 

                String outPut = ""; 
                if (md.getColumnTypeName(x).equals("DECIMAL")) { 
                  StringAlignUtils util = new StringAlignUtils(md.getColumnDisplaySize(x), 
                     Alignment.RIGHT); 
                  outPut = util.format(rs.getString(x)); 
                  strQueryOutput = strQueryOutput + outPut; 
                  mapper.put(md.getColumnName(x), outPut); 
                } else if (md.getColumnTypeName(x).equals("NUMERIC")) { 

                  StringAlignUtils util = new StringAlignUtils(md.getColumnDisplaySize(x), 
                     Alignment.RIGHT); 
                  outPut = util.format(rs.getString(x)); 
                  strQueryOutput = strQueryOutput + outPut; 
                  mapper.put(md.getColumnName(x), outPut); 
                } else if (md.getColumnTypeName(x).equals("CHAR() FOR BIT DATA")) { 

                  char charData = rs.getString(x).charAt(0); 
                  outPut = charData + ""; 
                  StringAlignUtils util = new StringAlignUtils(md.getColumnDisplaySize(x), 
                     Alignment.RIGHT); 
                  outPut = util.format(outPut); 
                  strQueryOutput = strQueryOutput + outPut; 
                  mapper.put(md.getColumnName(x), outPut); 
                } else { 

                 StringAlignUtils util = new StringAlignUtils(md.getColumnDisplaySize(x), 
                     Alignment.RIGHT); 
                  outPut = util.format(rs.getString(x)); 
                  strQueryOutput = strQueryOutput + outPut; 
                  mapper.put(md.getColumnName(x), outPut); 
                } 
              } 
              if (resultSetCounter % 100000 == 0) { 

                System.out.println(" The counter is " + resultSetCounter); 

              } 

              strQueryOutput = strQueryOutput + '\n'; 

              queue1.put(strQueryOutput); 
              queue2.put(new BasicDBObject(mapper)); 
            } 
           } catch (Exception e) { 
            e.printStackTrace(); 
            System.err.println("Error: " + e.getMessage()); 
            System.err.println("Exiting!"); 
            System.exit(1); 
           } 
           System.out.println("Query results successfully returned..."); 
         } catch (SQLException s) { 
           System.err.println("SQL statement is not executed!"); 
           System.err.println("Error: " + s.getMessage()); 
         } finally { 
           System.out.println("Trying to Close ResultSet and Statement..."); 
           if (rs != null) { 
            System.out.println("Closing ResultSet.."); 
            rs.close(); 
           } 
           if (st != null) { 
            System.out.println("Closing Statement.."); 
            st.close(); 
           } 
         } 
       } catch (Exception exception) { 
         exception.printStackTrace(); 
       } finally { 
         try { 
           System.out.println("Trying to Close database connection.."); 
           if (con != null) { 
            System.out.println("Closing database connection.."); 
            con.close(); 
           } 
         } catch (SQLException exception) { 
          exception.printStackTrace(); 
         } 
       } 
      } 
    } 
+1

코드를 읽을 수 있도록 모든 빈 줄을 제거하십시오. – EJP

답변

1

파일이 잘못된 형식으로 생성 된 이유가 있다고 생각합니다.

2 가지 문제가 있습니다.

1) 완전한 데이터가 쓰여지지 않았습니다. 2) 스트림이 닫히지 않았습니다.

문제가 해결되었지만 프로듀서 1 명과 소비자 2 명이있는 경우에도 처리 시간이 50 백만 레코드의 경우 90 분 이상인 경우에도이 프로그램을 더 빠르게 할 수있는 부분을 지적 할 수 있습니다.

0

느려지는 요소가 충분합니다.

하나의 작은 오류 마지막 else에 대한

String objectRetrieved = queue.take(); 
jarOutPutStreamText.write(objectRetrieved.getBytes()); 

어쩌면이 왼쪽 정렬?

가장 쉬운은 발견 할 수있는 인코딩

jarOutPutStreamText.write(objectRetrieved.getBytes(StandardCharsets.UTF_8)); 

지정해야합니다 :

String strQueryOutput = ""; 

이어야를

StringBuilder strQueryOutput = new StringBuilder(1000 /* output size */); 

는 DB2 SQL은 마지막 줄 WITH ur (미트되지 않은 읽기)에 의해 혜택을 누릴 수 있습니다 .

StringAlignUtils은 루프 외부에서 한 번 만들어야합니다. 실제로 포맷팅을 위해 데이터베이스에 남겨 두는 것이 가장 빠른 솔루션 일 수 있습니다.

모든 레코드에 대해 맵을 전달해야하는 경우 맵을 준비하고 Map.Entry 개의 항목을 열 인덱스 (마이너스 1)로 유지할 수 있습니다. 열 인덱스를 사용하여 값을 즉시 변경합니다.

더 작은 패치 크기 (?) 및 더 많은 Java 힙이 도움이 될 수 있습니다.

이 분야에서의 저의 경험은 주로 GzippedOutputStream입니다 (여기서도 기대했던 것 같습니다). 가장 빠른 압축은 최고 압축이 아닙니다.

0

위와 같이 제안 된 몇 가지 변경 사항을 통해 필자는 아웃 압축 방식을 사용하는 파일에 쓰기가 압축 된 스트림에 쓰기보다 빠릅니다.BufferedWriter에 쓰기 작업은 1 백만 개의 레코드를 처리하는 데 약 14 분이 걸리고 압축 된 스트림에 쓰는 데 22.8 분이 소요됩니다.

관련 문제