补充:2018-04-20

值得一说的是:下方的 “透明” 是通过 动态代理对 “负载均衡和容错”的封装 。

这里写图片描述

此图配合下方案例代码可以更好的理解 分布式服务框架-RPC原理。

协议:
这里写图片描述

说明 :内容为小编个人见解,同时做备忘用

基础准备 : java Socket , serverSocket , RPC 协议。

(1) 网络通信数据传输靠的就是 IO 流(byte[] 字节) 。

(2) RPC 协议是指 : 利用tcp 通信,对byte[] 加上自己的规则, 服务端和客户端可以通过此规则进行数据的封装和解析。

dubbo + zookeeper 代码演练

服务提供者 :

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>tony.test</groupId>
    <artifactId>dubbo-provider</artifactId>
    <version>0.0.1-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>dubbo</artifactId>
            <version>2.5.3</version>
        </dependency>
        <dependency>
            <groupId>com.101tec</groupId>
            <artifactId>zkclient</artifactId>
            <version>0.8</version>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.7</version>
        </dependency>
    </dependencies>
</project>

provider.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
    xsi:schemaLocation="http://www.springframework.org/schema/beans        http://www.springframework.org/schema/beans/spring-beans.xsd        http://code.alibabatech.com/schema/dubbo        http://code.alibabatech.com/schema/dubbo/dubbo.xsd">

    <!-- 提供方应用信息,用于计算依赖关系 -->
    <dubbo:application name="hello-world-app"  />

    <!-- 使用zookeeper注册中心暴露服务地址 -->
    <dubbo:registry address="zookeeper://localhost:2181" />

    <!-- 用dubbo协议在20880端口暴露服务 -->
    <!-- serialization 协议序列化方式,当协议支持多种序列化方式时使用,,默认hessian2
        比如:dubbo协议的dubbo,hessian2,java,compactedjava,以及http协议的json等 
    -->
    <dubbo:protocol name="dubbo" port="20880" serialization="fastjson"/>

    <!-- 声明需要暴露的服务接口 -->
    <dubbo:service interface="com.tony.test.dubbo.DemoService" ref="demoService" />

    <!-- 和本地bean一样实现服务 -->
    <bean id="demoService" class="com.tony.test.dubbo.provider.DemoServiceImpl" />

</beans>

服务接口 :

package com.tony.test.dubbo;

public interface DemoService {

    String sayHello(User user, String mark);
}

子类 :

package com.tony.test.dubbo.provider;

import com.tony.test.dubbo.DemoService;
import com.tony.test.dubbo.User;

public class DemoServiceImpl implements DemoService {

    public String sayHello(User user, String mark) {

        System.out.println("进来了----");

        return "agui>" + mark + " name>" + user.getName();
    }
}

main 入口 :

或者直接启动tomcat

package com.tony.test.dubbo.provider;

import org.springframework.context.support.ClassPathXmlApplicationContext;

public class ProviderMain {
    // 服务提供者
    public static void main(String[] args) throws Exception {
        ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
                new String[] { "provider.xml" });
        context.start();

        System.in.read();
    }
}

服务消费者

pom.xml 同上 , 接口(interface)同上 。

consumer.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
    xsi:schemaLocation="http://www.springframework.org/schema/beans        http://www.springframework.org/schema/beans/spring-beans.xsd        http://code.alibabatech.com/schema/dubbo        http://code.alibabatech.com/schema/dubbo/dubbo.xsd">

    <!-- 消费方应用名,用于计算依赖关系,不是匹配条件,不要与提供方一样 -->
    <dubbo:application name="consumer-of-helloworld-app"  />

    <!-- 使用multicast广播注册中心暴露发现服务地址 -->
    <dubbo:registry address="zookeeper://localhost:2181" />

    <!-- 生成远程服务代理,可以和本地bean一样使用demoService -->
    <dubbo:reference id="demoService" interface="com.tony.test.dubbo.DemoService" timeout="30000"/>

</beans>

main 入口

