本文使用netty-4.1.5.Final
版本源码进行分析
Bootstrap是Socket客户端创建工具类,用户通过Bootstrap可以方便的创建Netty的客户端并发起异步TCP连接操作。下面我们看一下客户端Bootstrap如何启动并对启动过程的源码进行分析。
Bootstrap客户端启动 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true )
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel (SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new EchoClientHandler());
}
});
ChannelFuture f = b.connect(HOST, PORT).sync();
f.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
创建Bootstrap实例,绑定用于处理I/O事件及task任务的workerGroup,并设置其他相关定制参数,最后对指定的host和port发起建立连接请求,就完成了启动Netty客户端的代码。
Bootstrap服务端启动源码分析 创建Bootstrap实例 1
Bootstrap b = new Bootstrap();
设置Bootstrap参数 绑定Reactor线程池
1
2
3
4
5
6
7
8
9
10
public B group (EventLoopGroup group) {
if (group == null ) {
throw new NullPointerException("group" );
}
if (this .group != null ) {
throw new IllegalStateException("group set already" );
}
this .group = group;
return (B) this ;
}
group线程组主要用于I/O读写以及task执行
设置channel类型,用于根据class类型反射创建对应channel
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public B channel (Class<? extends C> channelClass) {
if (channelClass == null ) {
throw new NullPointerException("channelClass" );
}
return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
}
public B channelFactory (ChannelFactory<? extends C> channelFactory) {
if (channelFactory == null ) {
throw new NullPointerException("channelFactory" );
}
if (this .channelFactory != null ) {
throw new IllegalStateException("channelFactory set already" );
}
this .channelFactory = channelFactory;
return (B) this ;
}
public class ReflectiveChannelFactory <T extends Channel > implements ChannelFactory <T > {
private final Class<? extends T> clazz;
public ReflectiveChannelFactory (Class<? extends T> clazz) {
if (clazz == null ) {
throw new NullPointerException("clazz" );
}
this .clazz = clazz;
}
@Override
public T newChannel () {
try {
return clazz.newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + clazz, t);
}
}
@Override
public String toString () {
return StringUtil.simpleClassName(clazz) + ".class" ;
}
}
根据channel的类型,创建一个生产channel的工厂,用于通过channel类型反射创建对应的channel,服务端一般使用NioServerSocketChannel.class
设置channel参数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public <T> B option (ChannelOption<T> option, T value) {
if (option == null ) {
throw new NullPointerException("option" );
}
if (value == null ) {
synchronized (options) {
options.remove(option);
}
} else {
synchronized (options) {
options.put(option, value);
}
}
return (B) this ;
}
设置handler
1
2
3
4
5
6
7
public B handler (ChannelHandler handler) {
if (handler == null ) {
throw new NullPointerException("handler" );
}
this .handler = handler;
return (B) this ;
}
为NioSocketChannel设置ChannelHandler,ChannelHandler是netty提供给用户定制和扩展的关键接口,用户可以通过自定义ChannelHandler,添加具体的业务逻辑处理。
启动Bootstrap的核心逻辑源码分析 启动客户端Bootstrap,需要对指定的host和port发起建立连接请求,首先需要将Channel初始化并注册到eventLoop中。
Bootstrap启动的核心逻辑总共分三步操作:
初始化
注册
发起连接
首先看下启动过程的主干逻辑,然后再具体分析每一步操作的具体逻辑:
1
2
3
4
5
6
7
public ChannelFuture connect (SocketAddress remoteAddress) {
if (remoteAddress == null ) {
throw new NullPointerException("remoteAddress" );
}
validate();
return doResolveAndConnect(remoteAddress, config.localAddress());
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
private ChannelFuture doResolveAndConnect (final SocketAddress remoteAddress, final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.isDone()) {
if (!regFuture.isSuccess()) {
return regFuture;
}
return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise());
} else {
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete (ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null ) {
promise.setFailure(cause);
} else {
promise.registered();
doResolveAndConnect0(channel, remoteAddress, localAddress, promise);
}
}
});
return promise;
}
}
首先初始化NioSocketChannel并注册到group的eventLoop上,当NioSocketChannel已经执行完成注册操作,则直接发起连接操作,否则添加Listener当执行完注册操作后再回调Listener发起连接。
初始化NioSocketChannel并注册到group的eventLoop上
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
final ChannelFuture initAndRegister () {
Channel channel = null ;
try {
channel = channelFactory.newChannel();
init(channel);
} catch (Throwable t) {
if (channel != null ) {
channel.unsafe().closeForcibly();
}
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null ) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
}
创建并初始化NioSocketChannel,然后将其注册到group的eventLoop上去,并返回注册操作的结果。
初始化 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
void init (Channel channel) throws Exception {
ChannelPipeline p = channel.pipeline();
p.addLast(config.handler());
final Map<ChannelOption<?>, Object> options = options0();
synchronized (options) {
for (Entry<ChannelOption<?>, Object> e: options.entrySet()) {
try {
if (!channel.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
logger.warn("Unknown channel option: " + e);
}
} catch (Throwable t) {
logger.warn("Failed to set a channel option: " + channel, t);
}
}
}
final Map<AttributeKey<?>, Object> attrs = attrs0();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
channel.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
}
}
注册 注册NioSocketChannel,从group选择一个eventLoop线程,将NioSocketChannel注册到该eventLoop的selector上,通过channel获取unsafe,进而操作底层NIO的api进行注册操作
1
2
3
4
5
public ChannelFuture register (final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise" );
promise.channel().unsafe().register(this , promise);
return promise;
}
unsafe的register方法,判断当前线程是否是对应channel的eventLoop线程来决定是直接执行register0还是封装一个task交由对应的eventLoop来执行register0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
public final void register (EventLoop eventLoop, final ChannelPromise promise) {
if (eventLoop == null ) {
throw new NullPointerException("eventLoop" );
}
if (isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already" ));
return ;
}
if (!isCompatible(eventLoop)) {
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return ;
}
AbstractChannel.this .eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run () {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}" ,
AbstractChannel.this , t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}
private void register0 (ChannelPromise promise) {
try {
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return ;
}
boolean firstRegistration = neverRegistered;
doRegister();
neverRegistered = false ;
registered = true ;
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
beginRead();
}
}
} catch (Throwable t) {
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
调用底层NIO的api执行注册操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
protected void doRegister () throws Exception {
boolean selected = false ;
for (;;) {
try {
selectionKey = javaChannel().register(eventLoop().selector, 0 , this );
return ;
} catch (CancelledKeyException e) {
if (!selected) {
eventLoop().selectNow();
selected = true ;
} else {
throw e;
}
}
}
}
发起连接 注册完成回到连接方法,当NioSocketChannel已经执行完成注册操作,则直接发起连接操作,否则添加Listener当执行完注册操作后再回调Listener发起连接操作,连接操作需要操作channel,所以需要封装成task任务交由channel对应的eventLoop线程执行其connect方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
private ChannelFuture doResolveAndConnect0 (final Channel channel, SocketAddress remoteAddress,
final SocketAddress localAddress, final ChannelPromise promise) {
try {
final EventLoop eventLoop = channel.eventLoop();
final AddressResolver<SocketAddress> resolver = this .resolver.getResolver(eventLoop);
if (!resolver.isSupported(remoteAddress) || resolver.isResolved(remoteAddress)) {
doConnect(remoteAddress, localAddress, promise);
return promise;
}
final Future<SocketAddress> resolveFuture = resolver.resolve(remoteAddress);
if (resolveFuture.isDone()) {
final Throwable resolveFailureCause = resolveFuture.cause();
if (resolveFailureCause != null ) {
channel.close();
promise.setFailure(resolveFailureCause);
} else {
doConnect(resolveFuture.getNow(), localAddress, promise);
}
return promise;
}
resolveFuture.addListener(new FutureListener<SocketAddress>() {
@Override
public void operationComplete (Future<SocketAddress> future) throws Exception {
if (future.cause() != null ) {
channel.close();
promise.setFailure(future.cause());
} else {
doConnect(future.getNow(), localAddress, promise);
}
}
});
} catch (Throwable cause) {
promise.tryFailure(cause);
}
return promise;
}
封装成task任务交由channel对应的eventLoop线程来执行,防止并发操作channel
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private static void doConnect (
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise connectPromise) {
final Channel channel = connectPromise.channel();
channel.eventLoop().execute(new Runnable() {
@Override
public void run () {
if (localAddress == null ) {
channel.connect(remoteAddress, connectPromise);
} else {
channel.connect(remoteAddress, localAddress, connectPromise);
}
connectPromise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
}
});
}
从责任链中获取ChannelOutboundHandler执行connect方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public ChannelFuture connect (
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
if (remoteAddress == null ) {
throw new NullPointerException("remoteAddress" );
}
if (!validatePromise(promise, false )) {
return promise;
}
final AbstractChannelHandlerContext next = findContextOutbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeConnect(remoteAddress, localAddress, promise);
} else {
safeExecute(executor, new Runnable() {
@Override
public void run () {
next.invokeConnect(remoteAddress, localAddress, promise);
}
}, promise, null );
}
return promise;
}
1
2
3
4
5
6
7
8
9
10
11
private void invokeConnect (SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
if (invokeHandler()) {
try {
((ChannelOutboundHandler) handler()).connect(this , remoteAddress, localAddress, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
} else {
connect(remoteAddress, localAddress, promise);
}
}
连接需要用的ChannelOutboundHandler是责任链的头部ChannelHandler:HeadContext
1
2
3
4
5
6
public void connect (
ChannelHandlerContext ctx,
SocketAddress remoteAddress, SocketAddress localAddress,
ChannelPromise promise) throws Exception {
unsafe.connect(remoteAddress, localAddress, promise);
}
最终调用unsafe的connect方法,也是真正进行连接操作的核心逻辑
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public final void connect (
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return ;
}
try {
if (connectPromise != null ) {
throw new ConnectionPendingException();
}
boolean wasActive = isActive();
if (doConnect(remoteAddress, localAddress)) {
fulfillConnectPromise(promise, wasActive);
} else {
connectPromise = promise;
requestedRemoteAddress = remoteAddress;
int connectTimeoutMillis = config().getConnectTimeoutMillis();
if (connectTimeoutMillis > 0 ) {
connectTimeoutFuture = eventLoop().schedule(new Runnable() {
@Override
public void run () {
ChannelPromise connectPromise = AbstractNioChannel.this .connectPromise;
ConnectTimeoutException cause =
new ConnectTimeoutException("connection timed out: " + remoteAddress);
if (connectPromise != null && connectPromise.tryFailure(cause)) {
close(voidPromise());
}
}
}, connectTimeoutMillis, TimeUnit.MILLISECONDS);
}
promise.addListener(new ChannelFutureListener() {
@Override
public void operationComplete (ChannelFuture future) throws Exception {
if (future.isCancelled()) {
if (connectTimeoutFuture != null ) {
connectTimeoutFuture.cancel(false );
}
connectPromise = null ;
close(voidPromise());
}
}
});
}
} catch (Throwable t) {
promise.tryFailure(annotateConnectException(t, remoteAddress));
closeIfClosed();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
protected boolean doConnect (SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
if (localAddress != null ) {
doBind0(localAddress);
}
boolean success = false ;
try {
boolean connected = javaChannel().connect(remoteAddress);
if (!connected) {
selectionKey().interestOps(SelectionKey.OP_CONNECT);
}
success = true ;
return connected;
} finally {
if (!success) {
doClose();
}
}
}
调用doConnect方法获得底层NIO原生的SocketChannel发起连接操作,并返回连接操作的结果,因为此处的SocketChannel被设置为非阻塞,所以这里有可能没有立即连接成功,如果当前操作立即连接成功,则调用fulfillConnectPromise方法进而触发fireChannelActive事件;如果当前操作未能立即连接成功,则设置SelectionKey.OP_CONNECT
监听位,异步监听连接就绪事件来完成连接操作,并为当前连接添加处理连接超时的定时任务task,并设置Listener用来在连接失败时清理废弃的定时任务task。
如果当前操作立即连接成功,则调用fulfillConnectPromise方法进而触发fireChannelActive事件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private void fulfillConnectPromise (ChannelPromise promise, boolean wasActive) {
if (promise == null ) {
return ;
}
boolean active = isActive();
boolean promiseSet = promise.trySuccess();
if (!wasActive && active) {
pipeline().fireChannelActive();
}
if (!promiseSet) {
close(voidPromise());
}
}
如果当前操作未能立即连接成功,则设置SelectionKey.OP_CONNECT
监听位,异步监听连接就绪事件来完成连接操作,异步监听的逻辑在NioEventLoop的处理就绪的SelectedKey的方法中
1
2
3
4
5
6
7
8
9
if ((readyOps & SelectionKey.OP_CONNECT) != 0 ) {
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public final void finishConnect () {
assert eventLoop () .inEventLoop () ;
try {
boolean wasActive = isActive();
doFinishConnect();
fulfillConnectPromise(connectPromise, wasActive);
} catch (Throwable t) {
fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
} finally {
if (connectTimeoutFuture != null ) {
connectTimeoutFuture.cancel(false );
}
connectPromise = null ;
}
}
1
2
3
4
5
protected void doFinishConnect () throws Exception {
if (!javaChannel().finishConnect()) {
throw new Error();
}
}
至此,Bootstrap客户端启动部分源码已经分析完成。