dubbo技术内幕四 Directory + Router

在上一篇有介绍,ReferenceBean refer的源码再贴一下

Class  RegistryProtocol
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
        //通过type也就是dubbo service的接口类型,和url zk的地址构造一个目录服务
        RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
        //设置其registry,默认是个ZookeeperRegistry
        directory.setRegistry(registry);
        directory.setProtocol(protocol);
        // all attributes of REFER_KEY
        Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
        URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters);
        if (!Constants.ANY_VALUE.equals(url.getServiceInterface())
                && url.getParameter(Constants.REGISTER_KEY, true)) {
            URL registeredConsumerUrl = getRegisteredConsumerUrl(subscribeUrl, url);
           //ZookeeperRegistry对指定的zk目录进行监听,主要是监听 配置 路由 和消费者url变更的信息
            registry.register(registeredConsumerUrl);
            directory.setRegisteredConsumerUrl(registeredConsumerUrl);
        }
        //目录服务对指定的zk地址进行订阅和监听,调用之后,directory内部的所有信息
        //都会刷新一遍
        directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
                Constants.PROVIDERS_CATEGORY
                        + "," + Constants.CONFIGURATORS_CATEGORY
                        + "," + Constants.ROUTERS_CATEGORY));
       //通过cluster将directory的集群调用封装成一个Invoker
        Invoker invoker = cluster.join(directory);
        ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
        return invoker;
    }

可以看到如上的源码里面,基本都是针对new出来的directory进行操作。
我们先看下directory的类的依赖关系图

dubbo技术内幕四 Directory + Router

如上图,RegistryDirectory不仅是个目录服务,而且实现了NotifyListener,所以当zk的某些目录发生变更的时候,RegistryDirectory会实时的刷新其内部的路由缓存信息,保证了其路由信息的实时的更新。
我们从Directory —>RegistryDirectory 进行分析

Class  Directory
public interface Directory<T> extends Node {

    /**
     * get service type.  返回支持的service类型也就是我们的dubbo的接口类型,
     *等于说每个dubbo服务都会new一个Directory出来
     *
     * @return service type.
     */
    Class<T> getInterface();

    /**
     * list invokers.  根据invocation返回所有的Invokers,为什么会有这个方法,由于
     *dubbo支持tag和group等标签,所以不同的invocation(可以认为是dubbo服务的某个 
     *方法)返回不同的invokers
     * @return invokers
     */
    List<Invoker<T>> list(Invocation invocation) throws RpcException;

}

Directory的中间抽象类AbstractDirectory
里面有几个属性

private final URL url; //可以认为是当前的目录服务的url的地址,一般来说是对zk地址的封装
private volatile List<Router> routers;//路由规则,如果我们配置了路由规则,这些路由规则会对Invokers进行过滤,只有满足条件的Invokers才会返回

在其构造函数里面,如下

 public AbstractDirectory(URL url, URL consumerUrl, List<Router> routers) {
        if (url == null)
            throw new IllegalArgumentException("url == null");
        this.url = url;
        this.consumerUrl = consumerUrl;
        setRouters(routers);
    }

其中setRouters(routers)会自动的加入两个路由规则,如下

 protected void setRouters(List<Router> routers) {
        // copy list
        routers = routers == null ? new ArrayList<Router>() : new ArrayList<Router>(routers);
        // append url router
        String routerkey = url.getParameter(Constants.ROUTER_KEY);
        if (routerkey != null && routerkey.length() > 0) {
            RouterFactory routerFactory = ExtensionLoader.getExtensionLoader(RouterFactory.class).getExtension(routerkey);
            routers.add(routerFactory.getRouter(url));
        }
        // append mock invoker selector
        //mock路由
        routers.add(new MockInvokersSelector());
       //标签路由
        routers.add(new TagRouter());
        Collections.sort(routers);
        this.routers = routers;
    }

可以认为这些routers就是invokers的过滤器,根据url的规则过滤出满足条件的invokers返回,routers我们在后面进行分析。
我们在源码里面可以窥探一二,如下

public List<Invoker<T>> list(Invocation invocation) throws RpcException {
        if (destroyed) {
            throw new RpcException("Directory already destroyed .url: " + getUrl());
        }
        //如何根据invocation返回invokers留给子类实现
        List<Invoker<T>> invokers = doList(invocation);
        List<Router> localRouters = this.routers; // local reference
        if (localRouters != null && !localRouters.isEmpty()) {
            for (Router router : localRouters) {
                try {
                    if (router.getUrl() == null || router.getUrl().getParameter(Constants.RUNTIME_KEY, false)) {
                       //根据router对invokers进行过滤,不满足条件的invokers过滤掉
                        invokers = router.route(invokers, getConsumerUrl(), invocation);
                    }
                } catch (Throwable t) {
                    logger.error("Failed to execute router: " + getUrl() + ", cause: " + t.getMessage(), t);
                }
            }
        }
        return invokers;
    }

