一.什么是netty?為什么要用netty
netty是jboss提供的一個(gè)java開(kāi)源框架,netty提供異步的、事件驅(qū)動(dòng)的網(wǎng)絡(luò)應(yīng)用程序框架和工具,用以快速開(kāi)發(fā)高性能、高可用性的網(wǎng)絡(luò)服務(wù)器和客戶端程序。也就是說(shuō)netty是一個(gè)基于nio的編程框架,使用netty可以快速的開(kāi)發(fā)出一個(gè)網(wǎng)絡(luò)應(yīng)用。
由于java 自帶的nio api使用起來(lái)非常復(fù)雜,并且還可能出現(xiàn) Epoll Bug,這使得我們使用原生的nio來(lái)進(jìn)行網(wǎng)絡(luò)編程存在很大的難度且非常耗時(shí)。但是netty良好的設(shè)計(jì)可以使開(kāi)發(fā)人員快速高效的進(jìn)行網(wǎng)絡(luò)應(yīng)用開(kāi)發(fā)。
Netty是什么?
1)本質(zhì):JBoss做的一個(gè)Jar包
2)目的:快速開(kāi)發(fā)高性能、高可靠性的網(wǎng)絡(luò)服務(wù)器和客戶端程序
3)優(yōu)點(diǎn):提供異步的、事件驅(qū)動(dòng)的網(wǎng)絡(luò)應(yīng)用程序框架和工具
通俗的說(shuō):一個(gè)好使的處理Socket的東東
二.netty的功能特性和架構(gòu)思想
如下圖所示:netty的核心是支持零拷貝的bytebuf緩沖對(duì)象、通用通信api和可擴(kuò)展的事件模型;它支持多種傳輸服務(wù)并且支持HTTP、Protobuf、二進(jìn)制、文本、WebSocket 等一系列常見(jiàn)協(xié)議,也支持自定義協(xié)議。
netty的模型是基于reactor多線程模型,其中mainReactor用于接收客戶端請(qǐng)求并轉(zhuǎn)發(fā)給subReactor。SubReactor負(fù)責(zé)通道的讀寫(xiě)請(qǐng)求,非 IO 請(qǐng)求(具體邏輯處理)的任務(wù)則會(huì)直接寫(xiě)入隊(duì)列,等待 worker threads 進(jìn)行處理。
三.netty中的一些核心的概念
1.bootstrap、serverBootstrap:bootstrap的意思是引導(dǎo),其主要作用是配置整個(gè)netty程序,將各個(gè)組件整合起來(lái)。serverBootstrap是服務(wù)器端的引導(dǎo)類(lèi)。bootstrap用于連接遠(yuǎn)程主機(jī)它有一個(gè)EventLoopGroup ;serverBootstrap用于監(jiān)聽(tīng)本地端口有兩個(gè)EventLoopGroup。
2.eventLoop:eventLoop維護(hù)了一個(gè)線程和任務(wù)隊(duì)列,支持異步提交執(zhí)行任務(wù)。
3.eventLoopGroup:eventLoopGroup 主要是管理eventLoop的生命周期,可以將其看作是一個(gè)線程池,其內(nèi)部維護(hù)了一組eventLoop,每個(gè)eventLoop對(duì)應(yīng)處理多個(gè)Channel,而一個(gè)Channel只能對(duì)應(yīng)一個(gè)eventLoop。
4.channelPipeLine:是一個(gè)包含channelHandler的list,用來(lái)設(shè)置channelHandler的執(zhí)行順序。
5.Channel:Channel代表一個(gè)實(shí)體(如一個(gè)硬件設(shè)備、一個(gè)文件、一個(gè)網(wǎng)絡(luò)套接字或者一個(gè)能夠執(zhí)行一個(gè)或者多個(gè)不同的IO操作的程序組件)的開(kāi)放鏈接,如讀操作和寫(xiě)操作。
6.Futrue、ChannelFuture :Future提供了另一種在操作完成時(shí)通知應(yīng)用程序的方式。這個(gè)對(duì)象可以看作是一個(gè)異步操作結(jié)果的占位符;它將在未來(lái)的某個(gè)時(shí)刻完成,并提供對(duì)其結(jié)果的訪問(wèn)。netty的每一個(gè)出站操作都會(huì)返回一個(gè)ChannelFuture。future上面可以注冊(cè)一個(gè)監(jiān)聽(tīng)器,當(dāng)對(duì)應(yīng)的事件發(fā)生后會(huì)出發(fā)該監(jiān)聽(tīng)器。
7.ChannelInitializer:它是一個(gè)特殊的ChannelInboundHandler,當(dāng)channel注冊(cè)到eventLoop上面時(shí),對(duì)channel進(jìn)行初始化
8.ChannelHandler:用來(lái)處理業(yè)務(wù)邏輯的代碼,ChannelHandler是一個(gè)父接口,ChannelnboundHandler和ChannelOutboundHandler都繼承了該接口,它們分別用來(lái)處理入站和出站。
9.ChannelHandlerContext:允許與其關(guān)聯(lián)的ChannelHandler與它相關(guān)聯(lián)的ChannlePipeline和其它ChannelHandler來(lái)進(jìn)行交互。它可以通知相同ChannelPipeline中的下一個(gè)ChannelHandler,也可以對(duì)其所屬的ChannelPipeline進(jìn)行動(dòng)態(tài)修改。
四.netty中常用的自帶解碼器和編碼器(編解碼器名字對(duì)應(yīng)的只列舉一個(gè))
1.DelimiterBasedFrameDecoder :分隔符解碼器,以設(shè)定的符號(hào)作為消息的結(jié)束解決粘包問(wèn)題
2.FixedLengthFrameDecoder?。憾ㄩL(zhǎng)解碼器,作用于定長(zhǎng)的消息
3.LineBasedFrameDecoder :按照每一行進(jìn)行分割,也就是特殊的分隔符解碼器,它的分割符為\n或者\(yùn)r\n。
4.LengthFieldBasedFrameDecoder ?。和ㄟ^(guò)消息中設(shè)置的長(zhǎng)度字段來(lái)進(jìn)行粘包處理。該解碼器總共有5個(gè)參數(shù)
5.LengthFieldBasedFrameDecoder(int maxFrameLength,單個(gè)包的最大大小
int lengthFieldOffset, 定義長(zhǎng)度的字段的相對(duì)包開(kāi)始的偏移量
int lengthFieldLength, 定義長(zhǎng)度字段所占字節(jié)數(shù)
int lengthAdjustment, lengthAdjustment = 數(shù)據(jù)長(zhǎng)度字段之后剩下包的字節(jié)數(shù) - 數(shù)據(jù)長(zhǎng)度取值(也就是長(zhǎng)度字段之后的所有非數(shù)據(jù)的其他信息)
int initialBytesToStrip) 從包頭開(kāi)始,要忽略的字節(jié)數(shù)
6.HttpRequestDecoder :將字節(jié)解碼為HttpRequest、HttpContent和LastHttpContent消息
7.HttpResponseDecoder ?。簩⒆止?jié)解碼為HttpResponse、HttpContent和LastHttpContent消息
8.ReplayingDecoder ?。阂粋€(gè)特殊的ByteToMessageDecoder ,可以在阻塞的i/o模式下實(shí)現(xiàn)非阻塞的解碼。 ReplayingDecoder 和ByteToMessageDecoder 最大的不同就是ReplayingDecoder 允許你實(shí)現(xiàn)decode()和decodeLast()就像所有的字節(jié)已經(jīng)接收到一樣,不需要判斷可用的字節(jié)
9.Base64Decoder ?。築ase64編碼器
10.StringDecoder :將接收到的ByteBuf轉(zhuǎn)化為String
11.ByteArrayDecoder ?。簩⒔邮盏降腂yteBuf轉(zhuǎn)化為byte 數(shù)組
12.DatagramPacketDecoder :運(yùn)用指定解碼器來(lái)對(duì)接收到的DatagramPacket進(jìn)行解碼
13.MsgpackDecoder ?。河糜贛sgpack序列化的解碼器
14.ProtobufDecoder ?。河糜赑rotobuf協(xié)議傳輸?shù)慕獯a器
15.HttpObjectAggregator :將http消息的多個(gè)部分聚合起來(lái)形成一個(gè)FullHttpRequest或者FullHttpResponse消息。
LengthFieldPrepender :將消息的長(zhǎng)度添加到消息的前端的編碼器,一般是和LengthFieldBasedFrameDecoder搭配使用
HttpServerCodec:相當(dāng)于HttpRequestDecoder和HttpResponseEncoder
HttpClientCodec:相當(dāng)于HttpRequestEncoder和HttpResponseDecoder
ChunkedWriteHandler:在進(jìn)行大文件傳輸?shù)臅r(shí)候,一次將文件的全部?jī)?nèi)容映射到內(nèi)存中,很有可能導(dǎo)致內(nèi)存溢出,ChunkedWriteHandler可以解決大文件或者碼流傳輸過(guò)程中可能發(fā)生的內(nèi)存溢出問(wèn)題.
五.netty的簡(jiǎn)單使用
public class MyClient {
public static void main(String[] args) throws InterruptedException {
NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup();
try{
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(nioEventLoopGroup).channel(NioSocketChannel.class)
.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0,4,0,4))
.addLast(new LengthFieldPrepender(4))
.addLast(new StringDecoder(CharsetUtil.UTF_8))
.addLast(new StringEncoder(CharsetUtil.UTF_8))
.addLast(new SimpleChannelInboundHandler<String>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println(ctx.channel().remoteAddress()+": "+msg);
ctx.writeAndFlush("來(lái)自客戶端的信息");
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
for(int i=0;i<10;i++){
ctx.writeAndFlush("客戶端第"+i+"條消息");
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
});
}
});
ChannelFuture future = bootstrap.connect("localhost", 9999).sync();
future.channel().closeFuture().sync();
}finally{
nioEventLoopGroup.shutdownGracefully().sync();
}
}
}
public class MyServer {
public static void main(String[] args) throws InterruptedException {
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
try{
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline()
.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4,0,4))
.addLast(new LengthFieldPrepender(4))
.addLast(new StringDecoder(CharsetUtil.UTF_8))
.addLast(new StringEncoder(CharsetUtil.UTF_8))
.addLast(new SimpleChannelInboundHandler<String>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println(ctx.channel().remoteAddress()+":"+msg);
ctx.writeAndFlush("from server: "+UUID.randomUUID());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
});
}
});
ChannelFuture future = serverBootstrap.bind(9999).sync();
future.channel().closeFuture().sync();
}finally{
bossGroup.shutdownGracefully().sync();
workerGroup.shutdownGracefully().sync();
}
}
}
原文鏈接: https://blog.csdn.net/weixin_42408447/article/details/117110363