开发环境

  • grpc-go: v1.24
  • consul:1.6
  • 设备: docker-compose、 mac

搭建一个单节点的consul环境(无acl)

共两个节点,server和client。

version: '2'
services:
  consul_server: 
    image: consul:1.6
    restart: always
    ports: 
      - "8301:8301"
    command: agent -server -bind=0.0.0.0 -client=0.0.0.0 -bootstrap-expect=1 -node=consul1 -datacenter=dc1
  consul_client:
    image: consul:1.6
    restart: always
    ports: 
      - "8400:8400"
      - "8500:8500"
      - "8600:8600"
    depends_on: 
      - consul_server
    command: agent -bind=0.0.0.0 -client=0.0.0.0 -retry-join=consul_server -ui -node=client1 -datacenter=dc1  -join consul_server

其中consul_server为单节点,并自举为master节点。consul_client为当前主机使用的client。

服务注册

grpc中预留了服务注册和服务解析服务负载均衡的接口,均可以自行实现。
其中服务注册最易实现,也比较好理解。分以下几个步骤

  • 1、自行实现一个注册服务用的结构体记录服务信息
  • 2、创建一个consul/api 的客户端,连接到先前安装配置好的单节点consul集群。
  • 3、实现check功能,因为是grpc的服务直接选择了grpc类型。
  • 4、实现check函数,并注册进consul
regis.go 服务实际脚本,实现了 健康检查
package regis

import (
	"fmt"
	"google.golang.org/grpc"
	"google.golang.org/grpc/health/grpc_health_v1"
	"google.golang.org/grpc/reflection"
	"library/microservice"
	"net"
)

type Server struct {
	server   *grpc.Server
}

var svr = Server{}

func init() {
	svr.server = grpc.NewServer()
}
func GetServer() *grpc.Server {
	return svr.server
}
func Start(){

	defer func() {
		fmt.Println("启动错误")
	}()
	//使用consul注册服务
	register:= microservice.NewConsulRegister()
	register.Port = 8079
	register.Name = "user"
	register.Tag = []string{"user"}
	if err := register.Register(); err !=nil{
		panic(err)
	}
	grpc_health_v1.RegisterHealthServer(svr.server, &microservice.HealthImpl{Status:grpc_health_v1.HealthCheckResponse_SERVING})

	lis, err := net.Listen("tcp", fmt.Sprintf(":%d",register.Port))
	if err != nil {
		panic(err)
	}
	fmt.Println("user server ready listen")
	reflection.Register(svr.server)
	if err := svr.server.Serve(lis); err != nil {
		panic(err)
		//log.Fatalf("failed to serve: %v", err)
	}
}
简单的健康检查功能
package microservice

import (
	"context"
	"fmt"
	"google.golang.org/grpc/health/grpc_health_v1"
)

type HealthImpl struct {
	Status   grpc_health_v1.HealthCheckResponse_ServingStatus
	Reason string
}

func (h *HealthImpl) Watch(*grpc_health_v1.HealthCheckRequest, grpc_health_v1.Health_WatchServer) error {
	return nil
}

func (h *HealthImpl) OffLine(reason string) {
	h.Status = grpc_health_v1.HealthCheckResponse_NOT_SERVING
	h.Reason = reason
	fmt.Println(reason)
}
func (h *HealthImpl) OnLine(reason string) {
	h.Status = grpc_health_v1.HealthCheckResponse_SERVING
	h.Reason = reason
	fmt.Println(reason)
}

func (h *HealthImpl) Check(ctx context.Context, req *grpc_health_v1.HealthCheckRequest) (*grpc_health_v1.HealthCheckResponse, error) {
	return &grpc_health_v1.HealthCheckResponse{
		Status:h.Status,
	}, nil
}

consul_register.go 注册用 ,Address为自己的consul api地址
package microservice

import (
	"fmt"
	"github.com/hashicorp/consul/api"
	"net"
	"time"
)

//consul
// NewConsulRegister create a new consul register
func NewConsulRegister() *ConsulRegister {
	return &ConsulRegister{
		Address:                        192.168.1.118:8500, //consul address
		Name:                           "unknown",
		Tag:                            []string{},
		Port:                           3000,
		DeregisterCriticalServiceAfter: time.Duration(1) * time.Minute,
		Interval:                       time.Duration(10) * time.Second,
	}
}