而其最重大的doList方法再RegistryDirectory方法里面进行了实现,如下

Class  RegistryDirectory
public List<Invoker<T>> doList(Invocation invocation) {
        List<Invoker<T>> invokers = null;
        //拿到本地缓存的method与List<Invoker<T>的缓存map,根据invocation的信息进行
       //查找
        Map<String, List<Invoker<T>>> localMethodInvokerMap = this.methodInvokerMap; // local reference
        if (localMethodInvokerMap != null && localMethodInvokerMap.size() > 0) {
            String methodName = RpcUtils.getMethodName(invocation);
            Object[] args = RpcUtils.getArguments(invocation);
            if (args != null && args.length > 0 && args[0] != null
                    && (args[0] instanceof String || args[0].getClass().isEnum())) {
                invokers = localMethodInvokerMap.get(methodName + "." + args[0]); // The routing can be enumerated according to the first parameter
            }
            if (invokers == null) {
                invokers = localMethodInvokerMap.get(methodName);
            }
            if (invokers == null) {
                invokers = localMethodInvokerMap.get(Constants.ANY_VALUE);
            }
            if (invokers == null) {
                Iterator<List<Invoker<T>>> iterator = localMethodInvokerMap.values().iterator();
                if (iterator.hasNext()) {
                    invokers = iterator.next();
                }
            }
        }
        return invokers == null ? new ArrayList<Invoker<T>>(0) : invokers;
    }

上面的代码不难理解,最重大的就是localMethodInvokerMap,直接的在这个里面拿信息就可以了,但是这里面的信息是怎么来的,以及是怎么变更的呢。
由于RegistryDirectory实现了NotifyListener,不难想到localMethodInvokerMap的维护和变更是在 void notify(List<URL> urls)里面实现的。源码如下
我们先看下NotifyListener的源码备注

public interface NotifyListener {

    /**
     * Triggered when a service change notification is received.
     * <p>
     * Notify needs to support the contract: <br>
     * 1. Always notifications on the service interface and the dimension of the data type. that is, won t notify part of the same type data belonging to one service. Users do not need to compare the results of the previous notification.<br> 每次必须是针对某个 type data 的全量通知
     * 2. The first notification at a subscription must be a full notification of all types of data of a service.<br>  //第一次必须是all types of data的通知(providers, consumers, routers, overrides.)
     * 3. At the time of change, different types of data are allowed to be notified separately, e.g.: providers, consumers, routers, overrides. It allows only one of these types to be notified, but the data of this type must be full, not incremental.<br>
      //后面可以是针对某个type的全量通知
     * 4. If a data type is empty, need to notify a empty protocol with category parameter identification of url data.<br>
     * 5. The order of notifications to be guaranteed by the notifications(That is, the implementation of the registry). Such as: single thread push, queue serialization, and version comparison.<br>  //顺序要保证,dubbo里面使用synchronized来做的同步
     *
     * @param urls The list of registered information , is always not empty. The meaning is the same as the return value of {@link com.alibaba.dubbo.registry.RegistryService#lookup(URL)}. //不能通知个null结果
     */
    void notify(List<URL> urls);

}

如上对notify里面的参数urls做了规定。

Class  RegistryDirectory
public synchronized void notify(List<URL> urls) {
        //缓存所有providers的url
        List<URL> invokerUrls = new ArrayList<URL>();
       //缓存所有router的url
        List<URL> routerUrls = new ArrayList<URL>();
        //缓存所有的配置 configurator的url
        List<URL> configuratorUrls = new ArrayList<URL>();
        for (URL url : urls) {
             //判断url的protocol,根据protocol判断通知的type的类型
            String protocol = url.getProtocol();
            String category = url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
            //如果是route变更
            if (Constants.ROUTERS_CATEGORY.equals(category)
                    || Constants.ROUTE_PROTOCOL.equals(protocol)) {
                routerUrls.add(url);
            //如果是配置变更
            } else if (Constants.CONFIGURATORS_CATEGORY.equals(category)
                    || Constants.OVERRIDE_PROTOCOL.equals(protocol)) {
                configuratorUrls.add(url);
           //如果是providers url的变更
            } else if (Constants.PROVIDERS_CATEGORY.equals(category)) {
                invokerUrls.add(url);
            } else {
                logger.warn("Unsupported category " + category + " in notified url: " + url + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost());
            }
        }
        // configurators  处理变更的configuratorUrls
        if (configuratorUrls != null && !configuratorUrls.isEmpty()) {
            this.configurators = toConfigurators(configuratorUrls);
        }
        // routers 处理变更的routerUrls
        if (routerUrls != null && !routerUrls.isEmpty()) {
            List<Router> routers = toRouters(routerUrls);
            if (routers != null) { // null - do nothing
                setRouters(routers);
            }
        }
        List<Configurator> localConfigurators = this.configurators; // local reference
        // merge override parameters
        this.overrideDirectoryUrl = directoryUrl;
        if (localConfigurators != null && !localConfigurators.isEmpty()) {
            for (Configurator configurator : localConfigurators) {
                this.overrideDirectoryUrl = configurator.configure(overrideDirectoryUrl);
            }
        }
        // providers 刷新
        refreshInvoker(invokerUrls);
    }

