RPC远程调用框架

今天我们来聊一聊远程调用这个技术点:通俗的讲就是调用远端(另外一台服务器应用上)的方法,当时了解这个RPC的时候是在dubbo这个框架并且自己应用了dubbo,只是会简单的使用,但是并没有搞懂它底层如何如何实现方法的调用的,带着这些疑问自己也一直在不断的学习,有幸在前公司参加过一个网关框架的开发,所以对RPC框架的底层有了深厚的理解,通过自己了解,学习,然后自己实践应用,我自己封装了一套小型的RPC框架,说白了就是dubbo的阉割版,所以我还取名叫做mini-dubbo,我放到了github上,欢迎大家去点击,地址在下方

mini-dubbo源码地址

简述远程调用是如何实现(白话)

首先要建立两台服务器之间的通信(服务提供者和服务消费者之间的通信),首先服务消费者需要知道服务提供者的地址,所以需要将服务提供者的地址保存到一个中心,而这个中心是提供者,消费者都能够访问的,建立连接后,服务消费者调用服务的提供者需要传递参数,因为服务提供者在本地执行不知道执行那个方法,所以服务消费者需要传递的参数过来,参数传递过来后服务提供者收到请求需要去执行本地方法,如何执行呢?我们知道java已经将类编译好了,可以通过java的反射机制来执行方法,而方法的执行需要哪些参数呢?首先你要知道是哪个接口,接口中的哪个方法,方法中参数类型,参数列表,知道这些后服务提供者在本地执行该方法然后返回执行结果给服务消费者,以上就是远程方法执行获取结果的过程。

在这里插入图片描述

实现设计到的知识点

  1. 服务提供者保存接口地址的注册中心
  2. 服务消费者本地生成服务提供者接口的代理对象
  3. 请求过程中的序列化和反序列化
  4. java中的反射机制
  5. 请求数据和响应数据的定义(协议)
  6. 服务提供者的集群涉及到负载均衡算法

注册中心

注册中心就相当于一个持久化的过程将接口地址保存起来提供个消费者获取然后调用,可以使用数据库,redis,zookeeper。

数据库:太多的io操作有瓶颈

redis:对于订阅事件比较麻烦,服务上下线需要自己实现数据的清理

zookeeper:天生支持订阅事件,无需更多的业务操作代码编写

  • 使用zookeeper作为注册中心(接口级别)

zookeeper保存数据的结构是树的结构,类似于文件系统的结构

当服务提供者启动的同时会将标记为服务提供者(@Provider)的对象注册到注册中心上,mini-dubbo使用过该类AppServiceRegister,将服务,应用的部署信息注册上去的

/mini-dubbo/app-service 为根节点存放接口

/mini-dubbo/app-deploy 为根节点存放部署的应用

/com.yu.user.api.UserApi 表示接口

/provider 表示接口提供者,下面的数据表示提供给消费者消费的的地址

/custome 表示接口消费者,下面的数据目前没意义

在这里插入图片描述

// 关键的方法 
// 在容器中扫描使用了@Provider注解的对象
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        Map<String, Object> beanMap = applicationContext.getBeansWithAnnotation(Provider.class);
        List<String> beanNames = new ArrayList<>();
        for (Map.Entry<String, Object> entry : beanMap.entrySet()) {
            beanNames.add(entry.getKey());
        }
        if (StringUtils.isEmpty(SpringContextHolder.getProperties("spring.application.name")) || StringUtils.isEmpty(SpringContextHolder.getProperties("server.port"))) {
            throw new RuntimeException("please check server name{spring.application.name} and port{server.port}");
        }

        String providerAddress = CommonUtil.getLocalServerAddress() + "/" + SpringContextHolder.getProperties("spring.application.name") + AppserverAddress +
                "?weight=" + (StringUtils.isEmpty(SpringContextHolder.getProperties("spring.mini-dubbo.weight")) ? 1 : SpringContextHolder.getProperties("spring.mini-dubbo.weight"));

        for (String beanName : beanNames) {
            if (beanName.indexOf("org.springframework") != -1) {
                continue;
            }
            if (type.isAnnotationPresent(Provider.class)) {
                Class<?>[] define = type.getInterfaces();
                for (Class<?> cls : define) {

                    if (cls.getName().indexOf("org.springframework") != -1) {
                        continue;
                    }

                    if (cls.getName().startsWith("private")) {
                        throw new RuntimeException("接口名不能以private开头 ");
                    }
                    log.info("[app-server] registry providerAddress to registry-center: {}, url: {}", cls.getName(), providerAddress);
                    // 注册接口
                    RegistryStrategy.registerProvider(cls.getName(), providerAddress);
                }
            }
        }
        // 注册部署应用信息
        registerAppDeploy();
    }

