Dubbo source code network communication

Dubbo source code network communication

Introduced the Dubbo communication process, following the source code debugging, if you have any problems, please point out the big guys

What will the service exposure do?

  1. Register ZK, monitor the dynamic configuration node
  2. Open the server
  3. Create proxy service
  4. Exporter -> Invoker -> proxyService

What will the service reference do?

  1. Register ZK, monitor dynamic configuration node, providr node, routing node
  2. Open the client
  3. Create proxy service
  4. proxyService -> Invoker

Client request

ConsumerProxyService -> Invoker DubboInvoker  -> Exchanger HeaderExchangeClient  -> Transporter NettyClient  ->   -> SEND-TO-SERVER ( DefaultFuture Request )
 

Server response

  -> Transporter NettyServer ->  Handlers ->   -> Exporter#getInvoker -> Invoker#invoke -> ProviderProxyService -> callback 
 

Exchanger

Exchangers

Facade class, provides various convenient methods, first obtain through SPI Exchanger, and then call Exchangerrelated methods to create ExchangeServer,ExchangeClient

Exchanger

SPI interface, the default implementation class HeaderExchanger, provides two shortcut methods to create ExchangeServer,ExchangeClient

@SPI(HeaderExchanger.NAME)
public interface Exchanger {
    @Adaptive({Constants.EXCHANGER_KEY})
    ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException;

    @Adaptive({Constants.EXCHANGER_KEY})
    ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException;
}

public class HeaderExchanger implements Exchanger {
    public static final String NAME = "header";

    @Override
    public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
        return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
    }

    @Override
    public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
        return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
    }
}
 

ExchangeServer

Server side use, default implementation class HeaderExchangeServer, internal call to Transporterenable Server service

public interface ExchangeServer extends Server {
    Collection<ExchangeChannel> getExchangeChannels();

    ExchangeChannel getExchangeChannel(InetSocketAddress remoteAddress);
}
 

ExchangeClient

Client side use, default implementation class HeaderExchangeClient, core requestmethod, internal call to Transportersend request

public interface ExchangeClient extends Client, ExchangeChannel {
}
 

ExchangeChannel

The default implementation class HeaderExchangeChannel, as HeaderExchangeClientan attribute of

Transporter

Transporters

Facade class, provides various convenient methods, first obtain through SPI Transporter, and then call Transporterrelated methods to create Server,Client

Transporter

SPI interface, the default implementation class NettyTransporter, provides two shortcut methods to create Server,Client

@SPI("netty")
public interface Transporter {
    @Adaptive({Constants.SERVER_KEY, Constants.TRANSPORTER_KEY})
    Server bind(URL url, ChannelHandler handler) throws RemotingException;

    @Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY})
    Client connect(URL url, ChannelHandler handler) throws RemotingException;
}

public class NettyTransporter implements Transporter {
    public static final String NAME = "netty";

    @Override
    public Server bind(URL url, ChannelHandler listener) throws RemotingException {
        return new NettyServer(url, listener);
    }

    @Override
    public Client connect(URL url, ChannelHandler listener) throws RemotingException {
        return new NettyClient(url, listener);
    }
}
 

Server

Server side use, default implementation class NettyServer, used to open Server service, core methoddoOpen

public class NettyServer extends AbstractServer implements Server {
}
 

Client

Used on the client side, the default implementation class NettyClient, the core requestmethod is used to send requests, and is doOpenused to establish a connection with the server

public class NettyClient extends AbstractClient {
}
 

Server start service

DubboProtocol#export =>
DubboProtocol#openServer => 
DubboProtocol#createServer =>
Exchangers#bind => 
NettyServer#doOpen
 

In the end, NettyServer#doOpena Server side was opened through Netty in

DubboProtocol#createServer
    => Exchangers#bind(url, requestHandler)
        => HeaderExchanger#bind(url, requestHandler)
            => return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))))
                
//Transporters#bind   
Transporters#bind
    => NettyTransporter#bind(url, handler)
        => return new NettyServer(url, handler)
            =>  NettyServer#doOpen NettyServer doOpen 
 

