在上一篇有介绍,ReferenceBean refer的源码再贴一下
Class RegistryProtocol
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
//通过type也就是dubbo service的接口类型,和url zk的地址构造一个目录服务
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
//设置其registry,默认是个ZookeeperRegistry
directory.setRegistry(registry);
directory.setProtocol(protocol);
// all attributes of REFER_KEY
Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters);
if (!Constants.ANY_VALUE.equals(url.getServiceInterface())
&& url.getParameter(Constants.REGISTER_KEY, true)) {
URL registeredConsumerUrl = getRegisteredConsumerUrl(subscribeUrl, url);
//ZookeeperRegistry对指定的zk目录进行监听,主要是监听 配置 路由 和消费者url变更的信息
registry.register(registeredConsumerUrl);
directory.setRegisteredConsumerUrl(registeredConsumerUrl);
}
//目录服务对指定的zk地址进行订阅和监听,调用之后,directory内部的所有信息
//都会刷新一遍
directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
Constants.PROVIDERS_CATEGORY
+ "," + Constants.CONFIGURATORS_CATEGORY
+ "," + Constants.ROUTERS_CATEGORY));
//通过cluster将directory的集群调用封装成一个Invoker
Invoker invoker = cluster.join(directory);
ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
return invoker;
}
可以看到如上的源码里面,基本都是针对new出来的directory进行操作。
我们先看下directory的类的依赖关系图

