سلام،
کدارو تغییر دادم، الان دیگه هر وقت کلاینت کانکت یا دیسکانکت بشه، بهت خبر میده...
Server :
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.CharsetDecoder;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
/**
* @author avb
*/
public final class Server implements Runnable {
private static final int DEFAULT_BUFFER_SIZE = 512;
private static final long TIMEOUT = 10000;
private static enum State {STOPPED, STOPPING, RUNNING}
private final int port;
private final AtomicReference<State> state = new AtomicReference<>(State.STOPPED);
private final List<ServerListener> listeners = new ArrayList<>();
private Selector mainSelector;
private ServerSocketChannel server;
private ExecutorService handlers;
public Server(int port) {
this.port = port;
}
private void openConnection() throws IOException {
mainSelector = Selector.open();
server = ServerSocketChannel.open();
server.bind(new InetSocketAddress(port));
server.configureBlocking(false);
server.register(mainSelector, SelectionKey.OP_ACCEPT, new Acceptor());
}
private boolean start() {
return state.compareAndSet(State.STOPPED, State.RUNNING);
}
private void started() {
fireServerStarted();
}
public boolean isRunning() {
return state.get() == State.RUNNING;
}
public boolean stop() {
if (state.compareAndSet(State.RUNNING, State.STOPPING)) {
fireServerStopping();
handlers.shutdown();
mainSelector.wakeup();
return true;
}
return false;
}
public boolean isStopping() {
return state.get() == State.STOPPING;
}
private void stopped() {
fireServerStopped();
handlers.shutdownNow();
state.set(State.STOPPED);
}
public boolean isStopped() {
return state.get() == State.STOPPED;
}
private void clientConnected() {
fireClientConnected();
}
private void clientDisconnected() {
fireClientDisconnected();
}
public void addServerListener(final ServerListener listener) {
listeners.add(listener);
}
public void removeServerListener(final ServerListener listener) {
listeners.remove(listener);
}
public void fireServerStarted() {
listeners.forEach(ServerListener::serverStarted);
}
public void fireServerStopping() {
listeners.forEach(ServerListener::serverStopping);
}
public void fireServerStopped() {
listeners.forEach(ServerListener::serverStopped);
}
public void fireClientConnected() {
listeners.forEach(ServerListener::clientConnected) ;
}
public void fireClientDisconnected() {
listeners.forEach(ServerListener::clientDisconnect ed);
}
@Override
public void run() {
if (!start()) {
return;
}
started();
handlers = Executors.newCachedThreadPool();
try {
openConnection();
while (isRunning()) {
if (mainSelector.select(TIMEOUT) == 0) {
continue;
}
Set<SelectionKey> selectedKeys = mainSelector.selectedKeys();
Iterator<SelectionKey> it = selectedKeys.iterator();
while (it.hasNext() && isRunning()) {
SelectionKey key = it.next();
it.remove();
try {
if (key.isConnectable()) {
((SocketChannel) key.channel()).finishConnect();
}
if (key.isAcceptable()) {
dispatch(key);
}
} catch (Throwable t) {
resetKey(key);
}
}
selectedKeys.clear();
}
} catch (Throwable e) {
e.printStackTrace();
} finally {
closeConnection();
stopped();
}
}
private void closeConnection() {
try {
mainSelector.close();
server.socket().close();
server.close();
} catch (IOException e) {
e.printStackTrace();
}
}
private void resetKey(SelectionKey key) {
key.cancel();
try {
key.channel().close();
} catch (IOException ignored) {
}
}
private void dispatch(SelectionKey key) {
Runnable acceptor = (Runnable) key.attachment();
if (acceptor != null) {
acceptor.run();
}
}
private class Acceptor implements Runnable {
@Override
public synchronized void run() {
try {
SocketChannel connection = server.accept();
handlers.submit(new Handler(connection));
clientConnected();
} catch (IOException ignored) {
}
}
}
private final class Handler implements Runnable {
private final Selector selector;
private final SocketChannel socketChannel;
private final ConcurrentHashMap<SocketChannel, byte[]> dataTracking = new ConcurrentHashMap<>();
private Handler(SocketChannel channel) throws IOException {
selector = Selector.open();
socketChannel = channel;
socketChannel.configureBlocking(false);
socketChannel.socket().setTcpNoDelay(true);
socketChannel.register(selector, SelectionKey.OP_READ);
}
@Override
public void run() {
try {
while (isRunning()) {
if (selector.select(TIMEOUT) == 0) {
continue;
}
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> it = selectedKeys.iterator();
while (it.hasNext() && isRunning()) {
SelectionKey key = it.next();
it.remove();
if (!key.isValid()) {
continue;
}
try {
if (key.isWritable()) {
write(key);
}
if (key.isReadable()) {
String data = read(key);
sendResponse(key, data);
}
} catch (Throwable t) {
resetKey(key);
clientDisconnected();
return;
}
}
selectedKeys.clear();
}
} catch (Throwable t) {
clientDisconnected();
} finally {
closeConnection();
}
}
private void closeConnection() {
try {
selector.close();
socketChannel.socket().close();
socketChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
private void write(SelectionKey key) {
SocketChannel channel = (SocketChannel) key.channel();
byte[] data = dataTracking.remove(channel);
try {
ByteBuffer writeBuffer = ByteBuffer.wrap(data);
while (writeBuffer.hasRemaining()) {
channel.write(writeBuffer);
}
key.interestOps(SelectionKey.OP_READ);
} catch (IOException e) {
resetKey(key);
}
}
private String read(SelectionKey key) {
SocketChannel channel = (SocketChannel) key.channel();
StringBuilder sb = new StringBuilder();
ByteBuffer readBuffer = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE);
CharsetDecoder decoder = StandardCharsets.UTF_8.newDecoder();
CharBuffer charBuffer;
int read;
try {
while (true) {
read = channel.read(readBuffer);
if (read <= 0) {
break;
}
readBuffer.flip();
charBuffer = decoder.decode(readBuffer);
readBuffer.clear();
sb.append(String.valueOf(charBuffer));
}
} catch (IOException e) {
resetKey(key);
return null;
}
if (read == -1) {
resetKey(key);
return null;
}
return String.valueOf(sb);
}
private void sendResponse(SelectionKey key, String data) {
String response = process(data);
echo(key, response.getBytes());
}
private void echo(SelectionKey key, byte[] data) {
SocketChannel socketChannel = (SocketChannel) key.channel();
dataTracking.put(socketChannel, data);
key.interestOps(SelectionKey.OP_WRITE);
}
}
}
ServerListener :
/**
* @author avb
*/
public interface ServerListener {
public void serverStarted();
public void serverStopping();
public void serverStopped();
public void clientConnected();
public void clientDisconnected();
}
ServerAdapter :
/**
* @author avb
*/
public abstract class ServerAdapter implements ServerListener {
@Override
public void serverStarted() {
}
@Override
public void serverStopping() {
}
@Override
public void serverStopped() {
}
@Override
public void clientConnected() {
}
@Override
public void clientDisconnected() {
}
}
سمت کلاینت :
Client :
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.CharsetDecoder;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicReference;
/**
* @author avb
*/
public final class Client implements Runnable {
private static final int DEFAULT_BUFFER_SIZE = 512;
private static final long TIMEOUT = 10000;
private static final long REQUEST_TIMEOUT = 10000;
private static enum State {DISCONNECTED, DISCONNECTING, CONNECTING, CONNECTED}
private final Locker writer = new Locker<>(null);
private final InetSocketAddress address;
private final AtomicReference<State> state = new AtomicReference<>(State.DISCONNECTED);
private final ConcurrentLinkedQueue<Locker<String, String>> queue = new ConcurrentLinkedQueue<>();
private final ConcurrentHashMap<SelectionKey, Locker<String, String>> requests = new ConcurrentHashMap<>();
private final List<ClientListener> listeners = new ArrayList<>();
private SocketChannel channel;
private Selector selector;
public Client(InetSocketAddress address) {
this.address = address;
}
private boolean connect() {
if (!state.compareAndSet(State.DISCONNECTED, State.CONNECTING)) {
return false;
}
fireClientConnecting();
try {
selector = Selector.open();
channel = SocketChannel.open();
channel.configureBlocking(false);
channel.register(selector, SelectionKey.OP_CONNECT);
channel.connect(address);
return true;
} catch (IOException e) {
disconnected();
return false;
}
}
public boolean isConnecting() {
return state.get() == State.CONNECTING;
}
private void connected() {
fireClientConnected();
state.set(State.CONNECTED);
}
public boolean isConnected() {
return state.get() == State.CONNECTED;
}
public boolean disconnect() {
if (state.compareAndSet(State.CONNECTED, State.DISCONNECTING)) {
fireClientDisconnecting();
requests.forEach((key, locker) -> {
resetKey(key);
locker.unlock();
});
requests.clear();
queue.clear();
writer.unlock();
selector.wakeup();
return true;
}
return false;
}
public boolean isDisconnecting() {
return state.get() == State.DISCONNECTING;
}
private void disconnected() {
fireClientDisconnected();
state.set(State.DISCONNECTED);
}
public boolean isDisconnected() {
return state.get() == State.DISCONNECTED;
}
public void addClientListener(final ClientListener listener) {
listeners.add(listener);
}
public void removeClientListener(final ClientListener listener) {
listeners.remove(listener);
}
public void fireClientConnecting() {
listeners.forEach(ClientListener::clientConnecting );
}
public void fireClientConnected() {
listeners.forEach(ClientListener::clientConnected) ;
}
public void fireClientDisconnecting() {
listeners.forEach(ClientListener::clientDisconnect ing);
}
public void fireClientDisconnected() {
listeners.forEach(ClientListener::clientDisconnect ed);
}
@Override
public void run() {
if (!connect()) {
return;
}
try {
while (isConnected() || isConnecting()) {
if (selector.select(TIMEOUT) == 0) {
continue;
}
Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
while (keys.hasNext() && (isConnected() || isConnecting())) {
SelectionKey key = keys.next();
keys.remove();
if (!key.isValid()) {
continue;
}
try {
if (key.isConnectable()) {
connect(key);
}
if (key.isWritable()) {
write(key);
}
if (key.isReadable()) {
read(key);
}
} catch (Throwable t) {
resetKeyAndDisconnect(key);
return;
}
}
}
} catch (IOException e) {
e.printStackTrace();
} finally {
closeConnection();
disconnected();
}
}
public String getResponse(String request) {
if (!isConnected()) {
return null;
}
Locker<String, String> locker = new Locker<>(request);
addToQueue(locker);
locker.lock(REQUEST_TIMEOUT);
String response = locker.getValue();
if (isNull(response)) {
return null;
}
return response;
}
private void closeConnection() {
if (isDisconnected()) {
return;
}
try {
selector.close();
channel.socket().close();
channel.close();
} catch (IOException ignored) {
}
}
private void connect(SelectionKey key) {
SocketChannel channel = (SocketChannel) key.channel();
try {
if (channel.isConnectionPending()) {
channel.finishConnect();
channel.configureBlocking(false);
channel.register(selector, SelectionKey.OP_WRITE);
connected();
}
} catch (IOException e) {
resetKeyAndDisconnect(key);
}
}
private void write(SelectionKey key) {
if (!isConnected()) {
return;
}
if (queue.isEmpty()) {
writer.lock();
}
Locker<String, String> locker;
if ((locker = poll()) != null) {
SocketChannel channel = (SocketChannel) key.channel();
try {
ByteBuffer writeBuffer = ByteBuffer.wrap(locker.getKey().getBytes());
while (writeBuffer.hasRemaining()) {
channel.write(writeBuffer);
}
requests.put(key, locker);
key.interestOps(SelectionKey.OP_READ);
} catch (IOException e) {
resetKeyAndDisconnect(key);
}
}
}
private void read(SelectionKey key) {
if (!isConnected()) {
return;
}
SocketChannel channel = (SocketChannel) key.channel();
StringBuilder sb = new StringBuilder();
ByteBuffer readBuffer = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE);
CharsetDecoder decoder = StandardCharsets.UTF_8.newDecoder();
CharBuffer charBuffer;
int read;
try {
while (true) {
read = channel.read(readBuffer);
if (read <= 0) {
break;
}
readBuffer.flip();
charBuffer = decoder.decode(readBuffer);
readBuffer.clear();
sb.append(String.valueOf(charBuffer));
}
} catch (IOException e) {
resetKeyAndDisconnect(key);
return;
}
if (read == -1) {
resetKeyAndDisconnect(key);
return;
}
Locker<String, String> locker = requests.remove(key);
if (locker != null) {
locker.setValue(String.valueOf(sb));
locker.unlock();
}
writer.unlock();
key.interestOps(SelectionKey.OP_WRITE);
}
private void resetKey(SelectionKey key) {
key.cancel();
try {
key.channel().close();
} catch (IOException ignored) {
}
}
private void resetKeyAndDisconnect(SelectionKey key) {
resetKey(key);
disconnect();
}
private void addToQueue(Locker<String, String> locker) {
queue.add(locker);
writer.unlock();
}
private Locker<String, String> poll() {
return queue.poll();
}
private static boolean isNull(String s) {
return s == null || s.isEmpty() || s.trim().isEmpty();
}
}
Locker :
/**
* @author avb
*/
public final class Locker<K, V> {
private final Object o = new Object();
private final K key;
private V value;
public Locker(K key) {
this.key = key;
}
public K getKey() {
return key;
}
public V getValue() {
return value;
}
public void setValue(V value) {
this.value = value;
}
public void lock() {
synchronized (o) {
try {
o.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public void lock(long timeout) {
synchronized (o) {
try {
o.wait(timeout);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public void unlock() {
synchronized (o) {
o.notify();
}
}
}
ClientListener :
/**
* @author avb
*/
public interface ClientListener {
public void clientConnecting();
public void clientConnected();
public void clientDisconnecting();
public void clientDisconnected();
}
ClientAdapter :
/**
* @author avb
*/
public abstract class ClientAdapter implements ClientListener {
@Override
public void clientConnecting() {
}
@Override
public void clientConnected() {
}
@Override
public void clientDisconnecting() {
}
@Override
public void clientDisconnected() {
}
}
بازم امری بود در خدمتم :)