摘要: 原创出处 https://www.jianshu.com/p/47816d0f1317 「鋒Nic」欢迎转载,保留摘要,谢谢!
简介摘要
SOFARPC服务发布创建服务运行容器配置ServerConfig,设置基础配置并且通过配置文件加载服务端默认配置;创建服务发布配置ProviderConfig,设置接口名称、接口实现类引用以及指定服务端配置;通过服务发布启动类ProviderBootstrap发布服务。SOFARPC服务引用按照编程界面分为两种使用SOFARPC的方式:
1.通过SOFARPC使用:
服务引用过程涉及到RegistryConfig注册中心配置类以及ConsumerConfig服务引用配置类。
(1)RegistryConfig注册中心配置类
RegistryConfig registryConfig = new RegistryConfig() .setProtocol("zookeeper" ) .setAddress("127.0.0.1:2181" )
RegistryConfig表示注册中心,如上声明服务注册中心的地址和端口是127.0.0.1:2181,协议是Zookeeper。
(2) ConsumerConfig服务引用配置类
ConsumerConfig<HelloService> consumerConfig = new ConsumerConfig<HelloService>() .setInterfaceId(HelloService.class.getName()) .setRegistry(registryConfig); HelloService helloService = consumerConfig.refer();
ConsumerConfig表示服务引用,如上声明所引用服务的接口和服务注册中心。最终通过refer方法将此服务引用,获取到该服务的远程调用的代理。
SOFARPC服务引用支持如下特性:
(1)同一服务注册多个注册中心,构建多个RegistryConfig设置给 ConsumerConfig:
List<RegistryConfig> registryConfigs = new ArrayList<RegistryConfig>(); registryConfigs.add(registryA); registryConfigs.add(registryB); consumerConfig.setRegistry(registryConfigs);
(2)提供MethodConfig进行方法级别参数设置,API方式使用相应的对象set法即可为其设置参数:
MethodConfig methodConfigA = new MethodConfig(); MethodConfig methodConfigB = new MethodConfig(); List<MethodConfig> methodConfigs = new ArrayList<MethodConfig>(); methodConfigs.add(methodConfigA); methodConfigs.add(methodConfigB); consumerConfig.setMethods(methodConfigs);
2.通过SOFABoot使用:
(1)服务引用使用XML配置通过sofa:reference元素表示引用服务,XML配置如下所示就能够引用SOFARPC服务:
<sofa:reference id ="helloSyncServiceReference" interface ="com.alipay.sofa.rpc.samples.invoke.HelloSyncService" > <sofa:binding.bolt /> </sofa:reference >
如上通过sofa:reference元素引用了一个服务,其中id属性表示该服务引用在Spring上下文中的唯一标识,interface表示该服务的接口。sofa:binding.bolt表示该服务引用调用时使用的协议为bolt。
当引用服务的SOFARPC应用启动的时候从服务注册中心订阅到相应服务的元数据信息。服务注册中心收到订阅请求将发布方的元数据列表实时推送给服务引用方。当服务引用方拿到发布方的地址从中选取地址发起服务调用。SOFARPC应用启动为每一个服务引用生成一个远程调用的代理,Spring获取Bean通过id获取到服务引用进行使用:
HelloSyncService helloSyncServiceReference = (HelloSyncService) applicationContext .getBean("helloSyncServiceReference" ); String result = helloSyncServiceReference.saySync("sync" );
每一个服务引用对应sofa:binding元素,也就是说如果想对同一个服务发起不同协议的调用,需要如下配置:
<sofa:reference id ="boltHelloSyncServiceReference" interface ="com.alipay.sofa.rpc.samples.invoke.HelloSyncService" > <sofa:binding.bolt /> </sofa:reference > <sofa:reference id ="restHelloSyncServiceReference" interface ="com.alipay.sofa.rpc.samples.invoke.HelloSyncService" > <sofa:binding.rest /> </sofa:reference > <sofa:reference id ="dubboHelloSyncServiceReference" interface ="com.alipay.sofa.rpc.samples.invoke.HelloSyncService" > <sofa:binding.dubbo /> </sofa:reference >
声明服务引用的同时设置需要的参数,global-attrs元素设置调用超时,地址等待时间等参数; target-url元素能够设置直连调用的地址;method标签能够设置方法级别参数:
<sofa:reference id ="personReferenceBolt" interface ="com.alipay.sofa.boot.examples.demo.rpc.bean.PersonService" > <sofa:binding.bolt > <sofa:global-attrs timeout ="3000" address-wait-time ="2000" /> <sofa:route target-url ="127.0.0.1:22000" /> <sofa:method name ="sayName" timeout ="3000" /> </sofa:binding.bolt > </sofa:reference > <sofa:reference id ="personReferenceBolt" interface ="com.alipay.sofa.boot.examples.demo.rpc.bean.PersonService" > <sofa:binding.bolt /> </sofa:reference > <sofa:reference id ="personReferenceRest" interface ="com.alipay.sofa.boot.examples.demo.rpc.bean.PersonService" > <sofa:binding.rest /> </sofa:reference > <sofa:reference id ="personReferenceDubbo" interface ="com.alipay.sofa.boot.examples.demo.rpc.bean.PersonService" > <sofa:binding.dubbo /> </sofa:reference >
其中sofa:reference元素表示引用该服务,sofa:binding元素声明该服务引用的调用的协议。如上Spring上下文构建三个服务的远程代理类,名字分别为personReferenceBolt、personReferenceRest以及personReferenceDubbo。
(2)服务引用使用注解方式通过@SofaReference注解表示引用服务,interfaceType元素指定服务接口,bindings元素指定协议类型(多协议场景使用@SofaReferenceBinding注解bindingType元素指定协议类型,暂时只支持bolt协议):
@Component public class AnnotationClientImpl { @SofaReference (interfaceType = AnnotationService.class, binding = @SofaReferenceBinding (bindingType = "bolt" )) private AnnotationService annotationService; public String sayClientAnnotation (String str) { String result = annotationService.sayAnnotation(str); return result; } }
客户端模块包含集群管理、路由、地址管理器、连接管理器、负载均衡器,以及与代理、注册中心等模块交互:
客户端调用流程
源码解析
搭建环境服务发布示例:
package org.alipay.sofa.rpc;import com.alipay.sofa.rpc.config.ConsumerConfig;public class RpcClient { public static void main (String[] args) { ConsumerConfig<HelloService> consumerConfig = new ConsumerConfig<HelloService>() .setInterfaceId(HelloService.class.getName()) .setProtocol("bolt" ) .setDirectUrl("bolt://127.0.0.1:12200" ) .setConnectTimeout(10 * 1000 ); HelloService helloService = consumerConfig.refer(); while (true ) { try { System.out.println(helloService.sayHello("world" )); } catch (Exception e) { e.printStackTrace(); } try { Thread.sleep(2000 ); } catch (InterruptedException e) { e.printStackTrace(); } } } }
参考SOFARPC Example示例模块(com.alipay.sofa.rpc.quickstart.QuickStartClient ):
package com.alipay.sofa.rpc.quickstart;import com.alipay.sofa.rpc.config.ConsumerConfig;public class QuickStartClient { public static void main (String[] args) { ConsumerConfig<HelloService> consumerConfig = new ConsumerConfig<HelloService>() .setInterfaceId(HelloService.class.getName()) .setProtocol("bolt" ) .setDirectUrl("bolt://127.0.0.1:12200" ) .setConnectTimeout(10 * 1000 ); HelloService helloService = consumerConfig.refer(); while (true ) { try { System.out.println(helloService.sayHello("world" )); } catch (Exception e) { e.printStackTrace(); } try { Thread.sleep(2000 ); } catch (InterruptedException e) { e.printStackTrace(); } } } }
运行服务引用端示例类QuickStartClient查看消费端运行效果,服务消费者输出日志如下:
2018 -05 -18 01 :51 :20 ,472 main INFO [com.alipay.sofa.rpc.context.RpcRuntimeContext:info:102 ] - Welcome! Loading SOFA RPC Framework : 5.4 .0_20180427231325, PID is:1424 2018 -05 -18 01 :51 :20 ,555 main INFO [com.alipay.sofa.rpc.module .ModuleFactory:info:102 ] - Install Module: fault-tolerance2018 -05 -18 01 :51 :20 ,658 main INFO [com.alipay.sofa.rpc.bootstrap.DefaultConsumerBootstrap:infoWithApp:122 ] - Refer consumer config : bolt:2018 -05 -18 01 :51 :21 ,119 main INFO [com.alipay.sofa.rpc.client.AllConnectConnectionHolder:infoWithApp:122 ] - Add provider of com.alipay.sofa.rpc.quickstart.HelloService, size is : 1 Sofa-Middleware-Log SLF4J : Actual binding is of type [ com.alipay.remoting Log4j ] 2018 -05 -18 01 :51 :21 ,322 SOFA-CLI-CONN-com.alipay.sofa.rpc.quickstart.HelloService-3 -T1 INFO [com.alipay.sofa.common.log:report:30 ] - Sofa-Middleware-Log SLF4J : Actual binding is of type [ com.alipay.remoting Log4j ]log4j:ERROR Failed to rename [C:\Users\Administrator\logs/bolt/connection-event.log] to [C:\Users\Administrator\logs/bolt/connection-event.log.2018-05 -12 ]. 2018 -05 -18 01 :51 :25 ,236 SOFA-CLI-CONN-com.alipay.sofa.rpc.quickstart.HelloService-3 -T1 INFO [com.alipay.sofa.rpc.client.AllConnectConnectionHolder:infoWithApp:122 ] - Connect to com.alipay.sofa.rpc.quickstart.HelloService provider:bolt:hello world ! hello world !
参考sofa-rpc-boot-projects范例模块(com.alipay.sofa.rpc.samples.annotation ):
package com.alipay.sofa.rpc.samples.annotation;public interface AnnotationService { String sayAnnotation (String stirng) ; } package com.alipay.sofa.rpc.samples.annotation;import com.alipay.sofa.runtime.api.annotation.SofaReference;import com.alipay.sofa.runtime.api.annotation.SofaReferenceBinding;import org.springframework.stereotype.Component;@Component public class AnnotationClientImpl { @SofaReference (interfaceType = AnnotationService.class, binding = @SofaReferenceBinding (bindingType = "bolt" )) private AnnotationService annotationService; public String sayClientAnnotation (String str) { String result = annotationService.sayAnnotation(str); return result; } } package com.alipay.sofa.rpc.samples.annotation;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;import org.springframework.context.ApplicationContext;@SpringBootApplication public class AnotationClientApplication { public static void main (String[] args) { System.setProperty("server.port" , "8081" ); SpringApplication springApplication = new SpringApplication(AnotationClientApplication.class); ApplicationContext applicationContext = springApplication.run(args); AnnotationClientImpl annotationService = applicationContext.getBean(AnnotationClientImpl.class); String result = annotationService.sayClientAnnotation("annotation" ); System.out.println("invoke result:" + result); if ("annotation" .equalsIgnoreCase(result)) { System.out.println("annotation invoke success" ); } else { System.out.println("annotation invoke fail" ); } } }
运行服务引用端范例类AnnotationClientApplication查看消费端运行效果,服务消费者输出日志如下:
. ____ _ __ _ _ /\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \ ( ( )\___ | ' _ | '_| | ' _ \/ _` | \ \ \ \ \\/ ___)| |_)| | | | | || (_| | ) ) ) ) ' |____| .__|_| |_|_| |_\__, | / / / / =========|_|==============|___/=/_/_/_/ :: Spring Boot :: (v1.4.2.RELEASE) Sofa-Middleware-Log SLF4J : Actual logging.path is [ ./logs ] 2018-06-06 00:35:18.357 INFO 10108 --- [ main] com.alipay.sofa.common.log : Sofa-Middleware-Log SLF4J : Actual logging.path is [ ./logs ] Sofa-Middleware-Log SLF4J : Actual binding is of type [ com.alipay.sofa.infra Logback ] 2018-06-06 00:35:18.384 INFO 10108 --- [ main] com.alipay.sofa.common.log : Sofa-Middleware-Log SLF4J : Actual binding is of type [ com.alipay.sofa.infra Logback ] Sofa-Middleware-Log SLF4J : Actual binding is of type [ com.alipay.sofa.healthcheck Logback ] 2018-06-06 00:35:18.687 INFO 10108 --- [ main] com.alipay.sofa.common.log : Sofa-Middleware-Log SLF4J : Actual binding is of type [ com.alipay.sofa.healthcheck Logback ] 2018-06-06 00:35:18.809 INFO 10108 --- [ main] c.a.s.r.s.a.AnotationClientApplication : Starting AnotationClientApplication on Program with PID 10108 (D:\Program\Github\sofa-rpc-boot-projects\sofa-boot-samples\target\classes started by Administrator in D:\Program\Github\sofa-rpc-boot-projects) 2018-06-06 00:35:18.811 INFO 10108 --- [ main] c.a.s.r.s.a.AnotationClientApplication : No active profile set, falling back to default profiles: default 2018-06-06 00:35:19.146 INFO 10108 --- [ main] ationConfigEmbeddedWebApplicationContext : Refreshing org.springframework.boot.context.embedded.AnnotationConfigEmbeddedWebApplicationContext@4c12331b: startup date [Wed Jun 06 00:35:19 CST 2018]; root of context hierarchy 2018-06-06 00:35:24.647 INFO 10108 --- [ main] trationDelegate$BeanPostProcessorChecker : Bean ' com.alipay.sofa.runtime.spring.configuration.SofaRuntimeAutoConfiguration' of type [class com.alipay.sofa.runtime.spring.configuration.SofaRuntimeAutoConfiguration$$EnhancerBySpringCGLIB$$2f86d19a] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying) 2018-06-06 00:35:25.119 INFO 10108 --- [ main] trationDelegate$BeanPostProcessorChecker : Bean ' bindingConverterFactory' of type [class com.alipay.sofa.runtime.service.impl.BindingConverterFactoryImpl] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying) 2018-06-06 00:35:25.231 INFO 10108 --- [ main] trationDelegate$BeanPostProcessorChecker : Bean ' bindingAdapterFactory' of type [class com.alipay.sofa.runtime.service.impl.BindingAdapterFactoryImpl] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying) 2018-06-06 00:35:25.235 INFO 10108 --- [ main] trationDelegate$BeanPostProcessorChecker : Bean ' sofaRuntimeProperties' of type [class com.alipay.sofa.runtime.spring.config.SofaRuntimeProperties] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying) 2018-06-06 00:35:25.247 INFO 10108 --- [ main] trationDelegate$BeanPostProcessorChecker : Bean ' sofaRuntimeContext' of type [class com.alipay.sofa.runtime.spi.component.SofaRuntimeContext] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying) 2018-06-06 00:35:26.374 INFO 10108 --- [ main] s.b.c.e.t.TomcatEmbeddedServletContainer : Tomcat initialized with port(s): 8081 (http) 2018-06-06 00:35:26.400 INFO 10108 --- [ main] o.apache.catalina.core.StandardService : Starting service Tomcat 2018-06-06 00:35:26.402 INFO 10108 --- [ main] org.apache.catalina.core.StandardEngine : Starting Servlet Engine: Apache Tomcat/8.5.6 2018-06-06 00:35:26.897 INFO 10108 --- [ost-startStop-1] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring embedded WebApplicationContext 2018-06-06 00:35:26.898 INFO 10108 --- [ost-startStop-1] o.s.web.context.ContextLoader : Root WebApplicationContext: initialization completed in 7752 ms 2018-06-06 00:35:27.573 INFO 10108 --- [ost-startStop-1] o.s.b.w.servlet.ServletRegistrationBean : Mapping servlet: ' dispatcherServlet' to [/] 2018-06-06 00:35:27.582 INFO 10108 --- [ost-startStop-1] o.s.b.w.servlet.FilterRegistrationBean : Mapping filter: ' metricsFilter' to: [/*] 2018-06-06 00:35:27.583 INFO 10108 --- [ost-startStop-1] o.s.b.w.servlet.FilterRegistrationBean : Mapping filter: ' characterEncodingFilter' to: [/*] 2018-06-06 00:35:27.583 INFO 10108 --- [ost-startStop-1] o.s.b.w.servlet.FilterRegistrationBean : Mapping filter: ' hiddenHttpMethodFilter' to: [/*] 2018-06-06 00:35:27.584 INFO 10108 --- [ost-startStop-1] o.s.b.w.servlet.FilterRegistrationBean : Mapping filter: ' httpPutFormContentFilter' to: [/*] 2018-06-06 00:35:27.584 INFO 10108 --- [ost-startStop-1] o.s.b.w.servlet.FilterRegistrationBean : Mapping filter: ' requestContextFilter' to: [/*] 2018-06-06 00:35:27.584 INFO 10108 --- [ost-startStop-1] o.s.b.w.servlet.FilterRegistrationBean : Mapping filter: ' webRequestLoggingFilter' to: [/*] 2018-06-06 00:35:27.584 INFO 10108 --- [ost-startStop-1] o.s.b.w.servlet.FilterRegistrationBean : Mapping filter: ' applicationContextIdFilter' to: [/*] Sofa-Middleware-Log SLF4J : Actual binding is of type [ com.alipay.sofa.runtime Logback ] 2018-06-06 00:35:29.847 INFO 10108 --- [ main] com.alipay.sofa.common.log : Sofa-Middleware-Log SLF4J : Actual binding is of type [ com.alipay.sofa.runtime Logback ] Sofa-Middleware-Log SLF4J : Actual binding is of type [ com.alipay.sofa.rpc Logback ] 2018-06-06 00:35:34.481 INFO 10108 --- [ main] com.alipay.sofa.common.log : Sofa-Middleware-Log SLF4J : Actual binding is of type [ com.alipay.sofa.rpc Logback ] 2018-06-06 00:35:35.080 INFO 10108 --- [ main] o.a.c.f.imps.CuratorFrameworkImpl : Starting 2018-06-06 00:35:35.110 INFO 10108 --- [ main] org.apache.zookeeper.ZooKeeper : Client environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT 2018-06-06 00:35:35.114 INFO 10108 --- [ main] org.apache.zookeeper.ZooKeeper : Client environment:host.name=Program 2018-06-06 00:35:35.114 INFO 10108 --- [ main] org.apache.zookeeper.ZooKeeper : Client environment:java.version=1.8.0_141 2018-06-06 00:35:35.114 INFO 10108 --- [ main] org.apache.zookeeper.ZooKeeper : Client environment:java.vendor=Oracle Corporation 2018-06-06 00:35:35.115 INFO 10108 --- [ main] org.apache.zookeeper.ZooKeeper : Client environment:java.home=C:\Program Files\Java\jdk1.8.0_141\jre 2018-06-06 00:35:35.138 INFO 10108 --- [ main] org.apache.zookeeper.ZooKeeper : Client environment:java.class.path=C:\Program Files\Java\jdk1.8.0_141\jre\lib\charsets.jar;C:\Program Files\Java\jdk1.8.0_141\jre\lib\deploy.jar;C:\Program Files\Java\jdk1.8.0_141\jre\lib\ext\access-bridge-64.jar;C:\Program Files\Java\jdk1.8.0_141\jre\lib\ext\cldrdata.jar;C:\Program Files\Java\jdk1.8.0_141\jre\lib\ext\dnsns.jar;C:\Program Files\Java\jdk1.8.0_141\jre\lib\ext\jaccess.jar;C:\Program Files\Java\jdk1.8.0_141\jre\lib\ext\jfxrt.jar;C:\Program Files\Java\jdk1.8.0_141\jre\lib\ext\localedata.jar;C:\Program Files\Java\jdk1.8.0_141\jre\lib\ext\nashorn.jar;C:\Program Files\Java\jdk1.8.0_141\jre\lib\ext\sunec.jar;C:\Program Files\Java\jdk1.8.0_141\jre\lib\ext\sunjce_provider.jar;C:\Program Files\Java\jdk1.8.0_141\jre\lib\ext\sunmscapi.jar;C:\Program Files\Java\jdk1.8.0_141\jre\lib\ext\sunpkcs11.jar;C:\Program Files\Java\jdk1.8.0_141\jre\lib\ext\zipfs.jar;C:\Program Files\Java\jdk1.8.0_141\jre\lib\javaws.jar;C:\Program Files\Java\jdk1.8.0_141\jre\lib\jce.jar;C:\Program Files\Java\jdk1.8.0_141\jre\lib\jfr.jar;C:\Program Files\Java\jdk1.8.0_141\jre\lib\jfxswt.jar;C:\Program Files\Java\jdk1.8.0_141\jre\lib\jsse.jar;C:\Program Files\Java\jdk1.8.0_141\jre\lib\management-agent.jar;C:\Program Files\Java\jdk1.8.0_141\jre\lib\plugin.jar;C:\Program Files\Java\jdk1.8.0_141\jre\lib\resources.jar;C:\Program Files\Java\jdk1.8.0_141\jre\lib\rt.jar;D:\Program\Github\sofa-rpc-boot-projects\sofa-boot-samples\target\classes;D:\Program\Github\sofa-rpc-boot-projects\sofa-boot-starter\target\classes;C:\Users\Administrator\.m2\repository\com\alipay\sofa\sofa-rpc-all\5.4.0\sofa-rpc-all-5.4.0.jar;C:\Users\Administrator\.m2\repository\com\alipay\sofa\bolt\1.4.1\bolt-1.4.1.jar;C:\Users\Administrator\.m2\repository\org\slf4j\slf4j-api\1.7.21\slf4j-api-1.7.21.jar;C:\Users\Administrator\.m2\repository\io\netty\netty-all\4.1.25.Final\netty-all-4.1.25.Final.jar;C:\Users\Administrator\.m2\repository\com\alipay\sofa\hessian\3.3.0\hessian-3.3.0.jar;C:\Users\Administrator\.m2\repository\com\alipay\sofa\tracer-core\2.1.1\tracer-core-2.1.1.jar;C:\Users\Administrator\.m2\repository\io\opentracing\opentracing-api\0.22.0\opentracing-api-0.22.0.jar;C:\Users\Administrator\.m2\repository\io\opentracing\opentracing-noop\0.22.0\opentracing-noop-0.22.0.jar;C:\Users\Administrator\.m2\repository\io\opentracing\opentracing-mock\0.22.0\opentracing-mock-0.22.0.jar;C:\Users\Administrator\.m2\repository\io\opentracing\opentracing-util\0.22.0\opentracing-util-0.22.0.jar;C:\Users\Administrator\.m2\repository\com\alipay\sofa\lookout\lookout-api\1.4.0\lookout-api-1.4.0.jar;C:\Users\Administrator\.m2\repository\com\alipay\sofa\runtime-sofa-boot-starter\2.4.0\runtime-sofa-boot-starter-2.4.0.jar;C:\Users\Administrator\.m2\repository\org\apache\curator\curator-client\2.9.1\curator-client-2.9.1.jar;C:\Users\Administrator\.m2\repository\org\apache\curator\curator-framework\2.9.1\curator-framework-2.9.1.jar;C:\Users\Administrator\.m2\repository\org\apache\curator\curator-recipes\2.9.1\curator-recipes-2.9.1.jar;C:\Users\Administrator\.m2\repository\org\jboss\resteasy\resteasy-jaxrs\3.0.12.Final\resteasy-jaxrs-3.0.12.Final.jar;C:\Users\Administrator\.m2\repository\org\jboss\spec\javax\annotation\jboss-annotations-api_1.1_spec\1.0.1.Final\jboss-annotations-api_1.1_spec-1.0.1.Final.jar;C:\Users\Administrator\.m2\repository\javax\activation\activation\1.1.1\activation-1.1.1.jar;C:\Users\Administrator\.m2\repository\org\apache\httpcomponents\httpclient\4.3.6\httpclient-4.3.6.jar;C:\Users\Administrator\.m2\repository\org\apache\httpcomponents\httpcore\4.3.3\httpcore-4.3.3.jar;C:\Users\Administrator\.m2\repository\commons-logging\commons-logging\1.1.3\commons-logging-1.1.3.jar;C:\Users\Administrator\.m2\repository\commons-codec\commons-codec\1.6\commons-codec-1.6.jar;C:\Users\Administrator\.m2\repository\commons-io\commons-io\2.1\commons-io-2.1.jar;C:\Users\Administrator\.m2\repository\net\jcip\jcip-annotations\1.0\jcip-annotations-1.0.jar;C:\Users\Administrator\.m2\repository\org\jboss\resteasy\resteasy-client\3.0.12.Final\resteasy-client-3.0.12.Final.jar;C:\Users\Administrator\.m2\repository\org\jboss\resteasy\resteasy-jackson-provider\3.0.12.Final\resteasy-jackson-provider-3.0.12.Final.jar;C:\Users\Administrator\.m2\repository\org\codehaus\jackson\jackson-core-asl\1.9.12\jackson-core-asl-1.9.12.jar;C:\Users\Administrator\.m2\repository\org\codehaus\jackson\jackson-mapper-asl\1.9.12\jackson-mapper-asl-1.9.12.jar;C:\Users\Administrator\.m2\repository\org\codehaus\jackson\jackson-jaxrs\1.9.12\jackson-jaxrs-1.9.12.jar;C:\Users\Administrator\.m2\repository\org\codehaus\jackson\jackson-xc\1.9.12\jackson-xc-1.9.12.jar;C:\Users\Administrator\.m2\repository\org\jboss\resteasy\resteasy-netty4\3.0.12.Final\resteasy-netty4-3.0.12.Final.jar;C:\Users\Administrator\.m2\repository\org\jboss\resteasy\resteasy-validator-provider-11\3.0.12.Final\resteasy-validator-provider-11-3.0.12.Final.jar;C:\Users\Administrator\.m2\repository\org\hibernate\hibernate-validator\5.0.1.Final\hibernate-validator-5.0.1.Final.jar;C:\Users\Administrator\.m2\repository\javax\validation\validation-api\1.1.0.Final\validation-api-1.1.0.Final.jar;C:\Users\Administrator\.m2\repository\org\jboss\logging\jboss-logging\3.1.1.GA\jboss-logging-3.1.1.GA.jar;C:\Users\Administrator\.m2\repository\com\fasterxml\classmate\0.8.0\classmate-0.8.0.jar;C:\Users\Administrator\.m2\repository\org\jboss\resteasy\jaxrs-api\3.0.12.Final\jaxrs-api-3.0.12.Final.jar;C:\Users\Administrator\.m2\repository\org\jboss\resteasy\resteasy-multipart-provider\3.0.12.Final\resteasy-multipart-provider-3.0.12.Final.jar;C:\Users\Administrator\.m2\repository\org\jboss\resteasy\resteasy-jaxb-provider\3.0.12.Final\resteasy-jaxb-provider-3.0.12.Final.jar;C:\Users\Administrator\.m2\repository\com\sun\xml\bind\jaxb-impl\2.2.7\jaxb-impl-2.2.7.jar;C:\Users\Administrator\.m2\repository\com\sun\xml\bind\jaxb-core\2.2.7\jaxb-core-2.2.7.jar;C:\Users\Administrator\.m2\repository\javax\xml\bind\jaxb-api\2.2.7\jaxb-api-2.2.7.jar;C:\Users\Administrator\.m2\repository\com\sun\istack\istack-commons-runtime\2.16\istack-commons-runtime-2.16.jar;C:\Users\Administrator\.m2\repository\com\sun\xml\fastinfoset\FastInfoset\1.2.12\FastInfoset-1.2.12.jar;C:\Users\Administrator\.m2\repository\javax\xml\bind\jsr173_api\1.0\jsr173_api-1.0.jar;C:\Users\Administrator\.m2\repository\javax\mail\mail\1.5.0-b01\mail-1.5.0-b01.jar;C:\Users\Administrator\.m2\repository\org\apache\james\apache-mime4j\0.6\apache-mime4j-0.6.jar;C:\Users\Administrator\.m2\repository\javax\el\javax.el-api\2.2.5\javax.el-api-2.2.5.jar;C:\Users\Administrator\.m2\repository\org\glassfish\web\javax.el\2.2.6\javax.el-2.2.6.jar;C:\Users\Administrator\.m2\repository\com\alibaba\dubbo\2.4.10\dubbo-2.4.10.jar;C:\Users\Administrator\.m2\repository\org\jboss\netty\netty\3.2.5.Final\netty-3.2.5.Final.jar;C:\Users\Administrator\.m2\repository\org\springframework\spring-beans\4.3.4.RELEASE\spring-beans-4.3.4.RELEASE.jar;C:\Users\Administrator\.m2\repository\org\springframework\spring-core\4.3.4.RELEASE\spring-core-4.3.4.RELEASE.jar;C:\Users\Administrator\.m2\repository\org\springframework\boot\spring-boot-autoconfigure\1.4.2.RELEASE\spring-boot-autoconfigure-1.4.2.RELEASE.jar;C:\Users\Administrator\.m2\repository\com\alipay\sofa\test-sofa-boot-starter\2.4.0\test-sofa-boot-starter-2.4.0.jar;C:\Users\Administrator\.m2\repository\com\alipay\sofa\healthcheck-sofa-boot-starter\2.4.0\healthcheck-sofa-boot-starter-2.4.0.jar;C:\Users\Administrator\.m2\repository\com\alipay\sofa\infra-sofa-boot-starter\2.4.0\infra-sofa-boot-starter-2.4.0.jar;C:\Users\Administrator\.m2\repository\org\springframework\spring-context\4.3.4.RELEASE\spring-context-4.3.4.RELEASE.jar;C:\Users\Administrator\.m2\repository\org\springframework\spring-aop\4.3.4.RELEASE\spring-aop-4.3.4.RELEASE.jar;C:\Users\Administrator\.m2\repository\org\springframework\spring-expression\4.3.4.RELEASE\spring-expression-4.3.4.RELEASE.jar;C:\Users\Administrator\.m2\repository\org\springframework\boot\spring-boot\1.4.2.RELEASE\spring-boot-1.4.2.RELEASE.jar;C:\Users\Administrator\.m2\repository\org\springframework\boot\spring-boot-starter-actuator\1.4.2.RELEASE\spring-boot-starter-actuator-1.4.2.RELEASE.jar;C:\Users\Administrator\.m2\repository\org\springframework\boot\spring-boot-starter\1.4.2.RELEASE\spring-boot-starter-1.4.2.RELEASE.jar;C:\Users\Administrator\.m2\repository\org\springframework\boot\spring-boot-starter-logging\1.4.2.RELEASE\spring-boot-starter-logging-1.4.2.RELEASE.jar;C:\Users\Administrator\.m2\repository\ch\qos\logback\logback-classic\1.1.7\logback-classic-1.1.7.jar;C:\Users\Administrator\.m2\repository\ch\qos\logback\logback-core\1.1.7\logback-core-1.1.7.jar;C:\Users\Administrator\.m2\repository\org\slf4j\jcl-over-slf4j\1.7.21\jcl-over-slf4j-1.7.21.jar;C:\Users\Administrator\.m2\repository\org\slf4j\jul-to-slf4j\1.7.21\jul-to-slf4j-1.7.21.jar;C:\Users\Administrator\.m2\repository\org\slf4j\log4j-over-slf4j\1.7.21\log4j-over-slf4j-1.7.21.jar;C:\Users\Administrator\.m2\repository\org\yaml\snakeyaml\1.17\snakeyaml-1.17.jar;C:\Users\Administrator\.m2\repository\org\springframework\boot\spring-boot-actuator\1.4.2.RELEASE\spring-boot-actuator-1.4.2.RELEASE.jar;C:\Users\Administrator\.m2\repository\org\springframework\boot\spring-boot-starter-web\1.4.2.RELEASE\spring-boot-starter-web-1.4.2.RELEASE.jar;C:\Users\Administrator\.m2\repository\org\springframework\boot\spring-boot-starter-tomcat\1.4.2.RELEASE\spring-boot-starter-tomcat-1.4.2.RELEASE.jar;C:\Users\Administrator\.m2\repository\org\apache\tomcat\embed\tomcat-embed-core\8.5.6\tomcat-embed-core-8.5.6.jar;C:\Users\Administrator\.m2\repository\org\apache\tomcat\embed\tomcat-embed-el\8.5.6\tomcat-embed-el-8.5.6.jar;C:\Users\Administrator\.m2\repository\org\apache\tomcat\embed\tomcat-embed-websocket\8.5.6\tomcat-embed-websocket-8.5.6.jar;C:\Users\Administrator\.m2\repository\com\fasterxml\jackson\core\jackson-databind\2.8.4\jackson-databind-2.8.4.jar;C:\Users\Administrator\.m2\repository\com\fasterxml\jackson\core\jackson-annotations\2.8.0\jackson-annotations-2.8.0.jar;C:\Users\Administrator\.m2\repository\com\fasterxml\jackson\core\jackson-core\2.8.4\jackson-core-2.8.4.jar;C:\Users\Administrator\.m2\repository\org\springframework\spring-web\4.3.4.RELEASE\spring-web-4.3.4.RELEASE.jar;C:\Users\Administrator\.m2\repository\org\springframework\spring-webmvc\4.3.4.RELEASE\spring-webmvc-4.3.4.RELEASE.jar;C:\Users\Administrator\.m2\repository\com\alibaba\fastjson\1.2.47\fastjson-1.2.47.jar;C:\Users\Administrator\.m2\repository\com\alipay\sofa\common\sofa-common-tools\1.0.12\sofa-common-tools-1.0.12.jar;C:\Users\Administrator\.m2\repository\org\apache\curator\curator-test\2.9.1\curator-test-2.9.1.jar;C:\Users\Administrator\.m2\repository\org\apache\zookeeper\zookeeper\3.4.6\zookeeper-3.4.6.jar;C:\Users\Administrator\.m2\repository\log4j\log4j\1.2.16\log4j-1.2.16.jar;C:\Users\Administrator\.m2\repository\jline\jline\0.9.94\jline-0.9.94.jar;C:\Users\Administrator\.m2\repository\io\netty\netty\3.7.0.Final\netty-3.7.0.Final.jar;C:\Users\Administrator\.m2\repository\org\javassist\javassist\3.18.1-GA\javassist-3.18.1-GA.jar;C:\Users\Administrator\.m2\repository\org\apache\commons\commons-math\2.2\commons-math-2.2.jar;C:\Users\Administrator\.m2\repository\com\google\guava\guava\16.0.1\guava-16.0.1.jar;C:\Users\Administrator\.m2\repository\com\101tec\zkclient\0.10\zkclient-0.10.jar;D:\Program\JetBrains\IntelliJ\lib\idea_rt.jar 2018-06-06 00:35:35.148 INFO 10108 --- [ main] org.apache.zookeeper.ZooKeeper : Client environment:java.library.path=C:\Program Files\Java\jdk1.8.0_141\bin;C:\WINDOWS\Sun\Java\bin;C:\WINDOWS\system32;C:\WINDOWS;C:\ProgramData\Oracle\Java\javapath;C:\WINDOWS\system32;C:\WINDOWS;C:\WINDOWS\System32\Wbem;C:\WINDOWS\System32\WindowsPowerShell\v1.0\;C:\Program Files\Java\jdk1.8.0_141\bin;C:\Program Files\Java\jdk1.8.0_141\jre\bin;C:\Program Files\Maven\bin;C:\Program Files (x86)\Git\cmd;C:\Program Files\MySQL\MySQL Utilities 1.6\;C:\Users\Administrator\AppData\Local\Microsoft\WindowsApps;;C:\Program Files\Mercurial;. 2018-06-06 00:35:35.148 INFO 10108 --- [ main] org.apache.zookeeper.ZooKeeper : Client environment:java.io.tmpdir=C:\Users\ADMINI~1\AppData\Local\Temp\ 2018-06-06 00:35:35.148 INFO 10108 --- [ main] org.apache.zookeeper.ZooKeeper : Client environment:java.compiler=<NA> 2018-06-06 00:35:35.149 INFO 10108 --- [ main] org.apache.zookeeper.ZooKeeper : Client environment:os.name=Windows 10 2018-06-06 00:35:35.156 INFO 10108 --- [ main] org.apache.zookeeper.ZooKeeper : Client environment:os.arch=amd64 2018-06-06 00:35:35.156 INFO 10108 --- [ main] org.apache.zookeeper.ZooKeeper : Client environment:os.version=10.0 2018-06-06 00:35:35.157 INFO 10108 --- [ main] org.apache.zookeeper.ZooKeeper : Client environment:user.name=Administrator 2018-06-06 00:35:35.157 INFO 10108 --- [ main] org.apache.zookeeper.ZooKeeper : Client environment:user.home=C:\Users\Administrator 2018-06-06 00:35:35.157 INFO 10108 --- [ main] org.apache.zookeeper.ZooKeeper : Client environment:user.dir=D:\Program\Github\sofa-rpc-boot-projects 2018-06-06 00:35:35.164 INFO 10108 --- [ main] org.apache.zookeeper.ZooKeeper : Initiating client connection, connectString=127.0.0.1:2181 sessionTimeout=60000 watcher=org.apache.curator.ConnectionState@572e6fd9 2018-06-06 00:35:35.347 INFO 10108 --- [127.0.0.1:2181)] org.apache.zookeeper.ClientCnxn : Opening socket connection to server 127.0.0.1/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error) 2018-06-06 00:35:35.351 INFO 10108 --- [127.0.0.1:2181)] org.apache.zookeeper.ClientCnxn : Socket connection established to 127.0.0.1/127.0.0.1:2181, initiating session 2018-06-06 00:35:35.497 INFO 10108 --- [127.0.0.1:2181)] org.apache.zookeeper.ClientCnxn : Session establishment complete on server 127.0.0.1/127.0.0.1:2181, sessionid = 0x10005c434e80001, negotiated timeout = 40000 2018-06-06 00:35:35.507 INFO 10108 --- [ain-EventThread] o.a.c.f.state.ConnectionStateManager : State change: CONNECTED Sofa-Middleware-Log SLF4J : Actual binding is of type [ com.alipay.remoting Logback ] 2018-06-06 00:35:40.802 INFO 10108 --- [ionService-3-T1] com.alipay.sofa.common.log : Sofa-Middleware-Log SLF4J : Actual binding is of type [ com.alipay.remoting Logback ] Sofa-Middleware-Log SLF4J : Actual binding is of type [ com.alipay.sofa.rpc.boot Logback ] 2018-06-06 00:35:44.158 INFO 10108 --- [ main] com.alipay.sofa.common.log : Sofa-Middleware-Log SLF4J : Actual binding is of type [ com.alipay.sofa.rpc.boot Logback ] 2018-06-06 00:35:51.863 INFO 10108 --- [ main] s.w.s.m.m.a.RequestMappingHandlerAdapter : Looking for @ControllerAdvice: org.springframework.boot.context.embedded.AnnotationConfigEmbeddedWebApplicationContext@4c12331b: startup date [Wed Jun 06 00:35:19 CST 2018]; root of context hierarchy 2018-06-06 00:35:52.090 INFO 10108 --- [ main] s.w.s.m.m.a.RequestMappingHandlerMapping : Mapped "{[/error]}" onto public org.springframework.http.ResponseEntity<java.util.Map<java.lang.String, java.lang.Object>> org.springframework.boot.autoconfigure.web.BasicErrorController.error(javax.servlet.http.HttpServletRequest) 2018-06-06 00:35:52.092 INFO 10108 --- [ main] s.w.s.m.m.a.RequestMappingHandlerMapping : Mapped "{[/error],produces=[text/html]}" onto public org.springframework.web.servlet.ModelAndView org.springframework.boot.autoconfigure.web.BasicErrorController.errorHtml(javax.servlet.http.HttpServletRequest,javax.servlet.http.HttpServletResponse) 2018-06-06 00:35:52.172 INFO 10108 --- [ main] o.s.w.s.handler.SimpleUrlHandlerMapping : Mapped URL path [/webjars/**] onto handler of type [class org.springframework.web.servlet.resource.ResourceHttpRequestHandler] 2018-06-06 00:35:52.173 INFO 10108 --- [ main] o.s.w.s.handler.SimpleUrlHandlerMapping : Mapped URL path [/**] onto handler of type [class org.springframework.web.servlet.resource.ResourceHttpRequestHandler] 2018-06-06 00:35:52.274 INFO 10108 --- [ main] o.s.w.s.handler.SimpleUrlHandlerMapping : Mapped URL path [/**/favicon.ico] onto handler of type [class org.springframework.web.servlet.resource.ResourceHttpRequestHandler] 2018-06-06 00:35:59.017 INFO 10108 --- [ main] o.s.b.a.e.mvc.EndpointHandlerMapping : Mapped "{[/heapdump || /heapdump.json],methods=[GET],produces=[application/octet-stream]}" onto public void org.springframework.boot.actuate.endpoint.mvc.HeapdumpMvcEndpoint.invoke(boolean,javax.servlet.http.HttpServletRequest,javax.servlet.http.HttpServletResponse) throws java.io.IOException,javax.servlet.ServletException 2018-06-06 00:35:59.019 INFO 10108 --- [ main] o.s.b.a.e.mvc.EndpointHandlerMapping : Mapped "{[/sofaboot/versions || /sofaboot/versions.json],produces=[application/json]}" onto public java.lang.Object com.alipay.sofa.infra.endpoint.SofaBootVersionEndpointMvcAdapter.invoke() 2018-06-06 00:35:59.026 INFO 10108 --- [ main] o.s.b.a.e.mvc.EndpointHandlerMapping : Mapped "{[/info || /info.json],methods=[GET],produces=[application/json]}" onto public java.lang.Object org.springframework.boot.actuate.endpoint.mvc.EndpointMvcAdapter.invoke() 2018-06-06 00:35:59.029 INFO 10108 --- [ main] o.s.b.a.e.mvc.EndpointHandlerMapping : Mapped "{[/dump || /dump.json],methods=[GET],produces=[application/json]}" onto public java.lang.Object org.springframework.boot.actuate.endpoint.mvc.EndpointMvcAdapter.invoke() 2018-06-06 00:35:59.032 INFO 10108 --- [ main] o.s.b.a.e.mvc.EndpointHandlerMapping : Mapped "{[/env/{name:.*}],methods=[GET],produces=[application/json]}" onto public java.lang.Object org.springframework.boot.actuate.endpoint.mvc.EnvironmentMvcEndpoint.value(java.lang.String) 2018-06-06 00:35:59.033 INFO 10108 --- [ main] o.s.b.a.e.mvc.EndpointHandlerMapping : Mapped "{[/env || /env.json],methods=[GET],produces=[application/json]}" onto public java.lang.Object org.springframework.boot.actuate.endpoint.mvc.EndpointMvcAdapter.invoke() 2018-06-06 00:35:59.035 INFO 10108 --- [ main] o.s.b.a.e.mvc.EndpointHandlerMapping : Mapped "{[/health || /health.json],produces=[application/json]}" onto public java.lang.Object org.springframework.boot.actuate.endpoint.mvc.HealthMvcEndpoint.invoke(java.security.Principal) 2018-06-06 00:35:59.038 INFO 10108 --- [ main] o.s.b.a.e.mvc.EndpointHandlerMapping : Mapped "{[/trace || /trace.json],methods=[GET],produces=[application/json]}" onto public java.lang.Object org.springframework.boot.actuate.endpoint.mvc.EndpointMvcAdapter.invoke() 2018-06-06 00:35:59.041 INFO 10108 --- [ main] o.s.b.a.e.mvc.EndpointHandlerMapping : Mapped "{[/health/readiness || /health/readiness.json],produces=[application/json]}" onto public java.lang.Object com.alipay.sofa.healthcheck.service.SofaBootReadinessCheckMvcEndpoint.invoke(java.security.Principal) 2018-06-06 00:35:59.043 INFO 10108 --- [ main] o.s.b.a.e.mvc.EndpointHandlerMapping : Mapped "{[/logfile || /logfile.json],methods=[GET || HEAD]}" onto public void org.springframework.boot.actuate.endpoint.mvc.LogFileMvcEndpoint.invoke(javax.servlet.http.HttpServletRequest,javax.servlet.http.HttpServletResponse) throws javax.servlet.ServletException,java.io.IOException 2018-06-06 00:35:59.049 INFO 10108 --- [ main] o.s.b.a.e.mvc.EndpointHandlerMapping : Mapped "{[/configprops || /configprops.json],methods=[GET],produces=[application/json]}" onto public java.lang.Object org.springframework.boot.actuate.endpoint.mvc.EndpointMvcAdapter.invoke() 2018-06-06 00:35:59.052 INFO 10108 --- [ main] o.s.b.a.e.mvc.EndpointHandlerMapping : Mapped "{[/mappings || /mappings.json],methods=[GET],produces=[application/json]}" onto public java.lang.Object org.springframework.boot.actuate.endpoint.mvc.EndpointMvcAdapter.invoke() 2018-06-06 00:35:59.061 INFO 10108 --- [ main] o.s.b.a.e.mvc.EndpointHandlerMapping : Mapped "{[/metrics/{name:.*}],methods=[GET],produces=[application/json]}" onto public java.lang.Object org.springframework.boot.actuate.endpoint.mvc.MetricsMvcEndpoint.value(java.lang.String) 2018-06-06 00:35:59.062 INFO 10108 --- [ main] o.s.b.a.e.mvc.EndpointHandlerMapping : Mapped "{[/metrics || /metrics.json],methods=[GET],produces=[application/json]}" onto public java.lang.Object org.springframework.boot.actuate.endpoint.mvc.EndpointMvcAdapter.invoke() 2018-06-06 00:35:59.063 INFO 10108 --- [ main] o.s.b.a.e.mvc.EndpointHandlerMapping : Mapped "{[/autoconfig || /autoconfig.json],methods=[GET],produces=[application/json]}" onto public java.lang.Object org.springframework.boot.actuate.endpoint.mvc.EndpointMvcAdapter.invoke() 2018-06-06 00:35:59.066 INFO 10108 --- [ main] o.s.b.a.e.mvc.EndpointHandlerMapping : Mapped "{[/beans || /beans.json],methods=[GET],produces=[application/json]}" onto public java.lang.Object org.springframework.boot.actuate.endpoint.mvc.EndpointMvcAdapter.invoke() 2018-06-06 00:35:59.731 INFO 10108 --- [ main] o.s.j.e.a.AnnotationMBeanExporter : Registering beans for JMX exposure on startup 2018-06-06 00:35:59.764 INFO 10108 --- [ main] o.s.c.support.DefaultLifecycleProcessor : Starting beans in phase 0 2018-06-06 00:36:00.855 INFO 10108 --- [ main] s.b.c.e.t.TomcatEmbeddedServletContainer : Tomcat started on port(s): 8081 (http) 2018-06-06 00:36:00.864 INFO 10108 --- [ main] c.a.s.r.s.a.AnotationClientApplication : Started AnotationClientApplication in 44.609 seconds (JVM running for 46.813) invoke result:annotation annotation invoke success
SOFARPC服务引用流程:
(1)创建服务引用配置类ConsumerConfig,设置ConsumerConfig实例接口名称(服务接口:做为服务唯一标识的组成部分)、调用协议、直连调用地址以及连接超时时间:
public class ConsumerConfig <T > extends AbstractInterfaceConfig <T , ConsumerConfig <T >> implements Serializable
服务引用配置类ConsumerConfig继承接口级公共配置类AbstractInterfaceConfig,能够通过集成的注册中心动态调整服务引用接口&IP级别配置例如超时时间、权重等。
(2)服务引用配置类ConsumerConfig负责加载调整服务引用接口&IP级配置,根据服务消费配置获取代理对象引用,单一职责原则绑定服务消费者启动类ConsumerBootstrap实施引用服务:
public T refer () { if (consumerBootstrap == null ) { consumerBootstrap = Bootstraps.from(this ); } return consumerBootstrap.refer(); }
首先判断服务消费者启动类ConsumerBootstrap是否为空,通过引用服务辅助工具类Bootstraps按照绑定服务引用配置扩展加载工厂ExtensionLoaderFactory加载初始化ConsumerBootstrap实例,默认服务消费者启动类ConsumerBootstrap与调用协议启动类实例相同:
public static <T> ConsumerBootstrap<T> from (ConsumerConfig<T> consumerConfig) { String bootstrap = consumerConfig.getBootstrap(); ConsumerBootstrap consumerBootstrap; if (StringUtils.isNotEmpty(bootstrap)) { consumerBootstrap = ExtensionLoaderFactory.getExtensionLoader(ConsumerBootstrap.class) .getExtension(bootstrap, new Class[] { ConsumerConfig.class }, new Object[] { consumerConfig }); } else { bootstrap = consumerConfig.getProtocol(); ExtensionLoader extensionLoader = ExtensionLoaderFactory.getExtensionLoader(ConsumerBootstrap.class); ExtensionClass<ConsumerBootstrap> extensionClass = extensionLoader.getExtensionClass(bootstrap); if (extensionClass == null ) { bootstrap = RpcConfigs.getStringValue(RpcOptions.DEFAULT_CONSUMER_BOOTSTRAP); consumerConfig.setBootstrap(bootstrap); consumerBootstrap = ExtensionLoaderFactory.getExtensionLoader(ConsumerBootstrap.class) .getExtension(bootstrap, new Class[] { ConsumerConfig.class }, new Object[] { consumerConfig }); } else { consumerConfig.setBootstrap(bootstrap); consumerBootstrap = extensionClass.getExtInstance( new Class[] { ConsumerConfig.class }, new Object[] { consumerConfig }); } } return (ConsumerBootstrap<T>) consumerBootstrap; }
接着引用服务包装类ConsumerBootstrap通过refer()方法调用服务,ConsumerBootstrap基于Bolt、Rest、Dubbo网络通信协议提供三种协议服务引用实现类:BoltConsumerBootstrap、RestConsumerBootstrap以及DubboConsumerBootstrap。默认服务消费者启动器DefaultConsumerBootstrap调用服务refer()方法首先同步双重检查服务引用代理实现类实例是否不为空,不为空则返回代理实现引用,接着执行SOFARPC引用服务逻辑:
public T refer () { if (proxyIns != null ) { return proxyIns; } synchronized (this ) { if (proxyIns != null ) { return proxyIns; } String key = consumerConfig.buildKey(); String appName = consumerConfig.getAppName(); checkParameters(); if (LOGGER.isInfoEnabled(appName)) { LOGGER.infoWithApp(appName, "Refer consumer config : {} with bean id {}" , key, consumerConfig.getId()); } AtomicInteger cnt = REFERRED_KEYS.get(key); if (cnt == null ) { cnt = CommonUtils.putToConcurrentMap(REFERRED_KEYS, key, new AtomicInteger(0 )); } int c = cnt.incrementAndGet(); int maxProxyCount = consumerConfig.getRepeatedReferLimit(); if (maxProxyCount > 0 ) { if (c > maxProxyCount) { cnt.decrementAndGet(); throw new SofaRpcRuntimeException("Duplicate consumer config with key " + key + " has been referred more than " + maxProxyCount + " times!" + " Maybe it's wrong config, please check it." + " Ignore this if you did that on purpose!" ); } else if (c > 1 ) { if (LOGGER.isInfoEnabled(appName)) { LOGGER.infoWithApp(appName, "Duplicate consumer config with key {} has been referred!" + " Maybe it's wrong config, please check it." + " Ignore this if you did that on purpose!" , key); } } } try { cluster = ClusterFactory.getCluster(this ); consumerConfig.setConfigListener(buildConfigListener(this )); consumerConfig.setProviderInfoListener(buildProviderInfoListener(this )); cluster.init(); proxyInvoker = buildClientProxyInvoker(this ); proxyIns = (T) ProxyFactory.buildProxy(consumerConfig.getProxy(), consumerConfig.getProxyClass(), proxyInvoker); } catch (Exception e) { if (cluster != null ) { cluster.destroy(); cluster = null ; } consumerConfig.setConfigListener(null ); consumerConfig.setProviderInfoListener(null ); cnt.decrementAndGet(); if (e instanceof SofaRpcRuntimeException) { throw (SofaRpcRuntimeException) e; } else { throw new SofaRpcRuntimeException("Build consumer proxy error!" , e); } } if (consumerConfig.getOnAvailable() != null && cluster != null ) { cluster.checkStateChange(false ); } RpcRuntimeContext.cacheConsumerConfig(this ); return proxyIns; } }
默认服务消费者启动器DefaultConsumerBootstrap引用服务方法执行逻辑:
(1)根据服务引用配置ConsumerConfig 获取调用协议、服务接口以及服务标签构建key以及获取应用名称appName;
(2)调用checkParameters()方法检查服务消费者配置字段和参数:
protected void checkParameters () {}
(3)检验同一个服务(调用协议&服务接口&服务标签相同)的发布次数是否超过服务引用配置的最大次数,超过最大数量直接抛出异常;
(4)客户端集群工厂ClusterFactory通过服务消费者启动器构造客户端集群Cluster,根据服务消费者启动器服务消费者配置扩展加载工厂ExtensionLoaderFactory加载初始化Cluster实例:
public static Cluster getCluster (ConsumerBootstrap consumerBootstrap) { try { ConsumerConfig consumerConfig = consumerBootstrap.getConsumerConfig(); ExtensionClass<Cluster> ext = ExtensionLoaderFactory.getExtensionLoader(Cluster.class) .getExtensionClass(consumerConfig.getCluster()); if (ext == null ) { throw ExceptionUtils.buildRuntime("consumer.cluster" , consumerConfig.getCluster(), "Unsupported cluster of client!" ); } return ext.getExtInstance(new Class[] { ConsumerBootstrap.class }, new Object[] { consumerBootstrap }); } catch (SofaRpcRuntimeException e) { throw e; } catch (Throwable e) { throw new SofaRpcRuntimeException(e.getMessage(), e); } }
客户端集群Cluster封装集群模式、长连接管理、服务路由、负载均衡等抽象类,实现调用器Invoker、服务发布信息监听器ProviderInfoListener、可初始化接口Initializable, 可销毁接口Destroyable,涵盖两种客户端集群实现类:快速失败客户端集群FailFastCluster和故障转移(支持重试和指定地址调用)客户端集群FailoverCluster:
@Extensible (singleton = false )@ThreadSafe public abstract class Cluster implements Invoker , ProviderInfoListener , Initializable , Destroyable {...}@Extension ("failfast" )public class FailFastCluster extends AbstractCluster {...}@Extension ("failover" )public class FailoverCluster extends AbstractCluster {...}
(5)服务端订阅者配置实例设置根据服务消费者启动器构建的服务引用配置发生变化监听器、集群服务端地址监听器;
(6)初始化客户端集群Cluster实例包括构建路由链routerChain,加载负载均衡策略loadBalancer,获取地址保持器addressHolder,构造调用端过滤链filterChain,连接管理器启动重连线程,服务消费启动器订阅服务发布者列表,初始化服务端连接建立长连接:
public synchronized void init () { if (initialized) { return ; } routerChain = RouterChain.buildConsumerChain(consumerBootstrap); loadBalancer = LoadBalancerFactory.getLoadBalancer(consumerBootstrap); addressHolder = AddressHolderFactory.getAddressHolder(consumerBootstrap); connectionHolder = ConnectionHolderFactory.getConnectionHolder(consumerBootstrap); this .filterChain = FilterChain.buildConsumerChain(this .consumerConfig, new ConsumerInvoker(consumerBootstrap)); if (consumerConfig.isLazy()) { if (LOGGER.isInfoEnabled(consumerConfig.getAppName())) { LOGGER.infoWithApp(consumerConfig.getAppName(), "Connection will be initialized when first invoke." ); } } connectionHolder.init(); try { List<ProviderGroup> all = consumerBootstrap.subscribe(); if (CommonUtils.isNotEmpty(all)) { updateAllProviders(all); } } catch (SofaRpcRuntimeException e) { throw e; } catch (Throwable e) { throw new SofaRpcRuntimeException("Init provider's transport error!" , e); } initialized = true ; if (consumerConfig.isCheck() && !isAvailable()) { throw new SofaRpcRuntimeException("The consumer is depend on alive provider " + "and there is no alive provider, you can ignore it " + "by ConsumerConfig.setCheck(boolean) (default is false)" ); } }
路由链RouterChain根据服务端订阅者配置构建服务消费路由链:1.获取服务引用配置路由规则引用;2.解析路由规则判断是否需要排除系统过滤器;3.解析通过别名方式注入的路由准备数据;4.解析自动加载的路由添加到扩展路由集合;5.根据扩展路由和服务端订阅者启动器创建路由链RouteChain:
public static RouterChain buildConsumerChain (ConsumerBootstrap consumerBootstrap) { ConsumerConfig<?> consumerConfig = consumerBootstrap.getConsumerConfig(); List<Router> customRouters = consumerConfig.getRouterRef() == null ? new ArrayList<Router>() : new CopyOnWriteArrayList<Router>(consumerConfig.getRouterRef()); HashSet<String> excludes = parseExcludeRouter(customRouters); List<ExtensionClass<Router>> extensionRouters = new ArrayList<ExtensionClass<Router>>(); List<String> routerAliases = consumerConfig.getRouter(); if (CommonUtils.isNotEmpty(routerAliases)) { for (String routerAlias : routerAliases) { if (startsWithExcludePrefix(routerAlias)) { excludes.add(routerAlias.substring(1 )); } else { extensionRouters.add(EXTENSION_LOADER.getExtensionClass(routerAlias)); } } } if (!excludes.contains(StringUtils.ALL) && !excludes.contains(StringUtils.DEFAULT)) { for (Map.Entry<String, ExtensionClass<Router>> entry : CONSUMER_AUTO_ACTIVES.entrySet()) { if (!excludes.contains(entry.getKey())) { extensionRouters.add(entry.getValue()); } } } excludes = null ; if (extensionRouters.size() > 1 ) { Collections.sort(extensionRouters, new OrderedComparator<ExtensionClass>()); } List<Router> actualRouters = new ArrayList<Router>(); for (ExtensionClass<Router> extensionRouter : extensionRouters) { Router actualRoute = extensionRouter.getExtInstance(); actualRouters.add(actualRoute); } actualRouters.addAll(customRouters); return new RouterChain(actualRouters, consumerBootstrap); } private static HashSet<String> parseExcludeRouter (List<Router> customRouters) { HashSet<String> excludeKeys = new HashSet<String>(); if (CommonUtils.isNotEmpty(customRouters)) { for (Router router : customRouters) { if (router instanceof ExcludeRouter) { ExcludeRouter excludeRouter = (ExcludeRouter) router; String excludeName = excludeRouter.getExcludeName(); if (StringUtils.isNotEmpty(excludeName)) { String excludeRouterName = startsWithExcludePrefix(excludeName) ? excludeName.substring(1 ) : excludeName; if (StringUtils.isNotEmpty(excludeRouterName)) { excludeKeys.add(excludeRouterName); } } customRouters.remove(router); } } } if (!excludeKeys.isEmpty()) { if (LOGGER.isInfoEnabled()) { LOGGER.info("Find exclude routers: {}" , excludeKeys); } } return excludeKeys; } public RouterChain (List<Router> actualRouters, ConsumerBootstrap consumerBootstrap) { this .routers = new ArrayList<Router>(); if (CommonUtils.isNotEmpty(actualRouters)) { for (Router router : actualRouters) { if (router.needToLoad(consumerBootstrap)) { router.init(consumerBootstrap); routers.add(router); } } } }
负载均衡工厂LoadBalancerFactory、地址管理工厂AddressHolderFactory、连接管理工厂ConnectionHolder根据服务消费者配置获取负载均衡器、地址保持器、连接管理器:读取服务消费配置负载均衡策略、地址保持配置、连接管理配置,扩展加载工厂ExtensionLoaderFactory根据负载均衡、地址保持、连接管理相应配置生成负载均衡器、地址保持器、连接管理器实例:
public static LoadBalancer getLoadBalancer (ConsumerBootstrap consumerBootstrap) { try { String loadBalancer = consumerBootstrap.getConsumerConfig().getLoadBalancer(); ExtensionClass<LoadBalancer> ext = ExtensionLoaderFactory .getExtensionLoader(LoadBalancer.class).getExtensionClass(loadBalancer); if (ext == null ) { throw ExceptionUtils.buildRuntime("consumer.loadBalancer" , loadBalancer, "Unsupported loadBalancer of client!" ); } return ext.getExtInstance(new Class[] { ConsumerBootstrap.class }, new Object[] { consumerBootstrap }); } catch (SofaRpcRuntimeException e) { throw e; } catch (Throwable e) { throw new SofaRpcRuntimeException(e.getMessage(), e); } } public static AddressHolder getAddressHolder (ConsumerBootstrap consumerBootstrap) { try { String connectionHolder = consumerBootstrap.getConsumerConfig().getAddressHolder(); ExtensionClass<AddressHolder> ext = ExtensionLoaderFactory.getExtensionLoader(AddressHolder.class) .getExtensionClass(connectionHolder); if (ext == null ) { throw ExceptionUtils.buildRuntime("consumer.addressHolder" , connectionHolder, "Unsupported addressHolder of client!" ); } return ext.getExtInstance(new Class[] { ConsumerBootstrap.class }, new Object[] { consumerBootstrap }); } catch (SofaRpcRuntimeException e) { throw e; } catch (Throwable e) { throw new SofaRpcRuntimeException(e.getMessage(), e); } } public static ConnectionHolder getConnectionHolder (ConsumerBootstrap consumerBootstrap) { try { String connectionHolder = consumerBootstrap.getConsumerConfig().getConnectionHolder(); ExtensionClass<ConnectionHolder> ext = ExtensionLoaderFactory .getExtensionLoader(ConnectionHolder.class).getExtensionClass(connectionHolder); if (ext == null ) { throw ExceptionUtils.buildRuntime("consumer.connectionHolder" , connectionHolder, "Unsupported connectionHolder of client!" ); } return ext.getExtInstance(new Class[] { ConsumerBootstrap.class }, new Object[] { consumerBootstrap }); } catch (SofaRpcRuntimeException e) { throw e; } catch (Throwable e) { throw new SofaRpcRuntimeException(e.getMessage(), e); } }
客户端集群按照服务器启动器配置构造消费调用器ConsumerInvoker,消费调用器invoke()方法使用客户端发送数据给服务器执行服务调用过程,设置调用服务端远程地址同步/单向/Callback/ Future调用远程服务,譬如Bolt客户端传输BoltClientTransport同步调用通过RpcClient获取远程连接使用管道Channel的writeAndFlush(Object msg)方法发送客户端数据,并且添加监听器提供发送失败Future添加失败响应场景进行远程访问调用。过滤链FilterChain根据服务端订阅者配置构造调用端执行链,调用端执行链最底层是调用过滤器:按照自动装载扩展实现逻辑获取用户new实例方式注入的过滤器(优先级高),判断是否需要排除系统过滤器,解析用户通过别名方式注入的过滤器准备数据,解析自动加载的过滤器添加到自定义的过滤器构造执行链:
public ConsumerInvoker (ConsumerBootstrap consumerBootstrap) { super (consumerBootstrap.getConsumerConfig()); this .consumerBootstrap = consumerBootstrap; } public SofaResponse invoke (SofaRequest sofaRequest) throws SofaRpcException { ProviderInfo providerInfo = RpcInternalContext.getContext().getProviderInfo(); String appName = providerInfo.getStaticAttr(ProviderInfoAttrs.ATTR_APP_NAME); if (StringUtils.isNotEmpty(appName)) { sofaRequest.setTargetAppName(appName); } return consumerBootstrap.getCluster().sendMsg(providerInfo, sofaRequest); } protected SofaResponse doSendMsg (ProviderInfo providerInfo, ClientTransport transport, SofaRequest request) throws SofaRpcException { RpcInternalContext context = RpcInternalContext.getContext(); RpcInternalContext.getContext().setRemoteAddress(providerInfo.getHost(), providerInfo.getPort()); try { checkProviderVersion(providerInfo, request); String invokeType = request.getInvokeType(); int timeout = resolveTimeout(request, consumerConfig, providerInfo); SofaResponse response = null ; if (RpcConstants.INVOKER_TYPE_SYNC.equals(invokeType)) { long start = RpcRuntimeContext.now(); try { response = transport.syncSend(request, timeout); } finally { if (RpcInternalContext.isAttachmentEnable()) { long elapsed = RpcRuntimeContext.now() - start; context.setAttachment(RpcConstants.INTERNAL_KEY_CLIENT_ELAPSE, elapsed); } } } else if (RpcConstants.INVOKER_TYPE_ONEWAY.equals(invokeType)) { long start = RpcRuntimeContext.now(); try { transport.oneWaySend(request, timeout); response = buildEmptyResponse(request); } finally { if (RpcInternalContext.isAttachmentEnable()) { long elapsed = RpcRuntimeContext.now() - start; context.setAttachment(RpcConstants.INTERNAL_KEY_CLIENT_ELAPSE, elapsed); } } } else if (RpcConstants.INVOKER_TYPE_CALLBACK.equals(invokeType)) { SofaResponseCallback sofaResponseCallback = request.getSofaResponseCallback(); if (sofaResponseCallback == null ) { SofaResponseCallback methodResponseCallback = consumerConfig .getMethodOnreturn(request.getMethodName()); if (methodResponseCallback != null ) { request.setSofaResponseCallback(methodResponseCallback); } } context.setAttachment(RpcConstants.INTERNAL_KEY_CLIENT_SEND_TIME, RpcRuntimeContext.now()); transport.asyncSend(request, timeout); response = buildEmptyResponse(request); } else if (RpcConstants.INVOKER_TYPE_FUTURE.equals(invokeType)) { context.setAttachment(RpcConstants.INTERNAL_KEY_CLIENT_SEND_TIME, RpcRuntimeContext.now()); ResponseFuture future = transport.asyncSend(request, timeout); RpcInternalContext.getContext().setFuture(future); response = buildEmptyResponse(request); } else { throw new SofaRpcException(RpcErrorType.CLIENT_UNDECLARED_ERROR, "Unknown invoke type:" + invokeType); } return response; } catch (SofaRpcException e) { throw e; } catch (Throwable e) { throw new SofaRpcException(RpcErrorType.CLIENT_UNDECLARED_ERROR, e); } } public static FilterChain buildConsumerChain (ConsumerConfig<?> consumerConfig, FilterInvoker lastFilter) { List<Filter> customFilters = consumerConfig.getFilterRef() == null ? new ArrayList<Filter>() : new CopyOnWriteArrayList<Filter>(consumerConfig.getFilterRef()); HashSet<String> excludes = parseExcludeFilter(customFilters); List<ExtensionClass<Filter>> extensionFilters = new ArrayList<ExtensionClass<Filter>>(); List<String> filterAliases = consumerConfig.getFilter(); if (CommonUtils.isNotEmpty(filterAliases)) { for (String filterAlias : filterAliases) { if (startsWithExcludePrefix(filterAlias)) { excludes.add(filterAlias.substring(1 )); } else { ExtensionClass<Filter> filter = EXTENSION_LOADER.getExtensionClass(filterAlias); if (filter != null ) { extensionFilters.add(filter); } } } } if (!excludes.contains(StringUtils.ALL) && !excludes.contains(StringUtils.DEFAULT)) { for (Map.Entry<String, ExtensionClass<Filter>> entry : CONSUMER_AUTO_ACTIVES.entrySet()) { if (!excludes.contains(entry.getKey())) { extensionFilters.add(entry.getValue()); } } } excludes = null ; if (extensionFilters.size() > 1 ) { Collections.sort(extensionFilters, new OrderedComparator<ExtensionClass<Filter>>()); } List<Filter> actualFilters = new ArrayList<Filter>(); for (ExtensionClass<Filter> extensionFilter : extensionFilters) { actualFilters.add(extensionFilter.getExtInstance()); } actualFilters.addAll(customFilters); return new FilterChain(actualFilters, lastFilter, consumerConfig); }
连接管理器初始化启动重连线程:读取服务引用配置服务接口以及服务端消费者给提供者重连间隔创建线程池启动重连+心跳线程,存活的客户端列表(保持长连接,且一切正常的)检查可用连接,非可用连接(不可用的长连接)从存活丢到重试列表,失败待重试的客户端列表(连上后断开的)命中服务提供者动态配置的重试周期系数进行重连,客户端传输建立长连接(基于Bolt协议的客户端传输实现BoltClientTransport只支持长连接共享模式,RpcClient通过URL缓存和超时时间建立连接,基于第三方协议譬如cxf/resteasy的客户端传输实现AbstractProxyClientTransport,按照客户端配置建立Socket连接),两次验证检查客户端传输是否存活,睡眠防止被连上又被服务端踢下线,设置服务提供者重试周期系数,从重试丢到存活列表,存活的客户端列表和存活但是亚健康节点(连续心跳超时,这种只发心跳,不发请求)原来为空变成非空需要通知状态变成可用,主要包括1.启动成功变成可用时;2.注册中心增加,更新节点后变成可用时;3.重连上从一个可用节点都没有变成有可用节点时:
public void init () { if (reconThread == null ) { startReconnectThread(); } } protected void startReconnectThread () { final String interfaceId = consumerConfig.getInterfaceId(); int reconnect = consumerConfig.getReconnectPeriod(); if (reconnect > 0 ) { reconnect = Math.max(reconnect, 2000 ); reconThread = new ScheduledService("CLI-RC-" + interfaceId, ScheduledService.MODE_FIXEDDELAY, new Runnable() { @Override public void run () { try { doReconnect(); } catch (Throwable e) { LOGGER.errorWithApp(consumerConfig.getAppName(), "Exception when retry connect to provider" , e); } } }, reconnect, reconnect, TimeUnit.MILLISECONDS).start(); } } private void doReconnect () { String interfaceId = consumerConfig.getInterfaceId(); String appName = consumerConfig.getAppName(); int thisTime = reconnectFlag.incrementAndGet(); boolean print = thisTime % 6 == 0 ; boolean isAliveEmptyFirst = isAvailableEmpty(); for (Map.Entry<ProviderInfo, ClientTransport> alive : aliveConnections.entrySet()) { ClientTransport connection = alive.getValue(); if (connection != null && !connection.isAvailable()) { aliveToRetry(alive.getKey(), connection); } } for (Map.Entry<ProviderInfo, ClientTransport> entry : getRetryConnections() .entrySet()) { ProviderInfo providerInfo = entry.getKey(); int providerPeriodCoefficient = CommonUtils.parseNum((Integer) providerInfo.getDynamicAttr(ProviderInfoAttrs.ATTR_RC_PERIOD_COEFFICIENT), 1 ); if (thisTime % providerPeriodCoefficient != 0 ) { continue ; } ClientTransport transport = entry.getValue(); if (LOGGER.isDebugEnabled(appName)) { LOGGER.debugWithApp("Retry connect to {} provider:{} ..." , interfaceId, providerInfo); } try { transport.connect(); if (doubleCheck(interfaceId, providerInfo, transport)) { providerInfo.setDynamicAttr(ProviderInfoAttrs.ATTR_RC_PERIOD_COEFFICIENT, 1 ); retryToAlive(providerInfo, transport); } } catch (Exception e) { if (print) { if (LOGGER.isWarnEnabled(appName)) { LOGGER.warnWithApp("Retry connect to {} provider:{} error ! The exception is " + e .getMessage(), interfaceId, providerInfo); } } else { if (LOGGER.isDebugEnabled(appName)) { LOGGER.debugWithApp("Retry connect to {} provider:{} error ! The exception is " + e .getMessage(), interfaceId, providerInfo); } } } } if (isAliveEmptyFirst && !isAvailableEmpty()) { notifyStateChangeToAvailable(); } }
服务端消费者启动器订阅服务发布列表,获取服务消费者配置直连调用地址,判断直连调用地址是否为空,直连调用地址为空根据注册中心配置从注册中心订阅服务发布列表,遍历服务端订阅者注册中心配置注册中心工厂生成注册中心对象,初始化启动注册中心订阅服务发布列表添加到服务提供者分组;直连调用地址非空按照逗号或者分号分隔直连调用地址遍历转换成服务端提供者添加到默认直连分组,返回直连服务端发布方分组:
public List<ProviderGroup> subscribe () { List<ProviderGroup> result = null ; String directUrl = consumerConfig.getDirectUrl(); if (StringUtils.isNotEmpty(directUrl)) { result = subscribeFromDirectUrl(directUrl); } else { List<RegistryConfig> registryConfigs = consumerConfig.getRegistry(); if (CommonUtils.isNotEmpty(registryConfigs)) { result = subscribeFromRegistries(); } } return result; } protected List<ProviderGroup> subscribeFromDirectUrl (String directUrl) { List<ProviderGroup> result = new ArrayList<ProviderGroup>(); List<ProviderInfo> tmpProviderInfoList = new ArrayList<ProviderInfo>(); String[] providerStrs = StringUtils.splitWithCommaOrSemicolon(directUrl); for (String providerStr : providerStrs) { ProviderInfo providerInfo = convertToProviderInfo(providerStr); if (providerInfo.getStaticAttr(ProviderInfoAttrs.ATTR_SOURCE) == null ) { providerInfo.setStaticAttr(ProviderInfoAttrs.ATTR_SOURCE, "direct" ); } tmpProviderInfoList.add(providerInfo); } result.add(new ProviderGroup(RpcConstants.ADDRESS_DIRECT_GROUP, tmpProviderInfoList)); return result; } protected List<ProviderGroup> subscribeFromRegistries () { List<ProviderGroup> result = new ArrayList<ProviderGroup>(); List<RegistryConfig> registryConfigs = consumerConfig.getRegistry(); if (CommonUtils.isEmpty(registryConfigs)) { return result; } int addressWaitTime = consumerConfig.getAddressWait(); int maxAddressWaitTime = SofaConfigs.getIntegerValue(consumerConfig.getAppName(), SofaOptions.CONFIG_MAX_ADDRESS_WAIT_TIME, SofaOptions.MAX_ADDRESS_WAIT_TIME); addressWaitTime = addressWaitTime < 0 ? maxAddressWaitTime : Math.min(addressWaitTime, maxAddressWaitTime); ProviderInfoListener listener = consumerConfig.getProviderInfoListener(); respondRegistries = addressWaitTime == 0 ? null : new CountDownLatch(registryConfigs.size()); Map<String, ProviderGroup> tmpProviderInfoList = new HashMap<String, ProviderGroup>(); for (RegistryConfig registryConfig : registryConfigs) { Registry registry = RegistryFactory.getRegistry(registryConfig); registry.init(); registry.start(); try { List<ProviderGroup> current; try { if (respondRegistries != null ) { consumerConfig.setProviderInfoListener(new WrapperClusterProviderInfoListener(listener, respondRegistries)); } current = registry.subscribe(consumerConfig); } finally { if (respondRegistries != null ) { consumerConfig.setProviderInfoListener(listener); } } if (current == null ) { continue ; } else { if (respondRegistries != null ) { respondRegistries.countDown(); } } for (ProviderGroup group : current) { String groupName = group.getName(); if (!group.isEmpty()) { ProviderGroup oldGroup = tmpProviderInfoList.get(groupName); if (oldGroup != null ) { oldGroup.addAll(group.getProviderInfos()); } else { tmpProviderInfoList.put(groupName, group); } } } } catch (SofaRpcRuntimeException e) { throw e; } catch (Throwable e) { String appName = consumerConfig.getAppName(); if (LOGGER.isWarnEnabled(appName)) { LOGGER.warnWithApp(appName, "Catch exception when subscribe from registry: " + registryConfig.getId() + ", but you can ignore if it's called by JVM shutdown hook" , e); } } } if (respondRegistries != null ) { try { respondRegistries.await(addressWaitTime, TimeUnit.MILLISECONDS); } catch (Exception ignore) { } } return new ArrayList<ProviderGroup>(tmpProviderInfoList.values()); }
遍历直连服务端提供者分组初始化服务端连接(建立长连接),检查服务节点协议类型和序列化方式信息,地址保持器/连接管理器全量更新全部服务端列表,依赖服务端消费者配置、地址保持器服务端发布者分组、直连服务端提供者分组构建服务提供更新事件给事件总线发布,事件总线按照事件订阅者是否同步进行事件同步/异步处理:
public void updateAllProviders (List<ProviderGroup> providerGroups) { List<ProviderGroup> oldProviderGroups = new ArrayList<ProviderGroup>(addressHolder.getProviderGroups()); int count = 0 ; if (providerGroups != null ) { for (ProviderGroup providerGroup : providerGroups) { checkProviderInfo(providerGroup); count += providerGroup.size(); } } if (count == 0 ) { Collection<ProviderInfo> currentProviderList = currentProviderList(); addressHolder.updateAllProviders(providerGroups); if (CommonUtils.isNotEmpty(currentProviderList)) { if (LOGGER.isWarnEnabled(consumerConfig.getAppName())) { LOGGER.warnWithApp(consumerConfig.getAppName(), "Provider list is emptied, may be all " + "providers has been closed, or this consumer has been add to blacklist" ); closeTransports(); } } } else { addressHolder.updateAllProviders(providerGroups); connectionHolder.updateAllProviders(providerGroups); } if (EventBus.isEnable(ProviderInfoUpdateAllEvent.class)) { ProviderInfoUpdateAllEvent event = new ProviderInfoUpdateAllEvent(consumerConfig, oldProviderGroups, providerGroups); EventBus.post(event); } } protected void checkProviderInfo (ProviderGroup providerGroup) { List<ProviderInfo> providerInfos = providerGroup == null ? null : providerGroup.getProviderInfos(); if (CommonUtils.isEmpty(providerInfos)) { return ; } Iterator<ProviderInfo> iterator = providerInfos.iterator(); while (iterator.hasNext()) { ProviderInfo providerInfo = iterator.next(); if (!StringUtils.equals(providerInfo.getProtocolType(), consumerConfig.getProtocol())) { if (LOGGER.isWarnEnabled(consumerConfig.getAppName())) { LOGGER.warnWithApp(consumerConfig.getAppName(), "Unmatched protocol between consumer [{}] and provider [{}]." , consumerConfig.getProtocol(), providerInfo.getProtocolType()); } } if (StringUtils.isEmpty(providerInfo.getSerializationType())) { providerInfo.setSerializationType(consumerConfig.getSerialization()); } } } public static void post (final Event event) { if (!isEnable()) { return ; } CopyOnWriteArraySet<Subscriber> subscribers = SUBSCRIBER_MAP.get(event.getClass()); if (CommonUtils.isNotEmpty(subscribers)) { for (final Subscriber subscriber : subscribers) { if (subscriber.isSync()) { handleEvent(subscriber, event); } else { final RpcInternalContext context = RpcInternalContext.peekContext(); AsyncRuntime.getAsyncThreadPool().execute( new Runnable() { @Override public void run () { try { RpcInternalContext.setContext(context); handleEvent(subscriber, event); } catch (Exception e) { RpcInternalContext.removeContext(); } } }); } } } } private static void handleEvent (final Subscriber subscriber, final Event event) { try { subscriber.onEvent(event); } catch (Throwable e) { if (LOGGER.isWarnEnabled()) { LOGGER.warn("Handle " + event.getClass() + " error" , e); } } }
(7)服务端订阅者启动器创建客户端代理调用器ClientProxyInvoker执行链,构建代理调用器通过默认调用端代理执行器构造执行链缓存接口名和序列化类型,注入到动态代理调用处理器提供构建代理类调用器执行参数:
protected ClientProxyInvoker buildClientProxyInvoker (ConsumerBootstrap bootstrap) { return new DefaultClientProxyInvoker(bootstrap); } public DefaultClientProxyInvoker (ConsumerBootstrap bootstrap) { super (bootstrap); cacheCommonData(); }
(8)代理工厂按照服务端订阅者配置代理类型构建客户端代理调用器代理类实例,缓存服务端消费者配置ConsumerConfig,动态代理类型包括基于JDK动态代理实现JDKProxy(默认)和基于Javassist动态代理实现JavassistProxy,其中JDKProxy代理实现通过JDK代理处理器JDKInvocationHandler拦截请求变为Invocation执行invoke()方法远程调用,客户端代理调用器ClientProxyInvoker实施Proxy拦截调用使用客户端集群Cluster最底层调用过滤器,以消费端调用器ConsumerInvoker进行Client发送数据给Server调用过程,即Cluster.sendMsg()->ClientTransport.Send()->RpcClient.invokeWith *()->Channel.writeAndFlush():
public static <T> T buildProxy (String proxyType, Class<T> clazz, Invoker proxyInvoker) throws Exception { try { ExtensionClass<Proxy> ext = ExtensionLoaderFactory.getExtensionLoader(Proxy.class) .getExtensionClass(proxyType); if (ext == null ) { throw ExceptionUtils.buildRuntime("consumer.proxy" , proxyType, "Unsupported proxy of client!" ); } Proxy proxy = ext.getExtInstance(); return proxy.getProxy(clazz, proxyInvoker); } catch (SofaRpcRuntimeException e) { throw e; } catch (Throwable e) { throw new SofaRpcRuntimeException(e.getMessage(), e); } } public Object invoke (Object proxy, Method method, Object[] paramValues) throws Throwable { String methodName = method.getName(); Class[] paramTypes = method.getParameterTypes(); if ("toString" .equals(methodName) && paramTypes.length == 0 ) { return proxyInvoker.toString(); } else if ("hashCode" .equals(methodName) && paramTypes.length == 0 ) { return proxyInvoker.hashCode(); } else if ("equals" .equals(methodName) && paramTypes.length == 1 ) { Object another = paramValues[0 ]; return proxy == another || (proxy.getClass().isInstance(another) && proxyInvoker.equals(JDKProxy.parseInvoker(another))); } SofaRequest sofaRequest = MessageBuilder.buildSofaRequest(method.getDeclaringClass(), method, paramTypes, paramValues); SofaResponse response = proxyInvoker.invoke(sofaRequest); if (response.isError()) { throw new SofaRpcException(RpcErrorType.SERVER_UNDECLARED_ERROR, response.getErrorMsg()); } Object ret = response.getAppResponse(); if (ret instanceof Throwable) { throw (Throwable) ret; } else { if (ret == null ) { return ClassUtils.getDefaultPrimitiveValue(method.getReturnType()); } return ret; } } public SofaResponse invoke (SofaRequest request) throws SofaRpcException { SofaResponse response = null ; Throwable throwable = null ; try { RpcInternalContext.pushContext(); RpcInternalContext context = RpcInternalContext.getContext(); context.setProviderSide(false ); decorateRequest(request); try { if (EventBus.isEnable(ClientStartInvokeEvent.class)) { EventBus.post(new ClientStartInvokeEvent(request)); } response = cluster.invoke(request); } catch (SofaRpcException e) { throwable = e; throw e; } finally { if (!request.isAsync()) { if (EventBus.isEnable(ClientEndInvokeEvent.class)) { EventBus.post(new ClientEndInvokeEvent(request, response, throwable)); } } } decorateResponse(response); return response; } finally { RpcInternalContext.removeContext(); RpcInternalContext.popContext(); } }
解析总结
SOFARPC服务引用流程概括为SOFARPC服务需要创建服务引用配置ConsumerConfig,自定义设置接口名称、调用协议、直连调用地址以及连接超时时间等客户端配置;服务消费者启动类ConsumerBootstrap引用服务:构造客户端集群Cluster封装集群模式、长连接管理、服务路由、负载均衡,初始化客户端集群Cluster(构建Router链routerChain,加载负载均衡策略loadBalancer,获取地址保持器addressHolder,构造Filter链filterChain最底层是调用过滤器,连接管理器启动重连线程,订阅服务发布者,初始化服务端连接(建立长连接),创建客户端代理调用器ClientProxyInvoker执行链,创建客户端调用器代理实现引用服务链路,基于JDK动态代理场景RPC服务远程根据JDK代理处理器JDKInvocationHandler拦截请求转换成Invocation执行invoke()方法以客户端代理调用器ClientProxyInvoker实施Proxy拦截调用,使用客户端集群Cluster调用消费端调用器ConsumerInvoker进行客户端发送数据给服务端完成远程调用过程。