현재 서버에 스트리밍을 계속하기 위해 일대 다 연결을 열어주는 응용 프로그램에 java.nio.channel.Selectors & SocketChannels를 사용하고 있습니다. StreamWriteWorker - SocketChannel에 쓰기 작업을 수행하고, StreamReadWorker - 버퍼에서 바이트를 읽고 내용을 파싱하고, StreamTaskDispatcher - readyOps에 대한 Selector의 선택을 수행하고 작업자 스레드에 대한 새로운 실행 파일을 전달합니다.java.nio Selectors and SocketChannel for continue 스트리밍
문제점 - 선택기의 선택 메소드에 대한 호출은 첫 번째 호출에서 값> 0 (유효한 readyOps) 만 리턴합니다. 한 번에 모든 준비 채널에서 데이터를 쓰고 보낼 수 있지만 Selector의 선택 메소드 호출은 모두 0을 반환합니다.
질문 : 모든 읽기 후에는 SocketChannel에서 close를 호출해야합니까?/쓴다 (나는 희망하지 않는다!)? 그렇지 않은 경우 SocketChannels이 읽기/쓰기 작업에 사용할 수없는 원인이 무엇입니까?
죄송합니다. 코드를 게시 할 수 없지만 도움이 될만한 명확한 설명이 되었기를 바랍니다. 나는 답변을 찾았지만 닫은 후에 SocketChannel 연결을 재사용 할 수는 없다는 것을 알았지 만, 내 채널이 가까이 있으면 안되며 서버는 EOF 스트림 결과를받지 못합니다.
나는 약간의 진전을 이루었고 json 구문 분석 오류로 인해 서버 응용 프로그램에서 쓰기 작업이 발생하지 않는다는 것을 알아 냈습니다. 이제 클라이언트 응용 프로그램 코드의 SocketChannel이 읽기 작업을 처리 한 후에 다른 쓰기 작업을 수행 할 준비가되었습니다. 나는 이것이 SocketChannels의 TCP 성질이라고 생각한다. 그러나 SocketChannel을 서버 응용 프로그램에서 다른 읽기 작업에 사용할 수 없게됩니다. SocketChannels에 대한 정상적인 동작입니까? 읽기 작업 후에 클라이언트 측에서 연결을 닫고 새 연결을 설정해야합니까? 여기
는 내가 뭘하려고 오전의 코드 샘플입니다 :package org.stream.socket;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.nio.charset.CodingErrorAction;
import java.util.HashMap;
import java.util.Iterator;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.lang3.RandomStringUtils;
import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.google.gson.JsonPrimitive;
import com.google.gson.stream.JsonToken;
public class ClientServerTest {
private LinkedBlockingQueue<byte[]> dataQueue = new LinkedBlockingQueue<byte[]>();
private ExecutorService executor = Executors.newFixedThreadPool(1);
private HashMap<String, Integer> uuidToSize = new HashMap<String, Integer>();
private class StreamWriteTask implements Runnable {
private ByteBuffer buffer;
private SelectionKey key;
private Selector selector;
private StreamWriteTask(ByteBuffer buffer, SelectionKey key, Selector selector) {
this.buffer = buffer;
this.key = key;
this.selector = selector;
}
@Override
public void run() {
SocketChannel sc = (SocketChannel) key.channel();
byte[] data = (byte[]) key.attachment();
buffer.clear();
buffer.put(data);
buffer.flip();
int results = 0;
while (buffer.hasRemaining()) {
try {
results = sc.write(buffer);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
if (results == 0) {
buffer.compact();
buffer.flip();
data = new byte[buffer.remaining()];
buffer.get(data);
key.interestOps(SelectionKey.OP_WRITE);
key.attach(data);
selector.wakeup();
return;
}
}
key.interestOps(SelectionKey.OP_READ);
key.attach(null);
selector.wakeup();
}
}
private class StreamReadTask implements Runnable {
private ByteBuffer buffer;
private SelectionKey key;
private Selector selector;
private StreamReadTask(ByteBuffer buffer, SelectionKey key, Selector selector) {
this.buffer = buffer;
this.key = key;
this.selector = selector;
}
private boolean checkUUID(byte[] data) {
return uuidToSize.containsKey(new String(data));
}
@Override
public void run() {
SocketChannel sc = (SocketChannel) key.channel();
buffer.clear();
byte[] data = (byte[]) key.attachment();
if (data != null) {
buffer.put(data);
}
int count = 0;
int readAttempts = 0;
try {
while ((count = sc.read(buffer)) > 0) {
readAttempts++;
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
if (count == 0) {
buffer.flip();
data = new byte[buffer.limit()];
buffer.get(data);
if (checkUUID(data)) {
key.interestOps(SelectionKey.OP_READ);
key.attach(data);
} else {
System.out.println("Clinet Read - uuid ~~~~ " + new String(data));
key.interestOps(SelectionKey.OP_WRITE);
key.attach(null);
}
}
if (count == -1) {
try {
sc.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
selector.wakeup();
}
}
private class ClientWorker implements Runnable {
@Override
public void run() {
try {
Selector selector = Selector.open();
SocketChannel sc = SocketChannel.open();
sc.configureBlocking(false);
sc.connect(new InetSocketAddress("127.0.0.1", 9001));
sc.register(selector, SelectionKey.OP_CONNECT);
ByteBuffer buffer = ByteBuffer.allocateDirect(65535);
while (selector.isOpen()) {
int count = selector.select(10);
if (count == 0) {
continue;
}
Iterator<SelectionKey> it = selector.selectedKeys().iterator();
while (it.hasNext()) {
final SelectionKey key = it.next();
it.remove();
if (!key.isValid()) {
continue;
}
if (key.isConnectable()) {
sc = (SocketChannel) key.channel();
if (!sc.finishConnect()) {
continue;
}
sc.register(selector, SelectionKey.OP_WRITE);
}
if (key.isReadable()) {
key.interestOps(0);
executor.execute(new StreamReadTask(buffer, key, selector));
}
if (key.isWritable()) {
key.interestOps(0);
if(key.attachment() == null){
key.attach(dataQueue.take());
}
executor.execute(new StreamWriteTask(buffer, key, selector));
}
}
}
} catch (IOException ex) {
// Handle Exception
}catch(InterruptedException ex){
}
}
}
private class ServerWorker implements Runnable {
@Override
public void run() {
try {
Selector selector = Selector.open();
ServerSocketChannel ssc = ServerSocketChannel.open();
ServerSocket socket = ssc.socket();
socket.bind(new InetSocketAddress(9001));
ssc.configureBlocking(false);
ssc.register(selector, SelectionKey.OP_ACCEPT);
ByteBuffer buffer = ByteBuffer.allocateDirect(65535);
DataHandler handler = new DataHandler();
while (selector.isOpen()) {
int count = selector.select(10);
if (count == 0) {
continue;
}
Iterator<SelectionKey> it = selector.selectedKeys().iterator();
while (it.hasNext()) {
final SelectionKey key = it.next();
it.remove();
if (!key.isValid()) {
continue;
}
if (key.isAcceptable()) {
ssc = (ServerSocketChannel) key.channel();
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
sc.register(selector, SelectionKey.OP_READ);
}
if (key.isReadable()) {
handler.readSocket(buffer, key);
}
if (key.isWritable()) {
handler.writeToSocket(buffer, key);
}
}
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
private class DataHandler {
private JsonObject parseData(StringBuilder builder) {
if (!builder.toString().endsWith("}")) {
return null;
}
JsonParser parser = new JsonParser();
JsonObject obj = (JsonObject) parser.parse(builder.toString());
return obj;
}
private void readSocket(ByteBuffer buffer, SelectionKey key)
throws IOException {
SocketChannel sc = (SocketChannel) key.channel();
buffer.clear();
int count = Integer.MAX_VALUE;
int readAttempts = 0;
try {
while ((count = sc.read(buffer)) > 0) {
readAttempts++;
}
} catch (IOException e) {
e.printStackTrace();
}
if (count == 0) {
buffer.flip();
StringBuilder builder = key.attachment() instanceof StringBuilder ? (StringBuilder) key
.attachment() : new StringBuilder();
Charset charset = Charset.forName("UTF-8");
CharsetDecoder decoder = charset.newDecoder();
decoder.onMalformedInput(CodingErrorAction.IGNORE);
System.out.println(buffer);
CharBuffer charBuffer = decoder.decode(buffer);
String content = charBuffer.toString();
charBuffer = null;
builder.append(content);
System.out.println(content);
JsonObject obj = parseData(builder);
if (obj == null) {
key.attach(builder);
key.interestOps(SelectionKey.OP_READ);
} else {
System.out.println("data ~~~~~~~ " + builder.toString());
JsonPrimitive uuid = obj.get("uuid").getAsJsonPrimitive();
key.attach(uuid.toString().getBytes());
key.interestOps(SelectionKey.OP_WRITE);
}
}
if (count == -1) {
key.attach(null);
sc.close();
}
}
private void writeToSocket(ByteBuffer buffer, SelectionKey key)
throws IOException {
SocketChannel sc = (SocketChannel) key.channel();
byte[] data = (byte[]) key.attachment();
buffer.clear();
buffer.put(data);
buffer.flip();
int writeAttempts = 0;
while (buffer.hasRemaining()) {
int results = sc.write(buffer);
writeAttempts++;
System.out.println("Write Attempt #" + writeAttempts);
if (results == 0) {
buffer.compact();
buffer.flip();
data = new byte[buffer.remaining()];
buffer.get(data);
key.attach(data);
key.interestOps(SelectionKey.OP_WRITE);
break;
}
}
key.interestOps(SelectionKey.OP_READ);
key.attach(null);
}
}
public ClientServerTest() {
for (int index = 0; index < 1000; index++) {
JsonObject obj = new JsonObject();
String uuid = UUID.randomUUID().toString();
uuidToSize.put(uuid, uuid.length());
obj.addProperty("uuid", uuid);
String data = RandomStringUtils.randomAlphanumeric(10000);
obj.addProperty("event", data);
dataQueue.add(obj.toString().getBytes());
}
Thread serverWorker = new Thread(new ServerWorker());
serverWorker.start();
Thread clientWorker = new Thread(new ClientWorker());
clientWorker.start();
}
/**
* @param args
*/
public static void main(String[] args) {
ClientServerTest test = new ClientServerTest();
for(;;){
}
}
}
왜이 세 개의 스레드가 필요하다고 생각하십니까? 쓰는 쓰레드는 필요 없으며, NIO가 의도 한 선을 따라 약간의 재구성을하면 읽기 쓰레드도 제거 할 수 있습니다. 멀티 스레딩과 NIO는 실제로 섞이지 않습니다. 멀티 스레딩을 원하면 java.net을 사용하고 입출력을 차단하십시오. – EJP
의견을 보내 주셔서 감사합니다. 시기 적절성이 중요한 요소이므로 3 가지 스레드로 시작했습니다. 나는 Write on Write 또는 그 반대로 쓰기를 원하지 않았고, 나는 많은 양의 데이터로 작업 할 것이다. 내가 진술 한 문제에 관해서 당신은 대응하고 있습니까? –
3 개의 스레드에 동기를 부여하는 문제가 있는지 확실하지 않습니다. 당신은 * non-blocking 모드에있다. * select() 호출을 제외하고 아무것도 대기하지 않을 것이다. 그러나 select()가 0을 반환하면 아무 것도 준비되지 않습니다. 무작위로 채널을 닫으면 변경되지 않습니다. – EJP