나는 DB2에서 압축 된 스트림으로 5 천만 개의 레코드를 읽으 려하고 다음 중 하나 인 2 개의 jar 파일을 만들고 싶습니다. 그것들은 고정 너비 포맷으로되어 있고 다른 하나는 BSON 포맷으로되어있다.엄청난 양의 데이터를 데이터베이스에서 읽어서 고정 너비와 BSON에 쓰기 직렬 쓰기보다 느린 멀티 쓰레드
코드를 만들 수 있었지만 코드를 가져 와서이 파일에 쓰는 데 걸리는 시간이 120 분에 가까워 제작자 소비자 모델을 사용하도록 코드를 다시 디자인했습니다. 필자는이 모델을 사용하여 성능이 저하되는 것을 보았습니다. 어떤 이유로 멀티 스레딩이 예상대로 작동하지 않습니다.
생산자 생산자 = 새로운 생산자 (queue1, queue2, URL, 사용자, 패스, 드라이버, strQuery);
Consumer1 consumer1 = new Consumer1(queue1, outputDatFile, fileDat.getName());
Consumer2 consumer2 = new Consumer2(queue2, outputDatBSONFile, fileBSONDat.getName());
ExecutorService threadPool = Executors.newFixedThreadPool(3);
Future producerStatus = threadPool.submit(producer);
threadPool.execute(consumer1);
threadPool.execute(consumer2);
try {
System.out.println("This will wait for the producer to wait " + producerStatus.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
threadPool.shutdown();
long end = System.currentTimeMillis();
System.out.println("End Time: " + end);
long elapsedTimeMillis = end - start;
float elapsedTimeSec = (float) elapsedTimeMillis/1000.0F;
System.out.println("Total Time: " + elapsedTimeSec + " seconds..");
if (!(errorStatus)) {
System.out.println("Successful exit...");
System.exit(0);
} else {
System.out.println("Exiting - Fatal Errors Encountered!");
System.exit(1);
}
}
private static JarOutputStream getJar(String outputDatFile, String fileName)
throws FileNotFoundException, IOException {
JarOutputStream jarOutPutStream = new JarOutputStream(
new BufferedOutputStream(new FileOutputStream(new File(outputDatFile + ".jar"))));
jarOutPutStream.setMethod(JarOutputStream.DEFLATED);
JarEntry ze = new JarEntry(fileName);
jarOutPutStream.putNextEntry(ze);
return jarOutPutStream;
}
public class Consumer1 implements Runnable {
private BlockingQueue<String> queue;
private String outputDatFile;
private String datFileName;
public Consumer1(BlockingQueue<String> queue, String outputDatFile, String datFileName) {
this.queue = queue;
this.outputDatFile = outputDatFile;
this.datFileName = datFileName;
}
@Override
public void run() {
JarOutputStream jarOutPutStreamText = null;
try {
jarOutPutStreamText = getJar(outputDatFile, datFileName);
int recordsWritten = 0;
while (true) {
recordsWritten++;
try {
String objectRetrieved = queue.take();
jarOutPutStreamText.write(objectRetrieved.getBytes());
jarOutPutStreamText.flush();
if (recordsWritten % 100000 == 0) {
System.out.println("Written Records Count Queue 1 " + recordsWritten);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} catch (FileNotFoundException e1) {
e1.printStackTrace();
} catch (IOException e1) {
e1.printStackTrace();
} finally {
if (jarOutPutStreamText != null) {
try {
jarOutPutStreamText.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
}
public class Consumer2 implements Runnable {
private BlockingQueue<DBObject> queue2;
private String outputDatBSONFile;
private String bsonFileName;
public Consumer2(BlockingQueue<DBObject> queue2, String outputDatBSONFile, String bsonFileName) {
this.queue2 = queue2;
this.outputDatBSONFile = outputDatBSONFile;
this.bsonFileName = bsonFileName;
}
@Override
public void run() {
JarOutputStream jarOutPutStreamBSON = null;
try {
jarOutPutStreamBSON = getJar(outputDatBSONFile, bsonFileName);
BSONFileWriter bsonWriter = new BSONFileWriter(jarOutPutStreamBSON);
int recordsWritten = 0;
while (true) {
recordsWritten++;
try {
DBObject objectRetrieved = queue2.take();
bsonWriter.write(objectRetrieved);
bsonWriter.flush();
if (recordsWritten % 100000 == 0) {
System.out.println("Written Records Count Queue 2 " + recordsWritten);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} catch (FileNotFoundException e1) {
e1.printStackTrace();
} catch (IOException e1) {
e1.printStackTrace();
} finally {
if (jarOutPutStreamBSON != null) {
try {
jarOutPutStreamBSON.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
public class Producer implements Runnable {
private BlockingQueue<String> queue1;
private BlockingQueue<DBObject> queue2;
private String url;
private String user;
private String pass;
private String driver;
private String strQuery;
public Producer(BlockingQueue<String> queue1, BlockingQueue<DBObject> queue2, String url, String user,
String pass, String driver, String strQuery) {
this.queue1 = queue1;
this.queue2 = queue2;
this.url = url;
this.pass = pass;
this.driver = driver;
this.strQuery = strQuery;
}
@Override
public void run() {
Connection con = null;
Statement st = null;
ResultSet rs = null;
try {
Class.forName(driver);
con = DriverManager.getConnection(url, user, pass);
con.setAutoCommit(false);
Map<String, Object> mapper = new HashMap<String, Object>();
try {
st = con.createStatement(ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_READ_ONLY);
st.setFetchSize(20000);
System.out.println(
"Attempting to execute statement: " + System.getProperty("line.separator") + strQuery);
strQuery = strQuery.replace(System.getProperty("line.separator"), " ");
rs = st.executeQuery(strQuery);
ResultSetMetaData md = rs.getMetaData();
try {
int col = md.getColumnCount();
int resultSetCounter = 0;
while (rs.next()) {
String strQueryOutput = "";
resultSetCounter++;
for (int x = 1; x <= col; ++x) {
String outPut = "";
if (md.getColumnTypeName(x).equals("DECIMAL")) {
StringAlignUtils util = new StringAlignUtils(md.getColumnDisplaySize(x),
Alignment.RIGHT);
outPut = util.format(rs.getString(x));
strQueryOutput = strQueryOutput + outPut;
mapper.put(md.getColumnName(x), outPut);
} else if (md.getColumnTypeName(x).equals("NUMERIC")) {
StringAlignUtils util = new StringAlignUtils(md.getColumnDisplaySize(x),
Alignment.RIGHT);
outPut = util.format(rs.getString(x));
strQueryOutput = strQueryOutput + outPut;
mapper.put(md.getColumnName(x), outPut);
} else if (md.getColumnTypeName(x).equals("CHAR() FOR BIT DATA")) {
char charData = rs.getString(x).charAt(0);
outPut = charData + "";
StringAlignUtils util = new StringAlignUtils(md.getColumnDisplaySize(x),
Alignment.RIGHT);
outPut = util.format(outPut);
strQueryOutput = strQueryOutput + outPut;
mapper.put(md.getColumnName(x), outPut);
} else {
StringAlignUtils util = new StringAlignUtils(md.getColumnDisplaySize(x),
Alignment.RIGHT);
outPut = util.format(rs.getString(x));
strQueryOutput = strQueryOutput + outPut;
mapper.put(md.getColumnName(x), outPut);
}
}
if (resultSetCounter % 100000 == 0) {
System.out.println(" The counter is " + resultSetCounter);
}
strQueryOutput = strQueryOutput + '\n';
queue1.put(strQueryOutput);
queue2.put(new BasicDBObject(mapper));
}
} catch (Exception e) {
e.printStackTrace();
System.err.println("Error: " + e.getMessage());
System.err.println("Exiting!");
System.exit(1);
}
System.out.println("Query results successfully returned...");
} catch (SQLException s) {
System.err.println("SQL statement is not executed!");
System.err.println("Error: " + s.getMessage());
} finally {
System.out.println("Trying to Close ResultSet and Statement...");
if (rs != null) {
System.out.println("Closing ResultSet..");
rs.close();
}
if (st != null) {
System.out.println("Closing Statement..");
st.close();
}
}
} catch (Exception exception) {
exception.printStackTrace();
} finally {
try {
System.out.println("Trying to Close database connection..");
if (con != null) {
System.out.println("Closing database connection..");
con.close();
}
} catch (SQLException exception) {
exception.printStackTrace();
}
}
}
}
코드를 읽을 수 있도록 모든 빈 줄을 제거하십시오. – EJP