介绍 🐹

gRPC:grpc 是 Google 开发的一种开源 RPC(Remote Procedure Call Protocol 远程过程调用的协议)框架,在 gRPC 中,客户端应用程序可以像本地对象一样直接调用不同机器上的服务器应用程序上的方法,从而使您可以更轻松地创建分布式应用程序和服务。

简单的说:

如果你的 java 项目里有 A 和 B 两个类,A 中有 aa 方法,B 中有 bb 方法,你可以很容易的在 aa 里调用 bb 方法。

但是如果 aa 方法和 bb 方法分别在两台服务器上的不同项目代码模块里呢(也就是在不同的微服务里),你可能会用 http相关的库,比如 httpclient 通过 url 发送 get 或 post 去请求 bb 方法。但是这样做效率并不高效,在微服务和分布式中并不好用,使用 grpc ,你就可以像在调用本地方法一样去调用其他服务器上的方法,直观又高效。

关于 grpc 的使用和文档,可参考官网:grpc

一、gRPC 存根与 proto 🐖

与许多 RPC 系统一样,gRPC 基于定义服务的思想,在 proto 文件中指定远程调用的方法及其参数和返回类型。在服务端,服务端实现这个接口并运行一个 gRPC 服务来处理客户端调用。在客户端,客户端有一个存根即 stub,它提供与服务器相同的方法。

gRPC 的使用简单来说就是:
在 proto 文件定义要调用的方法,要传入的参数、返回值。然后 grpc 会根据 proto 生成两个东西,一个是客户端存根stub,用于给客户端提供调用方法名和塞入参;另一个是服务端接口,用于给服务端实现这个接口,返回给客户端。gRPC 就像一个桥梁一样连接着客户端和服务端,实现了远程调用。

二、grpc的 4 种模式介绍 🚎

1、简单模式(Simple RPC)

简单模式:也称简单 RPC,即客户端发起一次请求,服务端响应处理后返回一个结果给客户端。

在 proto 文件中可如下定义:

rpc SayHello(HelloRequest) returns (HelloResponse);

2、服务端数据流模式(Server-side streaming RPC)

服务端数据流模式:也称服务端流式 RPC,即客户端发起一次请求,服务端可以连续返回数据流。
比如:客户端向服务端发送了一个查询数据库的请求,服务端持续返回多次结果。(即客户端发送一次请求,服务端查询到数据库有一万条数据,服务端分批返回10次,每次返回1000条数据给客户端)。

在 proto 文件中可如下定义:

rpc LotsOfReplies(HelloRequest) returns (stream HelloResponse);

3、客户端数据流模式(Client-side streaming RPC)

客户端数据流模式:也称客户端流式 RPC,与服务端数据流模式相反,客户端持续向服务端发送数据流,在发送结束后,由服务端返回一个响应。
比如:客户端有一万条数据 ,分批多次请求服务端,服务端接收后把这些数据都存到数据库,然后返回一次结果给客户端。

在 proto 文件中可如下定义:

rpc LotsOfGreetings(stream HelloRequest) returns (HelloResponse);

4、双向数据流模式(Bidirectional streaming RPC)

双向数据流模式:也称双向流式 RPC,即客户端和服务端都可以向对方多次收发数据。

比如:客户端有一万条数据 ,分批多次请求服务端,服务端每次接收后存到数据库后都发送一次结果给客户端。

在 proto 文件中可如下定义:

rpc BidiHello(stream HelloRequest) returns (stream HelloResponse);

三、grpc 4 种模式的代码示例 🐯

下面使用 springboot 来开发 4 个接口对 grpc 4 种模式进行代码示例。

下面的只是部分关键代码。

完整代码我已上传到 github,你可以去 grpc-spring-boot-demo 把完整的代码 clone 或下载下来运行。

最终效果如图:

请添加图片描述

