疯狂java


您现在的位置: 疯狂软件 >> 新闻资讯 >> 正文

Java Nio 异步操作channel


 

 
摘要: Java NIO 的核心组成部分: 1.Channels 2.Buffers 3.Selectors   我们首先来学习Channels(java.nio.channels): 通道   1)通道基础   通道(Channel)是java.nio的第二个主要创新。它们既不是一个扩展也不是一项增强,而是全新、极好的Java I/O示例,提供与I/O服务的直接连接。Channel
 
Java NIO 的核心组成部分:
 
1.Channels
 
2.Buffers
 
3.Selectors
 
  我们首先来学习Channels(java.nio.channels):
 
通道
 
  1)通道基础
 
  通道(Channel)是java.nio的第二个主要创新。它们既不是一个扩展也不是一项增强,而是全新、极好的Java I/O示例,提供与I/O服务的直接连接。Channel用于在字节缓冲区和位于通道另一侧的实体(通常是一个文件或套接字)之间有效地传输数据。
 
  channel的jdk源码:
 
package java.nio.channels;
    public interface Channel;
    {
        public boolean isOpen();
        public void close() throws IOException;
    }
  与缓冲区不同,通道API主要由接口指定。不同的操作系统上通道实现(Channel Implementation)会有根本性的差异,所以通道API仅仅描述了可以做什么。因此很自然地,通道实现经常使用操作系统的本地代码。通道接口允许您以一种受控且可移植的方式来访问底层的I/O服务。
 
  Channel是一个对象,可以通过它读取和写入数据。拿 NIO 与原来的 I/O 做个比较,通道就像是流。所有数据都通过 Buffer 对象来处理。您永远不会将字节直接写入通道中,相反,您是将数据写入包含一个或者多个字节的缓冲区。同样,您不会直接从通道中读取字节,而是将数据从通道读入缓冲区,再从缓冲区获取这个字节。
 
Java NIO 的通道类似流,但又有些不同:
 
既可以从通道中读取数据,又可以写数据到通道。但流的读写通常是单向的。
通道可以异步地读写。
通道中的数据总是要先读到一个 Buffer,或者总是要从一个 Buffer 中写入。
基本上,所有的 IO 在NIO 中都从一个Channel 开始。Channel 有点象流。 数据可以从Channel读到Buffer中,也可以从Buffer 写到Channel中。这里有个图示: 
                                                      
 
下面是JAVA NIO中的一些主要Channel的实现: java.nio.channels包中的Channel接口的已知实现类
 
FileChannel:从文件中读写数据。
DatagramChannel:能通过UDP读写网络中的数据。
SocketChannel:能通过TCP读写网络中的数据。
ServerSocketChannel:可以监听新进来的TCP连接,像Web服务器那样。对每一个新进来的连接都会创建一个SocketChannel。
正如你所看到的,这些通道涵盖了UDP 和 TCP 网络IO,以及文件IO。 
 
  2)通道API
  1.文件通道API
  FileChannel类可以实现常用的read,write以及scatter/gather操作,同时它也提供了很多专用于文件的新方法。这些方法中的许多都是我们所熟悉的文件操作。
  FileChannel类的JDK源码:
 
