Nio : Selector, SelectionKey

Dans le nouveau paquetage nio, l'API java.net ne permettait pas vraiment la gestion non-bloquante des sockets : désormais, java.nio le permet non seulement sur les sockets mais aussi sur les "pipes".

entrées non bloquantes en lecture

Source de ServerSocketChannelR.java
import java.nio.*;
import java.nio.channels.*;
import java.net.*;
import java.util.*;

public class ServerSocketChannelR {
  private int port;
  private ByteBuffer buffer;
  public static void main (String [] args)
    throws Exception {
    if (args.length != 1)
      throw new IllegalArgumentException("Usage: java ServerSocketChannelR port");
    int port = Integer.parseInt(args[0]);
    new ServerSocketChannelR(port).go ();
  }
  public  ServerSocketChannelR(int port) {
    this.port = port;
    buffer = ByteBuffer.allocateDirect (1024);
  }
  public void go ()
    throws Exception {
    System.out.println ("Listening on port " + port);
    ServerSocketChannel serverChannel = ServerSocketChannel.open();
    ServerSocket serverSocket = serverChannel.socket();
    Selector selector = Selector.open();
    serverSocket.bind (new InetSocketAddress (port));
    serverChannel.configureBlocking (false);
    serverChannel.register (selector, SelectionKey.OP_ACCEPT);
    while (true) {
      int n = selector.select();
      System.out.println ("selector.select() = " + n);
      if (n == 0) 
        continue;  
      Iterator it = selector.selectedKeys().iterator();
      while (it.hasNext()) {
        SelectionKey key = (SelectionKey) it.next();
        System.out.println ("SelectionKey = "+key);
        if (key.isAcceptable()) {
          System.out.println ("key.isAcceptable()");
          ServerSocketChannel server =
            (ServerSocketChannel) key.channel();
          SocketChannel channel = server.accept();
          System.out.println ("-- SocketChannel = "+channel
                   +" register SelectionKey.OP_READ");
          channel.configureBlocking (false);
          channel.register (selector, SelectionKey.OP_READ);
        }
        if (key.isReadable()) {
          System.out.println ("key.isReadable()");
          readDataFromSocket (key);
        }
        it.remove();
      }
    }
  }
  protected void readDataFromSocket (SelectionKey key)
    throws Exception {
    SocketChannel socketChannel = (SocketChannel) key.channel();
    System.out.println ("-- SocketChannel = "+socketChannel);
    int count;
    buffer.clear();      
    while ((count = socketChannel.read (buffer)) > 0) {
      System.out.println ("read count = "+count+ " contenu :");
      buffer.flip();    
      byte[] b1 = new byte [1];
      while (buffer.hasRemaining()) {
        buffer.get(b1);
        System.out.print ((new String(b1)));
      }
      buffer.clear();    
    }
    System.out.println ("\nread count = "+count);
    if (count < 0) {
      System.out.println ("socketChannel.close() = "+socketChannel);
      socketChannel.close();
    }
  }
}

EXECUTION
console 1:
$ java ServerSocketChannelR 4444
Listening on port 4444

console 2:
$ java ClientSocketChannelR localhost 4444
abc
Client envoie : abc
defg
Client envoie : defg
hijkl
Client envoie : hijkl

console 1:
selector.select() = 1
SelectionKey = sun.nio.ch.SelectionKeyImpl@1f33675
key.isAcceptable()
-- SocketChannel = java.nio.channels.SocketChannel
   [connected local=/127.0.0.1:4444 remote=/127.0.0.1:36392]