或者启动tomcat

package com.tony.test.dubbo.consumer;

import org.springframework.context.support.ClassPathXmlApplicationContext;

import com.tony.test.dubbo.DemoService;
import com.tony.test.dubbo.User;

public class ConsumerMain {

    // 服务消费者
    public static void main(String[] args) throws Exception {
        ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[] { "consumer.xml" });
        context.start();
        DemoService demoService = (DemoService) context.getBean("demoService"); 
        User user = new User();
        user.setName("agui");
        String hello = demoService.sayHello(user, "2"); 

        System.out.println("RPC调用结果:" + hello); 
    }

}

以上就是 dubbo + zookeeper 简单demo 实现。

接下来讲解一下实现原理

之前提到过:带有规则的byte[] 字节 ,那么阿里的dubbo 对bute[] 规则 如图 :

这里写图片描述

这里写图片描述

基本结构分为:header , body 两部分,详细内容参考上图。

整体流程 :

1) 启动配置文件,spring bean 容器加载我们的接口实现类,服务提供者封装serverSocket , 并通过 zookeeper客户端在 zookeeper中创建节点, 节点中记录服务端的ip, 端口,暴露的接口等信息。

2) 客户端通过zookeeper客户端拿到服务端ip , 端口等信息。

3)构建header 和 body 的字节数组(byte) , 通过socket 拿到输出流,write() 方法输出字节数组。

4)服务端拿到输入流之后,会得到接口信息,调用的方法名称等,可以通过spring的bean 工厂类,拿到 注入到bean 中的 该接口子类对象,然后去掉用具体的方法,拿到返回值后可以进行序列化,在用 socket 和 serverSocket 建立的管道 输出流 输出 子类方法返回的值,此时 http 请求,响应完毕。

说明:http 通信字节流,转为字符流,反序列化可以用阿里的

<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>1.2.7</version>
</dependency>

模拟消费端源码 :

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>tony.test</groupId>
    <artifactId>tony-dubbo-client</artifactId>
    <version>0.0.1-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.7</version>
        </dependency>
    </dependencies>
</project>

byte[]原数据封装类 :

package com.agui.test.consumer;

import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.nio.ByteBuffer;

import com.alibaba.fastjson.JSON;

/**
 * 发起dubbo调用请求
 */
public class AguiDubboRpcRequest {

    String host; // 服务提供者IP
    int port; // 服务提供者端口

    // 协议中header段
    ByteBuffer header = ByteBuffer.allocate(16);

    // 协议中body段
    StringBuffer rpcBody = new StringBuffer();

    public TonyDubboRpcRequest(String host, int port) {
        this.host = host;
        this.port = port;

        // 魔数 da bb
        header.put((byte) 0xda);
        header.put((byte) 0xbb);

        // 标识 固定为 C6
        header.put((byte) 0xC6);

        // 响应状态 ,这里没有
        header.put((byte) 0x00);

        // 消息ID 编号传进来

        // 数据长度 这个要在最后计算

    }

    /** 需要是完整的类名 */
    public TonyDubboRpcRequest dubboVersion(String dubboVersion) {
        rpcBody.append(JSON.toJSONString(dubboVersion));
        rpcBody.append("\r\n");
        return this;
    }

    /** 需要是完整的类名 */
    public TonyDubboRpcRequest path(String path) {
        rpcBody.append(JSON.toJSONString(path));
        rpcBody.append("\r\n");
        return this;
    }

    /** 服务版本号 */
    public TonyDubboRpcRequest serviceVersion(String serviceVersion) {
        rpcBody.append(JSON.toJSONString(serviceVersion));
        rpcBody.append("\r\n");
        return this;
    }

    /** 方法名 */
    public TonyDubboRpcRequest method(String method) {
        rpcBody.append(JSON.toJSONString(method));
        rpcBody.append("\r\n");
        return this;
    }

    /** 参数类型 */
    public TonyDubboRpcRequest desc(String desc) {
        rpcBody.append(JSON.toJSONString(desc));
        rpcBody.append("\r\n");
        return this;
    }

