ورود

View Full Version : کجا باید از ServerSocketChannel و کجا از serverSocket استفاده کنیم



MSHService
یک شنبه 27 مهر 1393, 07:11 صبح
با درود و ادب

دوستان من چند وقت داشتم روی یه سرور سوکت برای ادارمون کار میکردم که بتونم یه سرور خوب برای یه قسمتش بنویسم.

حالا بعد از اینکه خیلی از کارا تموم شده ، دیدم جاوا ServerSocketChannel هم داره و خیلی قابلیت هاش بیشترم از serverSocket هستن و ...

حالا به نظره شما من برناممو با ServerSocketChannel اصلاح کنم یا نه؟

با سپاس

ahmad.mo74
یک شنبه 27 مهر 1393, 20:14 عصر
سلام، بله ارزشش رو داره، چون با Server/Socket Channel میشه به صورت non-blocking کانکشن برقرار کرد که فوق العاده سرعت عملکرد رو میشه بالاتر برد و از همه مهمتر میشه بار روی سرور رو به میزان قابل توجهی پایین آورد!
درباره non-blocking io تو این لینکا مفصل توضیح داده شده:

http://en.wikipedia.org/wiki/Asynchronous_I/O
http://stackoverflow.com/questions/10570246/what-is-non-blocking-or-asynchronous-i-o-in-node-js
http://en.wikipedia.org/wiki/Non-blocking_I/O_(Java)
http://tutorials.jenkov.com/java-nio/nio-vs-io.html

نحوه استفاده از Selector و Server/Socket Channel با هم رو توی این مثالا ببینید :

http://tutorials.jenkov.com/java-nio/selectors.html
http://rox-xmlrpc.sourceforge.net/niotut/

اینا مثال های ساده بودن و مالتی ترد نبودن، برای اینکه به صورت مالتی ترد بشه از Selector ها استفاده کرد خب کار یه خورده پیچیده تر میشه...

تو این pdf آقای Doug Lea خیلی خوب توضیح داده : http://gee.cs.oswego.edu/dl/cpjslides/nio.pdf

اینم یه مثال کامل که با استفاده از نکته هایی که تو همین pdf بود نوشتم :

Server :


import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.CharsetDecoder;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;