生成代理对象

在服务消费端生成一个要使用的接口代理对象

消费端通过使用注解@Reference来讲代理对象赋值给这个变量

// 消费端使用
@RestController
public class OrderController {
    @Reference
    public static UserApi userApi;
}

如何将代理对象赋值给上述的变量上,大家可以去了解一下使用@Autowired的底层原理

mini-dubbo中是通过AppClient类扫描容器中的bean中使用@Reference

通过proxy方法创建代理对象,该方法获取了注册中心的提供者,并将地址缓存起来

   /** 将引用的接口赋上代理对象的值
     *
     * @param applicationContext
     * @throws BeansException
     */
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        SpringContextHolder.applicationContext = applicationContext;
        String[] beanNamesForType = applicationContext.getBeanNamesForType(Object.class);
        for (String beanName : beanNamesForType) {
            Object bean = applicationContext.getBean(beanName);
            Class<?> type = applicationContext.getType(beanName);
            ReflectionUtils.doWithFields(type, field -> {
                if (field.isAnnotationPresent(Reference.class)) {
                    Object proxy = AppClient.proxy(field.getType());
                    field.set(bean, proxy);
                }
            });
        }
     }

    /**
     * 创建直连代理实例
     *
     * @param interfaces
     * @return
     */
    @SuppressWarnings("unchecked")
    public static <T> T proxy(Class<T> interfaces) {
        if (StringUtils.isEmpty(SpringContextHolder.getProperties("spring.application.name")) || StringUtils.isEmpty(SpringContextHolder.getProperties("server.port"))) {
            throw new RuntimeException("please check server name{spring.application.name} and port{server.port}");
        }

        String consumerAddress = CommonUtil.getLocalServerAddress() +
                "/" + SpringContextHolder.getProperties("spring.application.name");
        // 缓存调用接口类型
        AppClientHandler.cacheServiceName(interfaces.getName());

        AppServiceDomain.Provider provider = RegistryStrategy.getProviderFromRegistry(interfaces.getName());
        // 注册消费者
        RegistryStrategy.registerConsumer(interfaces.getName(), consumerAddress);
        log.info("[app-client] registry customerAddress to registry-center: url: {},service: {}", consumerAddress, interfaces.getName());
        // 缓存type.getName()对应的服务地址
        if (provider != null && provider.getAddressList() != null) {
            AppClientHandler.cacheAppServerAddress(provider.getServiceName(), provider.getAddressList());
        }
// 关键
        return (T) Proxy.newProxyInstance(AppClient.class.getClassLoader(), new Class<?>[]{interfaces}, (proxy, method, args) -> {
            AppClientHandler handler = new AppClientHandler(interfaces, method, args);
            return executor.submit(handler).get();
        });
    }

序列化和反序列化

  1. 请求服务提供者需要将参数序列化
  2. 服务提供者收到后需要反序列化
  3. 服务提供者返回结果需要序列化
  4. 消费者收到结果后需要反序列化

在这里插入图片描述

网络之间数据的传输都是通过字节数组,mini-dubbo中使用kryo进行对象之间的序列化处理

反射机制

  • 消费者的调用

消费者通过使用代理对象调用方法后,mini-dubbo中是通过AppClientHandler类将请求数据包装了一个请求对象RequestDomain,如下。然后进行序列化发送一个post请求

public class RequestDomain {
    /**
     * 请求号
     */
    private String requestNo;

    /**
     * 接口class name
     */
    private String className;

    /**
     * 目标方法名
     */
    private String methodName;

    /**
     * 入参类型
     */
    private String[] paramTypeNames;

    /**
     * 入参
     */
    private Object[] paramInputs;

    public RequestDomain() {
    }
  • 提供端的执行

服务提供端收到请求数据后将其反序列化获取对象是RequestDomain,其中包含了反射需要的参数,由于提供端的容器中存在服务端需要调用的对象,调用方法,返回结果,mini-dubbo是通过AppServer类实现的