// ConsulRegister consul service register
type ConsulRegister struct {
	Address                        string
	Name                           string
	Tag                            []string
	Port                           int
	DeregisterCriticalServiceAfter time.Duration
	Interval                       time.Duration
}

// Register register service
func (r *ConsulRegister) Register() error {
	config := api.DefaultConfig()
	config.Address = r.Address
	client, err := api.NewClient(config)
	if err != nil {
		return err
	}
	agent := client.Agent()

	IP := LocalIP()
	reg := &api.AgentServiceRegistration{
		ID:      fmt.Sprintf("%v-%v-%v", r.Name, IP, r.Port), // 服务节点的名称
		Name:    r.Name,  										  // 服务名称
		Tags:    r.Tag,                                          // tag,可以为空
		Port:    r.Port,                                         // 服务端口
		Address: IP, 											// 服务 IP
		Check: &api.AgentServiceCheck{ // 健康检查
			Interval: r.Interval.String(), // 健康检查间隔
			GRPC:     fmt.Sprintf("%v:%v/%v", IP, r.Port, r.Name), // grpc 支持,执行健康检查的地址,service 会传到 Health.Check 函数中
			DeregisterCriticalServiceAfter: r.DeregisterCriticalServiceAfter.String(), // 注销时间,相当于过期时间
		},
	}

	if err := agent.ServiceRegister(reg); err != nil {
		return err
	}

	return nil
}

func LocalIP() string {
	addrs, err := net.InterfaceAddrs()
	if err != nil {
		return ""
	}
	for _, address := range addrs {
		if ipnet, ok := address.(*net.IPNet); ok && !ipnet.IP.IsLoopback() {
			if ipnet.IP.To4() != nil {
				return ipnet.IP.String()
			}
		}
	}
	return ""
}

服务发现

服务发现同样是自行实现的,结合了grpc自带的round_robin负载均衡功能对发现的地址进行轮训使用,负载均衡也可以自行实现,这里使用了grpc自带的。
roubrobin.go
流程如下:

  • 1、注册自行实现的resolver 接口
  • 2、初始化resolver实例
  • 3、grpc.Dial 并配置。其中 grpc.WithBalancerName(roundrobin.Name), 虽然废弃了单还可以使用。
  • 4、特别注意,虽然这样就配好了,单其中有挺多坑会导致无法建立连接,注释中有解释。
  • 5、resolve原理:解析接口实现后通过定时调用consul api获取相应服务的信息,通过cr.cc.UpdateState(resolver.State{Addresses:newAddrs})更新进入grpc.conn中。等待balance处理
  • 6、balance原理:当调用rpc函数时 invoke 函数会调用当前的负载均衡器:本文中就是round_robin, round_robin通过pick函数调用下一个可用的服务地址,并用新的地址替换旧的地址。连接成功后即可调用远程函数。

调用链分析:

  • 过年了,剩下的等我空了补充~~~
consul_resolver.go
package microservice

import (
	"errors"
	"fmt"
	"github.com/hashicorp/consul/api"
	"google.golang.org/grpc/resolver"
	"net"
	"regexp"
	"strconv"
	"sync"
	"time"
)

const (
	defaultPort = "8500"
)

var (
	errMissingAddr = errors.New("consul resolver: missing address")

	errAddrMisMatch = errors.New("consul resolver: invalied uri")

	errEndsWithColon = errors.New("consul resolver: missing port after port-separator colon")

	regexConsul, _ = regexp.Compile("^([A-z0-9.]+)(:[0-9]{1,5})?/([A-z_]+)$")

	//单例模式
	builderInstance = &consulBuilder{}
)

func Init() {
	fmt.Printf("calling consul init\n")
	resolver.Register(CacheBuilder())
}

type consulBuilder struct {
}

type consulResolver struct {
	address              string
	wg                   sync.WaitGroup
	cc                   resolver.ClientConn
	name                 string
	disableServiceConfig bool
	Ch                   chan int
}

func NewBuilder() resolver.Builder {
	return &consulBuilder{}
}

func CacheBuilder() resolver.Builder {
	return builderInstance
}

func (cb *consulBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOption) (resolver.Resolver, error) {

	host, port, name, err := parseTarget(fmt.Sprintf("%s/%s", target.Authority, target.Endpoint))
	if err != nil {
		fmt.Println("parse err")
		return nil, err
	}
	fmt.Println(fmt.Sprintf("consul service ==> host:%s, port%s, name:%s",host, port, name))
	cr := &consulResolver{
		address:              fmt.Sprintf("%s%s", host, port),
		name:                 name,
		cc:                   cc,
		disableServiceConfig: opts.DisableServiceConfig,
		Ch:					  make(chan int, 0),
	}
	go cr.watcher()
	return cr, nil

}