That is NettyServer, the handerattributes in, ultimately point to new DecodeHandler(new HeaderExchangeHandler(handler)). Finally, the Server side returns HeaderExchangeServer, and then in NettyServerthe constructor, handlesome encapsulation is actually done

public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
   //the handler will be warped: MultiMessageHandler->HeartbeatHandler->handler
    super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
}

public class ChannelHandlers {
    private static ChannelHandlers INSTANCE = new ChannelHandlers();

    protected ChannelHandlers() {}

    public static ChannelHandler wrap(ChannelHandler handler, URL url) {
        return ChannelHandlers.getInstance().wrapInternal(handler, url);
    }
    protected static ChannelHandlers getInstance() {
        return INSTANCE;
    }
    static void setTestingChannelHandlers(ChannelHandlers instance) {
        INSTANCE = instance;
    }
    protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
        return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
                .getAdaptiveExtension().dispatch(handler, url)));
    }
}
 

So, NettyServerthe handerattributes in the end point toMultiMessageHandler -> HeartbeatHandler -> AllDispatcher -> DecodeHandler

Client connection service

The call chain is too long, and the hidden is very deep, the emphasis is omitted, which is created when the Invoker is generated for the Reference object when the application starts

RegistryProtocol#doRefer =>
RegistryDirectory#subscribe =>
RegistryDirectory#toInvokers => 
ProtocolFilterWrapper#refer =>
AbstractProtocol#refer =>
DubboProtocol#protocolBindingRefer =>
DubboProtocol#getClients =>
DubboProtocol#getSharedClient =>
DubboProtocol#buildReferenceCountExchangeClientList =>
DubboProtocol#buildReferenceCountExchangeClient =>
DubboProtocol#initClient =>
Exchangers#connect =>
HeaderExchanger#connect =>
Transporters#connect =>
NettyTransporter#connect =>
NettyClient#<init> =>
NettyClient#doOpen
 

Finally, NettyClient#doOpenestablish a connection with Server through Netty in

Exchangers#connect
    => HeaderExchanger#connect(url, handler)
        => return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true)
                
//Transporters#connect   
Transporters#connect
    => NettyTransporter#connect(url, handler)
        => return new NettyClient(url, handler)
            =>  NettyClient#doOpen NettyClient doOpen 
 

That is NettyClient, the handerattributes in, ultimately point to new DecodeHandler(new HeaderExchangeHandler(handler)). Finally, the client side returns HeaderExchangeClient, and the clientattributes in it are also NettyClientpackaged

However DubboProtocol#buildReferenceCountExchangeClient, there is HeaderExchangeClienta layer of packaging in the method, and the client type in the final Invoker isReferenceCountExchangeClient

private ReferenceCountExchangeClient buildReferenceCountExchangeClient(URL url) {
    ExchangeClient exchangeClient = initClient(url);

    return new ReferenceCountExchangeClient(exchangeClient);
}
 

ReferenceCountExchangeClientIt's HeaderExchangeClientno different from that, except that it has a layer of packaging, and then there is a more important attribute referenceCountto record the number of clients?

Client sends request

  ->
InvokerInvocationHandler#invoke ->
MockClusterInvoker#invoke ->
AbstractClusterInvoker#invoke LoadBalance  -> 
FailoverClusterInvoker#doInvoke  ->
ProtocolFilterWrapper#invoke Filter  ->
AbstractInvoker#invoke Attachments  ->
DubboInvoker#doInvoke Exchange  ->
ReferenceCountExchangeClient#request ->
HeaderExchangeClient#request ->
HeaderExchangeChannel#request return CompletableFuture  ->
AbstractPeer#send ->
AbstractClient#send ->
NettyChannel#send ->
Channel#writeAndFlush 
 

From the DubboInvoker#doInvokebeginning to interact with the Exchange layer, the core code is as follows

