نمایش نتایج 1 تا 14 از 14

نام تاپیک: کجا باید از ServerSocketChannel و کجا از serverSocket استفاده کنیم

  1. #1
    کاربر دائمی آواتار MSHService
    تاریخ عضویت
    آذر 1387
    محل زندگی
    اکنون توی EJB
    پست
    207

    کجا باید از ServerSocketChannel و کجا از serverSocket استفاده کنیم

    با درود و ادب

    دوستان من چند وقت داشتم روی یه سرور سوکت برای ادارمون کار میکردم که بتونم یه سرور خوب برای یه قسمتش بنویسم.

    حالا بعد از اینکه خیلی از کارا تموم شده ، دیدم جاوا ServerSocketChannel هم داره و خیلی قابلیت هاش بیشترم از serverSocket هستن و ...

    حالا به نظره شما من برناممو با ServerSocketChannel اصلاح کنم یا نه؟

    با سپاس

  2. #2
    کاربر دائمی آواتار ahmad.mo74
    تاریخ عضویت
    مرداد 1393
    محل زندگی
    تهران
    پست
    437

    نقل قول: کجا باید از ServerSocketChannel و کجا از serverSocket استفاده کنیم

    سلام، بله ارزشش رو داره، چون با Server/Socket Channel میشه به صورت non-blocking کانکشن برقرار کرد که فوق العاده سرعت عملکرد رو میشه بالاتر برد و از همه مهمتر میشه بار روی سرور رو به میزان قابل توجهی پایین آورد!
    درباره non-blocking io تو این لینکا مفصل توضیح داده شده:

    http://en.wikipedia.org/wiki/Asynchronous_I/O
    http://stackoverflow.com/questions/1...i-o-in-node-js
    http://en.wikipedia.org/wiki/Non-blocking_I/O_(Java)
    http://tutorials.jenkov.com/java-nio/nio-vs-io.html

    نحوه استفاده از Selector و Server/Socket Channel با هم رو توی این مثالا ببینید :

    http://tutorials.jenkov.com/java-nio/selectors.html
    http://rox-xmlrpc.sourceforge.net/niotut/

    اینا مثال های ساده بودن و مالتی ترد نبودن، برای اینکه به صورت مالتی ترد بشه از Selector ها استفاده کرد خب کار یه خورده پیچیده تر میشه...

    تو این pdf آقای Doug Lea خیلی خوب توضیح داده : http://gee.cs.oswego.edu/dl/cpjslides/nio.pdf

    اینم یه مثال کامل که با استفاده از نکته هایی که تو همین pdf بود نوشتم :

    Server :


    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.CharBuffer;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.ServerSocketChannel;
    import java.nio.channels.SocketChannel;
    import java.nio.charset.CharsetDecoder;
    import java.nio.charset.StandardCharsets;
    import java.util.Iterator;
    import java.util.Set;
    import java.util.concurrent.ConcurrentHashMap;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.atomic.AtomicReference;


    /**
    * @author avb
    */
    public class Server implements Runnable {


    private static final short DEFAULT_BUFFER_SIZE = 512;
    private static final long TIMEOUT = 10000;


    private final int port;
    private final AtomicReference<State> state = new AtomicReference<>(State.STOPPED);


    private Selector mainSelector;
    private ServerSocketChannel server;
    private ExecutorService handlers;


    private enum State {STOPPED, STOPPING, RUNNING}


    public Server(int port) {
    this.port = port;
    }


    private void openConnection() throws IOException {
    mainSelector = Selector.open();
    server = ServerSocketChannel.open();
    server.bind(new InetSocketAddress(port));
    server.configureBlocking(false);
    server.register(mainSelector, SelectionKey.OP_ACCEPT).attach(new Acceptor());
    }


    private boolean start() {
    return state.compareAndSet(State.STOPPED, State.RUNNING);
    }


    private void started(boolean alreadyStarted) {
    if (!alreadyStarted) {
    System.out.println("server started...");
    } else {
    if (isRunning()) {
    System.out.println("server already running...");
    } else if (isStopping()) {
    System.out.println("server is stopping...");
    }
    }
    }


    public boolean isRunning() {
    return state.get() == State.RUNNING;
    }


    public boolean stop() {
    handlers.shutdown();
    return state.compareAndSet(State.RUNNING, State.STOPPING);
    }


    public boolean isStopping() {
    return state.get() == State.STOPPING;
    }


    private void stopped() {
    handlers.shutdownNow();
    state.set(State.STOPPED);
    }


    public boolean isStopped() {
    return state.get() == State.STOPPED;
    }


    @Override
    public void run() {
    if (!start()) {
    started(true);
    return;
    }
    handlers = Executors.newCachedThreadPool();
    started(false);
    try {
    openConnection();
    while (isRunning()) {
    if (mainSelector.select(TIMEOUT) == 0) {
    continue;
    }
    Set<SelectionKey> selectedKeys = mainSelector.selectedKeys();
    Iterator<SelectionKey> it = selectedKeys.iterator();
    while (it.hasNext()) {
    SelectionKey key = it.next();
    it.remove();
    try {
    if (key.isConnectable()) {
    ((SocketChannel) key.channel()).finishConnect();
    }
    if (key.isAcceptable()) {
    dispatch(key);
    }
    } catch (Throwable t) {
    resetKey(key);
    }
    }
    selectedKeys.clear();
    }
    } catch (Throwable e) {
    e.printStackTrace();
    } finally {
    closeConnection();
    stopped();
    }
    }


    private void closeConnection() {
    try {
    mainSelector.close();
    server.socket().close();
    server.close();
    } catch (IOException e) {
    e.printStackTrace();
    }
    }


    private void resetKey(SelectionKey key) {
    key.cancel();
    try {
    key.channel().close();
    } catch (IOException ignored) {
    }
    }


    private void dispatch(SelectionKey key) {
    Runnable runnable = (Runnable) key.attachment();
    if (runnable != null) {
    runnable.run();
    }
    }


    private class Acceptor implements Runnable {


    @Override
    public synchronized void run() {
    try {
    SocketChannel connection = server.accept();
    handlers.submit(new Handler(connection));
    } catch (IOException e) {
    e.printStackTrace();
    }
    }


    }


    private final class Handler implements Runnable {


    private final Selector selector;
    private final SocketChannel socketChannel;
    private final ConcurrentHashMap<SocketChannel, byte[]> dataTracking = new ConcurrentHashMap<>();


    private Handler(SocketChannel channel) throws IOException {
    selector = Selector.open();
    socketChannel = channel;
    socketChannel.configureBlocking(false);
    socketChannel.socket().setTcpNoDelay(true);
    socketChannel.register(selector, SelectionKey.OP_READ);
    }


    @Override
    public void run() {
    while (!Thread.currentThread().isInterrupted()) {
    try {
    if (selector.select(TIMEOUT) == 0) {
    continue;
    }
    Set<SelectionKey> selectedKeys = selector.selectedKeys();
    Iterator<SelectionKey> it = selectedKeys.iterator();
    while (it.hasNext()) {
    SelectionKey key = it.next();
    it.remove();
    if (!key.isValid()) {
    continue;
    }
    try {
    if (key.isWritable()) {
    write(key);
    }
    if (key.isReadable()) {
    read(key);
    }
    } catch (Throwable t) {
    resetKey(key);
    return;
    }
    }
    selectedKeys.clear();
    } catch (Throwable t) {
    return;
    }
    }
    }


    private void write(SelectionKey key) {
    SocketChannel channel = (SocketChannel) key.channel();
    byte[] data = dataTracking.remove(channel);
    try {
    ByteBuffer wrap = ByteBuffer.wrap(data);
    channel.write(wrap);
    key.interestOps(SelectionKey.OP_READ);
    } catch (IOException e) {
    resetKey(key);
    }
    }


    private void read(SelectionKey key) {
    SocketChannel channel = (SocketChannel) key.channel();
    StringBuilder sb = new StringBuilder();
    ByteBuffer readBuffer = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE);
    CharsetDecoder decoder = StandardCharsets.UTF_8.newDecoder();
    CharBuffer charBuffer;
    int read;
    try {
    do {
    read = channel.read(readBuffer);
    readBuffer.flip();
    charBuffer = decoder.decode(readBuffer);
    readBuffer.clear();
    sb.append(String.valueOf(charBuffer));
    } while (read > 0);
    } catch (IOException e) {
    resetKey(key);
    return;
    }
    if (read == -1) {
    resetKey(key);
    return;
    }
    System.out.println("received : " + sb);
    //process your data
    echo(key, String.valueOf(sb).getBytes());
    }


    private void echo(SelectionKey key, byte[] data) {
    SocketChannel socketChannel = (SocketChannel) key.channel();
    dataTracking.put(socketChannel, data);
    key.interestOps(SelectionKey.OP_WRITE);
    }


    }


    public static void main(String[] args) {
    Server server = new Server(11025);
    new Thread(server).start();
    }


    }


    Client :


    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.CharBuffer;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.SocketChannel;
    import java.nio.charset.CharsetDecoder;
    import java.nio.charset.StandardCharsets;
    import java.util.Iterator;
    import java.util.concurrent.atomic.AtomicReference;


    /**
    * @author avb
    */
    public class Client implements Runnable {


    public static void main(String[] args) {
    Client client = new Client(new InetSocketAddress("127.0.0.1", 11025));
    new Thread(client).start();
    for (int i = 1; i <= 100; i++) {
    String response = client.getResponse("message " + i);
    System.out.println("Server said : " + response);
    }
    }


    private static final short DEFAULT_BUFFER_SIZE = 512;
    private static final long TIMEOUT = 10000;


    private final Object lock = new Object();
    private final InetSocketAddress address;
    private final AtomicReference<State> state = new AtomicReference<>(State.DISCONNECTED);


    private String request;
    private String response;
    private SocketChannel channel;
    private Selector selector;
    private boolean readyToWrite = false;
    private boolean requestPending = false;


    private enum State {DISCONNECTED, DISCONNECTING, CONNECTED}


    public Client(InetSocketAddress address) {
    this.address = address;
    }


    private boolean connect() {
    if (!state.compareAndSet(State.DISCONNECTED, State.CONNECTED)) {
    return false;
    }
    try {
    selector = Selector.open();
    channel = SocketChannel.open();
    channel.configureBlocking(false);
    channel.register(selector, SelectionKey.OP_CONNECT);
    channel.connect(address);
    return true;
    } catch (IOException e) {
    disconnected();
    return false;
    }
    }


    private void connected(boolean alreadyConnected) {
    if (!alreadyConnected) {
    System.out.println("connected to server...");
    } else {
    if (isDisconnected()) {
    System.out.println("disconnected from server...");
    } else if (isDisconnecting()) {
    System.out.println("disconnecting from server...");
    } else if (isConnected()) {
    System.out.println("already connected...");
    }
    }
    }


    public boolean isConnected() {
    return state.get() == State.CONNECTED;
    }


    public boolean disconnect() {
    unlock();
    return state.compareAndSet(State.CONNECTED, State.DISCONNECTING);
    }


    public boolean isDisconnecting() {
    return state.get() == State.DISCONNECTING;
    }


    private void disconnected() {
    state.set(State.DISCONNECTED);
    }


    public boolean isDisconnected() {
    return state.get() == State.DISCONNECTED;
    }


    @Override
    public void run() {
    if (!connect()) {
    connected(true);
    return;
    }
    try {
    while (isConnected()) {
    if (selector.select(TIMEOUT) == 0) {
    continue;
    }
    Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
    while (keys.hasNext()) {
    SelectionKey key = keys.next();
    keys.remove();
    try {
    if (!key.isValid()) {
    continue;
    }
    if (key.isConnectable()) {
    connect(selector, key);
    }
    if (key.isWritable()) {
    readyToWrite = true;
    if (requestPending) {
    unlock();
    }
    lock();
    write(key);
    readyToWrite = false;
    }
    if (key.isReadable()) {
    read(key);
    }
    } catch (Throwable t) {
    resetKey(key);
    disconnect();
    break;
    }
    }
    }
    } catch (IOException e) {
    e.printStackTrace();
    } finally {
    closeConnection();
    disconnected();
    }
    }


    private void closeConnection() {
    if (isDisconnected()) {
    return;
    }
    try {
    selector.close();
    channel.socket().close();
    channel.close();
    } catch (IOException ignored) {
    }
    }


    private void connect(Selector selector, SelectionKey key) {
    SocketChannel channel = (SocketChannel) key.channel();
    try {
    if (channel.isConnectionPending()) {
    channel.finishConnect();
    channel.configureBlocking(false);
    channel.register(selector, SelectionKey.OP_WRITE);
    connected(false);
    }
    } catch (IOException e) {
    resetKey(key);
    }
    }


    private void write(SelectionKey key) {
    SocketChannel channel = (SocketChannel) key.channel();
    try {
    channel.write(ByteBuffer.wrap(request.getBytes())) ;
    key.interestOps(SelectionKey.OP_READ);
    } catch (IOException e) {
    resetKey(key);
    }
    }


    private void read(SelectionKey key) {
    SocketChannel channel = (SocketChannel) key.channel();
    StringBuilder builder = new StringBuilder();
    ByteBuffer readBuffer = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE);
    CharsetDecoder decoder = StandardCharsets.UTF_8.newDecoder();
    CharBuffer charBuffer;
    int read;
    try {
    do {
    read = channel.read(readBuffer);
    readBuffer.flip();
    charBuffer = decoder.decode(readBuffer);
    readBuffer.clear();
    builder.append(String.valueOf(charBuffer));
    } while (read > 0);
    } catch (IOException e) {
    resetKey(key);
    return;
    }
    if (read == -1) {
    resetKey(key);
    return;
    }
    response = String.valueOf(builder);
    unlock();
    key.interestOps(SelectionKey.OP_WRITE);
    }


    private void resetKey(SelectionKey key) {
    key.cancel();
    try {
    key.channel().close();
    } catch (IOException ignored) {
    }
    }


    public synchronized String getResponse(String request) {
    ensureConnect();
    if (!readyToWrite) {
    requestPending = true;
    lock();
    requestPending = false;
    }
    this.request = request;
    unlock();
    lock(TIMEOUT);
    return response;
    }


    private void ensureConnect() {
    if (isDisconnected()) {
    new Thread(this).start();
    }
    }


    private void lock() {
    synchronized (lock) {
    try {
    lock.wait();
    } catch (InterruptedException ignored) {
    }
    }
    }


    private void lock(long timeout) {
    synchronized (lock) {
    try {
    lock.wait(timeout);
    } catch (InterruptedException ignored) {
    }
    }
    }


    private void unlock() {
    synchronized (lock) {
    lock.notify();
    }
    }


    }


    بازم ممکنه باگ داشته باشه ولی تست کردم اوکی بود.

  3. #3
    کاربر دائمی آواتار MSHService
    تاریخ عضویت
    آذر 1387
    محل زندگی
    اکنون توی EJB
    پست
    207

    نقل قول: کجا باید از ServerSocketChannel و کجا از serverSocket استفاده کنیم

    متشکرم ، یا جا شکل زیر رو دیدم :

    10-20-2014 06-13-51 ب-ظ.png

    آقا دمت گرم. خیلی لطف کردی :)

    حالا برای اینکه بتونم یه Dispatcher داشته باشم و و بتونم بدونم کیا بهم وصل شده ، چکار کنم ؟!

    حذف رو باید تو خط 234 بنویسم :) اما اضافه رو کجا بنویسم؟

  4. #4
    کاربر دائمی آواتار ahmad.mo74
    تاریخ عضویت
    مرداد 1393
    محل زندگی
    تهران
    پست
    437

    نقل قول: کجا باید از ServerSocketChannel و کجا از serverSocket استفاده کنیم

    سلام، ممنونم

    خیلی راحت میشه فهمید چه موقعی کلاینت وصل شده یا نه، من چند روزی نیستم و فعلا به لپ تاپ دسترسی ندارم، ایشالا تا چند روز دیگه کدشو قرار میدم.

  5. #5
    کاربر دائمی آواتار ahmad.mo74
    تاریخ عضویت
    مرداد 1393
    محل زندگی
    تهران
    پست
    437

    نقل قول: کجا باید از ServerSocketChannel و کجا از serverSocket استفاده کنیم

    سلام،

    کدارو تغییر دادم، الان دیگه هر وقت کلاینت کانکت یا دیسکانکت بشه، بهت خبر میده...

    Server :


    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.CharBuffer;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.ServerSocketChannel;
    import java.nio.channels.SocketChannel;
    import java.nio.charset.CharsetDecoder;
    import java.nio.charset.StandardCharsets;
    import java.util.ArrayList;
    import java.util.Iterator;
    import java.util.List;
    import java.util.Set;
    import java.util.concurrent.ConcurrentHashMap;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.atomic.AtomicReference;


    /**
    * @author avb
    */
    public final class Server implements Runnable {


    private static final int DEFAULT_BUFFER_SIZE = 512;
    private static final long TIMEOUT = 10000;


    private static enum State {STOPPED, STOPPING, RUNNING}


    private final int port;
    private final AtomicReference<State> state = new AtomicReference<>(State.STOPPED);
    private final List<ServerListener> listeners = new ArrayList<>();


    private Selector mainSelector;
    private ServerSocketChannel server;
    private ExecutorService handlers;


    public Server(int port) {
    this.port = port;
    }


    private void openConnection() throws IOException {
    mainSelector = Selector.open();
    server = ServerSocketChannel.open();
    server.bind(new InetSocketAddress(port));
    server.configureBlocking(false);
    server.register(mainSelector, SelectionKey.OP_ACCEPT, new Acceptor());
    }


    private boolean start() {
    return state.compareAndSet(State.STOPPED, State.RUNNING);
    }


    private void started() {
    fireServerStarted();
    }


    public boolean isRunning() {
    return state.get() == State.RUNNING;
    }


    public boolean stop() {
    if (state.compareAndSet(State.RUNNING, State.STOPPING)) {
    fireServerStopping();
    handlers.shutdown();
    mainSelector.wakeup();
    return true;
    }
    return false;
    }


    public boolean isStopping() {
    return state.get() == State.STOPPING;
    }


    private void stopped() {
    fireServerStopped();
    handlers.shutdownNow();
    state.set(State.STOPPED);
    }


    public boolean isStopped() {
    return state.get() == State.STOPPED;
    }


    private void clientConnected() {
    fireClientConnected();
    }


    private void clientDisconnected() {
    fireClientDisconnected();
    }


    public void addServerListener(final ServerListener listener) {
    listeners.add(listener);
    }


    public void removeServerListener(final ServerListener listener) {
    listeners.remove(listener);
    }


    public void fireServerStarted() {
    listeners.forEach(ServerListener::serverStarted);
    }


    public void fireServerStopping() {
    listeners.forEach(ServerListener::serverStopping);
    }


    public void fireServerStopped() {
    listeners.forEach(ServerListener::serverStopped);
    }


    public void fireClientConnected() {
    listeners.forEach(ServerListener::clientConnected) ;
    }


    public void fireClientDisconnected() {
    listeners.forEach(ServerListener::clientDisconnect ed);
    }


    @Override
    public void run() {
    if (!start()) {
    return;
    }
    started();
    handlers = Executors.newCachedThreadPool();
    try {
    openConnection();
    while (isRunning()) {
    if (mainSelector.select(TIMEOUT) == 0) {
    continue;
    }
    Set<SelectionKey> selectedKeys = mainSelector.selectedKeys();
    Iterator<SelectionKey> it = selectedKeys.iterator();
    while (it.hasNext() && isRunning()) {
    SelectionKey key = it.next();
    it.remove();
    try {
    if (key.isConnectable()) {
    ((SocketChannel) key.channel()).finishConnect();
    }
    if (key.isAcceptable()) {
    dispatch(key);
    }
    } catch (Throwable t) {
    resetKey(key);
    }
    }
    selectedKeys.clear();
    }
    } catch (Throwable e) {
    e.printStackTrace();
    } finally {
    closeConnection();
    stopped();
    }
    }


    private void closeConnection() {
    try {
    mainSelector.close();
    server.socket().close();
    server.close();
    } catch (IOException e) {
    e.printStackTrace();
    }
    }


    private void resetKey(SelectionKey key) {
    key.cancel();
    try {
    key.channel().close();
    } catch (IOException ignored) {
    }
    }


    private void dispatch(SelectionKey key) {
    Runnable acceptor = (Runnable) key.attachment();
    if (acceptor != null) {
    acceptor.run();
    }
    }


    private class Acceptor implements Runnable {


    @Override
    public synchronized void run() {
    try {
    SocketChannel connection = server.accept();
    handlers.submit(new Handler(connection));
    clientConnected();
    } catch (IOException ignored) {
    }
    }


    }


    private final class Handler implements Runnable {


    private final Selector selector;
    private final SocketChannel socketChannel;
    private final ConcurrentHashMap<SocketChannel, byte[]> dataTracking = new ConcurrentHashMap<>();


    private Handler(SocketChannel channel) throws IOException {
    selector = Selector.open();
    socketChannel = channel;
    socketChannel.configureBlocking(false);
    socketChannel.socket().setTcpNoDelay(true);
    socketChannel.register(selector, SelectionKey.OP_READ);
    }


    @Override
    public void run() {
    try {
    while (isRunning()) {
    if (selector.select(TIMEOUT) == 0) {
    continue;
    }
    Set<SelectionKey> selectedKeys = selector.selectedKeys();
    Iterator<SelectionKey> it = selectedKeys.iterator();
    while (it.hasNext() && isRunning()) {
    SelectionKey key = it.next();
    it.remove();
    if (!key.isValid()) {
    continue;
    }
    try {
    if (key.isWritable()) {
    write(key);
    }
    if (key.isReadable()) {
    String data = read(key);
    sendResponse(key, data);
    }
    } catch (Throwable t) {
    resetKey(key);
    clientDisconnected();
    return;
    }
    }
    selectedKeys.clear();
    }
    } catch (Throwable t) {
    clientDisconnected();
    } finally {
    closeConnection();
    }
    }


    private void closeConnection() {
    try {
    selector.close();
    socketChannel.socket().close();
    socketChannel.close();
    } catch (IOException e) {
    e.printStackTrace();
    }
    }


    private void write(SelectionKey key) {
    SocketChannel channel = (SocketChannel) key.channel();
    byte[] data = dataTracking.remove(channel);
    try {
    ByteBuffer writeBuffer = ByteBuffer.wrap(data);
    while (writeBuffer.hasRemaining()) {
    channel.write(writeBuffer);
    }
    key.interestOps(SelectionKey.OP_READ);
    } catch (IOException e) {
    resetKey(key);
    }
    }


    private String read(SelectionKey key) {
    SocketChannel channel = (SocketChannel) key.channel();
    StringBuilder sb = new StringBuilder();
    ByteBuffer readBuffer = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE);
    CharsetDecoder decoder = StandardCharsets.UTF_8.newDecoder();
    CharBuffer charBuffer;
    int read;
    try {
    while (true) {
    read = channel.read(readBuffer);
    if (read <= 0) {
    break;
    }
    readBuffer.flip();
    charBuffer = decoder.decode(readBuffer);
    readBuffer.clear();
    sb.append(String.valueOf(charBuffer));
    }
    } catch (IOException e) {
    resetKey(key);
    return null;
    }
    if (read == -1) {
    resetKey(key);
    return null;
    }
    return String.valueOf(sb);
    }


    private void sendResponse(SelectionKey key, String data) {
    String response = process(data);
    echo(key, response.getBytes());
    }


    private void echo(SelectionKey key, byte[] data) {
    SocketChannel socketChannel = (SocketChannel) key.channel();
    dataTracking.put(socketChannel, data);
    key.interestOps(SelectionKey.OP_WRITE);
    }


    }


    }


    ServerListener :


    /**
    * @author avb
    */
    public interface ServerListener {


    public void serverStarted();


    public void serverStopping();


    public void serverStopped();


    public void clientConnected();


    public void clientDisconnected();


    }


    ServerAdapter :


    /**
    * @author avb
    */
    public abstract class ServerAdapter implements ServerListener {


    @Override
    public void serverStarted() {
    }


    @Override
    public void serverStopping() {
    }


    @Override
    public void serverStopped() {
    }


    @Override
    public void clientConnected() {
    }


    @Override
    public void clientDisconnected() {
    }


    }


    سمت کلاینت :

    Client :


    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.CharBuffer;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.SocketChannel;
    import java.nio.charset.CharsetDecoder;
    import java.nio.charset.StandardCharsets;
    import java.util.ArrayList;
    import java.util.Iterator;
    import java.util.List;
    import java.util.concurrent.ConcurrentHashMap;
    import java.util.concurrent.ConcurrentLinkedQueue;
    import java.util.concurrent.atomic.AtomicReference;


    /**
    * @author avb
    */
    public final class Client implements Runnable {


    private static final int DEFAULT_BUFFER_SIZE = 512;
    private static final long TIMEOUT = 10000;
    private static final long REQUEST_TIMEOUT = 10000;


    private static enum State {DISCONNECTED, DISCONNECTING, CONNECTING, CONNECTED}


    private final Locker writer = new Locker<>(null);
    private final InetSocketAddress address;
    private final AtomicReference<State> state = new AtomicReference<>(State.DISCONNECTED);
    private final ConcurrentLinkedQueue<Locker<String, String>> queue = new ConcurrentLinkedQueue<>();
    private final ConcurrentHashMap<SelectionKey, Locker<String, String>> requests = new ConcurrentHashMap<>();
    private final List<ClientListener> listeners = new ArrayList<>();


    private SocketChannel channel;
    private Selector selector;


    public Client(InetSocketAddress address) {
    this.address = address;
    }


    private boolean connect() {
    if (!state.compareAndSet(State.DISCONNECTED, State.CONNECTING)) {
    return false;
    }
    fireClientConnecting();
    try {
    selector = Selector.open();
    channel = SocketChannel.open();
    channel.configureBlocking(false);
    channel.register(selector, SelectionKey.OP_CONNECT);
    channel.connect(address);
    return true;
    } catch (IOException e) {
    disconnected();
    return false;
    }
    }


    public boolean isConnecting() {
    return state.get() == State.CONNECTING;
    }


    private void connected() {
    fireClientConnected();
    state.set(State.CONNECTED);
    }


    public boolean isConnected() {
    return state.get() == State.CONNECTED;
    }


    public boolean disconnect() {
    if (state.compareAndSet(State.CONNECTED, State.DISCONNECTING)) {
    fireClientDisconnecting();
    requests.forEach((key, locker) -> {
    resetKey(key);
    locker.unlock();
    });
    requests.clear();
    queue.clear();
    writer.unlock();
    selector.wakeup();
    return true;
    }
    return false;
    }


    public boolean isDisconnecting() {
    return state.get() == State.DISCONNECTING;
    }


    private void disconnected() {
    fireClientDisconnected();
    state.set(State.DISCONNECTED);
    }


    public boolean isDisconnected() {
    return state.get() == State.DISCONNECTED;
    }


    public void addClientListener(final ClientListener listener) {
    listeners.add(listener);
    }


    public void removeClientListener(final ClientListener listener) {
    listeners.remove(listener);
    }


    public void fireClientConnecting() {
    listeners.forEach(ClientListener::clientConnecting );
    }


    public void fireClientConnected() {
    listeners.forEach(ClientListener::clientConnected) ;
    }


    public void fireClientDisconnecting() {
    listeners.forEach(ClientListener::clientDisconnect ing);
    }


    public void fireClientDisconnected() {
    listeners.forEach(ClientListener::clientDisconnect ed);
    }


    @Override
    public void run() {
    if (!connect()) {
    return;
    }
    try {
    while (isConnected() || isConnecting()) {
    if (selector.select(TIMEOUT) == 0) {
    continue;
    }
    Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
    while (keys.hasNext() && (isConnected() || isConnecting())) {
    SelectionKey key = keys.next();
    keys.remove();
    if (!key.isValid()) {
    continue;
    }
    try {
    if (key.isConnectable()) {
    connect(key);
    }
    if (key.isWritable()) {
    write(key);
    }
    if (key.isReadable()) {
    read(key);
    }
    } catch (Throwable t) {
    resetKeyAndDisconnect(key);
    return;
    }
    }
    }
    } catch (IOException e) {
    e.printStackTrace();
    } finally {
    closeConnection();
    disconnected();
    }
    }


    public String getResponse(String request) {
    if (!isConnected()) {
    return null;
    }
    Locker<String, String> locker = new Locker<>(request);
    addToQueue(locker);
    locker.lock(REQUEST_TIMEOUT);
    String response = locker.getValue();
    if (isNull(response)) {
    return null;
    }
    return response;
    }


    private void closeConnection() {
    if (isDisconnected()) {
    return;
    }
    try {
    selector.close();
    channel.socket().close();
    channel.close();
    } catch (IOException ignored) {
    }
    }


    private void connect(SelectionKey key) {
    SocketChannel channel = (SocketChannel) key.channel();
    try {
    if (channel.isConnectionPending()) {
    channel.finishConnect();
    channel.configureBlocking(false);
    channel.register(selector, SelectionKey.OP_WRITE);
    connected();
    }
    } catch (IOException e) {
    resetKeyAndDisconnect(key);
    }
    }


    private void write(SelectionKey key) {
    if (!isConnected()) {
    return;
    }
    if (queue.isEmpty()) {
    writer.lock();
    }
    Locker<String, String> locker;
    if ((locker = poll()) != null) {
    SocketChannel channel = (SocketChannel) key.channel();
    try {
    ByteBuffer writeBuffer = ByteBuffer.wrap(locker.getKey().getBytes());
    while (writeBuffer.hasRemaining()) {
    channel.write(writeBuffer);
    }
    requests.put(key, locker);
    key.interestOps(SelectionKey.OP_READ);
    } catch (IOException e) {
    resetKeyAndDisconnect(key);
    }
    }
    }


    private void read(SelectionKey key) {
    if (!isConnected()) {
    return;
    }
    SocketChannel channel = (SocketChannel) key.channel();
    StringBuilder sb = new StringBuilder();
    ByteBuffer readBuffer = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE);
    CharsetDecoder decoder = StandardCharsets.UTF_8.newDecoder();
    CharBuffer charBuffer;
    int read;
    try {
    while (true) {
    read = channel.read(readBuffer);
    if (read <= 0) {
    break;
    }
    readBuffer.flip();
    charBuffer = decoder.decode(readBuffer);
    readBuffer.clear();
    sb.append(String.valueOf(charBuffer));
    }
    } catch (IOException e) {
    resetKeyAndDisconnect(key);
    return;
    }
    if (read == -1) {
    resetKeyAndDisconnect(key);
    return;
    }
    Locker<String, String> locker = requests.remove(key);
    if (locker != null) {
    locker.setValue(String.valueOf(sb));
    locker.unlock();
    }
    writer.unlock();
    key.interestOps(SelectionKey.OP_WRITE);
    }


    private void resetKey(SelectionKey key) {
    key.cancel();
    try {
    key.channel().close();
    } catch (IOException ignored) {
    }
    }


    private void resetKeyAndDisconnect(SelectionKey key) {
    resetKey(key);
    disconnect();
    }


    private void addToQueue(Locker<String, String> locker) {
    queue.add(locker);
    writer.unlock();
    }


    private Locker<String, String> poll() {
    return queue.poll();
    }


    private static boolean isNull(String s) {
    return s == null || s.isEmpty() || s.trim().isEmpty();
    }


    }


    Locker :


    /**
    * @author avb
    */
    public final class Locker<K, V> {


    private final Object o = new Object();
    private final K key;
    private V value;


    public Locker(K key) {
    this.key = key;
    }


    public K getKey() {
    return key;
    }


    public V getValue() {
    return value;
    }


    public void setValue(V value) {
    this.value = value;
    }


    public void lock() {
    synchronized (o) {
    try {
    o.wait();
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    }


    public void lock(long timeout) {
    synchronized (o) {
    try {
    o.wait(timeout);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    }


    public void unlock() {
    synchronized (o) {
    o.notify();
    }
    }


    }


    ClientListener :


    /**
    * @author avb
    */
    public interface ClientListener {


    public void clientConnecting();


    public void clientConnected();


    public void clientDisconnecting();


    public void clientDisconnected();


    }


    ClientAdapter :


    /**
    * @author avb
    */
    public abstract class ClientAdapter implements ClientListener {


    @Override
    public void clientConnecting() {
    }


    @Override
    public void clientConnected() {
    }


    @Override
    public void clientDisconnecting() {
    }


    @Override
    public void clientDisconnected() {
    }


    }


    بازم امری بود در خدمتم :)
    آخرین ویرایش به وسیله ahmad.mo74 : سه شنبه 06 آبان 1393 در 17:53 عصر

  6. #6
    کاربر دائمی آواتار MSHService
    تاریخ عضویت
    آذر 1387
    محل زندگی
    اکنون توی EJB
    پست
    207

    نقل قول: کجا باید از ServerSocketChannel و کجا از serverSocket استفاده کنیم

    دهنم باز مونده بابا دمت گرم

    ببین چطوری تونستی این کد رو اینقدر زود پیاده کنی؟ لطفا منو راهنمایی کن؟

    در ضمن اون نموداره بالا رو دیدی ؟ نظرت دربارش چیه؟

    برای کانکشن شکسته هم نظری نداری؟

  7. #7
    کاربر دائمی آواتار ahmad.mo74
    تاریخ عضویت
    مرداد 1393
    محل زندگی
    تهران
    پست
    437

    نقل قول: کجا باید از ServerSocketChannel و کجا از serverSocket استفاده کنیم

    سلام

    اون نمودار نمیدونم منظورش مصرف رمه یا انتقال دیتا تو هر ثانیه رو نشون میده، احتمالا همون مصرف رمو نشون میده که کاملا مشخصه nio server کم مصرف تره، برای تشخیص قطع کانکشن هم راهش همونایی بود که تو اون پست گفته شد، شما با nio server هم تست کن ببین کار میکنه یا نه...

    کدارم یه خورده تغییر دادم

  8. #8
    کاربر دائمی آواتار MSHService
    تاریخ عضویت
    آذر 1387
    محل زندگی
    اکنون توی EJB
    پست
    207

    نقل قول: کجا باید از ServerSocketChannel و کجا از serverSocket استفاده کنیم

    دوست عزیز او عکس ماله PDFیی که اینجا آپلود کردم

    نگفتی چطوری اینقدر زود تونستی اینو بنویسی؟

    توی این PDF به این صفحه هم اشاره شده :
    http://www.theserverside.com/discuss...hread_id=26700

    Netbeans من برای

     listeners.forEach(ServerListener::serverStarted);


    خطا میگیره میگه Method refrence not Expected here

    method refrence not support in source 1.7
    use Source 8 or higher to enable mehod refrences

    با سپاس از زحماتتون

    آخرین ویرایش به وسیله MSHService : سه شنبه 06 آبان 1393 در 07:06 صبح

  9. #9
    کاربر دائمی آواتار ahmad.mo74
    تاریخ عضویت
    مرداد 1393
    محل زندگی
    تهران
    پست
    437

    نقل قول: کجا باید از ServerSocketChannel و کجا از serverSocket استفاده کنیم

    سلام ، زود نوشتم؟؟؟
    شما بگو کدوم قسمت رو توضیح بدم یکم بری تو بحرش میبینی خیلیم سخت نیست، اگر با Selector مشکل داری حتما اون pdf رو بخون خودش خیلی خوب توضیح داده...

    اون ارور هم بخاطر اینه که jdk 8 نداری به جاش اینو بنویس :


    for (ServerListener listener : listeners) {
    listener.serverStarted();
    }

  10. #10
    کاربر دائمی آواتار MSHService
    تاریخ عضویت
    آذر 1387
    محل زندگی
    اکنون توی EJB
    پست
    207

    نقل قول: کجا باید از ServerSocketChannel و کجا از serverSocket استفاده کنیم

    آره دیدم نوشته بود ، ولی JDK 8 رو هم نصب کردم نشد ، البته با نت بینز هستم :(

    آره خیلی زود نوشتی. هم زیبا و هم خیلی سریع نوشتی.

    هنوز دارم مطالبه Jenkov و اون pdfه رو میخونم.

    متشکرم عزیزم
    آخرین ویرایش به وسیله MSHService : سه شنبه 06 آبان 1393 در 16:08 عصر

  11. #11
    کاربر دائمی آواتار MSHService
    تاریخ عضویت
    آذر 1387
    محل زندگی
    اکنون توی EJB
    پست
    207

    نقل قول: کجا باید از ServerSocketChannel و کجا از serverSocket استفاده کنیم

    عزیز این هم خطا داره :

    private void sendResponse(SelectionKey key, String data) {
    String response = Process(data);
    echo(key, response.getBytes());
    }



  12. #12
    کاربر دائمی آواتار ahmad.mo74
    تاریخ عضویت
    مرداد 1393
    محل زندگی
    تهران
    پست
    437

    نقل قول: کجا باید از ServerSocketChannel و کجا از serverSocket استفاده کنیم

    سلام، خب این خط رو عمدا اینطوری نوشتم، یعنی که اینجا میتونی درخواست کاربر رو بررسی کنی و بهش جواب بدی، متد process رو خودت باید بنویسی :)

  13. #13
    کاربر دائمی آواتار MSHService
    تاریخ عضویت
    آذر 1387
    محل زندگی
    اکنون توی EJB
    پست
    207

    نقل قول: کجا باید از ServerSocketChannel و کجا از serverSocket استفاده کنیم

    ببن نگفتی چطور اینقدر سریع و اینقدر عالی تونستید این کد رو بنویسید؟!!

    من چطور میتونم مثل شما بشم ؟

  14. #14
    کاربر دائمی آواتار ahmad.mo74
    تاریخ عضویت
    مرداد 1393
    محل زندگی
    تهران
    پست
    437

    نقل قول: کجا باید از ServerSocketChannel و کجا از serverSocket استفاده کنیم

    سلام، شما لطف داری :)

    گفتم که بیشترشو از اون pdf کمک گرفتم...

    فقط یه نکته مهمی رو حواست باشه وقتی در حالت معمولی از socket چیزی میخونیم تا زمانی که دیتا کامل به دستمون نرسیده متد ()read عددی بزرگتر از 0 بهمون برمیگردونه و فقط موقعی 0 برمیگردونه که اطلاعات کامل بهمون برسه
    اما وقتی داریم از non-blocking io استفاده میکنیم قضیه فرق میکنه، چون کار read و write میتونن به طور همزمان انجام بشن ممکنه متد ()read عدد 0 بهمون برگردونه در حالی که هنوز اطلاعات کامل به دستمون نرسیده، برای داده های کم حجم کم اتفاق میفته که قبل از اتمام 0 برگردونه ولی اگر دیتا از سایز بافر بیشتر باشه این احتمال هست، روشی های مختلفی برای جلوگیری از این مشکل هست مثلا اینکه از قبل سایز رو به طرف مقابل بفرستیم تا بدونه تا کجا باید read کنه یا اینکه انتهای فایل رو مشخص کنیم (بیشتر زمانی که String میفرستیم کاربرد داره) مثلا من خودم اینکارو میکنم، یعنی وقتی چیزی میفرستم آخرش
    \n\r
    اضافه میکنم :

            private static final String EOF = "\n\r";



    private String read(SelectionKey key) {
    SocketChannel channel = (SocketChannel) key.channel();
    StringBuilder sb = new StringBuilder();
    ByteBuffer readBuffer = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE);
    CharsetDecoder decoder = StandardCharsets.UTF_8.newDecoder();
    CharBuffer charBuffer;
    int read;
    try {
    do {
    read = channel.read(readBuffer);
    if (read == -1) {
    resetKey(key);
    return null;
    }
    readBuffer.flip();
    charBuffer = decoder.decode(readBuffer);
    sb.append(charBuffer.array());
    readBuffer.clear();
    } while (sb.lastIndexOf(EOF) == -1);
    } catch (IOException e) {
    resetKey(key);
    return null;
    }
    sb.delete(sb.lastIndexOf(EOF), sb.length());
    return String.valueOf(sb);
    }
    آخرین ویرایش به وسیله ahmad.mo74 : پنج شنبه 08 آبان 1393 در 11:47 صبح

تاپیک های مشابه

  1. برای سوکت نویسی از کجا باید شروع کرد؟؟؟؟؟
    نوشته شده توسط AlirezaBahredar در بخش برنامه نویسی در Delphi
    پاسخ: 4
    آخرین پست: پنج شنبه 06 دی 1386, 10:05 صبح
  2. از کجا باید شروع کرد؟
    نوشته شده توسط Majid76 در بخش PHP
    پاسخ: 13
    آخرین پست: شنبه 26 خرداد 1386, 20:09 عصر
  3. من از کجا باید شروع کنم ؟
    نوشته شده توسط shab navard در بخش ASP.NET Web Forms
    پاسخ: 7
    آخرین پست: دوشنبه 17 اسفند 1383, 11:40 صبح
  4. این سوال رو کجا باید بپرسم ؟
    نوشته شده توسط mehdi_moosavi در بخش گفتگو با مسئولین سایت، درخواست و پیشنهاد
    پاسخ: 1
    آخرین پست: چهارشنبه 30 دی 1383, 10:10 صبح
  5. MDAC رو از کجا باید پیدا کنم
    نوشته شده توسط archi در بخش برنامه نویسی در 6 VB
    پاسخ: 1
    آخرین پست: جمعه 03 بهمن 1382, 12:05 عصر

برچسب های این تاپیک

قوانین ایجاد تاپیک در تالار

  • شما نمی توانید تاپیک جدید ایجاد کنید
  • شما نمی توانید به تاپیک ها پاسخ دهید
  • شما نمی توانید ضمیمه ارسال کنید
  • شما نمی توانید پاسخ هایتان را ویرایش کنید
  •