博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Dubbo集群调用模式之Mergeable实现
阅读量:7079 次
发布时间:2019-06-28

本文共 4758 字,大约阅读时间需要 15 分钟。

hot3.png

注: Dubbo版本是2.5.7

                        

                                                        图1 MergeableClusterInvoker的类继承图

1.Mergeable的含义

    Mergeable,即对结果集进行合并。

2.Dubbo中是怎么实现

    核心代码在MergeableClusterInvoker的doInvoke(Invocation,List<Invoker<T>>,LoadBalance)中,源码如下。代码看似有点多,但是分析主要逻辑的话,不复杂。

@Overridepublic Result invoke(final Invocation invocation) throws RpcException {    List
> invokers = directory.list(invocation); String merger = getUrl().getMethodParameter(invocation.getMethodName(), Constants.MERGER_KEY); Class
returnType; try { returnType = getInterface().getMethod(invocation.getMethodName(), invocation.getParameterTypes()).getReturnType(); } catch (NoSuchMethodException e) { returnType = null; } Map
> results = new HashMap
>(); for (final Invoker
invoker : invokers) { Future
future = executor.submit(new Callable
() { public Result call() throws Exception { return invoker.invoke(new RpcInvocation(invocation, invoker)); } }); results.put(invoker.getUrl().getServiceKey(), future); } Object result = null; List
resultList = new ArrayList
(results.size()); int timeout = getUrl().getMethodParameter(invocation.getMethodName(), Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); for (Map.Entry
> entry : results.entrySet()) { Future
future = entry.getValue(); try { Result r = future.get(timeout, TimeUnit.MILLISECONDS); if (r.hasException()) { log.error(new StringBuilder(32).append("Invoke ") .append(getGroupDescFromServiceKey(entry.getKey())) .append(" failed: ") .append(r.getException().getMessage()).toString(), r.getException()); } else { resultList.add(r); } } catch (Exception e) { throw new RpcException(new StringBuilder(32) .append("Failed to invoke service ") .append(entry.getKey()) .append(": ") .append(e.getMessage()).toString(), e); } } if (resultList.size() == 0) { return new RpcResult((Object) null); } else if (resultList.size() == 1) { return resultList.iterator().next(); } if (returnType == void.class) { return new RpcResult((Object) null); } if (merger.startsWith(".")) { merger = merger.substring(1); Method method; try { method = returnType.getMethod(merger, returnType); } catch (NoSuchMethodException e) { throw new RpcException(new StringBuilder(32) .append("Can not merge result because missing method [ ") .append(merger) .append(" ] in class [ ") .append(returnType.getClass().getName()) .append(" ]") .toString()); } if (method != null) { if (!Modifier.isPublic(method.getModifiers())) { method.setAccessible(true); } result = resultList.remove(0).getValue(); try { if (method.getReturnType() != void.class && method.getReturnType().isAssignableFrom(result.getClass())) { for (Result r : resultList) { result = method.invoke(result, r.getValue()); } } else { for (Result r : resultList) { method.invoke(result, r.getValue()); } } } catch (Exception e) { throw new RpcException( new StringBuilder(32) .append("Can not merge result: ") .append(e.getMessage()).toString(), e); } } else { throw new RpcException( new StringBuilder(32) .append("Can not merge result because missing method [ ") .append(merger) .append(" ] in class [ ") .append(returnType.getClass().getName()) .append(" ]") .toString()); } } else { Merger resultMerger; if (ConfigUtils.isDefault(merger)) { resultMerger = MergerFactory.getMerger(returnType); } else { resultMerger = ExtensionLoader.getExtensionLoader(Merger.class).getExtension(merger); } if (resultMerger != null) { List
rets = new ArrayList(resultList.size()); for (Result r : resultList) { rets.add(r.getValue()); } result = resultMerger.merge( rets.toArray((Object[]) Array.newInstance(returnType, 0))); } else { throw new RpcException("There is no merger to merge result."); } } return new RpcResult(result);}

    1.首先得到服务提供者列表,遍历服务提供者,对每个服务提供者调用服务,这个调用过程是封装在Callable中的,放在线程池中执行的。

    2.步骤1中得到的是个Future集合,即上面代码段中的results。

    3.对results中的Future,进行Future.get(),即阻塞等待线程执行完成。这样得到所有的结果集,即上面代码段中的resultList。

    4.为简化的目的,我们注重分析下面代码段中的源码。不考虑Merger是怎么来的话,代码比较简单,取出集合resultList中Result的value,结果也是个集合,即下面代码段中的rets,最后用Merger对rets集合合并操作。

} else {    Merger resultMerger;    if (ConfigUtils.isDefault(merger)) {        resultMerger = MergerFactory.getMerger(returnType);    } else {        resultMerger = ExtensionLoader.getExtensionLoader(Merger.class).getExtension(merger);    }    if (resultMerger != null) {        List rets = new ArrayList(resultList.size());        for (Result r : resultList) {            rets.add(r.getValue());        }        result = resultMerger.merge(                rets.toArray((Object[]) Array.newInstance(returnType, 0)));    } else {        throw new RpcException("There is no merger to merge result.");    }}

 

    重点是使用多线程,调用服务提供者,最后将得到的结果集,用Merger进行合并。

转载于:https://my.oschina.net/u/2518341/blog/1815077

你可能感兴趣的文章
浅析CentOS和RedHat Linux的区别
查看>>
Linux gcc版本如何升级
查看>>
Lubuntu Next 18.10将默认采用Calamares
查看>>
思达报表工具Style Report基础教程—简单列表
查看>>
一个灵活、好用、扩展性好的WCM系统应该包含哪些功能
查看>>
JAVA IO - 压缩流
查看>>
网络客户端的几种模式
查看>>
hive 新加字段 插入数据 注意事项
查看>>
Gstreamer学习笔记----第一个helloworld程序
查看>>
unix编程之多进程编程
查看>>
R语言学习之聚类
查看>>
我的友情链接
查看>>
斯坦佛编程教程-Unix编程工具(三)
查看>>
DHCP和TFTP配置以及CentOS 7上的服务控制
查看>>
Python 5.5 使用枚举类
查看>>
cookie禁用后session id传值的问题
查看>>
android 动画AnimationSet 和 AnimatorSet
查看>>
Dharma勒索软件继续大肆传播,据称已有100多家希腊网站沦陷
查看>>
成为JavaGC专家(1)—深入浅出Java垃圾回收机制
查看>>
Linux学习笔记(十七) vim
查看>>