package java.nio.channels;
    public abstract class FileChannel extends AbstractChannel implements ByteChannel, GatheringByteChannel, ScatteringByteChannel
    {
        // This is a partial API listing
        // All methods listed here can throw java.io.IOException
        public abstract int read (ByteBuffer dst, long position);
        public abstract int write (ByteBuffer src, long position);
        public abstract long size();
        public abstract long position();
        public abstract void position (long newPosition);
        public abstract void truncate (long size);
        public abstract void force (boolean metaData);
        public final FileLock lock();
        public abstract FileLock lock (long position, long size, boolean shared);
        public final FileLock tryLock();
        public abstract FileLock tryLock (long position, long size, boolean shared);
        public abstract MappedByteBuffer map (MapMode mode, long position, long size);
        public static class MapMode;
        public static final MapMode READ_ONLY;
        public static final MapMode READ_WRITE;
        public static final MapMode PRIVATE;
        public abstract long transferTo (long position, long count, WritableByteChannel target);
        public abstract long transferFrom (ReadableByteChannel src, long position, long count);
    } 
  文件通道总是阻塞式的,因此不能被置于非阻塞模式。现代操作系统都有复杂的缓存和预取机制,使得本地磁盘I/O操作延迟很少。网络文件系统一般而言延迟会多些,不过却也因该优化而受益。面向流的I/O的非阻塞范例对于面向文件的操作并无多大意义,这是由文件I/O本质上的不同性质造成的。对于文件I/O,最强大之处在于异步I/O(asynchronous I/O),它允许一个进程可以从操作系统请求一个或多个I/O操作而不必等待这些操作的完成。发起请求的进程之后会收到它请求的I/O操作已完成的通知。
 
  FileChannel对象是线程安全(thread-safe)的。多个进程可以在同一个实例上并发调用方法而不会引起任何问题,不过并非所有的操作都是多线程的(multithreaded)。影响通道位置或者影响文件大小的操作都是单线程的(single-threaded)。如果有一个线程已经在执行会影响通道位置或文件大小的操作,那么其他尝试进行此类操作之一的线程必须等待。并发行为也会受到底层的操作系统或文件系统影响。
  每个FileChannel对象都同一个文件描述符(file descriptor)有一对一的关系,所以上面列出的API方法与在您最喜欢的POSIX(可移植操作系统接口)兼容的操作系统上的常用文件I/O系统调用紧密对应也就不足为怪了。本质上讲,RandomAccessFile类提供的是同样的抽象内容。在通道出现之前,底层的文件操作都是通过RandomAccessFile类的方法来实现的。FileChannel模拟同样的I/O服务,因此它的API自然也是很相似的。
 
  三者之间的方法对比:
 
  
FILECHANNEL RANDOMACCESSFILE POSIX SYSTEM CALL
read( ) read( ) read( )
write( ) write( ) write( )
size( ) length( ) fstat( )
position( ) getFilePointer( ) lseek( )
position (long newPosition) seek( ) lseek( )
truncate( ) setLength( ) ftruncate( )
force( ) getFD().sync( ) fsync( )
 
 
  2.Socket通道
  新的socket通道类可以运行非阻塞模式并且是可选择的。这两个性能可以激活大程序(如网络服务器和中间件组件)巨大的可伸缩性和灵活性。本节中我们会看到,再也没有为每个socket连接使用一个线程的必要了,也避免了管理大量线程所需的上下文交换总开销。借助新的NIO类,一个或几个线程就可以管理成百上千的活动socket连接了并且只有很少甚至可能没有性能损失。所有的socket通道类(DatagramChannel、SocketChannel和ServerSocketChannel)都继承了位于java.nio.channels.spi包中的AbstractSelectableChannel。这意味着我们可以用一个Selector对象来执行socket通道的就绪选择(readiness selection)。
 
  请注意DatagramChannel和SocketChannel实现定义读和写功能的接口而ServerSocketChannel不实现。ServerSocketChannel负责监听传入的连接和创建新的SocketChannel对象,它本身从不传输数据。
 
  在我们具体讨论每一种socket通道前,您应该了解socket和socket通道之间的关系。之前的章节中有写道,通道是一个连接I/O服务导管并提供与该服务交互的方法。就某个socket而言,它不会再次实现与之对应的socket通道类中的socket协议API,而java.net中已经存在的socket通道都可以被大多数协议操作重复使用。
 
  全部socket通道类(DatagramChannel、SocketChannel和ServerSocketChannel)在被实例化时都会创建一个对等socket对象。这些是我们所熟悉的来自java.net的类(Socket、ServerSocket和DatagramSocket),它们已经被更新以识别通道。对等socket可以通过调用socket( )方法从一个通道上获取。此外,这三个java.net类现在都有getChannel( )方法。
 
  Socket通道将与通信协议相关的操作委托给相应的socket对象。socket的方法看起来好像在通道类中重复了一遍,但实际上通道类上的方法会有一些新的或者不同的行为。
 
  要把一个socket通道置于非阻塞模式,我们要依靠所有socket通道类的公有超级类:SelectableChannel。就绪选择(readiness selection)是一种可以用来查询通道的机制,该查询可以判断通道是否准备好执行一个目标操作,如读或写。非阻塞I/O和可选择性是紧密相连的,那也正是管理阻塞模式的API代码要在SelectableChannel超级类中定义的原因。
 
  设置或重新设置一个通道的阻塞模式是很简单的,只要调用configureBlocking( )方法即可,传递参数值为true则设为阻塞模式,参数值为false值设为非阻塞模式。真的,就这么简单!您可以通过调用isBlocking( )方法来判断某个socket通道当前处于哪种模式。
 
  非阻塞socket通常被认为是服务端使用的,因为它们使同时管理很多socket通道变得更容易。但是,在客户端使用一个或几个非阻塞模式的socket通道也是有益处的,例如,借助非阻塞socket通道,GUI程序可以专注于用户请求并且同时维护与一个或多个服务器的会话。在很多程序上,非阻塞模式都是有用的。
 
  偶尔地,我们也会需要防止socket通道的阻塞模式被更改。API中有一个blockingLock( )方法,该方法会返回一个非透明的对象引用。返回的对象是通道实现修改阻塞模式时内部使用的。只有拥有此对象的锁的线程才能更改通道的阻塞模式。
 