protected Result doInvoke(final Invocation invocation) throws Throwable {
    ExchangeClient currentClient;
    if (clients.length == 1) {
        currentClient = clients[0];
    } else {
        currentClient = clients[index.getAndIncrement() % clients.length];
    }
    boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
   //return = false oneWay  Future 
    if (isOneway) {
       //send=true 
        boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
        currentClient.send(inv, isSent);
        RpcContext.getContext().setFuture(null);
        return AsyncRpcResult.newDefaultAsyncResult(invocation);
    } else {
        AsyncRpcResult asyncRpcResult = new AsyncRpcResult(inv);
        CompletableFuture<Object> responseFuture = currentClient.request(inv, timeout);
        asyncRpcResult.subscribeTo(responseFuture);
        RpcContext.getContext().setFuture(new FutureAdapter(asyncRpcResult));
        return asyncRpcResult;
    }
}
 
ReferenceCountExchangeClient#request => 
HeaderExchangeClient#request =>  
HeaderExchangeChannel#request
 
//HeaderExchangeChannel.java
public CompletableFuture<Object> request(Object request, int timeout) throws RemotingException {
    if (closed) {
        throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
    }
   //create request.
    Request req = new Request();
    req.setVersion(Version.getProtocolVersion());
    req.setTwoWay(true);
    req.setData(request);
    DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout);
    try {
        channel.send(req);
    } catch (RemotingException e) {
        future.cancel();
        throw e;
    }
    return future;
}
 

In this method, there are several points to note:

  1. Inside the Requestconstructor, Requestan incrementally unique ID will be generated to identify the request
  2. channel#sendCall process, involving the NettyChannel#getOrAddChannelcalling method, NettyChannelthere is a ConcurrentMap<Channel, NettyChannel> CHANNEL_MAPbuffer for maintaining io.netty.channel.Channeland NettyChannelrelationships
  3. channel#sendIn the calling process NettyChannel#send, the method will eventually be called , which actually sends the message to the Server side
  4. What is returned DefaultFutureis aCompletableFuture
//NettyChannel.java
public void send(Object message, boolean sent) throws RemotingException {
    boolean success = true;
    int timeout = 0;
    try {
       // Server
        ChannelFuture future = channel.writeAndFlush(message);
        if (sent) {
           //  send=true  
            timeout = getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
            success = future.await(timeout);
        }
        Throwable cause = future.cause();
        if (cause != null) {
            throw cause;
        }
    } catch (Throwable e) {
        throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e);
    }
    if (!success) {
        throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress()
                + "in timeout(" + timeout + "ms) limit");
    }
}
 

From the above message sending process, it seems that I did not see the encoding of the message. That is because the codec has been set when the Netty client is initialized.

//NettyClient.java 
protected void doOpen() throws Throwable {
    final NettyClientHandler nettyClientHandler = new NettyClientHandler(getUrl(), this);
    bootstrap = new Bootstrap();
    bootstrap.group(nioEventLoopGroup)
            .option(ChannelOption.SO_KEEPALIVE, true)
            .option(ChannelOption.TCP_NODELAY, true)
            .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
            .channel(NioSocketChannel.class);
    if (getConnectTimeout() < 3000) {
        bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000);
    } else {
        bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getConnectTimeout());
    }

    bootstrap.handler(new ChannelInitializer() {
        @Override
        protected void initChannel(Channel ch) throws Exception {
            int heartbeatInterval = UrlUtils.getHeartbeat(getUrl());
            NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
            ch.pipeline()
                    .addLast("decoder", adapter.getDecoder())
                    .addLast("encoder", adapter.getEncoder())
                    .addLast("client-idle-handler", new IdleStateHandler(heartbeatInterval, 0, 0, MILLISECONDS))
                    .addLast("handler", nettyClientHandler);
            String socksProxyHost = ConfigUtils.getProperty(SOCKS_PROXY_HOST);
            if(socksProxyHost != null) {
                int socksProxyPort = Integer.parseInt(ConfigUtils.getProperty(SOCKS_PROXY_PORT, DEFAULT_SOCKS_PROXY_PORT));
                Socks5ProxyHandler socks5ProxyHandler = new Socks5ProxyHandler(new InetSocketAddress(socksProxyHost, socksProxyPort));
                ch.pipeline().addFirst(socks5ProxyHandler);
            }
        }
    });
}
 