1.maven

        <!-- grpc客户端所需依赖-->
        <dependency>
            <groupId>net.devh</groupId>
            <artifactId>grpc-spring-boot-starter</artifactId>
            <version>2.14.0.RELEASE</version>
        </dependency>

        <!-- 用于将grpc-protocol模块里的proto生成的代码打成的jar包给客户端和服务端使用-->
        <dependency>
            <groupId>com.example</groupId>
            <artifactId>grpc-protocol</artifactId>
            <version>0.0.1-SNAPSHOT</version>
        </dependency>

2.proto文件

syntax = "proto3";

package grpcprotocol;

option java_multiple_files = true;
option java_package = "com.example.grpcprotocol";
option java_outer_classname = "FirstTestServiceProtos";


message QueryInfoRequest {
  int32  id = 1; //id
  string  school = 2;
  bool isBoy = 3 ;
  repeated personalInfo personalInfoRecords = 4;
  map<string, string> familyInfo = 5;
}

message personalInfo{
  int64 identityNumber = 1;//身份证号
  string name = 2;
}

message QueryInfoResponse {
  int32  id = 1; //id
  string name = 2; //姓名
  string uuid = 3;
  repeated subjectMarks records = 4;
  map<string, string> subjectTeachs = 5;
}

message subjectMarks{
  string subjectName = 1;
  double score = 2;
}

//-------------------------------------------------

message QuerySomeDataRequest{
  int32 id = 1;
}

message QuerySomeDataResponse{
  int32 id = 1;
  repeated productInfo records = 2;
}

message productInfo{
  int32 productId = 1;
  string time = 2;
}

//-------------------------------------------------
message SendBooksInfoRequest{
  int32 id = 1;
  map<string, string> data = 2;
}

message SendBooksInfoResponse{
  int32 code = 1; // 返回值
  string note = 2; //返回消息
}

//-------------------------------------------------
message CalculateSumRequest{
  int32 id = 1;
  repeated int32 number= 2;
}

message CalculateSumResponse{
  int32 code = 1; // 返回值
  string note = 2; //返回消息
  int32 sum = 3;
}

service FirstTestService {
  //简单grpc
  rpc QueryInfo(QueryInfoRequest) returns (QueryInfoResponse){};
  //服务端流式grpc
  rpc QuerySomeData(QuerySomeDataRequest) returns (stream QuerySomeDataResponse){};
  //客户端流式grpc
  rpc SendBooksInfo(stream SendBooksInfoRequest) returns (SendBooksInfoResponse){};
  //双向流
  rpc CalculateSum(stream CalculateSumRequest) returns (stream CalculateSumResponse){};
}

3.客户端

package com.example.grpcclient.domo.service;

import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import com.example.grpcclient.domo.model.IdentityModel;
import com.example.grpcclient.domo.model.PersonInfoModel;
import com.example.grpcprotocol.*;
import io.grpc.stub.StreamObserver;
import net.devh.boot.grpc.client.inject.GrpcClient;
import org.springframework.stereotype.Service;

import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import static com.example.grpcclient.domo.util.ParamUtils.*;

@Service
public class ClientConsumer {
    @GrpcClient("mygrpc-server")
    //xxxBlockingStub用于简单grpc、服务端流式grpc
    private FirstTestServiceGrpc.FirstTestServiceBlockingStub stub;

    @GrpcClient("mygrpc-server")
    //xxxServiceStub用于客户端流式grpc、双向流grpc
    private FirstTestServiceGrpc.FirstTestServiceStub stub1;