    /** 参数值 */
    public TonyDubboRpcRequest values(Object... values) {
        for (Object value : values) {
            rpcBody.append(JSON.toJSONString(value));
            rpcBody.append("\r\n");
        }
        return this;
    }

    /** 隐式传参 */
    public TonyDubboRpcRequest attachments(Object attachment) {

        rpcBody.append(JSON.toJSONString(attachment));
        rpcBody.append("\r\n");
        return this;
    }

    /** 补齐header */
    public TonyDubboRpcRequest build(long msgId) {
        // 消息ID
        byte[] msgIdBytes = new byte[8];
        long2bytes(msgId, msgIdBytes, 0);
        this.header.put(msgIdBytes);

        // 计算body长度
        int length = this.rpcBody.toString().getBytes().length;
        byte[] lenBytes = new byte[4];
        int2bytes(length, lenBytes, 0);
        this.header.put(lenBytes);

        return this;
    }

    /** int转4字节数组 */
    private void int2bytes(int v, byte[] b, int off) {
        b[off + 3] = (byte) v;
        b[off + 2] = (byte) (v >>> 8);
        b[off + 1] = (byte) (v >>> 16);
        b[off + 0] = (byte) (v >>> 24);
    }

    /** long转8字节数组 */
    private void long2bytes(long v, byte[] b, int off) {
        b[off + 7] = (byte) v;
        b[off + 6] = (byte) (v >>> 8);
        b[off + 5] = (byte) (v >>> 16);
        b[off + 4] = (byte) (v >>> 24);
        b[off + 3] = (byte) (v >>> 32);
        b[off + 2] = (byte) (v >>> 40);
        b[off + 1] = (byte) (v >>> 48);
        b[off + 0] = (byte) (v >>> 56);
    }

    public String run() throws Exception {
        System.out.println("###########输出rcpBody内容:##########");
        System.out.println(this.rpcBody.toString());
        System.out.println("###########结束输出###########");

        Socket socket = new Socket(this.host, this.port);
        try {
            OutputStream rpcOPS = socket.getOutputStream();
            // 发起调用请求
            rpcOPS.write(header.array());
            rpcOPS.write(this.rpcBody.toString().getBytes());

            // 输出RPC调用结果
            InputStream rpcRsp = socket.getInputStream();
            byte[] header = new byte[16];
            rpcRsp.read(header);

            byte[] resp = new byte[1024];
            rpcRsp.read(resp);

            // 忽略header,取body
            return new String(resp);

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            socket.close();
        }
        return null;
    }
}

程序入口 :

package com.agui.test.consumer;

import com.alibaba.fastjson.JSONObject;

public class AguiConsumerMain {

    // 手写dubbo客户端
    public static void main(String[] args) throws Exception {
        // TODO 配置注册中心
        // TODO 获取服务信息
        // 本实例重点在于分析dubbo rpc协议内容,故此消费者假定已经知晓服务端的信息。

        JSONObject user = JSONObject.parseObject("{\"name\":\"tony\"}");

        TonyDubboRpcRequest tonyDubboRpcRequest = new TonyDubboRpcRequest("127.0.0.1", 20880);
        tonyDubboRpcRequest
        .dubboVersion("2.5.3")
        .path("com.tony.test.dubbo.DemoService")
        .serviceVersion("0.0.0.0")
        .method("sayHello")
        .desc("Lcom/tony/test/dubbo/User;Ljava/lang/String;")
        .values(user, "测试tony_1")
        .attachments(JSONObject.parseObject("{\"path\":\"com.tony.test.dubbo.DemoService\",\"interface\":\"com.tony.test.dubbo.DemoService\",\"version\":\"0.0.0\"}"))
        .build(3);

        String result = tonyDubboRpcRequest.run();

        System.out.println("RPC调用结果:");
        System.out.println(result);
    }

}

运行程序的时候需要启动上面的:服务提供者,zookeeper 。

Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