如上图,RegistryDirectory不仅是个目录服务,而且实现了NotifyListener,所以当zk的某些目录发生变更的时候,RegistryDirectory会实时的刷新其内部的路由缓存信息,保证了其路由信息的实时的更新。
我们从Directory —>RegistryDirectory 进行分析
Class Directory
public interface Directory<T> extends Node {
/**
* get service type. 返回支持的service类型也就是我们的dubbo的接口类型,
*等于说每个dubbo服务都会new一个Directory出来
*
* @return service type.
*/
Class<T> getInterface();
/**
* list invokers. 根据invocation返回所有的Invokers,为什么会有这个方法,由于
*dubbo支持tag和group等标签,所以不同的invocation(可以认为是dubbo服务的某个
*方法)返回不同的invokers
* @return invokers
*/
List<Invoker<T>> list(Invocation invocation) throws RpcException;
}
Directory的中间抽象类AbstractDirectory
里面有几个属性
private final URL url; //可以认为是当前的目录服务的url的地址,一般来说是对zk地址的封装
private volatile List<Router> routers;//路由规则,如果我们配置了路由规则,这些路由规则会对Invokers进行过滤,只有满足条件的Invokers才会返回
在其构造函数里面,如下
public AbstractDirectory(URL url, URL consumerUrl, List<Router> routers) {
if (url == null)
throw new IllegalArgumentException("url == null");
this.url = url;
this.consumerUrl = consumerUrl;
setRouters(routers);
}
其中setRouters(routers)会自动的加入两个路由规则,如下
protected void setRouters(List<Router> routers) {
// copy list
routers = routers == null ? new ArrayList<Router>() : new ArrayList<Router>(routers);
// append url router
String routerkey = url.getParameter(Constants.ROUTER_KEY);
if (routerkey != null && routerkey.length() > 0) {
RouterFactory routerFactory = ExtensionLoader.getExtensionLoader(RouterFactory.class).getExtension(routerkey);
routers.add(routerFactory.getRouter(url));
}
// append mock invoker selector
//mock路由
routers.add(new MockInvokersSelector());
//标签路由
routers.add(new TagRouter());
Collections.sort(routers);
this.routers = routers;
}
可以认为这些routers就是invokers的过滤器,根据url的规则过滤出满足条件的invokers返回,routers我们在后面进行分析。
我们在源码里面可以窥探一二,如下
public List<Invoker<T>> list(Invocation invocation) throws RpcException {
if (destroyed) {
throw new RpcException("Directory already destroyed .url: " + getUrl());
}
//如何根据invocation返回invokers留给子类实现
List<Invoker<T>> invokers = doList(invocation);
List<Router> localRouters = this.routers; // local reference
if (localRouters != null && !localRouters.isEmpty()) {
for (Router router : localRouters) {
try {
if (router.getUrl() == null || router.getUrl().getParameter(Constants.RUNTIME_KEY, false)) {
//根据router对invokers进行过滤,不满足条件的invokers过滤掉
invokers = router.route(invokers, getConsumerUrl(), invocation);
}
} catch (Throwable t) {
logger.error("Failed to execute router: " + getUrl() + ", cause: " + t.getMessage(), t);
}
}
}
return invokers;
}
而其最重大的doList方法再RegistryDirectory方法里面进行了实现,如下
Class RegistryDirectory
public List<Invoker<T>> doList(Invocation invocation) {
List<Invoker<T>> invokers = null;
//拿到本地缓存的method与List<Invoker<T>的缓存map,根据invocation的信息进行
//查找
Map<String, List<Invoker<T>>> localMethodInvokerMap = this.methodInvokerMap; // local reference
if (localMethodInvokerMap != null && localMethodInvokerMap.size() > 0) {
String methodName = RpcUtils.getMethodName(invocation);
Object[] args = RpcUtils.getArguments(invocation);
if (args != null && args.length > 0 && args[0] != null
&& (args[0] instanceof String || args[0].getClass().isEnum())) {
invokers = localMethodInvokerMap.get(methodName + "." + args[0]); // The routing can be enumerated according to the first parameter
}
if (invokers == null) {
invokers = localMethodInvokerMap.get(methodName);
}
if (invokers == null) {
invokers = localMethodInvokerMap.get(Constants.ANY_VALUE);
}
if (invokers == null) {
Iterator<List<Invoker<T>>> iterator = localMethodInvokerMap.values().iterator();
if (iterator.hasNext()) {
invokers = iterator.next();
}
}
}
return invokers == null ? new ArrayList<Invoker<T>>(0) : invokers;
}
上面的代码不难理解,最重大的就是localMethodInvokerMap,直接的在这个里面拿信息就可以了,但是这里面的信息是怎么来的,以及是怎么变更的呢。
由于RegistryDirectory实现了NotifyListener,不难想到localMethodInvokerMap的维护和变更是在 void notify(List<URL> urls)里面实现的。源码如下
我们先看下NotifyListener的源码备注
public interface NotifyListener {
/**
* Triggered when a service change notification is received.
* <p>
* Notify needs to support the contract: <br>
* 1. Always notifications on the service interface and the dimension of the data type. that is, won t notify part of the same type data belonging to one service. Users do not need to compare the results of the previous notification.<br> 每次必须是针对某个 type data 的全量通知
* 2. The first notification at a subscription must be a full notification of all types of data of a service.<br> //第一次必须是all types of data的通知(providers, consumers, routers, overrides.)
* 3. At the time of change, different types of data are allowed to be notified separately, e.g.: providers, consumers, routers, overrides. It allows only one of these types to be notified, but the data of this type must be full, not incremental.<br>
//后面可以是针对某个type的全量通知
* 4. If a data type is empty, need to notify a empty protocol with category parameter identification of url data.<br>
* 5. The order of notifications to be guaranteed by the notifications(That is, the implementation of the registry). Such as: single thread push, queue serialization, and version comparison.<br> //顺序要保证,dubbo里面使用synchronized来做的同步
*
* @param urls The list of registered information , is always not empty. The meaning is the same as the return value of {@link com.alibaba.dubbo.registry.RegistryService#lookup(URL)}. //不能通知个null结果
*/
void notify(List<URL> urls);
}
如上对notify里面的参数urls做了规定。
Class RegistryDirectory
public synchronized void notify(List<URL> urls) {
//缓存所有providers的url
List<URL> invokerUrls = new ArrayList<URL>();
//缓存所有router的url
List<URL> routerUrls = new ArrayList<URL>();
//缓存所有的配置 configurator的url
List<URL> configuratorUrls = new ArrayList<URL>();
for (URL url : urls) {
//判断url的protocol,根据protocol判断通知的type的类型
String protocol = url.getProtocol();
String category = url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
//如果是route变更
if (Constants.ROUTERS_CATEGORY.equals(category)
|| Constants.ROUTE_PROTOCOL.equals(protocol)) {
routerUrls.add(url);
//如果是配置变更
} else if (Constants.CONFIGURATORS_CATEGORY.equals(category)
|| Constants.OVERRIDE_PROTOCOL.equals(protocol)) {
configuratorUrls.add(url);
//如果是providers url的变更
} else if (Constants.PROVIDERS_CATEGORY.equals(category)) {
invokerUrls.add(url);
} else {
logger.warn("Unsupported category " + category + " in notified url: " + url + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost());
}
}
// configurators 处理变更的configuratorUrls
if (configuratorUrls != null && !configuratorUrls.isEmpty()) {
this.configurators = toConfigurators(configuratorUrls);
}
// routers 处理变更的routerUrls
if (routerUrls != null && !routerUrls.isEmpty()) {
List<Router> routers = toRouters(routerUrls);
if (routers != null) { // null - do nothing
setRouters(routers);
}
}
List<Configurator> localConfigurators = this.configurators; // local reference
// merge override parameters
this.overrideDirectoryUrl = directoryUrl;
if (localConfigurators != null && !localConfigurators.isEmpty()) {
for (Configurator configurator : localConfigurators) {
this.overrideDirectoryUrl = configurator.configure(overrideDirectoryUrl);
}
}
// providers 刷新
refreshInvoker(invokerUrls);
}
可以看到实则上面就是针对providers、 router、configurator配置的变更做了监听,然后实时的更新本地methodInvokerMap。
我们重点的看下refreshInvoker(invokerUrls),源码主要就是如下的三句
//将invokerUrls转化成Invokesr
Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map
//根据方法拆分缓存
Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); // Change method name to map Invoker Map
//将原来老的不要的Invoker销毁掉
destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker
而在 toInvokers(invokerUrls)方法中,最重大的就一句,如下
invoker = new InvokerDelegate<T>(protocol.refer(serviceType, url), url, providerUrl);
而通过debug可以发现
protocol.refer(serviceType, url)被封装成了一个DubboInvoker<T>,并继续的代理成一个InvokerDelegate,关于protocol的作用我们后面进行分析。
而toMethodInvokers(newUrlInvokerMap);就是在newUrlInvokerMap的基础上根据method做进一步的缓存。有兴趣可以看下
最后的destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap);
将oldUrlInvokerMap里面不在newUrlInvokerMap里面的invoke进行销毁。
经过一次监听变更,newMethodInvokerMap达到了最新。
既然在Directory使用Router进行了invokes的过滤,我们接着分析下Router的原理,先看下Router的类继承结构