/**
* @author avb
*/
public class Server implements Runnable {


private static final short DEFAULT_BUFFER_SIZE = 512;
private static final long TIMEOUT = 10000;


private final int port;
private final AtomicReference<State> state = new AtomicReference<>(State.STOPPED);


private Selector mainSelector;
private ServerSocketChannel server;
private ExecutorService handlers;


private enum State {STOPPED, STOPPING, RUNNING}


public Server(int port) {
this.port = port;
}


private void openConnection() throws IOException {
mainSelector = Selector.open();
server = ServerSocketChannel.open();
server.bind(new InetSocketAddress(port));
server.configureBlocking(false);
server.register(mainSelector, SelectionKey.OP_ACCEPT).attach(new Acceptor());
}


private boolean start() {
return state.compareAndSet(State.STOPPED, State.RUNNING);
}


private void started(boolean alreadyStarted) {
if (!alreadyStarted) {
System.out.println("server started...");
} else {
if (isRunning()) {
System.out.println("server already running...");
} else if (isStopping()) {
System.out.println("server is stopping...");
}
}
}


public boolean isRunning() {
return state.get() == State.RUNNING;
}


public boolean stop() {
handlers.shutdown();
return state.compareAndSet(State.RUNNING, State.STOPPING);
}


public boolean isStopping() {
return state.get() == State.STOPPING;
}


private void stopped() {
handlers.shutdownNow();
state.set(State.STOPPED);
}


public boolean isStopped() {
return state.get() == State.STOPPED;
}


@Override
public void run() {
if (!start()) {
started(true);
return;
}
handlers = Executors.newCachedThreadPool();
started(false);
try {
openConnection();
while (isRunning()) {
if (mainSelector.select(TIMEOUT) == 0) {
continue;
}
Set<SelectionKey> selectedKeys = mainSelector.selectedKeys();
Iterator<SelectionKey> it = selectedKeys.iterator();
while (it.hasNext()) {
SelectionKey key = it.next();
it.remove();
try {
if (key.isConnectable()) {
((SocketChannel) key.channel()).finishConnect();
}
if (key.isAcceptable()) {
dispatch(key);
}
} catch (Throwable t) {
resetKey(key);
}
}
selectedKeys.clear();
}
} catch (Throwable e) {
e.printStackTrace();
} finally {
closeConnection();
stopped();
}
}


private void closeConnection() {
try {
mainSelector.close();
server.socket().close();
server.close();
} catch (IOException e) {
e.printStackTrace();
}
}


private void resetKey(SelectionKey key) {
key.cancel();
try {
key.channel().close();
} catch (IOException ignored) {
}
}


private void dispatch(SelectionKey key) {
Runnable runnable = (Runnable) key.attachment();
if (runnable != null) {
runnable.run();
}
}


private class Acceptor implements Runnable {


@Override
public synchronized void run() {
try {
SocketChannel connection = server.accept();
handlers.submit(new Handler(connection));
} catch (IOException e) {
e.printStackTrace();
}
}


}


private final class Handler implements Runnable {


private final Selector selector;
private final SocketChannel socketChannel;
private final ConcurrentHashMap<SocketChannel, byte[]> dataTracking = new ConcurrentHashMap<>();


private Handler(SocketChannel channel) throws IOException {
selector = Selector.open();
socketChannel = channel;
socketChannel.configureBlocking(false);
socketChannel.socket().setTcpNoDelay(true);
socketChannel.register(selector, SelectionKey.OP_READ);
}


@Override
public void run() {
while (!Thread.currentThread().isInterrupted()) {
try {
if (selector.select(TIMEOUT) == 0) {
continue;
}
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> it = selectedKeys.iterator();
while (it.hasNext()) {
SelectionKey key = it.next();
it.remove();
if (!key.isValid()) {
continue;
}
try {
if (key.isWritable()) {
write(key);
}
if (key.isReadable()) {
read(key);
}
} catch (Throwable t) {
resetKey(key);
return;
}
}
selectedKeys.clear();
} catch (Throwable t) {
return;
}
}
}


private void write(SelectionKey key) {
SocketChannel channel = (SocketChannel) key.channel();
byte[] data = dataTracking.remove(channel);
try {
ByteBuffer wrap = ByteBuffer.wrap(data);
channel.write(wrap);
key.interestOps(SelectionKey.OP_READ);
} catch (IOException e) {
resetKey(key);
}
}


private void read(SelectionKey key) {
SocketChannel channel = (SocketChannel) key.channel();
StringBuilder sb = new StringBuilder();
ByteBuffer readBuffer = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE);
CharsetDecoder decoder = StandardCharsets.UTF_8.newDecoder();
CharBuffer charBuffer;
int read;
try {
do {
read = channel.read(readBuffer);
readBuffer.flip();
charBuffer = decoder.decode(readBuffer);
readBuffer.clear();
sb.append(String.valueOf(charBuffer));
} while (read > 0);
} catch (IOException e) {
resetKey(key);
return;
}
if (read == -1) {
resetKey(key);
return;
}
System.out.println("received : " + sb);
//process your data
echo(key, String.valueOf(sb).getBytes());
}


private void echo(SelectionKey key, byte[] data) {
SocketChannel socketChannel = (SocketChannel) key.channel();
dataTracking.put(socketChannel, data);
key.interestOps(SelectionKey.OP_WRITE);
}


}


public static void main(String[] args) {
Server server = new Server(11025);
new Thread(server).start();
}


}


Client :


import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.CharsetDecoder;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicReference;