After the first encoder, i.e. InternalEncoder#encodethe method, InternalEncoderimplements MessageToByteEncoderthe interface, the interior of the method calls Codec2related methods, and Codec2a SPI interface, the default implementationDubboCodec

NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
protected static Codec2 getChannelCodec(URL url) {
    String codecName = url.getParameter(Constants.CODEC_KEY, "telnet");
    if (ExtensionLoader.getExtensionLoader(Codec2.class).hasExtension(codecName)) {
        return ExtensionLoader.getExtensionLoader(Codec2.class).getExtension(codecName);
    } else {
        return new CodecAdapter(ExtensionLoader.getExtensionLoader(Codec.class).getExtension(codecName));
    }
}
 

The server responds to the request

I mentioned above NettyServerthe handerproperties point MultiMessageHandler -> HeartbeatHandler -> AllDispatcher -> DecodeHandler -> HeaderExchangeHandler and NettyServeropen end Server code is as follows

protected void doOpen() throws Throwable {
    bootstrap = new ServerBootstrap();
    bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
    workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
            new DefaultThreadFactory("NettyServerWorker", true));
    final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
    channels = nettyServerHandler.getChannels();

    bootstrap.group(bossGroup, workerGroup)
            .channel(NioServerSocketChannel.class)
            .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
            .childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
            .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
            .childHandler(new ChannelInitializer<NioSocketChannel>() {
                @Override
                protected void initChannel(NioSocketChannel ch) throws Exception {
                    int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
                    NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
                    ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
                            .addLast("decoder", adapter.getDecoder())
                            .addLast("encoder", adapter.getEncoder())
                            .addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))
                            .addLast("handler", nettyServerHandler);
                }
            });
    ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
    channelFuture.syncUninterruptibly();
    channel = channelFuture.channel();
}
 
  1. After the first decoder, i.e. InternalDecoder#decodemethods, InternalDecoderimplements ByteToMessageDecoderthe interface, the interior of the method calls Codec2related methods, and Codec2a SPI interface, the default implementationDubboCodec
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
protected static Codec2 getChannelCodec(URL url) {
    String codecName = url.getParameter(Constants.CODEC_KEY, "telnet");
    if (ExtensionLoader.getExtensionLoader(Codec2.class).hasExtension(codecName)) {
        return ExtensionLoader.getExtensionLoader(Codec2.class).getExtension(codecName);
    } else {
        return new CodecAdapter(ExtensionLoader.getExtensionLoader(Codec.class).getExtension(codecName));
    }
}
 
  1. MultiMessageHandlerUsed to process array messages. If the message is of MultiMessagetype and MultiMessagean Iterablearray is implemented , the received method of the handle is traversed; otherwise, the received method of the next handle is directly called
  2. AllChannelHandlerMessage is received, the channel handler messageencapsulated state for the ChannelState.RECEIVEDtype of ChannelEventRunnablethe object, then the thread pool to execute
  3. ChannelEventRunnable#runIn the method, the state is judged as the ChannelState.RECEIVEDtype, and the received method of the next handler is executed directly, that is DecodeHandler, this process is executed by the thread pool
  4. DecodeHandler#receivedIn the method, if the message is of Decodeabletype, decode the entire message; if the message is of Requesttype, Request.getData()decode; if the message is of Responsetype, Response.getResult()decode
  5. HeaderExchangeHandler#received-> HeaderExchangeHandler#handleRequest->requestHandler#reply , requestHandleris DubboProtocola property of ExchangeHandlerAdapterthe type
  6. HeaderExchangeHandler#handleRequestAn Responseobject will be created in , its ID attribute value is Requestthe ID value of the object, so that the request and response are associated
  7. requestHandler#replyIn the method, exporterMapget the corresponding DubboExporterobject from the cache , then DubboExporterget it Invoker, finally execute the Invoker#invokemethod, and then return an CompletableFutureobject
  8. HeaderExchangeHandler#handleRequestThe method receives the returned CompletableFutureobject, adds callback processing to it, encapsulates the returned result in the Responseobject in the callback , and then Responsesends it out through the channel