    //查询人员信息(简单grpc)
    public JSONObject queryInfo(PersonInfoModel customerListModel) {
        //构建请求参数
        QueryInfoRequest.Builder requestBuild = QueryInfoRequest.newBuilder();
        Optional.ofNullable(customerListModel.getId()).ifPresent(requestBuild::setId);
        Optional.ofNullable(customerListModel.getSchool()).ifPresent(requestBuild::setSchool);
        Optional.ofNullable(customerListModel.isBoy()).ifPresent(requestBuild::setIsBoy);
        List<IdentityModel> list = customerListModel.getPersonalInfos();
        if (null != list) {
            list.forEach((v) -> {
                personalInfo.Builder ryjblbBuilder = personalInfo.newBuilder()
                        .setIdentityNumber(getInt(v.getIdentityNumber()))
                        .setName(getString(v.getName()));
                requestBuild.addPersonalInfoRecords(ryjblbBuilder.build());
            });
        }
        Optional.ofNullable(customerListModel.getSchool()).ifPresent(requestBuild::setSchool);
        Map<String, String> map = new HashMap<>();
        map.put("父亲", "张明");
        map.put("母亲", "李梅");
        requestBuild.putAllFamilyInfo(map);

        //发送请求到服务端
        QueryInfoRequest request = requestBuild.build();
        QueryInfoResponse response = stub.queryInfo(request);

        //获取服务端返回的结果
        int id = response.getId();
        String name = response.getName();
        String uuid = response.getUuid();
        List<subjectMarks> recordsList = response.getRecordsList();
        Map<String, String> subjectTeachs = response.getSubjectTeachsMap();

        JSONObject json = new JSONObject();
        json.put("id", id);
        json.put("name", name);
        json.put("uuid", uuid);
        JSONArray array = new JSONArray();
        recordsList.forEach(v -> {
            JSONObject j = new JSONObject();
            j.put("score", v.getScore());
            j.put("subjectName", v.getSubjectName());
            array.add(j);
        });

        json.put("subjectMarks", array);
        json.put("subjectTeachs", subjectTeachs);
        return json;
    }

    //查询数据(服务端流式grpc)
    public JSONObject querySomeData(int id) {
        //构建请求参数
        QuerySomeDataRequest.Builder requestBuild = QuerySomeDataRequest.newBuilder();
        requestBuild.setId(id);

        JSONObject json = new JSONObject();
        //流式服务端返回的多个结果要用迭代器接收
        Iterator<QuerySomeDataResponse> iteraResponse = stub.querySomeData(requestBuild.build());
        while (iteraResponse.hasNext()) {
            QuerySomeDataResponse response = iteraResponse.next();
            int id1 = response.getId();
            List<productInfo> recordsList = response.getRecordsList();
            JSONArray array = new JSONArray();
            recordsList.forEach(v -> {
                JSONObject j = new JSONObject();
                j.put("productId", v.getProductId());
                j.put("time", v.getTime());
                array.add(j);
            });
            json.put("productInfo", array);
            json.put("id", id1);
        }
        return json;
    }