/**
* @author avb
*/
public class Client implements Runnable {


public static void main(String[] args) {
Client client = new Client(new InetSocketAddress("127.0.0.1", 11025));
new Thread(client).start();
for (int i = 1; i <= 100; i++) {
String response = client.getResponse("message " + i);
System.out.println("Server said : " + response);
}
}


private static final short DEFAULT_BUFFER_SIZE = 512;
private static final long TIMEOUT = 10000;


private final Object lock = new Object();
private final InetSocketAddress address;
private final AtomicReference<State> state = new AtomicReference<>(State.DISCONNECTED);


private String request;
private String response;
private SocketChannel channel;
private Selector selector;
private boolean readyToWrite = false;
private boolean requestPending = false;


private enum State {DISCONNECTED, DISCONNECTING, CONNECTED}


public Client(InetSocketAddress address) {
this.address = address;
}


private boolean connect() {
if (!state.compareAndSet(State.DISCONNECTED, State.CONNECTED)) {
return false;
}
try {
selector = Selector.open();
channel = SocketChannel.open();
channel.configureBlocking(false);
channel.register(selector, SelectionKey.OP_CONNECT);
channel.connect(address);
return true;
} catch (IOException e) {
disconnected();
return false;
}
}


private void connected(boolean alreadyConnected) {
if (!alreadyConnected) {
System.out.println("connected to server...");
} else {
if (isDisconnected()) {
System.out.println("disconnected from server...");
} else if (isDisconnecting()) {
System.out.println("disconnecting from server...");
} else if (isConnected()) {
System.out.println("already connected...");
}
}
}


public boolean isConnected() {
return state.get() == State.CONNECTED;
}


public boolean disconnect() {
unlock();
return state.compareAndSet(State.CONNECTED, State.DISCONNECTING);
}


public boolean isDisconnecting() {
return state.get() == State.DISCONNECTING;
}


private void disconnected() {
state.set(State.DISCONNECTED);
}


public boolean isDisconnected() {
return state.get() == State.DISCONNECTED;
}


@Override
public void run() {
if (!connect()) {
connected(true);
return;
}
try {
while (isConnected()) {
if (selector.select(TIMEOUT) == 0) {
continue;
}
Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
while (keys.hasNext()) {
SelectionKey key = keys.next();
keys.remove();
try {
if (!key.isValid()) {
continue;
}
if (key.isConnectable()) {
connect(selector, key);
}
if (key.isWritable()) {
readyToWrite = true;
if (requestPending) {
unlock();
}
lock();
write(key);
readyToWrite = false;
}
if (key.isReadable()) {
read(key);
}
} catch (Throwable t) {
resetKey(key);
disconnect();
break;
}
}
}
} catch (IOException e) {
e.printStackTrace();
} finally {
closeConnection();
disconnected();
}
}


private void closeConnection() {
if (isDisconnected()) {
return;
}
try {
selector.close();
channel.socket().close();
channel.close();
} catch (IOException ignored) {
}
}


private void connect(Selector selector, SelectionKey key) {
SocketChannel channel = (SocketChannel) key.channel();
try {
if (channel.isConnectionPending()) {
channel.finishConnect();
channel.configureBlocking(false);
channel.register(selector, SelectionKey.OP_WRITE);
connected(false);
}
} catch (IOException e) {
resetKey(key);
}
}


private void write(SelectionKey key) {
SocketChannel channel = (SocketChannel) key.channel();
try {
channel.write(ByteBuffer.wrap(request.getBytes())) ;
key.interestOps(SelectionKey.OP_READ);
} catch (IOException e) {
resetKey(key);
}
}


private void read(SelectionKey key) {
SocketChannel channel = (SocketChannel) key.channel();
StringBuilder builder = new StringBuilder();
ByteBuffer readBuffer = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE);
CharsetDecoder decoder = StandardCharsets.UTF_8.newDecoder();
CharBuffer charBuffer;
int read;
try {
do {
read = channel.read(readBuffer);
readBuffer.flip();
charBuffer = decoder.decode(readBuffer);
readBuffer.clear();
builder.append(String.valueOf(charBuffer));
} while (read > 0);
} catch (IOException e) {
resetKey(key);
return;
}
if (read == -1) {
resetKey(key);
return;
}
response = String.valueOf(builder);
unlock();
key.interestOps(SelectionKey.OP_WRITE);
}


private void resetKey(SelectionKey key) {
key.cancel();
try {
key.channel().close();
} catch (IOException ignored) {
}
}


public synchronized String getResponse(String request) {
ensureConnect();
if (!readyToWrite) {
requestPending = true;
lock();
requestPending = false;
}
this.request = request;
unlock();
lock(TIMEOUT);
return response;
}


private void ensureConnect() {
if (isDisconnected()) {
new Thread(this).start();
}
}


private void lock() {
synchronized (lock) {
try {
lock.wait();
} catch (InterruptedException ignored) {
}
}
}


private void lock(long timeout) {
synchronized (lock) {
try {
lock.wait(timeout);
} catch (InterruptedException ignored) {
}
}
}


