Dubbo SPI机制分析【一】

/ 服务发现SPIDUBBO源码 / 没有评论 / 33浏览

Dubbo 的扩展点加载从 JDK 标准的 SPI (Service Provider Interface) 扩展点发现机制加强而来。 Dubbo 改进了 JDK 标准的 SPI 的以下问题:

源码分析

dubbo的Extension基本架构如图所示: 1539264553280.png

这里最重要的类就是ExtensionLoader.

获取动态自适应拓展实现类

首先分析

ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension()

getAdaptiveExtension()

首先分析此方法:

    public T getAdaptiveExtension() {
        //1、先从自适应实例缓存中获取实例
        Object instance = cachedAdaptiveInstance.get();
        //2、双重锁检查,如果实例不存在,则生成实例
        if (instance == null) {
            if (createAdaptiveInstanceError == null) {
                //cachedAdaptiveInstance存放Adaptive修饰类的实例
                synchronized (cachedAdaptiveInstance) {
                    instance = cachedAdaptiveInstance.get();
                    if (instance == null) {
                        try {
                            //3、生成实例
                            instance = createAdaptiveExtension();
                            cachedAdaptiveInstance.set(instance);
                        } catch (Throwable t) {
                            createAdaptiveInstanceError = t;
                            throw new IllegalStateException("fail to create adaptive instance: " + t.toString(), t);
                        }
                    }
                }
            } else {
                throw new IllegalStateException("fail to create adaptive instance: " + createAdaptiveInstanceError.toString(), createAdaptiveInstanceError);
            }
        }

        return (T) instance;
    }

createAdaptiveExtension

    private T createAdaptiveExtension() {
        try {
            //1、先获取自适应拓展点实现类,然后实例化
            //2、注入
            return injectExtension((T) getAdaptiveExtensionClass().newInstance());
        } catch (Exception e) {
            throw new IllegalStateException("Can not create ad" +
                    "aptive extension " + type + ", cause: " + e.getMessage(), e);
        }
    }

getAdaptiveExtensionClass

    private Class<?> getAdaptiveExtensionClass() {
        //加载当前拓展点的所有实现,如果有被@Adaptive修饰的实现类,则缓存在cachedAdaptiveClass
        getExtensionClasses();
        //判断是否有被@Adaptive修饰的实现类,有的话则返回
        if (cachedAdaptiveClass != null) {
            return cachedAdaptiveClass;
        }
        //没有@Adaptive修饰的实现类,创建动态自适应拓展点实现
        return cachedAdaptiveClass = createAdaptiveExtensionClass();
    }

getExtensionClasses

首先分析getExtensionClasses

    private Map<String, Class<?>> getExtensionClasses() {
        //判断拓展点实现类缓存cachedClasses是否为空,如果为空,通过双重所检查模式进行拓展点实现类加载
        Map<String, Class<?>> classes = cachedClasses.get();
        if (classes == null) {
            synchronized (cachedClasses) {
                classes = cachedClasses.get();
                if (classes == null) {
                    //加载拓展点实现类
                    classes = loadExtensionClasses();
                    cachedClasses.set(classes);
                }
            }
        }
        return classes;
    }

判断拓展点实现类缓存cachedClasses是否为空,如果为空,通过双重锁检查模式进行拓展点实现类加载loadExtensionClasses

loadExtensionClasses

    private Map<String, Class<?>> loadExtensionClasses() {
        //判断当前类型是否是可拓展点且获取默认拓展实现类
        final SPI defaultAnnotation = type.getAnnotation(SPI.class);
        if (defaultAnnotation != null) {
            String value = defaultAnnotation.value();
            if ((value = value.trim()).length() > 0) {
                //每个拓展接口只能有一个拓展实现默认名,如Protocol的默认实现是“dubbo”
                String[] names = NAME_SEPARATOR.split(value);
                if (names.length > 1) {
                    throw new IllegalStateException("more than 1 default extension name on extension " + type.getName()
                            + ": " + Arrays.toString(names));
                }
                //设置默认名,如Protocol的为dubbo
                if (names.length == 1) cachedDefaultName = names[0];
            }
        }

        //根据传入类型,对【META-INF/dubbo/internal/】 【META-INF/dubbo/】【 META-INF/services/】
        //进行拓展点实现类的加载
        Map<String, Class<?>> extensionClasses = new HashMap<String, Class<?>>();
        loadDirectory(extensionClasses, DUBBO_INTERNAL_DIRECTORY, type.getName());
        loadDirectory(extensionClasses, DUBBO_INTERNAL_DIRECTORY, type.getName().replace("org.apache", "com.alibaba"));
        loadDirectory(extensionClasses, DUBBO_DIRECTORY, type.getName());
        loadDirectory(extensionClasses, DUBBO_DIRECTORY, type.getName().replace("org.apache", "com.alibaba"));
        loadDirectory(extensionClasses, SERVICES_DIRECTORY, type.getName());
        loadDirectory(extensionClasses, SERVICES_DIRECTORY, type.getName().replace("org.apache", "com.alibaba"));
        return extensionClasses;
    }