可以看到实则上面就是针对providers、 router、configurator配置的变更做了监听,然后实时的更新本地methodInvokerMap。
我们重点的看下refreshInvoker(invokerUrls),源码主要就是如下的三句

//将invokerUrls转化成Invokesr
Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map
//根据方法拆分缓存
            Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); // Change method name to map Invoker Map
//将原来老的不要的Invoker销毁掉
 destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker

而在 toInvokers(invokerUrls)方法中,最重大的就一句,如下

invoker = new InvokerDelegate<T>(protocol.refer(serviceType, url), url, providerUrl);

而通过debug可以发现
protocol.refer(serviceType, url)被封装成了一个DubboInvoker<T>,并继续的代理成一个InvokerDelegate,关于protocol的作用我们后面进行分析。
而toMethodInvokers(newUrlInvokerMap);就是在newUrlInvokerMap的基础上根据method做进一步的缓存。有兴趣可以看下
最后的destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap);
将oldUrlInvokerMap里面不在newUrlInvokerMap里面的invoke进行销毁。

经过一次监听变更,newMethodInvokerMap达到了最新。

既然在Directory使用Router进行了invokes的过滤,我们接着分析下Router的原理,先看下Router的类继承结构

dubbo技术内幕四 Directory + Router

整个Router的话继承结构还是很扁平化的。

public interface Router extends Comparable<Router>{

    /**
     * get the router url.
     *
     * @return url
     */
    URL getUrl();

    /**
     * route.
     *
     * @param invokers
     * @param url        refer url
     * @param invocation
     * @return routed invokers
     * @throws RpcException
     */
    <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException;

    /**
     * Router s priority, used to sort routers.
     *
     * @return router s priority
     */
    int getPriority();

}

三个方法也很简单,不难,我们选取TagRouter做代码跟踪,

Class TagRouter
public <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException {
        // filter
        List<Invoker<T>> result = new ArrayList<Invoker<T>>();
        // Dynamic param 判断Attachment里面是否有dubbo.tag属性
        String tag = RpcContext.getContext().getAttachment(Constants.TAG_KEY);
        // Tag request
        if (!StringUtils.isEmpty(tag)) {
            // Select tag invokers first
            for (Invoker<T> invoker : invokers) {
                //根据invoker的url是否有tag标签并匹配上,如果匹配上了,加入
                if (tag.equals(invoker.getUrl().getParameter(Constants.TAG_KEY))) {
                    result.add(invoker);
                }
            }
        }
        //如果过滤后没有满足条件的invokers
        //且Attachment没有dubbo.force.tag,那么将所有没有dubbo.tag标签的返回
        if (result.isEmpty()) {
            // Only forceTag = true force match, otherwise downgrade
            String forceTag = RpcContext.getContext().getAttachment(Constants.FORCE_USE_TAG);
            if (StringUtils.isEmpty(forceTag) || "false".equals(forceTag)) {
                for (Invoker<T> invoker : invokers) {
                    if (StringUtils.isEmpty(invoker.getUrl().getParameter(Constants.TAG_KEY))) {
                        result.add(invoker);
                    }
                }
            }
        }
        return result;
    }

当然我们也可以自动以Router,根据实际条件做过滤
通过上面的分析我们可知道Directory可以对zk上的配置信息进行监听,并进行路由信息的动态更新,但是我们通过cluster做进一步的封装后才返回,如下
Invoker invoker = cluster.join(directory);
而在cluster的内部也封装了LoadBalance,为什么要这样设计呢。
由于在rpc调用的过程中,如何选择一个invoke进行调用呢,我们可以通过LoadBalance算法来选择合适的invoke进行调用,如果invoke调用失败了,我们可以重试,也可以快速报错也可以将异常吃掉,那封装这些异常的场景的就是cluster的工作了,下一章会对cluster和LoadBalance进行分析,我们可以看下,在集群情况下,dubbo是如何选择一个invoke进行调用的,以及针对异常的场景如何的应付。

© 版权声明
THE END
如果内容对您有所帮助,就支持一下吧!
点赞0 分享
评论 抢沙发

请登录后发表评论

    暂无评论内容