<>NIO
<>什么是NIO
jdk1.4
新的原始IO抽象
实现了netty框架
<>缓冲区
IO
类字节/字符
BufferedInputStream字节
BufferedOutputStream字节
BufferedReader字符
BufferedWriter字符
NIO
Buffer
<>4个核心技术点
0 <= mark <=position <= limit <= capacity
capacity 缓冲区容量 代表缓冲区的大小 也就是byte[]的长度
byte[] data = new byte[]{1,2,3}; ByteBuffer buffer = ByteBuffer.wrap(data);
System.out.println(buffer.capacity()); //通过源码查看可以知道 byteBuffer 其实内部就是这个byte[]数组
limit 缓冲区限制
超出limit 会抛出 java.lang.IndexOutOfBoundsException 异常
如果不设置limit 则 limit = capacity
<>代码实例
public static void main(String[] args){ System.out.println(
"----------charBuffer------"); char[] array = new char[26]; for(char i = 'A';i<=
'Z';i++){ array[i-'A'] = i; } printlnFun(array); CharBuffer buff = CharBuffer.
wrap(array); System.out.println("capacity " + buff.capacity()); //buff.limit(3);
buff.put(0,'?'); buff.put(1,'《'); buff.put(2,'》'); //buff.limit(5); buff.put(3,
'}'); buff.put(4,'{'); System.out.println("limit " + buff.limit()); printlnFun(
array); } public static void printlnFun(char[] array){ for(char c : array){
System.out.print(c + "\t"); } System.out.println(); }
输出实例
----------charBuffer------ A B C D E F G H I J K L M N O P Q R S T U V W X Y Z
capacity 26 limit 26 ? 《 》 } { F G H I J K L M N O P Q R S T U V W X Y Z
position 当前位置
当put()输入元素的时候 位置会自动增长,但是不会超过 limit
public static void main(String[] args){ System.out.println(
"-------position-------test--------"); char[] array = new char[26]; for(char i=
'A';i<='Z';i++){ array[i - 'A'] = i; } for(char c : array){ System.out.print(c +
"\t"); } System.out.println(); CharBuffer buff = CharBuffer.wrap(array); System.
out.println("capacity " + buff.capacity()); System.out.println("limit " + buff.
limit()); System.out.println("position " + buff.position()); buff.position(5);
buff.put('&'); System.out.println("capacity " + buff.capacity()); System.out.
println("limit " + buff.limit()); System.out.println("position " + buff.position
()); buff.put('*'); System.out.println("capacity " + buff.capacity()); System.
out.println("limit " + buff.limit()); System.out.println("position " + buff.
position()); for(char c : array){ System.out.print(c + "\t"); } System.out.
println(); }
输出
-------position-------test-------- A B C D E F G H I J K L M N O P Q R S T U V
W X Y Z capacity 26 limit 26 position 0 capacity 26 limit 26 position 6
capacity 26 limit 26 position 7 A B C D E & * H I J K L M N O P Q R S T U V W X
Y Z
remaining() 返回 position 和 limit 之间的元素数
mark() 标记当前位置 ,通过reset()返回该位置
mark 标记的是 position的位置 所以不能大于 position
未定义 mark 会抛出 java.nio.InvalidMarkException 异常
<>注意事项
*
缓冲区的 capacity 不 能为负数,缓冲区的 l imit 不能为负 数,缓 冲 区 的 position 不能
为负数 。
*
position 不能大于其 limit 。
*
limit 不能大于其 capac ity 。
*
如果定义了 mark ,则在将 pos ition 或 l imit 调整为小于该 mark 的值时,该 mark 被丢弃 。
*
如果未定义 mark ,那么调用 reset() 方法将导致抛出 InvalidMarkException 异 常 。
*
如果 position 大于新的 limit ,则 position 的值就是新 limit 的值 。
*
当 limit 和 position 值一 样时,在指定的 pos ition 写入 数据时会 出现异常,因为 此位
置是被限制的
isReadOnly() 判断缓冲区是否只读
<>直接缓冲区
非直接缓冲区就是jvm缓冲区,
优缺点
* 降低软件对数据的吞吐量
* 提高内存占有率
* 造成软件运行效率低
直接缓冲区直接在内核空间中进行,无需jvm缓冲区,提高程序运行效率
public static void main(String[] args){ System.out.println(
"-------------直接缓冲区----------"); byte[] array = new byte[10]; ByteBuffer buff =
ByteBuffer.wrap(array); boolean isDirect = buff.isDirect();//是否是直接缓冲区 System.out
.println( "是否是直接缓冲区 " + isDirect); buff = ByteBuffer.allocateDirect(100);
//创建直接缓冲区 System.out.println( "是否是直接缓冲区 " + buff.isDirect()); }
<>还原缓冲区
clear()
public final Buffer clear () position = O; limit = capacity; mark = - 1 ;
return this; }
通过源码可以看出 并不是真正的清除数据
<>缓冲区反转
flip()
通过改变position 和 limit的值实现先写后读
public static void main(String[] args){ CharBuffer buff = CharBuffer.allocate(
20); buff.put("北国风光千里冰封万里雪飘"); System.out.println("capacity " + buff.capacity()
+ " limit " + buff.limit() + " position " + buff.position()); buff.flip();
//反转之后 就可以读取 position 和limit 之间的数据 System.out.println("capacity " + buff.
capacity() + " limit " + buff.limit() + " position " + buff.position()); for(int
i=0;i<buff.limit();i++){ System.out.print(buff.get()); } }
输出
capacity 20 limit 20 position 12 capacity 20 limit 12 position 0 北国风光千里冰封万里雪飘
final boolean hasArray 是否可访问地层数组
final boolean hasRemaining 判断 position 和limit 之间是否有剩余元素
<>rewind 重绕此缓冲区
position = 0 mark = -1
注意
这里要区分 clear flip rewind 三个方法的区别
* clear 一切为默认 position = 0 mark = -1 limit = capaclity
* flip limit = position position = 0 mark = -1
* rewind position = 0 mark = -1
* rewind()方法的侧重点在“重新
* clear()方法的侧重点在“还原一切状态
* flip () 方法的侧重点在 sub string 截取
<>slice 相对于数组的偏移量
byte[] source = new byte[]{1,2,3,4,5,6,7,8,9,0}; ByteBuffer buff1 = ByteBuffer.
wrap(source); buff1.position(5); ByteBuffer buff2 = buff1.slice(); log.info(
"buff1 position {} capacity {} limit {}",buff1.position(),buff1.capacity(),buff1
.limit()); log.info("buff2 position {} capacity {} limit {}",buff2.position(),
buff2.capacity(),buff2.limit()); buff2.put(1,(byte)100); for(int i=0;i<source.
length;i++){ System.out.print(source[i] + " "); } System.out.println(); log.info
("buff2 偏移 {}",buff2.arrayOffset());
输出
10:56:58.371 [main] INFO com.bai.niolearn.split.SplitTest - buff1 position 5
capacity10 limit 10 10:56:58.381 [main] INFO com.bai.niolearn.split.SplitTest -
buff2 position0 capacity 5 limit 5 1 2 3 4 5 6 100 8 9 0 10:56:58.386 [main]
INFOcom.bai.niolearn.split.SplitTest - buff2 偏移 5
<>CharBuffer 中文输出
getBytes(“utf-16BE”);
charBuff.capacity();
buff.asCharBuffer(); 当对byte[] 进行更改的时候会更改charbuffer的值
Charset.forName(“UTF-8”).decode(buff); 则不会改变
byte[] arrays = "三声四声天下白,退去星辰与晓月".getBytes("utf-16BE"); log.info(Charset.
defaultCharset().name()); ByteBuffer buff = ByteBuffer.wrap(arrays); log.info(
buff.getClass().getName()); CharBuffer charBuff = buff.asCharBuffer(); log.info(
charBuff.getClass().getName()); log.info("byteBuffer position {} capacity {}
limit {}",buff.position(),buff.capacity(),buff.limit()); log.info("charBuffer
position {} capacity {} limit {}",charBuff.position(),charBuff.capacity(),
charBuff.limit()); for(int i=0;i<charBuff.capacity();i++){ System.out.print(
charBuff.get()); } System.out.println();
输出
15:14:04.830 [main] INFO com.bai.niolearn.charbuffer.CharBufferTest - UTF-8 15:
14:04.833 [main] INFO com.bai.niolearn.charbuffer.CharBufferTest - java.nio.
HeapByteBuffer 15:14:04.834 [main] INFO com.bai.niolearn.charbuffer.
CharBufferTest - java.nio.ByteBufferAsCharBufferB 15:14:04.834 [main] INFO com.
bai.niolearn.charbuffer.CharBufferTest - byteBuffer position 0 capacity 30 limit
30 15:14:04.837 [main] INFO com.bai.niolearn.charbuffer.CharBufferTest -
charBuffer position0 capacity 15 limit 15 三声四声天下白,退去星辰与晓月
Charset.forName(“UTF-8”).decode(buff);
charBuff.limit();
byte[] arrays = "三声四声天下白,退去星辰与晓月".getBytes("UTF-8"); log.info(Charset.
defaultCharset().name()); ByteBuffer buff = ByteBuffer.wrap(arrays); log.info(
buff.getClass().getName()); CharBuffer charBuff = Charset.forName("UTF-8").
decode(buff); log.info(charBuff.getClass().getName()); log.info("byteBuffer
position {} capacity {} limit {}",buff.position(),buff.capacity(),buff.limit());
log.info("charBuffer position {} capacity {} limit {}",charBuff.position(),
charBuff.capacity(),charBuff.limit()); for(int i=0;i<charBuff.limit();i++){
System.out.print(charBuff.get()); } System.out.println();
输出
15:17:51.841 [main] INFO com.bai.niolearn.charbuffer.CharBufferTest - UTF-8 15:
17:51.845 [main] INFO com.bai.niolearn.charbuffer.CharBufferTest - java.nio.
HeapByteBuffer 15:17:51.846 [main] INFO com.bai.niolearn.charbuffer.
CharBufferTest - java.nio.HeapCharBuffer 15:17:51.846 [main] INFO com.bai.
niolearn.charbuffer.CharBufferTest - byteBuffer position 45 capacity 45 limit 45
15:17:51.849 [main] INFO com.bai.niolearn.charbuffer.CharBufferTest -
charBuffer position0 capacity 45 limit 15 三声四声天下白,退去星辰与晓月
<>其他转换
buff.asDoubleBuffer(); buff.asFloatBuffer(); buff.asIntBuffer();
<>其他方法
duplicate 复制缓冲区 会改变源数组
equals compareTo 比较缓冲区
compact 压缩缓冲区
扩容缓冲区
<>charBuffer 相关api
append
wrap
sunSequence
remaining 获取字符缓冲区长度
<>甬道
从目标缓冲区到文件或者缓冲区的通道
<>FileChannel
永远是阻塞的操作
使用示例
<>写入文件
FileOutputStream out = new FileOutputStream("/home/baijun/java/text.txt");
FileChannel fileChannel = out.getChannel(); ByteBuffer buff = ByteBuffer.wrap(
"abcdefg".getBytes()); fileChannel.write(buff); buff.rewind(); fileChannel.
position(3); fileChannel.write(buff); fileChannel.close(); out.close();
<>remaining 写入
FileOutputStream out = new FileOutputStream("/home/baijun/java/text.txt");
ByteBuffer buff1 = ByteBuffer.wrap("abcdefg".getBytes()); ByteBuffer buff2 =
ByteBuffer.wrap("1234567890".getBytes()); FileChannel fileChannel = out.
getChannel(); fileChannel.write(buff1); buff2.position(4); buff2.limit(8);
fileChannel.write(buff2); fileChannel.close(); out.close();
<>多线程写入
FileOutputStream out = new FileOutputStream("/home/baijun/java/text.txt");
FileChannel channel = out.getChannel(); Thread t1 = new Thread(() -> { try { for
(int i = 0; i < 10; i++) { ByteBuffer buff = ByteBuffer.wrap(
"三声鸡鸣天下白,退去星辰和晓月。\n".getBytes()); channel.write(buff); Thread.sleep(1000); } }
catch (IOException e) { throw new RuntimeException(e); } catch (
InterruptedException e) { throw new RuntimeException(e); } }); Thread t2 = new
Thread(() -> { try { for (int i = 0; i < 10; i++) { ByteBuffer buff = ByteBuffer
.wrap("abcdefghighlmn\n".getBytes()); channel.write(buff); Thread.sleep(1000); }
} catch (IOException e) { throw new RuntimeException(e); } catch (
InterruptedException e) { throw new RuntimeException(e); } }); t1.start(); t2.
start(); t1.join(); t2.join(); channel.close(); out.close();
<>读取文件
int read(ByteBuffer buff)
0 没有读取到
大于0 读取到的字节长度
-1 文档末尾
FileInputStream out = new FileInputStream("/home/baijun/java/text.txt");
FileChannel channel = out.getChannel(); ByteBuffer buff = ByteBuffer.allocate(10
); while(channel.read(buff) > -1){ buff.flip(); for(int i=0;i<buff.limit();i++){
log.info("字节: {}",buff.get()); } buff.clear(); } channel.close(); out.close();
channel.position(5) 设置当前位置 读取的时候就从当前位置开始读取
<>示例
@Test public void test3() throws IOException{ FileInputStream in = new
FileInputStream("text"); FileChannel channel = in.getChannel(); ByteBuffer buff
= ByteBuffer.allocate(10); int len = -1; while((len = channel.read(buff)) != -1)
{ log.info("当前长度 {}",len); buff.clear(); } log.info("当前长度 {}",len); buff.clear()
; channel.close(); in.close(); }
<>设置甬道位置和缓冲区位置进行读取
//text abcdefg @Test public void test4()throws IOException{ FileInputStream in
= new FileInputStream("text"); FileChannel channel = in.getChannel(); ByteBuffer
buffer= ByteBuffer.allocate(5); channel.position(5); channel.read(buffer); byte
[] array = buffer.array(); for(int i=0;i<array.length;i++){ log.info("{}",(char)
array[i]); } channel.close(); in.close(); } //fg "" "" "" @Test public void
test5()throws IOException{ FileInputStream in = new FileInputStream("text");
FileChannel channel = in.getChannel(); ByteBuffer buff = ByteBuffer.allocate(5);
channel.position(2); buff.position(2); channel.read(buff); byte[] array = buff.
array(); for(int i=0;i<array.length;i++){ log.info("{}",(char)array[i]); }
channel.close(); in.close(); } // "" "" cde
<>多线程读取但是不互相影响,甬道读取是同步的
//多线程读取 @Test public void test6() throws IOException { FileInputStream in = new
FileInputStream("test"); FileChannel channel = in.getChannel(); Runnable run =
new Runnable() { @Override public void run() { try { ByteBuffer buff =
ByteBuffer.allocate(5); int len = -1; while (((len = channel.read(buff)) > -1))
{ log.info("{}:{}",Thread.currentThread().getName(),new String(buff.array(),0,
len)); buff.clear(); } } catch (IOException e) { throw new RuntimeException(e);
} } }; Thread t1 = new Thread(run,"t1"); Thread t2 = new Thread(run,"t2"); t1.
start(); t2.start(); try { Thread.sleep(5000); } catch (InterruptedException e)
{ throw new RuntimeException(e); } channel.close(); in.close();
<>甬道中的数据大于缓冲区的数据,缓冲区有多少就读多少
@Test public void test7()throws IOException{ FileInputStream in = new
FileInputStream("test"); FileChannel channel = in.getChannel(); ByteBuffer buff
= ByteBuffer.allocate(3); log.info("{}", channel.position()); channel.read(buff)
; log.info("{}", channel.position()); channel.close(); in.close(); log.info("{}"
,new String(buff.array())); }
<>从甬道中读取字节到缓冲区中
//从甬道中读取字节放入缓冲区中 @Test public void test8()throws IOException{ FileInputStream
in= new FileInputStream("test"); FileChannel channel = in.getChannel();
ByteBuffer buff = ByteBuffer.allocate(100); buff.position(1); buff.limit(3);
channel.read(buff); channel.close(); in.close(); byte[] array = buff.array();
log.info("{}",new String(array)); } //0 3 abc
<>批量写入
//批量写入 @Test public void test9()throws IOException{ FileOutputStream out = new
FileOutputStream("text"); FileChannel channel = out.getChannel(); ByteBuffer b1
= ByteBuffer.wrap("bbbbbbb".getBytes()); ByteBuffer b2 = ByteBuffer.wrap(
"qqqqqqq".getBytes()); ByteBuffer b3 = ByteBuffer.wrap("aaaaaaa".getBytes());
channel.write(b1); channel.write(b2); channel.write(b3); channel.close(); out.
close(); }
<>批量remaining 写入
//remaining 写入甬道 @Test public void test10()throws IOException{ FileOutputStream
out= new FileOutputStream("text"); FileChannel channel = out.getChannel();
ByteBuffer b1 = ByteBuffer.wrap("abcde".getBytes()); ByteBuffer b2 = ByteBuffer.
wrap("12345".getBytes()); ByteBuffer[] buff = new ByteBuffer[]{b1,b2}; b1.
position(1); b1.limit(3); b2.position(2); b2.limit(4); channel.write(buff);
channel.close(); out.close(); }
<>批量同步写入 批量写入具有同步性
public void test11() throws IOException, InterruptedException {
FileOutputStream out = new FileOutputStream("bachText"); FileChannel channel =
out.getChannel(); for(int i=0;i<10;i++){ Thread t1 = new Thread(){ @Override
public void run() { ByteBuffer b1 = ByteBuffer.wrap("aaaa1\n".getBytes());
ByteBuffer b2 = ByteBuffer.wrap("bbbb1\n".getBytes()); ByteBuffer[] buff = new
ByteBuffer[]{b1,b2}; try { channel.write(buff); } catch (IOException e) { throw
new RuntimeException(e); } } }; Thread t2 = new Thread(){ @Override public void
run() { ByteBuffer b1 = ByteBuffer.wrap("cccc2\n".getBytes()); ByteBuffer b2 =
ByteBuffer.wrap("dddd2\n".getBytes()); ByteBuffer[] buff = new ByteBuffer[]{b1,
b2}; try { channel.write(buff); } catch (IOException e) { throw new
RuntimeException(e); } } }; t1.start(); t2.start(); } Thread.sleep(5000);
channel.close(); out.close(); } /** aaaa1 bbbb1 cccc2 dddd2 aaaa1 bbbb1 aaaa1
bbbb1 cccc2 dddd2 **/
<>批量读
@Test public void test12()throws IOException{ FileInputStream in = new
FileInputStream("text"); FileChannel channel = in.getChannel(); ByteBuffer b1 =
ByteBuffer.allocate(2); ByteBuffer b2 = ByteBuffer.allocate(2); ByteBuffer[]
buff= new ByteBuffer[]{b1,b2}; long len = channel.read(buff); log.info("{}",len)
; b1.clear(); b2.clear(); len = channel.read(buff); log.info("{}",len); b1.clear
(); b2.clear(); len = channel.read(buff); log.info("{}",len); b1.clear(); b2.
clear(); channel.close(); in.close(); } /** 4 1 -1 **/
<>从甬道当前位置开始读
//从甬道当前位置读取 @Test public void test13()throws IOException{ FileInputStream in =
new FileInputStream("text"); FileChannel channel = in.getChannel(); channel.
position(2); ByteBuffer b1 = ByteBuffer.allocate(2); ByteBuffer b2 = ByteBuffer.
allocate(2); ByteBuffer[] array = new ByteBuffer[]{b1,b2}; channel.read(array);
for(int i=0;i<array.length;i++){ byte[] buff = array[i].array(); log.info("{}",
new String(buff)); } channel.close(); in.close(); }
<>将字节放入当前位置
//将字节放在当前位置 @Test public void test14()throws IOException{ FileInputStream in =
new FileInputStream("text"); FileChannel channel = in.getChannel(); ByteBuffer
b1= ByteBuffer.allocate(2); ByteBuffer b2 = ByteBuffer.allocate(2); ByteBuffer[]
buffs= new ByteBuffer[]{b1,b2}; b1.position(1); channel.read(buffs); for(int i=
0;i<buffs.length;i++){ log.info("{} {}",buffs[i].array().length,new String(buffs
[i].array())); } channel.close(); in.close(); }
<>同步性
//同步性 @Test public void test15() throws IOException, InterruptedException {
FileInputStream in = new FileInputStream("bachText"); FileChannel channel = in.
getChannel(); Runnable run = ()->{ try{ ByteBuffer b1 = ByteBuffer.allocate(6);
ByteBuffer b2 = ByteBuffer.allocate(6); ByteBuffer[] arr = new ByteBuffer[]{b1,
b2}; long len = -1; while((len = channel.read(arr))!=-1){ synchronized (this){
//保证输出有序性 for(int j=0;j<arr.length;j++){ log.info("{} {}",Thread.currentThread()
.getName(),new String(arr[j].array())); } } b1.clear(); b2.clear(); } }catch (
Exception e){ log.error(e.getMessage()); } }; for(int i=0;i<10;i++){ Thread t1 =
new Thread(run,"t1"); Thread t2 =new Thread(run,"t2"); t1.start(); t2.start(); }
Thread.sleep(5000); channel.close(); in.close(); }
<>甬道中的数据大于缓冲区的数据,缓冲区有多少就读多少(与上面相同,不做示例了)
<>偏移量写入 long write(Bytebuffer[] srcs int offset,int length)
//注意,这里的length 指的是数组,不是元素 //偏移量写入 @Test public void test16()throws IOException{
FileOutputStream out = new FileOutputStream("text"); FileChannel channel = out.
getChannel(); ByteBuffer b1 = ByteBuffer.wrap("abcde".getBytes()); ByteBuffer b2
= ByteBuffer.wrap("12345".getBytes()); ByteBuffer[] arr = new ByteBuffer[]{b1,b2
}; channel.write(ByteBuffer.wrap("hello".getBytes())); channel.write(arr,0,1);
channel.close(); out.close(); }
<>将ByteBuffer 的 remaining写入通道,同步性(与上面相同不做演示)
<>读的操作相同,不做描述