loadExtensionClasses() 做了这么几件事:

loadDirectory

    private void loadDirectory(Map<String, Class<?>> extensionClasses, String dir, String type) {
        //拼接文件名,如META-INF/dubbo/internal/org.apache.dubbo.rpc.Protocol
        String fileName = dir + type;
        try {
            Enumeration<java.net.URL> urls;
            ClassLoader classLoader = findClassLoader();
            //加载classpath下所有对应type的文件,如META-INF/dubbo/internal/org.apache.dubbo.rpc.Protocol,然后进行合并加载
            if (classLoader != null) {
                urls = classLoader.getResources(fileName);
            } else {
                urls = ClassLoader.getSystemResources(fileName);
            }
            if (urls != null) {
                //遍历所有对应type的文件,如org.apache.dubbo.rpc.Protocol,然后加载class
                while (urls.hasMoreElements()) {
                    //获取绝对路径
                    java.net.URL resourceURL = urls.nextElement();
                    //获取jar下对应文件的class
                    loadResource(extensionClasses, classLoader, resourceURL);
                }
            }
        } catch (Throwable t) {
            logger.error("Exception when load extension class(interface: " +
                    type + ", description file: " + fileName + ").", t);
        }
    }

loadResource

     private void loadResource(Map<String, Class<?>> extensionClasses, ClassLoader classLoader, java.net.URL resourceURL) {
        try {
            BufferedReader reader = new BufferedReader(new InputStreamReader(resourceURL.openStream(), "utf-8"));
            try {
                String line;
                //轮流读取行数据,获取拓展接口实现类
                while ((line = reader.readLine()) != null) {
                    final int ci = line.indexOf('#');
                    if (ci >= 0) line = line.substring(0, ci);
                    line = line.trim();
                    if (line.length() > 0) {
                        try {
                            String name = null;
                            int i = line.indexOf('=');
                            if (i > 0) {
                                name = line.substring(0, i).trim();
                                line = line.substring(i + 1).trim();
                            }
                            if (line.length() > 0) {
                                //循环加载拓展接口实现类
                                loadClass(extensionClasses, resourceURL, Class.forName(line, true, classLoader), name);
                            }
                        } catch (Throwable t) {
                            IllegalStateException e = new IllegalStateException("Failed to load extension class(interface: " + type + ", class line: " + line + ") in " + resourceURL + ", cause: " + t.getMessage(), t);
                            exceptions.put(line, e);
                        }
                    }
                }
            } finally {
                reader.close();
            }
        } catch (Throwable t) {
            logger.error("Exception when load extension class(interface: " +
                    type + ", class file: " + resourceURL + ") in " + resourceURL, t);
        }
    }

这个方法做的事情很简单:

loadClass

    private void loadClass(Map<String, Class<?>> extensionClasses, java.net.URL resourceURL, Class<?> clazz, String name) throws NoSuchMethodException {
        //判断是否实现了类型接口
        if (!type.isAssignableFrom(clazz)) {
            throw new IllegalStateException("Error when load extension class(interface: " +
                    type + ", class line: " + clazz.getName() + "), class "
                    + clazz.getName() + "is not subtype of interface.");
        }
        //如果是自定义的适配拓展实现类,则设置cachedAdaptiveClass,并返回
        if (clazz.isAnnotationPresent(Adaptive.class)) {
            if (cachedAdaptiveClass == null) {
                cachedAdaptiveClass = clazz;
            } else if (!cachedAdaptiveClass.equals(clazz)) {
                throw new IllegalStateException("More than 1 adaptive class found: "
                        + cachedAdaptiveClass.getClass().getName()
                        + ", " + clazz.getClass().getName());
            }
        //如果是包装类的话,走包装类的逻辑,即加载存储在cachedWrapperClasses:ProtocolFilterWrapper,ProtocolListenerWrapper
        } else if (isWrapperClass(clazz)) {
            Set<Class<?>> wrappers = cachedWrapperClasses;
            if (wrappers == null) {
                cachedWrapperClasses = new ConcurrentHashSet<Class<?>>();
                wrappers = cachedWrapperClasses;
            }
            wrappers.add(clazz);
        } else {
            //如果没有指定自定义的适配拓展实现类,且没有包装类,说明是普通的拓展点实现类
            clazz.getConstructor();
            if (name == null || name.length() == 0) {
                name = findAnnotationName(clazz);
                if (name.length() == 0) {
                    throw new IllegalStateException("No such extension name for the class " + clazz.getName() + " in the config " + resourceURL);
                }
            }
            //分割name,获取name数组
            String[] names = NAME_SEPARATOR.split(name);
            if (names != null && names.length > 0) {
                //判断是否被@Active修饰的拓展点实现类,如果是,则使用cachedActivates缓存
                Activate activate = clazz.getAnnotation(Activate.class);
                if (activate != null) {
                    cachedActivates.put(names[0], activate);
                } else {
                    // support com.alibaba.dubbo.common.extension.Activate
                    com.alibaba.dubbo.common.extension.Activate oldActivate = clazz.getAnnotation(com.alibaba.dubbo.common.extension.Activate.class);
                    if (oldActivate != null) {
                        cachedActivates.put(names[0], oldActivate);
                    }
                }
                //同一个实现类对应多个name,以name=>class形式存放于extensionClasses,adaptive修饰的不会存放,包装类不会存放
                for (String n : names) {
                    if (!cachedNames.containsKey(clazz)) {
                        cachedNames.put(clazz, n);
                    }
                    Class<?> c = extensionClasses.get(n);
                    if (c == null) {
                        extensionClasses.put(n, clazz);
                    } else if (c != clazz) {
                        throw new IllegalStateException("Duplicate extension " + type.getName() + " name " + n + " on " + c.getName() + " and " + clazz.getName());
                    }
                }
            }
        }
    }

这个方法做了这么几件事:

至此,getExtensionClasses的逻辑走完,梳理下它的逻辑: 1、判断拓展点实现类的缓存cachedClasses是否为空,如果为空,进行加载操作 2、根据传入类型,获取SPI注解的值,然后设置默认名cachedDefaultNam 3、在META-INF/dubbo/internal/ META-INF/dubbo/ META-INF/services/这三个目录下进行拓展点加载 4、遍历文件(文件名为传入类型type的全路径名称)内容,然后根据name->className加载class

到这里,已经完成了相关拓展点实现类、@Adaptive修饰的拓展点实现类、@Active修饰的拓展点实现类、包装Wrapper拓展点实现类的加载,回到getAdaptiveExtensionClass,当前缓存cachedAdaptiveClass如果不为空,则说明有@Adaptive修饰的拓展点实现类,直接返回,如果没有,则开始创建动态自适应拓展点createAdaptiveExtensionClass.

createAdaptiveExtensionClass

    private Class<?> createAdaptiveExtensionClass() {
        String code = createAdaptiveExtensionClassCode();
        ClassLoader classLoader = findClassLoader();
        //动态编译Extension,名字为typeName+"$Adaptive",如Protocol$Adaptive
        //Protocol$Adaptive的主要功能
        //1. 从url或扩展接口获取扩展接口实现类的名称;
        //2.根据名称,获取实现类ExtensionLoader.getExtensionLoader(扩展接口类).getExtension(扩展接口实现类名称),然后调用实现类的方法。
        //需要明白一点dubbo的内部传参基本上都是基于Url来实现的,也就是说Dubbo是基于URL驱动的技术
        //所以,适配器类的目的是在运行期获取扩展的真正实现来调用,解耦接口和实现,这样的话要不我们自己实现适配器类,要不dubbo帮我们生成,而这些都是通过Adpative来实现。
        org.apache.dubbo.common.compiler.Compiler compiler = ExtensionLoader.getExtensionLoader(org.apache.dubbo.common.compiler.Compiler.class).getAdaptiveExtension();
        return compiler.compile(code, classLoader);
    }