func (cr *consulResolver) watcher() {
	fmt.Printf("calling [%s] consul watcher\n", cr.name)
	config := api.DefaultConfig()
	config.Address = cr.address
	client, err := api.NewClient(config)
	if err != nil {
		fmt.Printf("error create consul client: %v\n", err)
		return
	}
	t := time.NewTicker(2000 * time.Millisecond)
	defer func() {
		fmt.Println("defer done")
	}()
	for {
		select {
		case <-t.C:
			//fmt.Println("定时")
		case <-cr.Ch:
			//fmt.Println("ch call")
		}
		//api添加了 lastIndex   consul api中并不兼容附带lastIndex的查询
		services, _, err := client.Health().Service(cr.name, "", true, &api.QueryOptions{})
		if err != nil {
			fmt.Printf("error retrieving instances from Consul: %v", err)
		}

		newAddrs := make([]resolver.Address, 0)
		for _, service := range services {
			addr := net.JoinHostPort(service.Service.Address, strconv.Itoa(service.Service.Port))
			newAddrs = append(newAddrs, resolver.Address{
				Addr: addr,
				//type:不能是grpclb,grpclb在处理链接时会删除最后一个链接地址,不用设置即可 详见=> balancer_conn_wrappers => updateClientConnState
				ServerName:service.Service.Service,
			})
		}
		//cr.cc.NewAddress(newAddrs)
		//cr.cc.NewServiceConfig(cr.name)
		cr.cc.UpdateState(resolver.State{Addresses:newAddrs})
	}

}

func (cb *consulBuilder) Scheme() string {
	return "consul"
}

func (cr *consulResolver) ResolveNow(opt resolver.ResolveNowOption) {
	cr.Ch <- 1
}

func (cr *consulResolver) Close() {
}

func parseTarget(target string) (host, port, name string, err error) {

	if target == "" {
		return "", "", "", errMissingAddr
	}

	if !regexConsul.MatchString(target) {
		return "", "", "", errAddrMisMatch
	}

	groups := regexConsul.FindStringSubmatch(target)
	host = groups[1]
	port = groups[2]
	name = groups[3]
	if port == "" {
		port = defaultPort
	}
	return host, port, name, nil
}

grpc_client.go
package microservice

import (
	"fmt"
	"google.golang.org/grpc/balancer/roundrobin"

	//_ "github.com/mbobakov/grpc-consul-resolver" // It's important
	"google.golang.org/grpc"
	"library/util/constant"
)

func GetConsulHost() string {
	if constant.Env == constant.Dev {
		return "192.168.1.118:8500"
		//return "192.168.8.103:8079"
	}else{
		return "192.168.45.10:8500"
	}
}

type GrpcClient struct {
	Conn 		*grpc.ClientConn
	RpcTarget   string
	Name   		string
}

func (s *GrpcClient)RunGrpcClient(){
	conn, err := grpc.Dial(s.RpcTarget, grpc.WithInsecure())
	if err != nil {
		fmt.Println(err)
		return
	}
	s.Conn = conn
	fmt.Println("grpc client start success")
}
func (s *GrpcClient)RunConsulClient(){
	//初始化 resolver 实例
	Init()
	conn, err := grpc.Dial(
		fmt.Sprintf("%s://%s/%s", "consul", GetConsulHost(), s.Name),
		//不能block => blockkingPicker打开,在调用轮询时picker_wrapper => picker时若block则不进行robin操作直接返回失败
		//grpc.WithBlock(),
		grpc.WithInsecure(),
		//指定初始化round_robin => balancer (后续可以自行定制balancer和 register、resolver 同样的方式)
		grpc.WithBalancerName(roundrobin.Name),
		//grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"round_robin":{}}]}`),
	)
	if err != nil {
		fmt.Println("dial err:", err)
		return
	}
	s.Conn = conn
	fmt.Println(fmt.Sprintf("gRpc consul client [%s] start success", s.Name))
}
使用(自己封装的)
financeClient := &microservice.GrpcClient{Name: "finance"}
	financeClient.RunConsulClient()
	ssvr := settlement.NewSettlementClient(financeClient.Conn)
//ssvr就是rpc的实例,ssvr.SayHello(ctx ...) 类似的就可以调用函数了
Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