private void unlock() {
synchronized (lock) {
lock.notify();
}
}


}


بازم ممکنه باگ داشته باشه ولی تست کردم اوکی بود.

MSHService
دوشنبه 28 مهر 1393, 17:39 عصر
متشکرم ، یا جا شکل زیر رو دیدم :

124759

آقا دمت گرم. خیلی لطف کردی :)

حالا برای اینکه بتونم یه Dispatcher داشته باشم و و بتونم بدونم کیا بهم وصل شده ، چکار کنم ؟!

حذف رو باید تو خط 234 بنویسم :) اما اضافه رو کجا بنویسم؟

ahmad.mo74
پنج شنبه 01 آبان 1393, 21:36 عصر
سلام، ممنونم

خیلی راحت میشه فهمید چه موقعی کلاینت وصل شده یا نه، من چند روزی نیستم و فعلا به لپ تاپ دسترسی ندارم، ایشالا تا چند روز دیگه کدشو قرار میدم.

ahmad.mo74
یک شنبه 04 آبان 1393, 19:30 عصر
سلام،

کدارو تغییر دادم، الان دیگه هر وقت کلاینت کانکت یا دیسکانکت بشه، بهت خبر میده...

Server :


import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.CharsetDecoder;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;


/**
* @author avb
*/
public final class Server implements Runnable {


private static final int DEFAULT_BUFFER_SIZE = 512;
private static final long TIMEOUT = 10000;


private static enum State {STOPPED, STOPPING, RUNNING}


private final int port;
private final AtomicReference<State> state = new AtomicReference<>(State.STOPPED);
private final List<ServerListener> listeners = new ArrayList<>();


private Selector mainSelector;
private ServerSocketChannel server;
private ExecutorService handlers;


public Server(int port) {
this.port = port;
}


private void openConnection() throws IOException {
mainSelector = Selector.open();
server = ServerSocketChannel.open();
server.bind(new InetSocketAddress(port));
server.configureBlocking(false);
server.register(mainSelector, SelectionKey.OP_ACCEPT, new Acceptor());
}


private boolean start() {
return state.compareAndSet(State.STOPPED, State.RUNNING);
}


private void started() {
fireServerStarted();
}


public boolean isRunning() {
return state.get() == State.RUNNING;
}


public boolean stop() {
if (state.compareAndSet(State.RUNNING, State.STOPPING)) {
fireServerStopping();
handlers.shutdown();
mainSelector.wakeup();
return true;
}
return false;
}


public boolean isStopping() {
return state.get() == State.STOPPING;
}


private void stopped() {
fireServerStopped();
handlers.shutdownNow();
state.set(State.STOPPED);
}


public boolean isStopped() {
return state.get() == State.STOPPED;
}


private void clientConnected() {
fireClientConnected();
}


private void clientDisconnected() {
fireClientDisconnected();
}


public void addServerListener(final ServerListener listener) {
listeners.add(listener);
}


public void removeServerListener(final ServerListener listener) {
listeners.remove(listener);
}


public void fireServerStarted() {
listeners.forEach(ServerListener::serverStarted);
}


public void fireServerStopping() {
listeners.forEach(ServerListener::serverStopping);
}


public void fireServerStopped() {
listeners.forEach(ServerListener::serverStopped);
}


public void fireClientConnected() {
listeners.forEach(ServerListener::clientConnected) ;
}


public void fireClientDisconnected() {
listeners.forEach(ServerListener::clientDisconnect ed);
}


@Override
public void run() {
if (!start()) {
return;
}
started();
handlers = Executors.newCachedThreadPool();
try {
openConnection();
while (isRunning()) {
if (mainSelector.select(TIMEOUT) == 0) {
continue;
}
Set<SelectionKey> selectedKeys = mainSelector.selectedKeys();
Iterator<SelectionKey> it = selectedKeys.iterator();
while (it.hasNext() && isRunning()) {
SelectionKey key = it.next();
it.remove();
try {
if (key.isConnectable()) {
((SocketChannel) key.channel()).finishConnect();
}
if (key.isAcceptable()) {
dispatch(key);
}
} catch (Throwable t) {
resetKey(key);
}
}
selectedKeys.clear();
}
} catch (Throwable e) {
e.printStackTrace();
} finally {
closeConnection();
stopped();
}
}


private void closeConnection() {
try {
mainSelector.close();
server.socket().close();
server.close();
} catch (IOException e) {
e.printStackTrace();
}
}


private void resetKey(SelectionKey key) {
key.cancel();
try {
key.channel().close();
} catch (IOException ignored) {
}
}


private void dispatch(SelectionKey key) {
Runnable acceptor = (Runnable) key.attachment();
if (acceptor != null) {
acceptor.run();
}
}


private class Acceptor implements Runnable {


@Override
public synchronized void run() {
try {
SocketChannel connection = server.accept();
handlers.submit(new Handler(connection));
clientConnected();
} catch (IOException ignored) {
}
}


}


private final class Handler implements Runnable {


private final Selector selector;
private final SocketChannel socketChannel;
private final ConcurrentHashMap<SocketChannel, byte[]> dataTracking = new ConcurrentHashMap<>();


private Handler(SocketChannel channel) throws IOException {
selector = Selector.open();
socketChannel = channel;
socketChannel.configureBlocking(false);
socketChannel.socket().setTcpNoDelay(true);
socketChannel.register(selector, SelectionKey.OP_READ);
}


@Override
public void run() {
try {
while (isRunning()) {
if (selector.select(TIMEOUT) == 0) {
continue;
}
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> it = selectedKeys.iterator();
while (it.hasNext() && isRunning()) {
SelectionKey key = it.next();
it.remove();
if (!key.isValid()) {
continue;
}
try {
if (key.isWritable()) {
write(key);
}
if (key.isReadable()) {
String data = read(key);
sendResponse(key, data);
}
} catch (Throwable t) {
resetKey(key);
clientDisconnected();
return;
}
}
selectedKeys.clear();
}
} catch (Throwable t) {
clientDisconnected();
} finally {
closeConnection();
}
}


private void closeConnection() {
try {
selector.close();
socketChannel.socket().close();
socketChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
}


private void write(SelectionKey key) {
SocketChannel channel = (SocketChannel) key.channel();
byte[] data = dataTracking.remove(channel);
try {
ByteBuffer writeBuffer = ByteBuffer.wrap(data);
while (writeBuffer.hasRemaining()) {
channel.write(writeBuffer);
}
key.interestOps(SelectionKey.OP_READ);
} catch (IOException e) {
resetKey(key);
}
}


private String read(SelectionKey key) {
SocketChannel channel = (SocketChannel) key.channel();
StringBuilder sb = new StringBuilder();
ByteBuffer readBuffer = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE);
CharsetDecoder decoder = StandardCharsets.UTF_8.newDecoder();
CharBuffer charBuffer;
int read;
try {
while (true) {
read = channel.read(readBuffer);
if (read <= 0) {
break;
}
readBuffer.flip();
charBuffer = decoder.decode(readBuffer);
readBuffer.clear();
sb.append(String.valueOf(charBuffer));
}
} catch (IOException e) {
resetKey(key);
return null;
}
if (read == -1) {
resetKey(key);
return null;
}
return String.valueOf(sb);
}


private void sendResponse(SelectionKey key, String data) {
String response = process(data);
echo(key, response.getBytes());
}


private void echo(SelectionKey key, byte[] data) {
SocketChannel socketChannel = (SocketChannel) key.channel();
dataTracking.put(socketChannel, data);
key.interestOps(SelectionKey.OP_WRITE);
}


}


}


