微服务 Dubbo + Zookeeper 原理解析
补充:2018-04-20值得一说的是:下方的 “透明” 是通过 动态代理对 “负载均衡和容错”的封装 。此图配合下方案例代码可以更好的理解 分布式服务框架-RPC原理。协议:说明 :内容为小编个人见解,同时做备忘用基础准备 : javaSocket , serverSocket, RPC 协议。(1) 网络通信数据传输靠的就是 IO 流(byte[] 字...
补充:2018-04-20
值得一说的是:下方的 “透明” 是通过 动态代理对 “负载均衡和容错”的封装 。
此图配合下方案例代码可以更好的理解 分布式服务框架-RPC原理。
协议:
说明 :内容为小编个人见解,同时做备忘用
基础准备 : java Socket , serverSocket , RPC 协议。
(1) 网络通信数据传输靠的就是 IO 流(byte[] 字节) 。
(2) RPC 协议是指 : 利用tcp 通信,对byte[] 加上自己的规则, 服务端和客户端可以通过此规则进行数据的封装和解析。
dubbo + zookeeper 代码演练
服务提供者 :
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>tony.test</groupId>
<artifactId>dubbo-provider</artifactId>
<version>0.0.1-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>dubbo</artifactId>
<version>2.5.3</version>
</dependency>
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.8</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.7</version>
</dependency>
</dependencies>
</project>
provider.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://code.alibabatech.com/schema/dubbo http://code.alibabatech.com/schema/dubbo/dubbo.xsd">
<!-- 提供方应用信息,用于计算依赖关系 -->
<dubbo:application name="hello-world-app" />
<!-- 使用zookeeper注册中心暴露服务地址 -->
<dubbo:registry address="zookeeper://localhost:2181" />
<!-- 用dubbo协议在20880端口暴露服务 -->
<!-- serialization 协议序列化方式,当协议支持多种序列化方式时使用,,默认hessian2
比如:dubbo协议的dubbo,hessian2,java,compactedjava,以及http协议的json等
-->
<dubbo:protocol name="dubbo" port="20880" serialization="fastjson"/>
<!-- 声明需要暴露的服务接口 -->
<dubbo:service interface="com.tony.test.dubbo.DemoService" ref="demoService" />
<!-- 和本地bean一样实现服务 -->
<bean id="demoService" class="com.tony.test.dubbo.provider.DemoServiceImpl" />
</beans>
服务接口 :
package com.tony.test.dubbo;
public interface DemoService {
String sayHello(User user, String mark);
}
子类 :
package com.tony.test.dubbo.provider;
import com.tony.test.dubbo.DemoService;
import com.tony.test.dubbo.User;
public class DemoServiceImpl implements DemoService {
public String sayHello(User user, String mark) {
System.out.println("进来了----");
return "agui>" + mark + " name>" + user.getName();
}
}
main 入口 :
或者直接启动tomcat
package com.tony.test.dubbo.provider;
import org.springframework.context.support.ClassPathXmlApplicationContext;
public class ProviderMain {
// 服务提供者
public static void main(String[] args) throws Exception {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
new String[] { "provider.xml" });
context.start();
System.in.read();
}
}
服务消费者
pom.xml 同上 , 接口(interface)同上 。
consumer.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://code.alibabatech.com/schema/dubbo http://code.alibabatech.com/schema/dubbo/dubbo.xsd">
<!-- 消费方应用名,用于计算依赖关系,不是匹配条件,不要与提供方一样 -->
<dubbo:application name="consumer-of-helloworld-app" />
<!-- 使用multicast广播注册中心暴露发现服务地址 -->
<dubbo:registry address="zookeeper://localhost:2181" />
<!-- 生成远程服务代理,可以和本地bean一样使用demoService -->
<dubbo:reference id="demoService" interface="com.tony.test.dubbo.DemoService" timeout="30000"/>
</beans>
main 入口
或者启动tomcat
package com.tony.test.dubbo.consumer;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import com.tony.test.dubbo.DemoService;
import com.tony.test.dubbo.User;
public class ConsumerMain {
// 服务消费者
public static void main(String[] args) throws Exception {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[] { "consumer.xml" });
context.start();
DemoService demoService = (DemoService) context.getBean("demoService");
User user = new User();
user.setName("agui");
String hello = demoService.sayHello(user, "2");
System.out.println("RPC调用结果:" + hello);
}
}
以上就是 dubbo + zookeeper 简单demo 实现。
接下来讲解一下实现原理
之前提到过:带有规则的byte[] 字节 ,那么阿里的dubbo 对bute[] 规则 如图 :
基本结构分为:header , body 两部分,详细内容参考上图。
整体流程 :
1) 启动配置文件,spring bean 容器加载我们的接口实现类,服务提供者封装serverSocket , 并通过 zookeeper客户端在 zookeeper中创建节点, 节点中记录服务端的ip, 端口,暴露的接口等信息。
2) 客户端通过zookeeper客户端拿到服务端ip , 端口等信息。
3)构建header 和 body 的字节数组(byte) , 通过socket 拿到输出流,write() 方法输出字节数组。
4)服务端拿到输入流之后,会得到接口信息,调用的方法名称等,可以通过spring的bean 工厂类,拿到 注入到bean 中的 该接口子类对象,然后去掉用具体的方法,拿到返回值后可以进行序列化,在用 socket 和 serverSocket 建立的管道 输出流 输出 子类方法返回的值,此时 http 请求,响应完毕。
说明:http 通信字节流,转为字符流,反序列化可以用阿里的
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.7</version>
</dependency>
模拟消费端源码 :
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>tony.test</groupId>
<artifactId>tony-dubbo-client</artifactId>
<version>0.0.1-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.7</version>
</dependency>
</dependencies>
</project>
byte[]原数据封装类 :
package com.agui.test.consumer;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.nio.ByteBuffer;
import com.alibaba.fastjson.JSON;
/**
* 发起dubbo调用请求
*/
public class AguiDubboRpcRequest {
String host; // 服务提供者IP
int port; // 服务提供者端口
// 协议中header段
ByteBuffer header = ByteBuffer.allocate(16);
// 协议中body段
StringBuffer rpcBody = new StringBuffer();
public TonyDubboRpcRequest(String host, int port) {
this.host = host;
this.port = port;
// 魔数 da bb
header.put((byte) 0xda);
header.put((byte) 0xbb);
// 标识 固定为 C6
header.put((byte) 0xC6);
// 响应状态 ,这里没有
header.put((byte) 0x00);
// 消息ID 编号传进来
// 数据长度 这个要在最后计算
}
/** 需要是完整的类名 */
public TonyDubboRpcRequest dubboVersion(String dubboVersion) {
rpcBody.append(JSON.toJSONString(dubboVersion));
rpcBody.append("\r\n");
return this;
}
/** 需要是完整的类名 */
public TonyDubboRpcRequest path(String path) {
rpcBody.append(JSON.toJSONString(path));
rpcBody.append("\r\n");
return this;
}
/** 服务版本号 */
public TonyDubboRpcRequest serviceVersion(String serviceVersion) {
rpcBody.append(JSON.toJSONString(serviceVersion));
rpcBody.append("\r\n");
return this;
}
/** 方法名 */
public TonyDubboRpcRequest method(String method) {
rpcBody.append(JSON.toJSONString(method));
rpcBody.append("\r\n");
return this;
}
/** 参数类型 */
public TonyDubboRpcRequest desc(String desc) {
rpcBody.append(JSON.toJSONString(desc));
rpcBody.append("\r\n");
return this;
}
/** 参数值 */
public TonyDubboRpcRequest values(Object... values) {
for (Object value : values) {
rpcBody.append(JSON.toJSONString(value));
rpcBody.append("\r\n");
}
return this;
}
/** 隐式传参 */
public TonyDubboRpcRequest attachments(Object attachment) {
rpcBody.append(JSON.toJSONString(attachment));
rpcBody.append("\r\n");
return this;
}
/** 补齐header */
public TonyDubboRpcRequest build(long msgId) {
// 消息ID
byte[] msgIdBytes = new byte[8];
long2bytes(msgId, msgIdBytes, 0);
this.header.put(msgIdBytes);
// 计算body长度
int length = this.rpcBody.toString().getBytes().length;
byte[] lenBytes = new byte[4];
int2bytes(length, lenBytes, 0);
this.header.put(lenBytes);
return this;
}
/** int转4字节数组 */
private void int2bytes(int v, byte[] b, int off) {
b[off + 3] = (byte) v;
b[off + 2] = (byte) (v >>> 8);
b[off + 1] = (byte) (v >>> 16);
b[off + 0] = (byte) (v >>> 24);
}
/** long转8字节数组 */
private void long2bytes(long v, byte[] b, int off) {
b[off + 7] = (byte) v;
b[off + 6] = (byte) (v >>> 8);
b[off + 5] = (byte) (v >>> 16);
b[off + 4] = (byte) (v >>> 24);
b[off + 3] = (byte) (v >>> 32);
b[off + 2] = (byte) (v >>> 40);
b[off + 1] = (byte) (v >>> 48);
b[off + 0] = (byte) (v >>> 56);
}
public String run() throws Exception {
System.out.println("###########输出rcpBody内容:##########");
System.out.println(this.rpcBody.toString());
System.out.println("###########结束输出###########");
Socket socket = new Socket(this.host, this.port);
try {
OutputStream rpcOPS = socket.getOutputStream();
// 发起调用请求
rpcOPS.write(header.array());
rpcOPS.write(this.rpcBody.toString().getBytes());
// 输出RPC调用结果
InputStream rpcRsp = socket.getInputStream();
byte[] header = new byte[16];
rpcRsp.read(header);
byte[] resp = new byte[1024];
rpcRsp.read(resp);
// 忽略header,取body
return new String(resp);
} catch (Exception e) {
e.printStackTrace();
} finally {
socket.close();
}
return null;
}
}
程序入口 :
package com.agui.test.consumer;
import com.alibaba.fastjson.JSONObject;
public class AguiConsumerMain {
// 手写dubbo客户端
public static void main(String[] args) throws Exception {
// TODO 配置注册中心
// TODO 获取服务信息
// 本实例重点在于分析dubbo rpc协议内容,故此消费者假定已经知晓服务端的信息。
JSONObject user = JSONObject.parseObject("{\"name\":\"tony\"}");
TonyDubboRpcRequest tonyDubboRpcRequest = new TonyDubboRpcRequest("127.0.0.1", 20880);
tonyDubboRpcRequest
.dubboVersion("2.5.3")
.path("com.tony.test.dubbo.DemoService")
.serviceVersion("0.0.0.0")
.method("sayHello")
.desc("Lcom/tony/test/dubbo/User;Ljava/lang/String;")
.values(user, "测试tony_1")
.attachments(JSONObject.parseObject("{\"path\":\"com.tony.test.dubbo.DemoService\",\"interface\":\"com.tony.test.dubbo.DemoService\",\"version\":\"0.0.0\"}"))
.build(3);
String result = tonyDubboRpcRequest.run();
System.out.println("RPC调用结果:");
System.out.println(result);
}
}
运行程序的时候需要启动上面的:服务提供者,zookeeper 。
更多推荐
所有评论(0)