netty4源码之netty 服务端端如何处理请求

扫码下载官方App
学习过该课程的人还学习过:
其他联系方式
所属系列课程
& 网易公司 版权所有
关注我们:
Netty4底层源码剖析,夜行侠1988,本课程主要是针对netty4.16版本做个深入的源码剖析,从整个netty启动的整个流程,如何接收客户端请求数据,如何处理请求,以及对读写半包如何处理,还有netty里面哪些地方做了nio的优化处理,对内存如何管理,以及在多线程异步通信时,如何交互数据等等,最后还写了一套Springmvc框架,讲了https等等应用。
适用人群:想自己写高性能服务器,替换Tomcat
想完全随心所欲的掌控Netty的每一个细节
想成为一名高级Netty研发工程师博客分类:
之前写过一篇和本文类似的博客,不过原博客是基于netty3.x实现的,今天整理了一份基于4.0的完整系统分享给大家,希望能对大家有所帮助。
架构细节原博客都有,请参照
propholder.xml
&?xml version="1.0" encoding="UTF-8"?&
&beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:tx="http://www.springframework.org/schema/tx" xmlns:p="http://www.springframework.org/schema/p"
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-2.5.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-2.5.xsd"&
&bean id="propertyConfigurer"
class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"&
&property name="locations"&
&value&config/properties/settings.properties&/value&
&/property&
&bean id="serverInitializer" class="com.cp.netty.ServerInitializer"
init-method="init"&
&property name="timeout" value="${app.channel.readtimeout}" /&
&property name="handlerDispatcher" ref="handlerDispatcher" /&
&property name="requestType" value="${app.requestType}" /&
&bean id="handlerDispatcher" class="com.cp.dispatcher.HandlerDispatcher"&
&property name="messageExecutor"&
&bean class="com.cp.domain.FiexThreadPoolExecutor"
destroy-method="shutdown"&
&constructor-arg value="${app.handler.pool.corePoolSize}" /&
&constructor-arg value="${app.handler.pool.maximumPoolSize}" /&
&constructor-arg value="${app.handler.pool.keepAliveSecond}" /&
&constructor-arg value="${app.handler.pool.name}" /&
&/property&
&property name="sleepTime" value="${app.handler.sleepTime}" /&
&property name="handlerMap" ref="gameHandlerMap" /&
&bean id="gameHandlerMap" class="java.util.HashMap"&
&constructor-arg&
&entry key="999"&
&bean class="com.cp.handler.InitHandler"&
&/constructor-arg&
settings.properties
app.handler.pool.corePoolSize=16
app.handler.pool.maximumPoolSize=32
app.handler.pool.keepAliveSecond=300
app.handler.pool.name=gamework
app.handler.sleepTime=10
app.channel.readtimeout = 3600
websocket_text
websocket_binary
app.requestType=socket
测试类
TestClient
package com.cp.
import io.netty.bootstrap.B
import io.netty.buffer.ByteB
import io.netty.buffer.U
import io.netty.channel.ChannelF
import io.netty.channel.ChannelI
import io.netty.channel.ChannelO
import io.netty.channel.EventLoopG
import io.netty.channel.nio.NioEventLoopG
import io.netty.channel.socket.SocketC
import io.netty.channel.socket.nio.NioSocketC
import io.netty.handler.codec.LengthFieldBasedFrameD
import io.netty.handler.codec.LengthFieldP
import io.netty.handler.codec.http.DefaultFullHttpR
import io.netty.handler.codec.http.HttpH
import io.netty.handler.codec.http.HttpM
import io.netty.handler.codec.http.HttpRequestE
import io.netty.handler.codec.http.HttpResponseD
import io.netty.handler.codec.http.HttpV
import java.net.URI;
import com.cp.domain.ERequestT
public class TestClient {
public void connect(String host, int port, final ERequestType requestType)
throws Exception {
EventLoopGroup workerGroup = new NioEventLoopGroup();
String msg = "Are you ok?";
if (ERequestType.SOCKET.equals(requestType)) {
Bootstrap b = new Bootstrap();
b.group(workerGroup);
b.channel(NioSocketChannel.class).option(
ChannelOption.TCP_NODELAY, true);
b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000);
b.handler(new ChannelInitializer&SocketChannel&() {
protected void initChannel(SocketChannel ch)
throws Exception {
ch.pipeline().addLast(
new LengthFieldBasedFrameDecoder(
Integer.MAX_VALUE, 0, 4, 0, 4));
ch.pipeline().addLast("decode",
new LengthFieldPrepender(4));
ch.pipeline().addLast("handler",
new ClientInboundHandler());
ChannelFuture f = b.connect(host, port).sync();
ByteBuf messageData = Unpooled.buffer();
messageData.writeInt(999);
messageData.writeInt(msg.length());
messageData.writeBytes(msg.getBytes());
f.channel().writeAndFlush(messageData).sync();
f.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} else if (ERequestType.HTTP.equals(requestType)) {
Bootstrap b = new Bootstrap();
b.group(workerGroup);
b.channel(NioSocketChannel.class);
b.option(ChannelOption.SO_KEEPALIVE, true);
b.handler(new ChannelInitializer&SocketChannel&() {
public void initChannel(SocketChannel ch) throws Exception {
// 客户端接收到的是httpResponse响应,所以要使用HttpResponseDecoder进行解码
ch.pipeline().addLast(new HttpResponseDecoder());
// 客户端发送的是httprequest,所以要使用HttpRequestEncoder进行编码
ch.pipeline().addLast(new HttpRequestEncoder());
ch.pipeline().addLast(new ClientInboundHandler());
ChannelFuture f = b.connect(host, port).sync();
b.channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY,true);
b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000);
URI uri = new URI("http://" + host + ":" + port);
DefaultFullHttpRequest request = new DefaultFullHttpRequest(
HttpVersion.HTTP_1_1, HttpMethod.POST, uri.toASCIIString(),
Unpooled.wrappedBuffer(msg.getBytes("UTF-8")));
// 构建http请求
request.headers().set(HttpHeaders.Names.HOST, host);
request.headers().set(HttpHeaders.Names.CONNECTION,
HttpHeaders.Values.KEEP_ALIVE);
request.headers().set(HttpHeaders.Names.CONTENT_LENGTH,
request.content().readableBytes());
// 发送http请求
f.channel().write(request);
f.channel().flush();
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
public static void main(String[] args) throws Exception {
TestClient client = new TestClient();
client.connect("127.0.0.1", 8080, ERequestType.SOCKET);
ClientInboundHandler
package com.cp.
import io.netty.buffer.ByteB
import io.netty.channel.ChannelHandlerC
import io.netty.channel.ChannelInboundHandlerA
import io.netty.handler.codec.http.HttpC
import io.netty.handler.codec.http.HttpH
import io.netty.handler.codec.http.HttpR
public class ClientInboundHandler extends ChannelInboundHandlerAdapter {
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
if (msg instanceof HttpResponse) {
HttpResponse response = (HttpResponse)
System.out.println("CONTENT_TYPE:"
+ response.headers().get(HttpHeaders.Names.CONTENT_TYPE));
if (msg instanceof HttpContent) {
HttpContent content = (HttpContent)
ByteBuf buf = content.content();
System.out.println(buf.toString(io.netty.util.CharsetUtil.UTF_8));
buf.release();
if (msg instanceof ByteBuf) {
ByteBuf messageData = (ByteBuf)
int commandId = messageData.readInt();
int length = messageData.readInt();
byte[] c = new byte[length];
messageData.readBytes(c);
System.out.println("commandId:"+commandId+"\tmessage:"+new String(c));
本测试代码已经过http、socket、websocket测试。
鉴于很多朋友想深入交流,特提供源码demo项目下载地址:
https://github.com/pofuchenzhou/netty-spring-game.git
下载次数: 1121
浏览 38140
没原码啊,,,https://github.com/pofuchenzhou/netty-spring-game
求源码,谢谢!!!https://github.com/pofuchenzhou/netty-spring-game
谢谢反编译自己看吧
浏览: 276907 次
来自: 北京
netty等视频java.5d6b.com教程
学习了~~
不错,学习了~~
(window.slotbydup=window.slotbydup || []).push({
id: '4773203',
container: s,
size: '200,200',
display: 'inlay-fix'netty源代码解析(1)——服务端流程 - netty源码分析 netty源码剖析 netty 源码 网络通信 - Java - ITeye论坛
netty源代码解析(1)——服务端流程
锁定老帖子
精华帖 (1) :: 良好帖 (1) :: 新手帖 (1) :: 隐藏帖 (0)
fengfeng925
来自: 北京
发表时间:&&
最后修改:
今天看了下netty代码,对代码做了个流程分析,netty的代码写的真是漂亮。
netty服务端启动代码如下
ServerBootstrap bootstrap = new ServerBootstrap(new NioServerSocketChannelFactory(Executors.newCachedThreadPool(),
Executors.newCachedThreadPool()));
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() {
ChannelPipeline pipleline = pipeline();
//默认最大传输帧大小为16M
pipleline.addLast("encode", new ObjectEncoder(1048576 * 16));
pipleline.addLast("decode", new ObjectDecoder(1048576 * 16, ClassResolvers.weakCachingConcurrentResolver(null)));
pipleline.addLast("handler", handler);
//设置缓冲区为64M
bootstrap.setOption("receiveBufferSize", 1048576 * 64);
bootstrap.setOption("child.tcpNoDelay", true); //关闭Nagle算法
//tcp定期发送心跳包 比如IM里边定期探测对方是否下线
//只有tcp长连接下才有意义
bootstrap.setOption("child.keepAlive", true);
bootstrap.bind(new InetSocketAddress(port));
服务端事件处理顺序如下:
UpStream.ChannelState.OPEN—–&DownStream.ChannelState.BOUND(需要绑定)
——–&UpStream.ChannelState.BOUND(已经绑定)——&DownStream.CONNECTED(需要连接)——-&UpStream.CONNECTED(连接成功)
在bind的时候做了如下处理
public Channel bind(final SocketAddress localAddress) {
if (localAddress == null) {
throw new NullPointerException("localAddress");
final BlockingQueue&ChannelFuture& futureQueue =
new LinkedBlockingQueue&ChannelFuture&();
ChannelHandler binder = new Binder(localAddress, futureQueue);
ChannelHandler parentHandler = getParentHandler();
这里创建了一个Binder,它继承了SimpleChannelUpstreamHandler。先说说UpStreamHandler和DownStreamHandler,一般来说,UpStream类型的事件主要是由网络底层反馈给Netty的,比如messageReceived,channelConnected等事件,而DownStream类型的事件是由框架自己发起的,比如bind,write,connect,close等事件。
接着
ChannelPipeline bossPipeline = pipeline();
bossPipeline.addLast("binder", binder);
if (parentHandler != null) {
bossPipeline.addLast("userHandler", parentHandler);
Channel channel = getFactory().newChannel(bossPipeline);
这里创建出一个channel,每一个channel都是由一个tcp四元组组成。channel由ChannelFactory创建而成。在创建完NioServerSocketChannel后,会调用
fireChannelOpen(this);这是发出一个ChannelState.OPEN事件,前面注册的BinderHandler会处理这个事件。我们来看看Binder的代码
public void channelOpen(
ChannelHandlerContext ctx,
ChannelStateEvent evt) {
evt.getChannel().getConfig().setPipelineFactory(getPipelineFactory());
// Split options into two categories: parent and child.
Map&String, Object& allOptions = getOptions();
Map&String, Object& parentOptions = new HashMap&String, Object&();
for (Entry&String, Object& e: allOptions.entrySet()) {
if (e.getKey().startsWith("child.")) {
childOptions.put(
e.getKey().substring(6),
e.getValue());
} else if (!e.getKey().equals("pipelineFactory")) {
parentOptions.put(e.getKey(), e.getValue());
// Apply parent options.
evt.getChannel().getConfig().setOptions(parentOptions);
} finally {
ctx.sendUpstream(evt);
boolean finished = futureQueue.offer(evt.getChannel().bind(localAddress)); //这里发出bind事件,return Channels.bind(this, localAddress)
bind就触发了一个DownStream的ChannelState.BOUND事件。表明需要将该Channel绑定至指定的地址。
public void sendDownstream(ChannelEvent e) {
DefaultChannelHandlerContext tail = getActualDownstreamContext(this.tail);
if (tail == null) {
getSink().eventSunk(this, e);
} catch (Throwable t) {
notifyHandlerException(e, t);
sendDownstream(tail, e);
接着就要看NioServerSocketPipelineSink了,这个主要关注于具体传输数据的处理,同时也包括其他方面的内容,比如异常处理等等。执行eventSunk方法。
public void eventSunk(
ChannelPipeline pipeline, ChannelEvent e) throws Exception {
Channel channel = e.getChannel();
if (channel instanceof NioServerSocketChannel) {
handleServerSocket(e);
} else if (channel instanceof NioSocketChannel) {
handleAcceptedSocket(e);
nio方式ChannelSink一般会有1个boss实例(implements Runnable),以及若干个worker实例(不设置默认为cpu cores*2),它将channel分为 ServerSocketChannel和SocketChannel分开处理。这主要原因是boss线程accept()一个新的连接生成一个 SocketChannel交给worker进行数据接收。
看下ServerSocketChannel的处理
private void handleServerSocket(ChannelEvent e) {
if (!(e instanceof ChannelStateEvent)) {
ChannelStateEvent event = (ChannelStateEvent)
NioServerSocketChannel channel =
(NioServerSocketChannel) event.getChannel();
ChannelFuture future = event.getFuture();
ChannelState state = event.getState();
Object value = event.getValue();
switch (state) {
case OPEN:
if (Boolean.FALSE.equals(value)) {
close(channel, future);
case BOUND:
if (value != null) {
bind(channel, future, (SocketAddress) value);
close(channel, future);
主要是处理bind事件,
private void bind(
NioServerSocketChannel channel, ChannelFuture future,
SocketAddress localAddress) {
boolean bound =
boolean bossStarted =
channel.socket.socket().bind(localAddress, channel.getConfig().getBacklog());
future.setSuccess();
fireChannelBound(channel, channel.getLocalAddress());
//取出一个boss线程,然后交给Boss类去处理。
Executor bossExecutor =
((NioServerSocketChannelFactory) channel.getFactory()).bossE
DeadLockProofWorker.start(
bossExecutor,
new ThreadRenamingRunnable(
new Boss(channel),
"New I/O server boss #" + id + " (" + channel + ')'));
bossStarted =
} catch (Throwable t) {
future.setFailure(t);
fireExceptionCaught(channel, t);
} finally {
if (!bossStarted && bound) {
close(channel, future);
看下Boss类,它实现了Runnable接口
private final S
private final NioServerSocketC
Boss(NioServerSocketChannel channel) throws IOException {
this.channel =
selector = Selector.open();
boolean registered =
channel.socket.register(selector, SelectionKey.OP_ACCEPT);
registered =
} finally {
if (!registered) {
closeSelector();
channel.selector =
代码是不是有点熟悉,没错,是nio里的代码,需要注意的是,ServerSocketChannel只注册OP_ACCEPT事件。
再看下Boss类的run方法
public void run() {
final Thread currentThread = Thread.currentThread();
channel.shutdownLock.lock();
for (;;) {
if (selector.select(1000) & 0) {
selector.selectedKeys().clear();
SocketChannel acceptedSocket = channel.socket.accept();
if (acceptedSocket != null) {
registerAcceptedChannel(acceptedSocket, currentThread);
} catch (SocketTimeoutException e) {
// Thrown every second to get ClosedChannelException
// raised.
} catch (CancelledKeyException e) {
// Raised by accept() when the server socket was closed.
} catch (ClosedSelectorException e) {
// Raised by accept() when the server socket was closed.
} catch (ClosedChannelException e) {
// Closed as requested.
} catch (Throwable e) {
logger.warn(
"Failed to accept a connection.", e);
Thread.sleep(1000);
} catch (InterruptedException e1) {
} finally {
channel.shutdownLock.unlock();
closeSelector();
这里会调用registerAcceptedChannel(acceptedSocket, currentThread);方法
private void registerAcceptedChannel(SocketChannel acceptedSocket, Thread currentThread) {
ChannelPipeline pipeline =
channel.getConfig().getPipelineFactory().getPipeline();
NioWorker worker = nextWorker(); //获取一个NioWorker
//将Channel注册到NioWorker上去
worker.register(new NioAcceptedSocketChannel(
channel.getFactory(), pipeline, channel,
NioServerSocketPipelineSink.this, acceptedSocket,
worker, currentThread), null);
} catch (Exception e) {
logger.warn(
"Failed to initialize an accepted socket.", e);
acceptedSocket.close();
} catch (IOException e2) {
logger.warn(
"Failed to close a partially accepted socket.",
当有新的连接建立,会交给NioWorker的线程池去处理,boss只负责accept到新的连接,新的SocketChannel会被注册到一个work线程中去。
等级: 初级会员
来自: 厦门--&北京
发表时间:&&
谢谢了,最近正好正在用netty,不过我想的问的是为什么接收消息是叫UpStreamHandler,而发送消息是叫DownStreamHandler,不是和我们正常的理解相反么?
请登录后投票
fengfeng925
来自: 北京
发表时间:&&
finallygo 写道谢谢了,最近正好正在用netty,不过我想的问的是为什么接收消息是叫UpStreamHandler,而发送消息是叫DownStreamHandler,不是和我们正常的理解相反么?
从iso网络七层模型来理解
请登录后投票
等级: 初级会员
来自: 深圳
发表时间:&&
fengfeng925 写道finallygo 写道谢谢了,最近正好正在用netty,不过我想的问的是为什么接收消息是叫UpStreamHandler,而发送消息是叫DownStreamHandler,不是和我们正常的理解相反么?
从iso网络七层模型来理解
不用那么弦乎吧。字面上理解也行啊,down就是下发的意思啊,反过来接收就是up啦 ,嘿嘿
请登录后投票
等级: 初级会员
来自: 长沙
发表时间:&&
ljl_ss 写道fengfeng925 写道finallygo 写道谢谢了,最近正好正在用netty,不过我想的问的是为什么接收消息是叫UpStreamHandler,而发送消息是叫DownStreamHandler,不是和我们正常的理解相反么?
从iso网络七层模型来理解
不用那么弦乎吧。字面上理解也行啊,down就是下发的意思啊,反过来接收就是up啦 ,嘿嘿
我也觉得就是把思维角度换一下去理解。和我们自身的习惯有所不同而已.呵呵
请登录后投票
来自: 大连
发表时间:&&
bootstrap.setOption("receiveBufferSize", 1048576 * 64);& 这么漂亮的代码为什么还要用魔数呢
请登录后投票
等级: 初级会员
来自: 深圳
发表时间:&&
引用:if you want to learn
something, you might read up
on it and then write down some notes.
请登录后投票
来自: 北京
发表时间:&&
yuanq_20 写道
引用:if you want to learn something, you might read up on it and then write down some notes.
这个图很漂亮,我也写了netty的学习系列文章,希望大家批评指正:
请登录后投票
跳转论坛:移动开发技术
Web前端技术
Java企业应用
编程语言技术91 条评论分享收藏感谢收起赞同 2添加评论分享收藏感谢收起写回答博客分类:
一、生成serversocketchannel
ServerBootstrap设置channel类型 bootstrap.channel(NioServerSocketChannel.class)时,ServerBootstrap的父类AbstractBootstrap的初始ChannelFactory的对象,ChannelFactory的作用是生成ServerSocketChannel对象,channel方法代码:
public B channel(Class&? extends C& channelClass) {
if (channelClass == null) {
throw new NullPointerException("channelClass");
return channelFactory(new BootstrapChannelFactory&C&(channelClass));
BootstrapChannelFactory为AbstractBootstrap的内部类。
二、将serverchannel注册到selector
ServerBootstrap在绑定端口(bootstrap.bind(8080))时,调用AbstractBootstrap的initAndRegister方法:
final ChannelFuture initAndRegister() {
final Channel channel = channelFactory().newChannel();//(1)
init(channel);//(2)
} catch (Throwable t) {
channel.unsafe().closeForcibly();
return channel.newFailedFuture(t);
ChannelPromise regPromise = channel.newPromise();//(3)
group().register(channel, regPromise);//(4)
if (regPromise.cause() != null) {
if (channel.isRegistered()) {
channel.close();
channel.unsafe().closeForcibly();
return regP
1、调用ChannelFactory对象生成ServerBootstrap 的channel(Class&? extends C& channelClass)方法中设置的channelClass对象即NioServerSocketChannel对象,也就是Netty重新封装过的ServerSocketChannel对象,至于Netty为什么要封装ServerSocketChannel后面章节再写。
2、 初化NioServerSocketChannel对象,将我们在创建ServerBootstrap对象中设置的option 和attr值设置到NioServerSocketChannel对象中的config和arrs属性中。
3、生成ChannelPromise对象,这个对象主要是对channel的注册状态进行监听
4、 取eventLoop设置到channel中,并调用AbstractChannel$AbstractUnsafe的register0方法奖channel注册到eventLoop中的selector,并将ChannelPromise状态设置为成功:
private void register0(ChannelPromise promise) {
// check if the channel is still open as it could be closed in the mean time when the register
// call was outside of the eventLoop
if (!ensureOpen(promise)) {
doRegister();
//将serverchannel注册到selector中
registered =
promise.setSuccess();
pipeline.fireChannelRegistered();
if (isActive()) {
pipeline.fireChannelActive();
} catch (Throwable t) {
// Close the channel directly to avoid FD leak.
closeForcibly();
closeFuture.setClosed();
if (!promise.tryFailure(t)) {
logger.warn(
"Tried to fail the registration promise, but it is complete already. " +
"Swallowing the cause of the registration failure:", t);
三、绑定端口
通过initAndRegister方法将serverchannel注册到selector后调用doBind0方法注册端口
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
// This method is invoked before channelRegistered() is triggered.
Give user handlers a chance to set up
// the pipeline in its channelRegistered() implementation.
channel.eventLoop().execute(new Runnable() { //(1)
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
promise.setFailure(regFuture.cause());
浏览: 3505 次
来自: 桂林
[list][list][*][list][*][*][lis ...
(window.slotbydup=window.slotbydup || []).push({
id: '4773203',
container: s,
size: '200,200',
display: 'inlay-fix'}

我要回帖

更多关于 android netty服务端 的文章

更多推荐

版权声明:文章内容来源于网络,版权归原作者所有,如有侵权请点击这里与我们联系,我们将及时删除。

点击添加站长微信