上面这个方法,通过动态代理直接生成class,名为typeName+$Adaptive,debug得到Protocol$Adaptive,源码内容为

package com.wl.dubbo;

/**
 * @Author: liumenglong
 * @Date: 2018/9/29 22:48
 * @Description:
 */

import com.alibaba.dubbo.common.extension.ExtensionLoader;

public class Protocol$Adaptive implements com.alibaba.dubbo.rpc.Protocol {
    public void destroy() {
        throw new UnsupportedOperationException("method public abstract void com.alibaba.dubbo.rpc.Protocol.destroy() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
    }

    public int getDefaultPort() {
        throw new UnsupportedOperationException("method public abstract int com.alibaba.dubbo.rpc.Protocol.getDefaultPort() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
    }

    public com.alibaba.dubbo.rpc.Invoker refer(java.lang.Class arg0, com.alibaba.dubbo.common.URL arg1) throws com.alibaba.dubbo.rpc.RpcException {
        if (arg1 == null) throw new IllegalArgumentException("url == null");
        com.alibaba.dubbo.common.URL url = arg1;
        //根据URL获取到对应的拓展名,如果没有指定,则默认取“dubbo”
        String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
        if (extName == null)
            throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
        //根据拓展名获取对应的拓展点实现类
        com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
        //调用实际拓展点的refer方法,如DubboProtocol
        return extension.refer(arg0, arg1);
    }

    public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException {
        if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
        if (arg0.getUrl() == null)
            throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");
        com.alibaba.dubbo.common.URL url = arg0.getUrl();
        //根据URL获取到对应的拓展名,如果没有指定,则默认取“dubbo”
        String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
        if (extName == null)
            throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
        //根据拓展名获取对应的拓展点实现类
        com.alibaba.dubbo.rpc.Protocol extension = 
                (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.
                        getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
        //调用实际拓展点的export方法,如DubboProtocol
        return extension.export(arg0);
    }
}

何为动态自适应拓展点,就是通过传入的Url对象,获取到相应的拓展点名称,根据拓展名获取到具体的拓展点实现类,从而调用该实现类的方法,这个方法很好的体现了对修改关闭对拓展开放的原则,用户如果需要实现新的拓展点实现类,只需要按规则配置好相应的类和文件即可,无需修改核心代码。

再回到createAdaptiveExtension,

 return injectExtension((T) getAdaptiveExtensionClass().newInstance());

上面的getAdaptiveExtensionClass已经完成相应class的加载,且通过newInstance完成了实例化,接下来进行注入injectExtension

injectExtension

private T injectExtension(T instance) {
    try {
        //如果当前objectFactory不为空,说明传入type不是ExtensionFactory类型(具体原因见当前类的构造方法),则可以注入
        if (objectFactory != null) {
            for (Method method : instance.getClass().getMethods()) {
                //判断方法是否以set为前缀,且方法参数个数是1个,且方法是public的,如果是,进入注入逻辑
                if (method.getName().startsWith("set")
                        && method.getParameterTypes().length == 1
                        && Modifier.isPublic(method.getModifiers())) {
                    //获取该方法的class类型
                    Class<?> pt = method.getParameterTypes()[0];
                    try {
                        //获取属性名称
                        String property = method.getName().length() > 3 ? method.getName().substring(3, 4).toLowerCase() + method.getName().substring(4) : "";
                        // 根据类型和属性名称,获取实例
                        Object object = objectFactory.getExtension(pt, property);
                        if (object != null) {
                            //进行注入
                            method.invoke(instance, object);
                        }
                    } catch (Exception e) {
                        logger.error("fail to inject via method " + method.getName()
                                + " of interface " + type.getName() + ": " + e.getMessage(), e);
                    }
                }
            }
        }
    } catch (Exception e) {
        logger.error(e.getMessage(), e);
    }
    return instance;
}

这里首先分析objectFactory,因此需回到ExtensionLoader.getExtensionLoader,本来应该先分析这个方法,但由于getExtensionLoader不影响前面加载拓展点实现类的逻辑,故先不讲解,且前面加载拓展点实现类的逻辑是理解ExtensionLoader.getExtensionLoader的基础,因此,直到注入这里,再对ExtensionLoader.getExtensionLoader进行分析.

getExtensionLoader

  //工厂方法,根据传入类型获取对应ExtensionLoader
    public static <T> ExtensionLoader<T> getExtensionLoader(Class<T> type) {
        //传入的类型必须是接口且不为空,且必须是SPI修饰的可拓展点
        if (type == null)
            throw new IllegalArgumentException("Extension type == null");
        if (!type.isInterface()) {
            throw new IllegalArgumentException("Extension type(" + type + ") is not interface!");
        }
        if (!withExtensionAnnotation(type)) {
            throw new IllegalArgumentException("Extension type(" + type +
                    ") is not extension, because WITHOUT @" + SPI.class.getSimpleName() + " Annotation!");
        }

        //ExtensionLoader会存放在缓存之中,EXTENSION_LOADERS:type=>ExtensionLoader
        ExtensionLoader<T> loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
        if (loader == null) {
            //将ExtensionLoader放到缓存EXTENSION_LOADERS之中
            EXTENSION_LOADERS.putIfAbsent(type, new ExtensionLoader<T>(type));
            loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
        }
        return loader;
    }

这里主要是对非SPI修饰的类进行拦截,然后根据传入类型typeEXTENSION_LOADERS缓存中获取实例,如果为空,则进行实例创建,然后添加到缓存中。 对实例创建的过程new ExtensionLoader<T>(type)进行解析: 进入该构造函数:

   private ExtensionLoader(Class<?> type) {
        this.type = type;
        objectFactory = (type == ExtensionFactory.class ? null : ExtensionLoader.getExtensionLoader(ExtensionFactory.class).getAdaptiveExtension());
    }

由于当前typeExtensionFactory,故调用ExtensionLoader.getExtensionLoader(ExtensionFactory.class).getAdaptiveExtension(),由上面可得,当调用ExtensionLoader.getAdaptiveExtension的时候,如果该拓展点的实现类有被@Adaptive修饰的,则返回该实例,而查看ExtensionFactory接口的实现类可得,有符合该条件的实现类,如图 1539342675108.png

因此,objectFactory不为空,且是AdaptiveExtensionFactory.

public class AdaptiveExtensionFactory implements ExtensionFactory {

    private final List<ExtensionFactory> factories;

    public AdaptiveExtensionFactory() {
        ExtensionLoader<ExtensionFactory> loader = ExtensionLoader.getExtensionLoader(ExtensionFactory.class);
        List<ExtensionFactory> list = new ArrayList<ExtensionFactory>();
        //获取该拓展点的所有实现类的名称,并进行遍历
        for (String name : loader.getSupportedExtensions()) {
            //往list里面添加根据name获取到的拓展点实例
            list.add(loader.getExtension(name));
        }
        factories = Collections.unmodifiableList(list);
    }

    @Override
    public <T> T getExtension(Class<T> type, String name) {
        //遍历所有的ExtensionFactory,当获取到的extension不为空的时候,则返回结果;
        //如果是SPI拓展点,这里最后调用SpiExtensionFactory.getExtension,然后调用ExtensionLoader.getAdaptiveExtension方法返回拓展点实例
        //如果非SPI拓展点,这里通过SpringExtensionFactory获取
        // SpringExtensionFactory不支持SPI拓展点实例的获取,详见SpringExtensionFactory的getExtension方法
        for (ExtensionFactory factory : factories) {
            T extension = factory.getExtension(type, name);
            if (extension != null) {
                return extension;
            }
        }
        return null;
    }

}

首先看AdaptiveExtensionFactory的构造方法,获取ExtensionFactory的所有拓展点实现类,然后使用list列表factories存储,这里list的内容为SpiExtensionFactorySpringExtensionFactory,如图 1539343310182.png

再看AdaptiveExtensionFactory.getExtension(Class<T> type, String name):

到这里,注入的逻辑也分析结束,总结下注入的逻辑:

总结

走读了以上代码,对此过程做个总结:

参考

扩展点加载