2.2.1 ServerSocketChannel
让我们从最简单的ServerSocketChannel来开始对socket通道类的讨论。以下是ServerSocketChannel的完整API:
 
public abstract class ServerSocketChannel extends AbstractSelectableChannel
  {
      public static ServerSocketChannel open() throws IOException;
      public abstract ServerSocket socket();
      public abstract ServerSocket accept()throws IOException;
      public final int validOps();
  }  
ServerSocketChannel是一个基于通道的socket监听器。它同我们所熟悉的java.net.ServerSocket执行相同的基本任务,不过它增加了通道语义,因此能够在非阻塞模式下运行。
 
由于ServerSocketChannel没有bind( )方法,因此有必要取出对等的socket并使用它来绑定到一个端口以开始监听连接。我们也是使用对等ServerSocket的API来根据需要设置其他的socket选项。
 
同它的对等体java.net.ServerSocket一样,ServerSocketChannel也有accept( )方法。一旦您创建了一个ServerSocketChannel并用对等socket绑定了它,然后您就可以在其中一个上调用accept( )。如果您选择在ServerSocket上调用accept( )方法,那么它会同任何其他的ServerSocket表现一样的行为:总是阻塞并返回一个java.net.Socket对象。如果您选择在ServerSocketChannel上调用accept( )方法则会返回SocketChannel类型的对象,返回的对象能够在非阻塞模式下运行。
 
如果以非阻塞模式被调用,当没有传入连接在等待时,ServerSocketChannel.accept( )会立即返回null。正是这种检查连接而不阻塞的能力实现了可伸缩性并降低了复杂性。可选择性也因此得到实现。我们可以使用一个选择器实例来注册一个ServerSocketChannel对象以实现新连接到达时自动通知的功能。以下代码演示了如何使用一个非阻塞的accept( )方法:
 
package com.ronsoft.books.nio.channels;
   import java.nio.ByteBuffer;
   import java.nio.channels.ServerSocketChannel;
   import java.nio.channels.SocketChannel;
   import java.net.InetSocketAddress;
 
   public class ChannelAccept
   {
       public static final String GREETING = "Hello I must be going. ";
       public static void main (String [] argv) throws Exception
       {
           int port = 1234; // default
           if (argv.length > 0) {
               port = Integer.parseInt (argv [0]);
           }
           ByteBuffer buffer = ByteBuffer.wrap (GREETING.getBytes());
           ServerSocketChannel ssc = ServerSocketChannel.open();
           ssc.socket().bind (new InetSocketAddress (port));
           ssc.configureBlocking (false);
           while (true) {
               System.out.println ("Waiting for connections");
               SocketChannel sc = ssc.accept();
               if (sc == null) {
                   Thread.sleep (2000);
               } else {
                   System.out.println ("Incoming connection from: " + sc.socket().getRemoteSocketAddress());
                   buffer.rewind();
                   sc.write (buffer);
                   sc.close();
               }
           }
       }
   }
2.2.2 SocketChannel
 
下面开始学习SocketChannel,它是使用最多的socket通道类:
 
Java NIO中的SocketChannel是一个连接到TCP网络套接字的通道。可以通过以下2种方式创建SocketChannel:
 
打开一个SocketChannel并连接到互联网上的某台服务器。
一个新连接到达ServerSocketChannel时,会创建一个SocketChannel。
打开 SocketChannel
下面是SocketChannel的打开方式:
 
 
SocketChannel socketChannel = SocketChannel.open();
socketChannel.connect(new InetSocketAddress("http://jenkov.com", 80));
关闭 SocketChannel
 