//ChannelEventRunnable.java
public void run() {
    if (state == ChannelState.RECEIVED) {
        try {
           //RECEIVED  handle received  DecodeHandler
            handler.received(channel, message);
        } catch (Exception e) {}
    } else {
        switch (state) {
        case CONNECTED:
            try {
                handler.connected(channel);
           } catch (Exception e) {}
            break;
        case DISCONNECTED:
            try {
                handler.disconnected(channel);
           } catch (Exception e) {}
            break;
        case SENT:
            try {
                handler.sent(channel, message);
            } catch (Exception e) {}
            break;
        case CAUGHT:
            try {
                handler.caught(channel, exception);
           } catch (Exception e) {}
            break;
        default:
            logger.warn("unknown state: " + state + ", message is " + message);
        }
    }

}
 
InternalDecoder#decode
    => NettyServerHandler#channelRead
        => AbstractPeer#received
            => MultiMessageHandler#received
                => HeartbeatHandler#received
                    => AllChannelHandler#received
                    
                    ------------------   ----------------------
                    => ChannelEventRunnable#run
                        => DecodeHandler#received
                            => DecodeHandler#decode
                                => DecodeableRpcInvocation#decode
                        => HeaderExchangeHandler#received
                            => HeaderExchangeHandler#handleRequest
                                => DubboProtocol.requestHandler#reply
                    ------------------   -----------------------

                                    ---------------- -------------------
                                    => ProtocolFilterWrapper.invoke
                                    => EchoFilter.invoke
                                        => ClassLoaderFilter.invoke
                                        => GenericFilter.invoke
                                            => TraceFilter.invoke
                                            => MonitorFilter.invoke
                                                => TimeoutFilter.invoke
                                                => ExceptionFilter.invoke
                                                    => InvokerWrapper.invoke
                                    ----------------- -------------------
                                                        => AbstractProxyInvoker#invoke
                                                            => JavassistProxyFactory.AbstractProxyInvoker#doInvoke
                                                                =>  #invokeMethod
                                                                    =>  service 


                           //consumer  future#whenComplete                                                              
                            => channel.send(response)
                                => HeaderExchangeChannel
                                    => NettyChannel.send
                                        => NioSocketChannel#writeAndFlush(message)                                                  
 

The server sends the result

HeaderExchangeChannel#send =>
NettyChannel#send => 
NioSocketChannel#writeAndFlush(message) 
 

Client response result

When the client is started, the input handler and the server handler are the same

//DubboProtocol#initClient
Exchangers.connect(url, requestHandler);

//HeaderExchanger#connect
public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
    return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
}

Transporters#connect =>
    NettyTransporter#connect
        return NettyClient
 

In the NettyClientconstructor, the handler is wrapped

ChannelHandlers.wrap(handler, url)

public class ChannelHandlers {
    private static ChannelHandlers INSTANCE = new ChannelHandlers();
    protected ChannelHandlers() {
    }

    public static ChannelHandler wrap(ChannelHandler handler, URL url) {
        return ChannelHandlers.getInstance().wrapInternal(handler, url);
    }
    protected static ChannelHandlers getInstance() {
        return INSTANCE;
    }
    static void setTestingChannelHandlers(ChannelHandlers instance) {
        INSTANCE = instance;
    }
    protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
        return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
                .getAdaptiveExtension().dispatch(handler, url)));
    }
}
 