    //发送书籍信息(客户端流式grpc)
    public JSONObject sendBooksInfo(int id) {
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        SendBooksInfoResponse.Builder resp = SendBooksInfoResponse.newBuilder();
        StreamObserver<SendBooksInfoResponse> responseStreamObserver = new StreamObserver<SendBooksInfoResponse>() {
            @Override
            public void onNext(SendBooksInfoResponse value) {
                resp.setCode(value.getCode());
                resp.setNote(value.getNote());
            }

            @Override
            public void onError(Throwable t) {
                resp.setCode(-1);
                resp.setNote(t.getMessage());
                countDownLatch.countDown();
            }

            @Override
            public void onCompleted() {
                countDownLatch.countDown();
            }
        };
        StreamObserver<SendBooksInfoRequest> sendBooksStreamObserver = stub1.sendBooksInfo(responseStreamObserver);
        for (int i = 1; i < 4; i++) {
            SendBooksInfoRequest.Builder req = SendBooksInfoRequest.newBuilder();
            Map<String, String> books = new HashMap<>();
            books.put("书籍", String.valueOf(i));
            books.put("作者", "张三" + i);
            req.putAllData(books);
            req.setId(id);
            //发送请求到服务端
            sendBooksStreamObserver.onNext(req.build());
        }
        //3次发送完成,结束发送
        sendBooksStreamObserver.onCompleted();

        try {
            //等待线程执行完服务端返回结果(超时时间为2秒)
            countDownLatch.await(2, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        //获取结果
        SendBooksInfoResponse response = resp.build();
        JSONObject json = new JSONObject();
        json.put("code", response.getCode());
        json.put("note", response.getNote());
        return json;
    }

    //计算一到一万的和(双向流grpc)
    public JSONObject calculateSum(int id) {
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        CalculateSumResponse.Builder resp = CalculateSumResponse.newBuilder();
        List<Integer> sumList = new ArrayList<>();
        StreamObserver<CalculateSumResponse> responseStreamObserver = new StreamObserver<CalculateSumResponse>() {
            @Override
            public void onNext(CalculateSumResponse value) {
                //获取服务端每次返回的结果,把结果放入sumList
                resp.setCode(value.getCode());
                resp.setNote(value.getNote());
                resp.setSum(value.getSum());
                sumList.add(value.getSum());
                System.out.println("calculateSum sum:"+value.getSum());
            }

            @Override
            public void onError(Throwable t) {
                resp.setCode(-1);
                resp.setNote(t.getMessage());
                countDownLatch.countDown();
            }

            @Override
            public void onCompleted() {
                countDownLatch.countDown();
            }
        };
        StreamObserver<CalculateSumRequest> calculateSumStreamObserver = stub1.calculateSum(responseStreamObserver);

        CalculateSumRequest.Builder req = CalculateSumRequest.newBuilder();
        req.setId(id);
        for (int i = 1; i <= 10000; i++) {
            req.addNumber(i);
            //每一千个数发送到服务端
            if (i % 1000 == 0) {
                //发送请求到服务端
                calculateSumStreamObserver.onNext(req.build());
                //发送完后清除那一千个数
                req.clearNumber();
            }
        }
        //发送完成,结束发送
        calculateSumStreamObserver.onCompleted();

        try {
            //等待线程执行完服务端返回结果(超时时间为2秒)
            countDownLatch.await(2, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        //获取结果
        CalculateSumResponse response = resp.build();
        JSONObject json = new JSONObject();
        json.put("code", response.getCode());
        json.put("note", response.getNote());
        //计算sumList里的十个数得到一到一万的和
        json.put("sum",  sumList.stream().reduce(0, Integer::sum));
        return json;
    }
}

4.服务端

package com.example.grpcserver.demo.provider;


import com.example.grpcprotocol.*;
import com.example.grpcserver.demo.model.SubjectMarksModel;
import io.grpc.Status;
import io.grpc.StatusException;
import io.grpc.stub.StreamObserver;
import net.devh.boot.grpc.server.service.GrpcService;

import java.util.*;

@GrpcService
public class ServiceProvider extends FirstTestServiceGrpc.FirstTestServiceImplBase {

    @Override
    //查询人员信息(简单grpc)
    public void queryInfo(QueryInfoRequest request, StreamObserver<QueryInfoResponse> responseObserver) {
        //获取从客户端传来的id、school、isboy、personalInfoRecordsList
        int id = request.getId();
        String school = request.getSchool();
        boolean isBoy = request.getIsBoy();
        List<personalInfo> personalInfoRecordsList = request.getPersonalInfoRecordsList();
        System.out.println("queryInfoDemo id:" + id + ",school:" + school + ",isBoy:" + isBoy);

        //构造要返回到客户端的数据
        QueryInfoResponse.Builder responseBuild = QueryInfoResponse.newBuilder();
        responseBuild.setId(id);
        responseBuild.setName("张三");
        String uuid = UUID.randomUUID().toString().replace("-", "");
        responseBuild.setUuid(uuid);

        List<SubjectMarksModel> list = new ArrayList<>();
        list.add(new SubjectMarksModel("小明", 98.5));
        list.add(new SubjectMarksModel("小王", 90.5));
        list.forEach((v) -> {
            subjectMarks record = subjectMarks.newBuilder()
                    .setScore(v.getScore())
                    .setSubjectName(v.getSubjectName())
                    .build();
            responseBuild.addRecords(record);
        });

        Map<String, String> map = new HashMap<>();
        map.put("语文", "王刚");
        map.put("数学", "李磊");
        map.put("英语", "刘墨");
        responseBuild.putAllSubjectTeachs(map);

        //把数据返回客户端
        responseObserver.onNext(responseBuild.build());
        responseObserver.onCompleted();
    }

    @Override
    //查询数据(服务端流式grpc)
    public void querySomeData(QuerySomeDataRequest request, StreamObserver<QuerySomeDataResponse> responseObserver) {
        int id = request.getId();
        System.out.println("querySomeData id:" + id);

        QuerySomeDataResponse.Builder builder = QuerySomeDataResponse.newBuilder();
        for (int i = 1; i < 4; i++) {
            productInfo.Builder productInfoBuild = productInfo.newBuilder();
            productInfoBuild.setProductId(1000 + i);
            productInfoBuild.setTime(String.valueOf(System.currentTimeMillis() / 1000));
            builder.setId(i)
                    .addRecords(productInfoBuild)
                    .build();
            responseObserver.onNext(builder.build());
        }
        responseObserver.onCompleted();
    }

    @Override
    //发送书籍信息(客户端流式grpc)
    public StreamObserver<SendBooksInfoRequest> sendBooksInfo(StreamObserver<SendBooksInfoResponse> responseObserver) {
        List<Map<String, String>> datas = new ArrayList<>();
        return new StreamObserver<SendBooksInfoRequest>() {
            @Override
            public void onNext(SendBooksInfoRequest request) {
                //接收客户端每一次发送来的数据放到datas
                datas.add(request.getDataMap());
            }

            @Override
            public void onError(Throwable t) {
                t.printStackTrace();
            }

            @Override
            public void onCompleted() {
                //当客户端数据发送完毕后调用此方法,返回客户端数据
                SendBooksInfoResponse.Builder response = SendBooksInfoResponse.newBuilder();
                try {
                    System.out.println("sendBooksInfo:" + datas);
                    response.setCode(1);
                    response.setNote("成功");
                } catch (Exception e) {
                    e.printStackTrace();
                    responseObserver.onError(new StatusException(Status.INTERNAL));
                    return;
                }
                SendBooksInfoResponse resp = response.build();
                //返回客户端
                responseObserver.onNext(resp);
                responseObserver.onCompleted();
            }
        };
    }


    @Override
    //计算一到一万的和(双向流grpc)
    public StreamObserver<CalculateSumRequest> calculateSum(StreamObserver<CalculateSumResponse> responseObserver) {
        return new StreamObserver<CalculateSumRequest>() {
            @Override
            public void onNext(CalculateSumRequest request) {
                //接收客户端每一次发送来的1000个数,求和后返回给客户端
                int id = request.getId();
                List<Integer> numberList = request.getNumberList();
                int sum = numberList.stream().reduce(0, Integer::sum);
                System.out.println("calculateSum sum:" + sum);
                CalculateSumResponse calculateSumResponse = CalculateSumResponse.newBuilder()
                        .setCode(1)
                        .setNote("成功")
                        .setSum(sum)
                        .build();
                responseObserver.onNext(calculateSumResponse);
            }

            @Override
            public void onError(Throwable t) {
                t.printStackTrace();
            }

            @Override
            public void onCompleted() {
                //当客户端数据发送完毕后调用此方法,返回客户端
                CalculateSumResponse.Builder response = CalculateSumResponse.newBuilder();
                response.setCode(1);
                response.setNote("成功");
                //返回客户端
                responseObserver.onNext(response.build());
                responseObserver.onCompleted();
            }
        };
    }

}
Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