register SelectionKey.OP_READ
selector.select() = 1
SelectionKey = sun.nio.ch.SelectionKeyImpl@c2ea3f
key.isReadable()
-- SocketChannel = java.nio.channels.SocketChannel[...
read count = 3 contenu :
abc
read count = 0
selector.select() = 1
SelectionKey = sun.nio.ch.SelectionKeyImpl@c2ea3f
key.isReadable()
-- SocketChannel = java.nio.channels.SocketChannel[...
read count = 4 contenu :
defg
read count = 0
selector.select() = 1
SelectionKey = sun.nio.ch.SelectionKeyImpl@c2ea3f
key.isReadable()
-- SocketChannel = java.nio.channels.SocketChannel[...
read count = 5 contenu :
hijkl
read count = 0

console 3:
$ java ClientSocketChannelR localhost 4444
hello
Client envoie : hello
Ciao
fin des requetes du Client

console 1:
selector.select() = 1
SelectionKey = sun.nio.ch.SelectionKeyImpl@1f33675
key.isAcceptable()
-- SocketChannel = java.nio.channels.SocketChannel
[connected local=/127.0.0.1:4444 remote=/127.0.0.1:36396]
register SelectionKey.OP_READ
selector.select() = 1
SelectionKey = sun.nio.ch.SelectionKeyImpl@22c95b
key.isReadable()
-- SocketChannel = java.nio.channels.SocketChannel[...
read count = 5 contenu :
hello
read count = 0
selector.select() = 1
SelectionKey = sun.nio.ch.SelectionKeyImpl@22c95b
key.isReadable()
-- SocketChannel = java.nio.channels.SocketChannel[...
read count = -1
socketChannel.close() = java.nio.channels.SocketChannel[...

console 2:
mnopqr
Client envoie : mnopqr

console1 :
selector.select() = 1
SelectionKey = sun.nio.ch.SelectionKeyImpl@c2ea3f
key.isReadable()
-- SocketChannel = java.nio.channels.SocketChannel[...
read count = 6 contenu :
mnopqr
read count = 0

Les lignes de la classe :

Le client est tout à fait classique :
Source de ClientSocketChannelR.java
import java.io.*;
import java.net.*;
public class ClientSocketChannelR {
  public static void main(String[] args) throws Exception {
    if (args.length != 2)
      throw new IllegalArgumentException("Usage: java ClientSocketChannelR hostname port");
    String hostName = args[0]; 
    int port = Integer.parseInt(args[1]);
    Socket socket = null;
    OutputStream sockOut = null;
    BufferedReader sockIn = null;  
    try {
      socket = new Socket(hostName, port);
      sockOut = socket.getOutputStream();
      sockIn = new BufferedReader(
                  new InputStreamReader(socket.getInputStream()));
    } catch (UnknownHostException e) {
      System.out.println("host inconnu ou non atteignable : "+ hostName);
      System.exit(3);
    } catch (IOException e) {
      System.out.println("connection impossible au serveur");
      System.exit(4);
    }
    byte[] buffer = new byte[1024];
    boolean encore = true;
    while (encore) {
      String ligne = Clavier.lireLigne().trim();
      if (ligne.equalsIgnoreCase("Ciao")) {
        encore = false;
        System.out.println("fin des requetes du Client  ");
      } else {
        System.out.println("Client envoie : " + ligne);
        try {
          sockOut.write(ligne.getBytes());
          sockOut.flush();
        } catch (IOException e) {
          encore = false;
          System.out.println("fermeture connection par le serveur ");
        } 
      }
    }
    sockOut.close();
    sockIn.close();
    socket.close();
  }
}

Les lignes revisitées de la classe du serveur :

écritures non bloquantes

Source de ServerSocketChannelW.java
import java.io.*;
import java.nio.*;
import java.nio.channels.*;
import java.net.*;
import java.util.*;
public class ServerSocketChannelW {
  private int port;
  private ByteBuffer buffer;
  private int count = 0;
  
  public static void main (String [] args)
  throws Exception {
  if (args.length != 1)
    throw new IllegalArgumentException("Usage: java ServerSocketChannelW port");
    int port = Integer.parseInt(args[0]);
    new ServerSocketChannelW(port).go ();
  }
  
  public  ServerSocketChannelW(int port) {
    this.port = port;
    this.buffer = ByteBuffer.allocateDirect (8192);
    this.count = 0;
  }
  
  public void go ()
  throws Exception {
    System.out.println ("Listening on port " + port);
    ServerSocketChannel serverChannel = ServerSocketChannel.open();
    ServerSocket serverSocket = serverChannel.socket();
    Selector selector = Selector.open();
    serverSocket.bind (new InetSocketAddress (port));
    serverChannel.configureBlocking (false);
    serverChannel.register (selector, SelectionKey.OP_ACCEPT);
    
    while (true) {
      int n = selector.select();
      System.out.println ("selector.select() = " + n);
      if (n == 0) 
        continue;  
      Iterator it = selector.selectedKeys().iterator();
      while (it.hasNext()) {
        SelectionKey key = (SelectionKey) it.next();
        System.out.println ("SelectionKey = "+key);
        if (key.isAcceptable()) {
          System.out.println ("key.isAcceptable()");
          ServerSocketChannel server =
          (ServerSocketChannel) key.channel();
          SocketChannel channel = server.accept();
          System.out.println ("-- SocketChannel = "+channel
                +" register SelectionKey.OP_WRITE");
          channel.configureBlocking (false);
          channel.register (selector, SelectionKey.OP_WRITE);
        }
        if (key.isWritable()) {
          System.out.println ("key.isWritable()");
          WriteDataToSocket (key);
        }
        it.remove();
      }
    }
  }
  
  protected void WriteDataToSocket (SelectionKey key)
  throws Exception {
    SocketChannel socketChannel = (SocketChannel) key.channel();
    System.out.println ("-- SocketChannel write :"+socketChannel);
    buffer.clear();
    for (int i = 0; i < 1000; i++)
      buffer.put("pong".getBytes());
    buffer.flip();
    long byteEcrit = 0; 
    int aEcrire = 1000*("pong".length());
    try {
      byteEcrit = socketChannel.write( buffer );
      System.out.println ("\nwrite count = "+count);
      count += (int)byteEcrit;
    } catch (IOException ioe) {
      count += (int)byteEcrit;
      System.out.println (aEcrire+ " a ecrire mais "
                          +byteEcrit +" ecrit !");
      try {
        socketChannel.close();
        System.out.println ("socketChannel close  ");
      } catch (IOException e) { }
    }
  }
   
}

EXECUTION
console serveur :
$ java ServerSocketChannelW 6666
Listening on port 6666
selector.select() = 1
SelectionKey = sun.nio.ch.SelectionKeyImpl@1f33675
key.isAcceptable()
-- SocketChannel = java.nio.channels.SocketChannel
     [connected local=/127.0.0.1:6666 remote=/127.0.0.1:50815] 
register SelectionKey.OP_WRITE
selector.select() = 1
SelectionKey = sun.nio.ch.SelectionKeyImpl@c2ea3f
key.isWritable()
-- SocketChannel write :java.nio.channels.SocketChannel
     [connected local=/127.0.0.1:6666 remote=/127.0.0.1:50815]
write count = 0
selector.select() = 1
SelectionKey = sun.nio.ch.SelectionKeyImpl@c2ea3f
key.isWritable()
-- SocketChannel write :java.nio.channels.SocketChannel[...
selector.select() = 1
SelectionKey = sun.nio.ch.SelectionKeyImpl@c2ea3f
key.isWritable()
-- SocketChannel write :java.nio.channels.SocketChannel[...
write count = 8000
selector.select() = 1
SelectionKey = sun.nio.ch.SelectionKeyImpl@c2ea3f
key.isWritable()
-- SocketChannel write :java.nio.channels.SocketChannel[...
write count = 12000
.....
selector.select() = 1
SelectionKey = sun.nio.ch.SelectionKeyImpl@c2ea3f
key.isWritable()
-- SocketChannel write :java.nio.channels.SocketChannel[...
write count = 100000
selector.select() = 1
SelectionKey = sun.nio.ch.SelectionKeyImpl@c2ea3f
key.isWritable()
-- SocketChannel write :java.nio.channels.SocketChannel[...
selector.select() = 1
SelectionKey = sun.nio.ch.SelectionKeyImpl@c2ea3f
key.isWritable()
-- SocketChannel write :java.nio.channels.SocketChannel[...

console client : 
$java ClientSocketChannelW localhost 6666
100
lu : 100
1000
lu : 1100
5000
lu : 6100

console serveur :
selector.select() = 1
SelectionKey = sun.nio.ch.SelectionKeyImpl@c2ea3f
key.isWritable()
-- SocketChannel write :java.nio.channels.SocketChannel[...
.....
write count = 152000
selector.select() = 1
SelectionKey = sun.nio.ch.SelectionKeyImpl@c2ea3f
key.isWritable()
-- SocketChannel write :java.nio.channels.SocketChannel[...
selector.select() = 1
SelectionKey = sun.nio.ch.SelectionKeyImpl@c2ea3f
key.isWritable()
-- SocketChannel write :java.nio.channels.SocketChannel[...
4000 a ecrire mais 0 ecrit ! << fermeture du client
socketChannel close

Les lignes de la classe :

Le client est tout à fait classique :
Source de ClientSocketChannelW.java
import java.io.*;
import java.net.*;
public class ClientSocketChannelW {
  public static void main(String[] args) throws Exception {
    if (args.length != 2)
      throw new IllegalArgumentException("Usage: java ClientSocketChannelW hostname port");
    String hostName = args[0]; 
    int port = Integer.parseInt(args[1]);
    int count = 0;
    Socket socket = null;
    BufferedReader sockIn = null;  
    try {
      socket = new Socket(hostName, port);
      sockIn = new BufferedReader(
                   new InputStreamReader(socket.getInputStream()));
    } catch (UnknownHostException e) {
      System.out.println("host inconnu ou non atteignable : "+ hostName);
      System.exit(3);
    } catch (IOException e) {
      System.out.println("connection impossible au serveur");
      System.exit(4);
    }
    
    String messageDuServer, messageDuClient, ligne;  
    int combien = 0;
    try {
      do {
        combien = Clavier.lireInt();
        if (combien > 0) 
          for (int i = 0; i < combien; i++) 
            if (sockIn.read() == -1) {
              System.out.println("sockIn.getChar() == -1");
              break;
            } else
              ++count;        
        System.out.println("lu : "+count);
      } while (combien >=0);     
    } catch (IOException ioe) {
      System.out.println ("erreur lecture socket : "+ioe.getMessage());
    }
    
    sockIn.close();
    socket.close();
  }
}

les classes : Selector, SelectionKey et SelectableChannel

La technique des "callback"

Source de ServerPlusHandler.java
import java.nio.*;
import java.nio.channels.*;
import java.nio.charset.*;
import java.net.*;
import java.util.*;
public class ServerPlusHandler {
  private int port;
  
  public static void main (String [] args)
  throws Exception {
    if (args.length != 1)
      throw new IllegalArgumentException("Usage: java ServerPlusHandler port");
    int port = Integer.parseInt(args[0]);
    new ServerPlusHandler(port).go ();
  }
  public  ServerPlusHandler(int port) {
    this.port = port;
  }
  
  public void go ()
  throws Exception {
    System.out.println ("Listening on port " + port);
    ServerSocketChannel servSockChann = ServerSocketChannel.open();
    ServerSocket serverSocket = servSockChann.socket();
    Selector selector = Selector.open();
    serverSocket.bind (new InetSocketAddress (port));
    servSockChann.configureBlocking (false);
    SelectionKey keyServer = 
         servSockChann.register(selector, SelectionKey.OP_ACCEPT);
    
    while (true) {
      int n = selector.select();
      System.out.println ("selector.select() = " + n);
      if (n == 0) 
        continue;  
      Iterator it = selector.selectedKeys().iterator();
      while (it.hasNext()) {
        SelectionKey key = (SelectionKey) it.next();
        System.out.println ("SelectionKey = "+key);
        it.remove();
        if (key.isAcceptable() && (key == keyServer)) {
          System.out.println ("key.isAcceptable()");
          ServerSocketChannel server =
          (ServerSocketChannel) key.channel();
          SocketChannel clientSockChann = server.accept();
          System.out.println ("-- SocketChannel = "+clientSockChann
                              +" register SelectionKey.OP_READ");
          clientSockChann.configureBlocking (false);
          SelectionKey keyClient = 
             clientSockChann.register(selector,SelectionKey.OP_READ);
          keyClient.attach(new SuiviConnectionHandler());
        } else if (key.isReadable() && (key != keyServer)) {
          System.out.println ("key.isReadable()");
          try {
            SuiviConnectionHandler handler = 
                        (SuiviConnectionHandler)key.attachment();
            System.out.println ("handler = "+handler);
            handler.readDataFromSocket(key);
          } catch (Exception e) {
            System.out.println ("erreur de lecture "+e);
            e.printStackTrace();
          }
        } else {
          System.out.println ("SelectionKey non traite ! ");          
        }
      }
    }
  }
  
  class SuiviConnectionHandler {
    private int nombreRequete = 0;
    protected void readDataFromSocket (SelectionKey key)
    throws Exception {
      nombreRequete++;
      ByteBuffer buffer = ByteBuffer.allocateDirect (1024);
      Charset charset = Charset.forName("ISO-8859-1");
      CharsetEncoder encoder = charset.newEncoder();
      CharsetDecoder decoder = charset.newDecoder();
      SocketChannel clientSockChann = (SocketChannel) key.channel();
      System.out.println ("-- SocketChannel = "+clientSockChann 
                          + " nombre requete = " +nombreRequete);
      int nbreLu = clientSockChann.read (buffer);// suppose lecture en 1 coup !!
      if (nbreLu == -1) {
        System.out.println ("socketChannel.close() = "+clientSockChann);
        key.cancel();
        clientSockChann.close();
      } else {
        buffer.flip();    
        String requete = decoder.decode(buffer).toString();
        buffer.clear();
        if (requete.trim().equalsIgnoreCase("ciao")) {
          clientSockChann.write(encoder.encode(CharBuffer.wrap("bye bye")));
          System.out.println ("socketChannel.close() = "+clientSockChann);
          key.cancel();
          clientSockChann.close();
        } else {
          String reponse = "bien recu "+nbreLu
                     +" bytes lors de la requete : "+nombreRequete+"\n" ;
          clientSockChann.write(encoder.encode(CharBuffer.wrap(reponse)));
        }
      }
    }
  }    
}

EXECUTION
console 1:
$ java ServerPlusHandler 6666
Listening on port 6666

console 2:
$ telnet localhost 6666
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
a
bien recu 3 bytes lors de la requete : 1
bien recu 2 bytes lors de la requete : 2b
bien recu 3 bytes lors de la requete : 3cc
bien recu 4 bytes lors de la requete : 4

console 1:
selector.select() = 1
SelectionKey = sun.nio.ch.SelectionKeyImpl@1888759
key.isAcceptable()
-- SocketChannel = java.nio.channels.SocketChannel
[connected local=/127.0.0.1:6666 remote=/127.0.0.1:50762]
 register SelectionKey.OP_READ
selector.select() = 1
SelectionKey = sun.nio.ch.SelectionKeyImpl@13c5982
key.isReadable()
handler = ServerPlusHandler$SuiviConnectionHandler@14b7453
-- SocketChannel = java.nio.channels.SocketChannel
[connected local=/127.0.0.1:6666 remote=/127.0.0.1:50762]
 nombre requete = 1
selector.select() = 1
SelectionKey = sun.nio.ch.SelectionKeyImpl@13c5982
key.isReadable()
handler = ServerPlusHandler$SuiviConnectionHandler@14b7453
-- SocketChannel = java.nio.channels.SocketChannel
[connected local=/127.0.0.1:6666 remote=/127.0.0.1:50762]
 nombre requete = 2
selector.select() = 1
SelectionKey = sun.nio.ch.SelectionKeyImpl@13c5982
key.isReadable()
handler = ServerPlusHandler$SuiviConnectionHandler@14b7453
-- SocketChannel = java.nio.channels.SocketChannel
[connected local=/127.0.0.1:6666 remote=/127.0.0.1:50762]
 nombre requete = 3
selector.select() = 1
SelectionKey = sun.nio.ch.SelectionKeyImpl@13c5982
key.isReadable()
handler = ServerPlusHandler$SuiviConnectionHandler@14b7453
-- SocketChannel = java.nio.channels.SocketChannel
[connected local=/127.0.0.1:6666 remote=/127.0.0.1:50762]
 nombre requete = 4

console 3:
$ telnet localhost 6666
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
bonjour
bien recu 9 bytes lors de la requete : 1

console 1:
selector.select() = 1
SelectionKey = sun.nio.ch.SelectionKeyImpl@1888759
key.isAcceptable()
-- SocketChannel = java.nio.channels.SocketChannel
[connected local=/127.0.0.1:6666 remote=/127.0.0.1:50766]
 register SelectionKey.OP_READ
selector.select() = 1
SelectionKey = sun.nio.ch.SelectionKeyImpl@1503a3
key.isReadable()
handler = ServerPlusHandler$SuiviConnectionHandler@1a1c887
-- SocketChannel = java.nio.channels.SocketChannel
[connected local=/127.0.0.1:6666 remote=/127.0.0.1:50766]
 nombre requete = 1

console 2:
d
bien recu 3 bytes lors de la requete : 5

console 1:
selector.select() = 1
SelectionKey = sun.nio.ch.SelectionKeyImpl@13c5982
key.isReadable()
handler = ServerPlusHandler$SuiviConnectionHandler@14b7453
-- SocketChannel = java.nio.channels.SocketChannel
[connected local=/127.0.0.1:6666 remote=/127.0.0.1:50762]
 nombre requete = 5

Les lignes de la classe :

exercices