Therefore, NettyClientthe handler attribute in the final points is the MultiMessageHandler -> HeartbeatHandler -> AllChannelHandler -> DecodeHandler -> HeaderExchangeHandler -> requestHandlersame as the server-side processing flow

  1. Receive messages, go through MultiMessageHandler, HeartbeatHandlerprocess, and arriveAllDispatcher
  2. AllChannelHandlerEncapsulate the message into a new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message)type, and hand it over to the thread pool for execution
  3. The thread pool executes tasks and DecodeHandlerarrives afterHeaderExchangeHandler
  4. HeaderExchangeHandler#received -> HeaderExchangeHandler#handleResponse -> DefaultFuture#received, DefaultFutureMaintains one ID DefaultFuture , Request and Response can be one-to-one correspondence through the request ID
public static void received(Channel channel, Response response, boolean timeout) {
    try {
        DefaultFuture future = FUTURES.remove(response.getId());
        if (future != null) {
            Timeout t = future.timeoutCheckTask;
            if (!timeout) {
                t.cancel();
            }
            future.doReceived(response);
        } else {
        }
    } finally {
        CHANNELS.remove(response.getId());
    }
}

private void doReceived(Response res) {
    if (res == null) {
        throw new IllegalStateException("response cannot be null");
    }
    if (res.getStatus() == Response.OK) {
        this.complete(res.getResult());
    } else if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) {
        this.completeExceptionally(new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage()));
    } else {
        this.completeExceptionally(new RemotingException(channel, res.getErrorMessage()));
    }
}
 
  1. By response.IdacquiringDefaultFuture
  2. The execution CompletableFuture#completemethod allows the executed CompletableFuture#getuser thread to get a response and get the result back. So far, the entire call process is complete

Synchronous to asynchronous

However, we often call synchronously in our code, and we rarely call the CompletableFuture#getmethod ourselves . How do we deal with this part of the logic? In the DubboInvoker#doInvokemethod, what is returned is aAsyncRpcResult

protected Result doInvoke(final Invocation invocation) throws Throwable {
    RpcInvocation inv = (RpcInvocation) invocation;
    final String methodName = RpcUtils.getMethodName(invocation);
    inv.setAttachment(PATH_KEY, getUrl().getPath());
    inv.setAttachment(VERSION_KEY, version);

    ExchangeClient currentClient;
    if (clients.length == 1) {
        currentClient = clients[0];
    } else {
        currentClient = clients[index.getAndIncrement() % clients.length];
    }
    try {
        boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
        int timeout = getUrl().getMethodPositiveParameter(methodName, TIMEOUT_KEY, DEFAULT_TIMEOUT);
       //return = false oneWay  Future 
        if (isOneway) {
            boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
            currentClient.send(inv, isSent);
            RpcContext.getContext().setFuture(null);
            return AsyncRpcResult.newDefaultAsyncResult(invocation);
        } else {c
            AsyncRpcResult asyncRpcResult = new AsyncRpcResult(inv);
            CompletableFuture<Object> responseFuture = currentClient.request(inv, timeout);
           //  responseFuture   responseFuture   asyncRpcResult  complete   
            asyncRpcResult.subscribeTo(responseFuture);

            RpcContext.getContext().setFuture(new FutureAdapter(asyncRpcResult));
            return asyncRpcResult;
        }
    } catch (TimeoutException e) {
        throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
    } catch (RemotingException e) {
        throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
    }
}
 

AsyncToSyncInvoker

In the AsyncToSyncInvoker#invokemethod, it will be judged whether it is a synchronous call or an asynchronous call. If it is a synchronous call, the AsyncRpcResult#getmethod will be called to block the user thread to achieve the synchronization effect.

public Result invoke(Invocation invocation) throws RpcException {
    Result asyncResult = invoker.invoke(invocation);
    try {
        if (InvokeMode.SYNC == ((RpcInvocation) invocation).getInvokeMode()) {
           //  asyncResult#get  
            asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
        }
    } catch (InterruptedException e) {
        throw new RpcException("Interrupted unexpectedly while waiting for remoting result to return!  method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
    } catch (ExecutionException e) {
        Throwable t = e.getCause();
        if (t instanceof TimeoutException) {
            throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        } else if (t instanceof RemotingException) {
            throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    } catch (Throwable e) {
        throw new RpcException(e.getMessage(), e);
    }
    return asyncResult;
}