整个Router的话继承结构还是很扁平化的。
public interface Router extends Comparable<Router>{
/**
* get the router url.
*
* @return url
*/
URL getUrl();
/**
* route.
*
* @param invokers
* @param url refer url
* @param invocation
* @return routed invokers
* @throws RpcException
*/
<T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException;
/**
* Router s priority, used to sort routers.
*
* @return router s priority
*/
int getPriority();
}
三个方法也很简单,不难,我们选取TagRouter做代码跟踪,
Class TagRouter
public <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException {
// filter
List<Invoker<T>> result = new ArrayList<Invoker<T>>();
// Dynamic param 判断Attachment里面是否有dubbo.tag属性
String tag = RpcContext.getContext().getAttachment(Constants.TAG_KEY);
// Tag request
if (!StringUtils.isEmpty(tag)) {
// Select tag invokers first
for (Invoker<T> invoker : invokers) {
//根据invoker的url是否有tag标签并匹配上,如果匹配上了,加入
if (tag.equals(invoker.getUrl().getParameter(Constants.TAG_KEY))) {
result.add(invoker);
}
}
}
//如果过滤后没有满足条件的invokers
//且Attachment没有dubbo.force.tag,那么将所有没有dubbo.tag标签的返回
if (result.isEmpty()) {
// Only forceTag = true force match, otherwise downgrade
String forceTag = RpcContext.getContext().getAttachment(Constants.FORCE_USE_TAG);
if (StringUtils.isEmpty(forceTag) || "false".equals(forceTag)) {
for (Invoker<T> invoker : invokers) {
if (StringUtils.isEmpty(invoker.getUrl().getParameter(Constants.TAG_KEY))) {
result.add(invoker);
}
}
}
}
return result;
}
当然我们也可以自动以Router,根据实际条件做过滤
通过上面的分析我们可知道Directory可以对zk上的配置信息进行监听,并进行路由信息的动态更新,但是我们通过cluster做进一步的封装后才返回,如下
Invoker invoker = cluster.join(directory);
而在cluster的内部也封装了LoadBalance,为什么要这样设计呢。
由于在rpc调用的过程中,如何选择一个invoke进行调用呢,我们可以通过LoadBalance算法来选择合适的invoke进行调用,如果invoke调用失败了,我们可以重试,也可以快速报错也可以将异常吃掉,那封装这些异常的场景的就是cluster的工作了,下一章会对cluster和LoadBalance进行分析,我们可以看下,在集群情况下,dubbo是如何选择一个invoke进行调用的,以及针对异常的场景如何的应付。




















暂无评论内容