当用完SocketChannel之后调用SocketChannel.close()关闭SocketChannel:
socketChannel.close();  
从 SocketChannel 读取数据
要从SocketChannel中读取数据,调用一个read()的方法之一。以下是例子:
 
1
2
ByteBuffer buf = ByteBuffer.allocate(48);
int bytesRead = socketChannel.read(buf);
  首先,分配一个Buffer。从SocketChannel读取到的数据将会放到这个Buffer中。然后,调用SocketChannel.read()。该方法将数据从SocketChannel 读到Buffer中。read()方法返回的int值表示读了多少字节进Buffer里。如果返回的是-1,表示已经读到了流的末尾(连接关闭了)。
 
写入 SocketChannel
写数据到SocketChannel用的是SocketChannel.write()方法,该方法以一个Buffer作为参数。示例如下:
 
String newData = "New String to write to file..." + System.currentTimeMillis();
 
ByteBuffer buf = ByteBuffer.allocate(48);
buf.clear();
buf.put(newData.getBytes());
 
buf.flip();
 
while(buf.hasRemaining()) {
    channel.write(buf);
}
  注意SocketChannel.write()方法的调用是在一个while循环中的。Write()方法无法保证能写多少字节到SocketChannel。所以,我们重复调用write()直到Buffer没有要写的字节为止。
 
非阻塞模式
可以设置 SocketChannel 为非阻塞模式(non-blocking mode).设置之后,就可以在异步模式下调用connect(), read() 和write()了。
 
connect()
 
如果SocketChannel在非阻塞模式下,此时调用connect(),该方法可能在连接建立之前就返回了。为了确定连接是否建立,可以调用finishConnect()的方法。像这样:
 
socketChannel.configureBlocking(false);
socketChannel.connect(new InetSocketAddress("http://jenkov.com", 80));
 
while(! socketChannel.finishConnect() ){
    //wait, or do something else...
}
write()
 
非阻塞模式下,write()方法在尚未写出任何内容时可能就返回了。所以需要在循环中调用write()。前面已经有例子了,这里就不赘述了。
 
read()
 
非阻塞模式下,read()方法在尚未读取到任何数据时可能就返回了。所以需要关注它的int返回值,它会告诉你读取了多少字节。
 
非阻塞模式与选择器
非阻塞模式与选择器搭配会工作的更好,通过将一或多个SocketChannel注册到Selector,可以询问选择器哪个通道已经准备好了读取,写入等。Selector与SocketChannel的搭配使用会在后面详讲。
 
2.2.3 DatagramChannel
最后一个socket通道是DatagramChannel。正如SocketChannel对应Socket,ServerSocketChannel对应ServerSocket,每一个DatagramChannel对象也有一个关联的DatagramSocket对象。不过原命名模式在此并未适用:“DatagramSocketChannel”显得有点笨拙,因此采用了简洁的“DatagramChannel”名称。
 
正如SocketChannel模拟连接导向的流协议(如TCP/IP),DatagramChannel则模拟包导向的无连接协议(如UDP/IP)。
 
DatagramChannel是无连接的。每个数据报(datagram)都是一个自包含的实体,拥有它自己的目的地址及不依赖其他数据报的数据负载。与面向流的的socket不同,DatagramChannel可以发送单独的数据报给不同的目的地址。同样,DatagramChannel对象也可以接收来自任意地址的数据包。每个到达的数据报都含有关于它来自何处的信息(源地址)。
 
最后给出一个基本的channel实例:
 
RandomAccessFile aFile = new RandomAccessFile("data/nio-data.txt", "rw");
FileChannel inChannel = aFile.getChannel();
ByteBuffer buf = ByteBuffer.allocate(48);
int bytesRead = inChannel.read(buf);
//读取的字节数,可能为零,如果该通道已到达流的末尾,则返回 -1
while (bytesRead != -1) { System.out.println("Read " + bytesRead);
buf.flip();
//反转缓冲区
while(buf.hasRemaining()){
System.out.print((char) buf.get());
//读取此缓冲区当前位置的字节,然后该位置递增。
} buf.clear();
bytesRead = inChannel.read(buf);
//从缓冲区读取数据
} aFile.close();