<>NIO Basics
<> one , Three components
<>1. Channel
Bidirectional channel for reading and writing data
common Channel:
* FileChannel
* DatagramChannel:TCP UDP Need to use
* SocketChannel: Customer service end , The server can be used
* ServerSocketChannel: Server available
<>2. Buffer
Memory cache , Temporary data , Used to write data alone
Commonly used Buffer:
* ByteBuffer
* MappedByteBuffer
* DirectByteBuffer
* HeapByteBuffer
* ShortBuffer
* IntBuffer
* ......
<>2.1 ByteBuffer structure
Important attribute :
* capacity: capacity
* position:
* Write mode : Represents the position of the pointer to write data
* Read mode : Convert position to read position
* limit:
* Write mode : Write limit
* Read mode : Read limit
<>3. Selector
selector
<>3.1 Multithreaded version design
<>3.2 shortcoming
* High memory usage
* High cost of thread context switching
* It is only suitable for connecting a few scenes
<>3.3 Thread pool version
<>3.4 shortcoming
* Blocking mode , A thread can only process one socket connect
* Only suitable for short connection scenarios
<>3.5 Selector edition
Manage multiple with one thread channel, Get these channel Events on , these channel Operating in non blocking mode . call selector of select()
Will block you know channel A write only ready event has occurred , These event methods will return these events to trhead To handle . Suitable for low traffic scenarios
<> two , Sticky bag \ Half package
There are multiple pieces of data sent to the server on the network , Use between data \n Split , However, for some reason, these data are recombined when received .
public static void main(String[] args) throws IOException { //test1();
//writer(); ByteBuffer encode = StandardCharsets.UTF_8.encode("hello,world\ni
am zhangsan\nwe are family"); split(encode); encode = StandardCharsets.UTF_8.
encode(" together . \n 55555"); split(encode); encode = StandardCharsets.UTF_8.
encode(" nonononono . \n"); split(encode); encode = StandardCharsets.UTF_8.
encode(" 222222 . \n"); split(encode); encode = StandardCharsets.UTF_8.encode("
33333333333 . \n"); split(encode); encode = StandardCharsets.UTF_8.encode("
44444444444444 . \n123456\n"); split(encode); } /** * Sticky bag / Half package , through the use of /n division
Simulate packets sent by the network * 1, query ThreadLocal Is there any data that was not read last time * 1.1 Merge the last incomplete data with the current data * 1.2
Enable read mode * 1.3 verification position Location of Follow limit Is the location of the same , If different proof Data not fetched * 2, Read data */ private
static void split(ByteBuffer byteBuffer){ // Switch read mode Data lastData =
dataThreadLocal.get(); if (Objects.nonNull(lastData)) { ByteBuffer
lastDataBuffer= lastData.getData(); ByteBuffer newByteBuffer = ByteBuffer.
allocate(byteBuffer.limit() + (lastDataBuffer.limit() - lastDataBuffer.position(
))); while (lastDataBuffer.hasRemaining()){ newByteBuffer.put(lastDataBuffer.get
()); } newByteBuffer.put(byteBuffer); newByteBuffer.flip(); // Read new incoming , Merge previous data
for (int i = 0; i < newByteBuffer.limit(); i++) { byte b = newByteBuffer.get(i);
if ('\n' == b) { int length = i + 1 - newByteBuffer.position(); ByteBuffer
allocate= ByteBuffer.allocate(length); for (int i1 = 0; i1 < allocate.limit();
i1++) { allocate.put(newByteBuffer.get()); } allocate.flip(); System.out.println
(" Subsequent packet data :"+StandardCharsets.UTF_8.decode(allocate)); } } if (newByteBuffer.
position() != newByteBuffer.limit()) { // No intercepted data , Put the following data into Data data = new Data(
Thread.currentThread().getName(), newByteBuffer); dataThreadLocal.set(data); } }
else { // Split data according to escape characters for (int i = 0; i < byteBuffer.limit() ; i++) { // Intercept data
byte b = byteBuffer.get(i); if ('\n' == b) { int length = i + 1 - byteBuffer.
position(); ByteBuffer allocate = ByteBuffer.allocate(length); for (int i1 = 0;
i1< allocate.limit(); i1++) { allocate.put(byteBuffer.get()); } allocate.flip();
System.out.println(" print data :"+StandardCharsets.UTF_8.decode(allocate)); } } if (
byteBuffer.position() != byteBuffer.limit()) { // No intercepted data , Put the following data into Data data =
new Data(Thread.currentThread().getName(), byteBuffer); dataThreadLocal.set(data
); } } }
<> three , actual combat
<> time server
public class TimeServer { public static void main(String[] args) throws
InterruptedException { int port = 8080; if (args != null && args.length > 0) {
try { port = Integer.parseInt(args[0]); } catch (NumberFormatException e) { // }
} new Thread(new MultiplexerTimeServer(port)).start(); Thread.currentThread().
join(); } public static class MultiplexerTimeServer implements Runnable {
// Create multiplexer private Selector selector; // Resume server pipeline private ServerSocketChannel
serverSocketChannel; // Used to control whether the server is stopped private volatile boolean stop; public
MultiplexerTimeServer(int port) { try { selector = Selector.open();
serverSocketChannel= ServerSocketChannel.open(); // Set to non blocking mode serverSocketChannel.
configureBlocking(false); // Bind socket serverSocketChannel.socket().bind(new
InetSocketAddress(port), 1024); // Register a received listener bit event serverSocketChannel.register(
selector, SelectionKey.OP_ACCEPT); System.out.println("The time server is start
int port :" + port); } catch (IOException e) { System.exit(1); } } public void
stop() { this.stop = true; } @Override public void run() { try { while (!stop) {
// Polling registered at selector It's ready channel selector.select(1000); Set<SelectionKey>
selectionKeys= selector.selectedKeys(); Iterator<SelectionKey> keyIterator =
selectionKeys.iterator(); SelectionKey key = null; while (keyIterator.hasNext())
{ key = keyIterator.next(); keyIterator.remove(); this.handleInput(key); } } }
catch (IOException e) { e.printStackTrace(); } } /** * Handling events * @param
selectionKey * @throws IOException */ private void handleInput(SelectionKey
selectionKey) throws IOException { if (selectionKey.isValid()) {
// If the event is received, process the received event first , Then register a read operation bit on the pipeline if (selectionKey.isAcceptable()) { System.
out.println(" Connection event received ...."); ServerSocketChannel channel = (ServerSocketChannel)
selectionKey.channel(); SocketChannel sc = channel.accept(); sc.
configureBlocking(false); sc.register(selector, SelectionKey.OP_READ); }
// If it is a read event if (selectionKey.isReadable()) { System.out.println(" Read event received ....");
SocketChannel channel = (SocketChannel) selectionKey.channel(); ByteBuffer
byteBuffer= ByteBuffer.allocate(1024); int read = channel.read(byteBuffer);
byteBuffer.flip(); if (read > 0) { byte[] bytes = new byte[byteBuffer.remaining(
)]; byteBuffer.get(bytes); String body = new String(bytes, StandardCharsets.
UTF_8); System.out.println("The time server receive order :" + body); // Return client time
String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new Date(System
.currentTimeMillis()).toString() : "BAD ORDER"; doWrite(channel, currentTime); }
else if (read < 0) { selectionKey.cancel(); channel.close(); } else { ; // read 0
byte ignore } } } } private void doWrite(SocketChannel socketChannel, String response)
throws IOException { if (StringUtils.hasText(response)) { byte[] bytes =
response.getBytes(StandardCharsets.UTF_8); ByteBuffer wrap = ByteBuffer.allocate
(bytes.length); wrap.put(bytes); wrap.flip(); socketChannel.write(wrap); } } } }
<> Time client
public class TimeClient { public static void main(String[] args) throws
InterruptedException { int port = 8080; if (args != null && args.length > 0) {
port= Integer.parseInt(args[0]); } new Thread(new TimeClientHandle("127.0.0.1",
port)).start(); Thread.currentThread().join(); } public static class
TimeClientHandle implements Runnable { private String host; private int port;
private Selector selector; private SocketChannel socketChannel; private volatile
boolean stop; public TimeClientHandle(String host, int port) { this.host = host
== null ? "127.0.0.1" : host; this.port = port; try { selector = Selector.open()
; socketChannel = SocketChannel.open(); socketChannel.configureBlocking(false);
} catch (IOException e) { e.printStackTrace(); System.exit(1); } } @Override
public void run() { try { doConnect(); } catch (IOException e) { e.
printStackTrace(); System.exit(1); } while (!stop) { try { selector.select(1000)
; Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<
SelectionKey> selectionKeyIterator = selectionKeys.iterator(); SelectionKey key
= null; while (selectionKeyIterator.hasNext()) { key = selectionKeyIterator.next
(); selectionKeyIterator.remove(); try { handleInput(key); } catch (IOException
e) { e.printStackTrace(); key.cancel(); if (key.channel() != null) { key.channel
().close(); } } } } catch (Exception e) { e.printStackTrace(); System.exit(1); }
} } /** * Create connection * 1. If the direct connection is successful , Register read events directly to the multiplexer * 2. If the connection is not successful , Register connection events to the multiplexer * @throws
IOException */ private void doConnect() throws IOException { if (socketChannel.
connect(new InetSocketAddress(host, port))) { socketChannel.register(selector,
SelectionKey.OP_READ); doWrite(socketChannel); } else { socketChannel.register(
selector, SelectionKey.OP_CONNECT); } } /** * Send request data to pipeline * * * @param
socketChannel * @throws IOException */ private void doWrite(SocketChannel
socketChannel) throws IOException { byte[] bytes = "QUERY TIME ORDER".getBytes(
StandardCharsets.UTF_8); ByteBuffer byteBuffer = ByteBuffer.allocate(bytes.
length); byteBuffer.put(bytes); byteBuffer.flip(); socketChannel.write(
byteBuffer); if (!byteBuffer.hasRemaining()) { System.out.println("Send order 2
server succeed ."); } } /** * Judge first key Is it a connection event , If the connection is successful , Register a read time to send a query request * * @param
selectionKey * @throws IOException */ private void handleInput(SelectionKey
selectionKey) throws IOException { if (selectionKey.isValid()) { SocketChannel
socketChannel= (SocketChannel) selectionKey.channel(); // Determine whether the connection is successful if (
selectionKey.isConnectable()) { System.out.println(" Server connection event received ....");
// After completing the connection , Register a read event if (socketChannel.finishConnect()) { socketChannel.register(
selector, SelectionKey.OP_READ); doWrite(socketChannel); } } // Read event ready if (
selectionKey.isReadable()) { System.out.println(" Server read event received ...."); ByteBuffer
byteBuffer= ByteBuffer.allocate(1024); int read = socketChannel.read(byteBuffer)
; if (read > 0) { byteBuffer.flip(); byte[] bytes = new byte[read]; byteBuffer.
get(bytes); String body = new String(bytes, StandardCharsets.UTF_8); System.out.
println("Now is :" + body); this.stop = true; } else if (read < 0) {
selectionKey.cancel(); socketChannel.close(); } else { ; // read 0 byte } } } } } }
Technology