    private void call(HttpServletRequest req, HttpServletResponse resp) {

        try {
            OutputStream os = resp.getOutputStream();
            byte[] requestData = parseRequestData(req);
            long start = System.currentTimeMillis();
            Object result = null;

            RequestDomain appRequestDomain = null;
            ResponseDomain appResponseDomain = new ResponseDomain();

            try {
                appRequestDomain = CodecUtil.decodeRequest(requestData);

                if (appRequestDomain.getClassName() == null) {
                    throw new RuntimeException("[app-server] class does not exist");
                }
                String className = appRequestDomain.getClassName();
                String methodName = appRequestDomain.getMethodName();
                String[] paramTypeNames = appRequestDomain.getParamTypeNames();
                Object[] paramInputs = appRequestDomain.getParamInputs();

                Class<?> cls = Class.forName(className);
                Class<?>[] inputTypes = null;
                if (paramTypeNames != null) {
                    inputTypes = new Class<?>[paramTypeNames.length];
                    for (int i = 0; i < paramTypeNames.length; i++) {
                        inputTypes[i] = Class.forName(paramTypeNames[i]);
                    }
                }

                Method method = null;
                if (inputTypes != null && inputTypes.length > 0) {
                    method = ReflectionUtils.findMethod(cls, methodName, inputTypes);
                } else {
                    method = ReflectionUtils.findMethod(cls, methodName);
                }
                if (method == null) {
                    throw new RuntimeException("[app-server] method does not exist : " + methodName);
                }
                Object targetObject = SpringContextHolder.getBean(cls);
                if (targetObject == null) {
                    throw new RuntimeException("[app-server] no interface implementation class found" + cls.getName());
                }
                
                // 关键的执行本地方法返回结果
                if (paramInputs != null && paramInputs.length > 0) {
                    result = ReflectionUtils.invokeMethod(method, targetObject, paramInputs);
                } else {
                    result = ReflectionUtils.invokeMethod(method, targetObject);
                }
                appResponseDomain.setCode(0);
            } catch (Exception e) {
                log.error(e.getMessage(), e);
                String msg = e.getMessage();

                appResponseDomain.setCode(501);

                if (msg != null && !msg.matches("\\s*")) {
                    appResponseDomain.setMessage(msg);
                } else {
                    appResponseDomain.setMessage("[app-server] service exception");
                }
            } finally {
                long end = System.currentTimeMillis();
                appResponseDomain.setCostTime((int) (end - start));
                appResponseDomain.setResult(result);

                if (os != null) {
                    try {
                        // 序列化返回值对象
                        byte[] out = CodecUtil.encodeResponse(appResponseDomain);
                        os.write(out);
                    } catch (Exception e2) {
                        log.error(e2.getMessage(), e2);
                    }
                    try {
                        os.flush();
                        os.close();
                    } catch (Exception e2) {
                        log.error(e2.getMessage(), e2);
                    }
                }

                if (appRequestDomain != null) {
                    log.info("[app-server] execute done, from client ip: {}, class: {}, method: {}, cost: {}ms ", req.getRemoteAddr(), appRequestDomain.getClassName(), appRequestDomain.getMethodName(), (end - start));
                } else {
                    log.info("[app-server] from client ip: {}, cost: {}ms ", req.getRemoteAddr(), (end - start));
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
  • 提供端的返回
  public class ResponseDomain {
    /**
     * 状态码 0 成功  5xx 服务端异常   3xx 客户端异常
     */
    private Integer code = 0;

    /**
     * 业务执行耗费时间
     */
    private Integer costTime = -1;

    /**
     * 错误信息
     */
    private String message;

    /**
     * 返回的数据对象
     */
    private Object result;

到这我们已经走通了消费者到提供者的调用以及结果的返回,肯定有朋友疑问消费者调用提供者的地址,提供者是怎么知道呀?其实这个我们再mini-dubbo中配置了一个自定义的servlet专门约定用来处理消费者发送的请求,

@Configuration
@AutoConfigureAfter({ServletWebServerFactoryAutoConfiguration.class})
public class MiniDubboServerAutoConfiguration {
    private static final String DEFAULT_MINI_DUBBO_SERVLET_BEAN_NAME = "miniDubboServerServlet";
    public static final String DEFAULT_MINI_DUBBO_SERVLET_BEAN_NAME_REGISTRATION_BEAN_NAME = "miniDubboServerServletRegistration";

    @Bean(name = {DEFAULT_MINI_DUBBO_SERVLET_BEAN_NAME_REGISTRATION_BEAN_NAME})
    public ServletRegistrationBean miniMvcServletFastRegistration() {
        ServletRegistrationBean registration = new ServletRegistrationBean(new AppServer(), AppServiceRegister.AppserverAddress);
        registration.setName(DEFAULT_MINI_DUBBO_SERVLET_BEAN_NAME);
        return registration;
    }
}

负载均衡算法

dubbo中支持最小活跃数,权重随机轮询,权重轮询,一致性哈希

而我写的mini-dubbo中同样支持权重随机轮询,权重轮询,一致性哈希。就是参考dubbo中实现的

总结

dubbo中是使用基于nio模式netty框架绑定指定的端口来进行数据的传输,而mini-dubbo中我是通过监听指定的接口地址进行http请求进行数据的传输

具体的mini-dubbo的使用方法可上github查看readme
在这里插入图片描述

Logo

瓜分20万奖金 获得内推名额 丰厚实物奖励 易参与易上手

更多推荐