ServerListener :


/**
* @author avb
*/
public interface ServerListener {


public void serverStarted();


public void serverStopping();


public void serverStopped();


public void clientConnected();


public void clientDisconnected();


}


ServerAdapter :


/**
* @author avb
*/
public abstract class ServerAdapter implements ServerListener {


@Override
public void serverStarted() {
}


@Override
public void serverStopping() {
}


@Override
public void serverStopped() {
}


@Override
public void clientConnected() {
}


@Override
public void clientDisconnected() {
}


}


سمت کلاینت :

Client :


import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.CharsetDecoder;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicReference;


/**
* @author avb
*/
public final class Client implements Runnable {


private static final int DEFAULT_BUFFER_SIZE = 512;
private static final long TIMEOUT = 10000;
private static final long REQUEST_TIMEOUT = 10000;


private static enum State {DISCONNECTED, DISCONNECTING, CONNECTING, CONNECTED}


private final Locker writer = new Locker<>(null);
private final InetSocketAddress address;
private final AtomicReference<State> state = new AtomicReference<>(State.DISCONNECTED);
private final ConcurrentLinkedQueue<Locker<String, String>> queue = new ConcurrentLinkedQueue<>();
private final ConcurrentHashMap<SelectionKey, Locker<String, String>> requests = new ConcurrentHashMap<>();
private final List<ClientListener> listeners = new ArrayList<>();


private SocketChannel channel;
private Selector selector;


public Client(InetSocketAddress address) {
this.address = address;
}


private boolean connect() {
if (!state.compareAndSet(State.DISCONNECTED, State.CONNECTING)) {
return false;
}
fireClientConnecting();
try {
selector = Selector.open();
channel = SocketChannel.open();
channel.configureBlocking(false);
channel.register(selector, SelectionKey.OP_CONNECT);
channel.connect(address);
return true;
} catch (IOException e) {
disconnected();
return false;
}
}


public boolean isConnecting() {
return state.get() == State.CONNECTING;
}


private void connected() {
fireClientConnected();
state.set(State.CONNECTED);
}


public boolean isConnected() {
return state.get() == State.CONNECTED;
}


public boolean disconnect() {
if (state.compareAndSet(State.CONNECTED, State.DISCONNECTING)) {
fireClientDisconnecting();
requests.forEach((key, locker) -> {
resetKey(key);
locker.unlock();
});
requests.clear();
queue.clear();
writer.unlock();
selector.wakeup();
return true;
}
return false;
}


public boolean isDisconnecting() {
return state.get() == State.DISCONNECTING;
}


private void disconnected() {
fireClientDisconnected();
state.set(State.DISCONNECTED);
}


public boolean isDisconnected() {
return state.get() == State.DISCONNECTED;
}


public void addClientListener(final ClientListener listener) {
listeners.add(listener);
}


public void removeClientListener(final ClientListener listener) {
listeners.remove(listener);
}


public void fireClientConnecting() {
listeners.forEach(ClientListener::clientConnecting );
}


public void fireClientConnected() {
listeners.forEach(ClientListener::clientConnected) ;
}


public void fireClientDisconnecting() {
listeners.forEach(ClientListener::clientDisconnect ing);
}


public void fireClientDisconnected() {
listeners.forEach(ClientListener::clientDisconnect ed);
}


@Override
public void run() {
if (!connect()) {
return;
}
try {
while (isConnected() || isConnecting()) {
if (selector.select(TIMEOUT) == 0) {
continue;
}
Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
while (keys.hasNext() && (isConnected() || isConnecting())) {
SelectionKey key = keys.next();
keys.remove();
if (!key.isValid()) {
continue;
}
try {
if (key.isConnectable()) {
connect(key);
}
if (key.isWritable()) {
write(key);
}
if (key.isReadable()) {
read(key);
}
} catch (Throwable t) {
resetKeyAndDisconnect(key);
return;
}
}
}
} catch (IOException e) {
e.printStackTrace();
} finally {
closeConnection();
disconnected();
}
}


public String getResponse(String request) {
if (!isConnected()) {
return null;
}
Locker<String, String> locker = new Locker<>(request);
addToQueue(locker);
locker.lock(REQUEST_TIMEOUT);
String response = locker.getValue();
if (isNull(response)) {
return null;
}
return response;
}


private void closeConnection() {
if (isDisconnected()) {
return;
}
try {
selector.close();
channel.socket().close();
channel.close();
} catch (IOException ignored) {
}
}


private void connect(SelectionKey key) {
SocketChannel channel = (SocketChannel) key.channel();
try {
if (channel.isConnectionPending()) {
channel.finishConnect();
channel.configureBlocking(false);
channel.register(selector, SelectionKey.OP_WRITE);
connected();
}
} catch (IOException e) {
resetKeyAndDisconnect(key);
}
}


private void write(SelectionKey key) {
if (!isConnected()) {
return;
}
if (queue.isEmpty()) {
writer.lock();
}
Locker<String, String> locker;
if ((locker = poll()) != null) {
SocketChannel channel = (SocketChannel) key.channel();
try {
ByteBuffer writeBuffer = ByteBuffer.wrap(locker.getKey().getBytes());
while (writeBuffer.hasRemaining()) {
channel.write(writeBuffer);
}
requests.put(key, locker);
key.interestOps(SelectionKey.OP_READ);
} catch (IOException e) {
resetKeyAndDisconnect(key);
}
}
}


private void read(SelectionKey key) {
if (!isConnected()) {
return;
}
SocketChannel channel = (SocketChannel) key.channel();
StringBuilder sb = new StringBuilder();
ByteBuffer readBuffer = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE);
CharsetDecoder decoder = StandardCharsets.UTF_8.newDecoder();
CharBuffer charBuffer;
int read;
try {
while (true) {
read = channel.read(readBuffer);
if (read <= 0) {
break;
}
readBuffer.flip();
charBuffer = decoder.decode(readBuffer);
readBuffer.clear();
sb.append(String.valueOf(charBuffer));
}
} catch (IOException e) {
resetKeyAndDisconnect(key);
return;
}
if (read == -1) {
resetKeyAndDisconnect(key);
return;
}
Locker<String, String> locker = requests.remove(key);
if (locker != null) {
locker.setValue(String.valueOf(sb));
locker.unlock();
}
writer.unlock();
key.interestOps(SelectionKey.OP_WRITE);
}


private void resetKey(SelectionKey key) {
key.cancel();
try {
key.channel().close();
} catch (IOException ignored) {
}
}


private void resetKeyAndDisconnect(SelectionKey key) {
resetKey(key);
disconnect();
}


private void addToQueue(Locker<String, String> locker) {
queue.add(locker);
writer.unlock();
}


private Locker<String, String> poll() {
return queue.poll();
}


private static boolean isNull(String s) {
return s == null || s.isEmpty() || s.trim().isEmpty();
}


}


Locker :


/**
* @author avb
*/
public final class Locker<K, V> {


private final Object o = new Object();
private final K key;
private V value;


public Locker(K key) {
this.key = key;
}


public K getKey() {
return key;
}


public V getValue() {
return value;
}


public void setValue(V value) {
this.value = value;
}


public void lock() {
synchronized (o) {
try {
o.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}


public void lock(long timeout) {
synchronized (o) {
try {
o.wait(timeout);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}


public void unlock() {
synchronized (o) {
o.notify();
}
}


}


ClientListener :


/**
* @author avb
*/
public interface ClientListener {


public void clientConnecting();


public void clientConnected();


public void clientDisconnecting();


public void clientDisconnected();


}


ClientAdapter :


/**
* @author avb
*/
public abstract class ClientAdapter implements ClientListener {


@Override
public void clientConnecting() {
}


@Override
public void clientConnected() {
}


@Override
public void clientDisconnecting() {
}


@Override
public void clientDisconnected() {
}


}


بازم امری بود در خدمتم :)

MSHService
یک شنبه 04 آبان 1393, 22:55 عصر
دهنم باز مونده :متعجب: بابا دمت گرم

ببین چطوری تونستی این کد رو اینقدر زود پیاده کنی؟ لطفا منو راهنمایی کن؟

در ضمن اون نموداره بالا رو دیدی ؟ نظرت دربارش چیه؟

برای کانکشن شکسته هم نظری نداری؟

ahmad.mo74
دوشنبه 05 آبان 1393, 18:32 عصر
سلام :قلب:

اون نمودار نمیدونم منظورش مصرف رمه یا انتقال دیتا تو هر ثانیه رو نشون میده، احتمالا همون مصرف رمو نشون میده که کاملا مشخصه nio server کم مصرف تره، برای تشخیص قطع کانکشن هم راهش همونایی بود که تو اون پست گفته شد، شما با nio server هم تست کن ببین کار میکنه یا نه...

کدارم یه خورده تغییر دادم

MSHService
سه شنبه 06 آبان 1393, 06:55 صبح
دوست عزیز او عکس ماله PDFیی که اینجا (http://www.4shared.com/rar/W3zMPGI9ba/tymaPaulMultithreaded.html)آپلود کردم

نگفتی چطوری اینقدر زود تونستی اینو بنویسی؟ :خجالت:

توی این PDF به این صفحه هم اشاره شده :
http://www.theserverside.com/discussions/thread.tss?thread_id=26700

Netbeans من برای

listeners.forEach(ServerListener::serverStarted);

خطا میگیره میگه Method refrence not Expected here

method refrence not support in source 1.7
use Source 8 or higher to enable mehod refrences

با سپاس از زحماتتون

ahmad.mo74
سه شنبه 06 آبان 1393, 14:37 عصر
سلام ، زود نوشتم؟؟؟
شما بگو کدوم قسمت رو توضیح بدم یکم بری تو بحرش میبینی خیلیم سخت نیست، اگر با Selector مشکل داری حتما اون pdf رو بخون خودش خیلی خوب توضیح داده...

اون ارور هم بخاطر اینه که jdk 8 نداری به جاش اینو بنویس :


for (ServerListener listener : listeners) {
listener.serverStarted();
}

MSHService
سه شنبه 06 آبان 1393, 14:51 عصر
آره دیدم نوشته بود ، ولی JDK 8 رو هم نصب کردم نشد ، البته با نت بینز هستم :(

آره خیلی زود نوشتی. هم زیبا و هم خیلی سریع نوشتی.

هنوز دارم مطالبه Jenkov و اون pdfه رو میخونم.

متشکرم عزیزم :قلب:

MSHService
سه شنبه 06 آبان 1393, 16:08 عصر
عزیز این هم خطا داره :

private void sendResponse(SelectionKey key, String data) {
String response = Process(data);
echo(key, response.getBytes());
}

:ناراحت:

ahmad.mo74
سه شنبه 06 آبان 1393, 17:52 عصر
سلام، خب این خط رو عمدا اینطوری نوشتم، یعنی که اینجا میتونی درخواست کاربر رو بررسی کنی و بهش جواب بدی، متد process رو خودت باید بنویسی :)

MSHService
سه شنبه 06 آبان 1393, 19:16 عصر
ببن نگفتی چطور اینقدر سریع و اینقدر عالی تونستید این کد رو بنویسید؟!!

من چطور میتونم مثل شما بشم ؟ :خجالت:

ahmad.mo74
پنج شنبه 08 آبان 1393, 11:03 صبح
سلام، شما لطف داری :)

گفتم که بیشترشو از اون pdf کمک گرفتم...

فقط یه نکته مهمی رو حواست باشه وقتی در حالت معمولی از socket چیزی میخونیم تا زمانی که دیتا کامل به دستمون نرسیده متد ()read عددی بزرگتر از 0 بهمون برمیگردونه و فقط موقعی 0 برمیگردونه که اطلاعات کامل بهمون برسه
اما وقتی داریم از non-blocking io استفاده میکنیم قضیه فرق میکنه، چون کار read و write میتونن به طور همزمان انجام بشن ممکنه متد ()read عدد 0 بهمون برگردونه در حالی که هنوز اطلاعات کامل به دستمون نرسیده، برای داده های کم حجم کم اتفاق میفته که قبل از اتمام 0 برگردونه ولی اگر دیتا از سایز بافر بیشتر باشه این احتمال هست، روشی های مختلفی برای جلوگیری از این مشکل هست مثلا اینکه از قبل سایز رو به طرف مقابل بفرستیم تا بدونه تا کجا باید read کنه یا اینکه انتهای فایل رو مشخص کنیم (بیشتر زمانی که String میفرستیم کاربرد داره) مثلا من خودم اینکارو میکنم، یعنی وقتی چیزی میفرستم آخرش \n\r اضافه میکنم :

private static final String EOF = "\n\r";



private String read(SelectionKey key) {
SocketChannel channel = (SocketChannel) key.channel();
StringBuilder sb = new StringBuilder();
ByteBuffer readBuffer = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE);
CharsetDecoder decoder = StandardCharsets.UTF_8.newDecoder();
CharBuffer charBuffer;
int read;
try {
do {
read = channel.read(readBuffer);
if (read == -1) {
resetKey(key);
return null;
}
readBuffer.flip();
charBuffer = decoder.decode(readBuffer);
sb.append(charBuffer.array());
readBuffer.clear();
} while (sb.lastIndexOf(EOF) == -1);
} catch (IOException e) {
resetKey(key);
return null;
}
sb.delete(sb.lastIndexOf(EOF), sb.length());
return String.valueOf(sb);
}