1. 引言

微服务已经成为当下最为流行的分布式架构了。
通过将系统拆分成若干个服务,将业务进行横向、纵向切分,而诞生出各个高度内聚、轻度耦合的微服务,组成微服务架构。
微服务架构在其可维护性、责任分工上都有着很大的优势,更加有利于系统的组建、维护、问题的快速响应和解决。
但是,微服务架构也存在着难以治理的缺点,由于服务数量众多,每个服务又有多台服务器提供服务,如何实时监控每台服务器的运行健康情况,如何实现服务的平滑切换与扩容,都是微服务架构组建之前需要首要考虑的。
目前基于 java 的 RPC 架构包括阿里开源的 dubbo、sofa,腾讯开源的 TARS,新浪微博开源的 montan 等都具备比较强大的服务注册、发现、水平扩容等诸多功能的服务治理,但 python 目前尚没有很好的开源解决方案。
本文,我们就基于 thrift、zookeeper、redis 来实现一套基于 python 的基本的服务治理方案,我们主要解决的问题有:

  1. 服务注册、发现
  2. 服务端负载均衡
  3. 服务提供者动态权重调整
  4. 服务手动停止、开启
  5. 服务调用监控
  6. python thrift 的使用简化

2. 服务治理的出现

在微服务环境下,服务间通信可以采用最基本的 Restful http 协议通信,但这样有以下几个问题:

  1. 负载均衡主要依靠 nginx 等方式,难以监控和控制
  2. 难以实现服务节点的手动开关
  3. 服务间依赖关系不清晰
  4. 开发维护强依赖文档
  5. 接口规范缺乏保障和约束
  6. 使用者自行维护编码、传输、解码过程

综合上面几个问题,RPC 诞生了,目前跨语言、综合性能优势较大的成熟 RPC 系统主要有 gRPC 和 thrift,两者分别是 Google 和 Apache 的开源产品。
在简单地使用 thrift、gRPC 搭建微服务架构时,最简单的治理方法是利用 RMI 或 Hessian 等工具,简单地暴露和引用远程服务,但这仍然具有上述服务间依赖关系不清晰、难以实现服务节点的手动开关等问题,因此服务治理平台的搭建是十分必要的。

3. 服务治理平台的职责

服务治理平台作为服务提供方与服务调用方之间的一道桥梁,需要至少具备以下几个重要因素:

  1. 服务注册管理
  2. 访问路由
  3. 服务状态监控
  4. 接口状态监控
  5. 手动开关控制

# 此处有图片 1

4. 设计思路

# 此处有图片 2

通过 zookeeper 提供的 EPHEMERAL 节点功能,我们可以十分方便的实现服务的自动注册和摘除,从而向客户端隐藏服务端具体的服务提供者,实现动态流量控制和负载均衡。
而通过与 Redis 的同步,就可以实现用户设置的保存与读取,实现更加丰富的自定义功能。
而前端页面则给与使用者友好的页面来查看、操作和控制服务的监控、开关等。

5. 代码实现

5.1. 目录树

# 此处有图片 3

我们通过编写一个 python 包来让各服务来引用和使用。

5.2. server.py

server.py 是提供给服务提供者调用的 python 文件。
我们通过 appkey 来防止多个服务提供了相同 service_name 造成服务发生混淆的异常情况发生。
通过 mysql unique key 来实现其唯一性,并通过 appkey 维护服务的更多持久化信息。
服务提供者只需引用 probe 包,调用 start_service 方法即可实现服务的注册和提供。
同时我们通过 hostname 来区分了线上和线下两套环境,实现环境间的隔离,当然,生产环境中,我们可以定义更加严格的方式来实现环境的判断和获取。
如上所述,我们通过 zookeeper 提供的 EPHEMERAL 节点功能实现服务的自动注册和摘除,同时在创建节点前,我们先去 redis 中获取了节点信息,来同步上一次节点运行时的设置参数。

  • 需要注意的是,zookeeper 的 EPHEMERAL 节点在异常断开而没有主动 close 与 zookeeper 的连接,该节点会在 sessiontimeout 时间内被检测和移除,这意味着,如果 sessiontimeout 设置过大,则已经失效的节点将在很长时间内仍停留在 zookeeper 中造成误判
import json
import logging
import os
import re
import socket

import pymysql
import thriftpy2
import yaml
from kazoo.client import KazooClient
from redis import Redis
from rediscluster import StrictRedisCluster
from thriftpy2 import rpc


def start_service(thrift_path, appkey, service_name, handler, port=8081):
    if type(service_name).__name__ != 'str' or '' == service_name or re.match('^\w+$', service_name) is None:
        raise Exception('错误的 service_name (' + service_name + '), service_name 只能取字母、数字或下划线的组合')

    try:
        hostname = socket.getfqdn(socket.gethostname())
        env = 'dev'
        if re.match('^prod\d+$', hostname) is not None:
            env = 'prod'

        if env == 'prod':
            logpath = '/var/log/thriftprobe'
            if not os.path.exists(logpath):
                os.system('mkdir -p ' + logpath)
            logpath = logpath + '/appkey_' + service_name + '_server.log'
        else:
            logpath = ''
        logging.basicConfig(filename=logpath, level=logging.INFO,
                            format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s')

        filename = os.path.join(os.path.dirname(__file__), 'config', 'config.yml')
        yamlfd = open(filename, encoding="utf-8")
        config = yaml.load(yamlfd)

        dbclient = pymysql.connect(host=config['mysql'][env]['host'], port=config['mysql'][env]['port'],
                                   user=config['mysql'][env]['username'], database=config['mysql'][env]['database'],
                                   password=config['mysql'][env]['password'], connect_timeout=1)
        sql = 'SELECT count(*) as total from serviceapp WHERE appkey = %s'
        cursor = dbclient.cursor(pymysql.cursors.DictCursor)
        values = [appkey]
        cursor.execute(sql, values)
        total = cursor.fetchone()
        if total is None or total['total'] <= 0:
            raise Exception('appkey - ' + appkey + ' 尚未注册')

        module_name = appkey + '_thrift'
        serviceThrift = thriftpy2.load(thrift_path, module_name=module_name)
        server = rpc.make_server(getattr(serviceThrift, service_name), handler, '0.0.0.0', port)
        logging.info('启动 thrift 服务 [appkey:' + appkey + ', service_name:' + service_name + ', port:' + str(port) + ']')
        __zk_connect(config, env, appkey, service_name, port)
        server.serve()
    except Exception as e:
        logging.error(e)
        raise e


def __zk_connect(config, env, appkey, service_name, port):
    local_ip = socket.gethostbyname(socket.getfqdn(socket.gethostname()))
    if config['redis'][env]['cluster'] == 1:
        redis_nodes = [{'host': config['redis'][env]['host'], 'port': config['redis'][env]['port']}]
        redis_instance = StrictRedisCluster(startup_nodes=redis_nodes,
                                           password=config['redis'][env]['password'], socket_connect_timeout=1)
    else:
        redis_instance = Redis(host=config['redis'][env]['host'], port=config['redis'][env]['port'],
                               password=config['redis'][env]['password'])

    nodeinfos = None
    redis_key = 'thriftprobe_' + env
    node_field = appkey + '|' + env + '|' + service_name + '|' + local_ip + '|' + str(port)
    try:
        nodeinfos = json.loads(
            redis_instance.hget(redis_key, node_field))
    except:
        pass

    if nodeinfos is None:
        nodeinfos = {
            'host': local_ip,
            'port': port,
            'weight': 10,
            'valid': 1
        }
        redis_instance.hset(redis_key, node_field, json.dumps(nodeinfos))
        logging.info('更新节点配置信息到缓存: ' + node_field + ' - ' + json.dumps(nodeinfos))
    else:
        logging.info('从缓存中加载节点配置信息: ' + node_field + '-' + json.dumps(nodeinfos))

    host = config['zkserver'][env]['host'] + ':' + str(config['zkserver'][env]['port'])
    zk = KazooClient(hosts=host)
    zk.start(200)

    full_name = '/services' + '/' + env + '/' + appkey

    exists = zk.exists(full_name)
    if exists is None:
        zk.create(full_name, makepath=True)

    zk.create(full_name + '/' + service_name + '_' + local_ip + '_' + str(port),
              value=bytes(json.dumps(nodeinfos), encoding='utf-8'),
              ephemeral=True)
    logging.info('创建节点成功:' + json.dumps(nodeinfos))

5.3. client.py

client.py 的主要功能很简单,就是创建 thrift 的 TClient 对象,这里我们重写了 TClient 对象,从而实现接口访问时服务端地址的动态获取。
用户只需调用 build_client 即可创建 TClient 对象,实现接口的调用。

import logging
import os
import re
import socket
from time import sleep

import thriftpy2

from thriftprobe.probe.ProbTClient import ProbTClient


def build_client(thrift_path, appkey, service_name, socket_timeout = 500):
    hostname = socket.getfqdn(socket.gethostname())
    env = 'dev'
    if re.match('^prod\d+$', hostname) is not None:
        env = 'prod'

    if env == 'prod':
        logpath = '/var/log/thriftprobe'
        if not os.path.exists(logpath):
            os.system('mkdir -p ' + logpath)
        logpath = logpath + '/' + appkey + '_' + service_name + '_client.log'
    else:
        logpath = ''
    logging.basicConfig(filename=logpath, level=logging.INFO,
                        format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s')

    module_name = appkey + '_thrift'
    client_thrift = thriftpy2.load(thrift_path, module_name=module_name)
    service = getattr(client_thrift, service_name)
    return ProbTClient(env, service, appkey, service_name, socket_timeout)

5.4. ProbTClient.py

ProbTClient 就是我们重写的 thrift TClient,每一次接口访问,我们都通过随机的方式在现有存活的 zk 服务端节点中找到可用的节点信息,随机动态分配访问地址,实现接口的访问。

import functools
import json
import logging
import os
import random
import re
import uuid

import yaml
from kazoo.client import KazooClient
from thrift.Thrift import TApplicationException
from thriftpy2.protocol import TBinaryProtocolFactory
from thriftpy2.thrift import TMessageType, args2kwargs
from thriftpy2.transport import TSocket, TBufferedTransportFactory


class ProbTClient(object):

    def __init__(self, env, service, appkey, service_name, socket_timeout):
        self._service = service
        self._appkey = appkey
        self._service_name = service_name
        self._env = env
        self._seqid = 0
        self._iprot = None
        self._oprot = None
        self._socket_timeout = socket_timeout
        if env == 'prod':
            logpath = '/var/log/thriftprobe'
            if not os.path.exists(logpath):
                os.system('mkdir -p ' + logpath)
            logpath = logpath + '/appkey_' + service_name + '_client.log'
        else:
            logpath = ''
        logging.basicConfig(filename=logpath, level=logging.INFO,
                            format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s')

    def __getattr__(self, _api):
        if _api in self._service.thrift_services:
            return functools.partial(self._req, _api)

        raise AttributeError("{} instance has no attribute '{}'".format(
            self.__class__.__name__, _api))

    def __dir__(self):
        return self._service.thrift_services

    def _req(self, _api, *args, **kwargs):
        logging.info(self._service_name + '.' + _api + '[' + self._uuid + ']' + ' 开始调用')
        try:
            _kw = args2kwargs(getattr(self._service, _api + "_args").thrift_spec,
                              *args)
            kwargs.update(_kw)
            result_cls = getattr(self._service, _api + "_result")

            self._connect()
            self._send(_api, **kwargs)
            # wait result only if non-oneway
            if not getattr(result_cls, "oneway"):
                return self._recv(_api)
        finally:
            logging.info(self._service_name + '.' + _api + '[' + self._uuid + ']' + ' 完成调用')
            self.close()

    def _send(self, _api, **kwargs):
        self._oprot.write_message_begin(_api, TMessageType.CALL, self._seqid)
        args = getattr(self._service, _api + "_args")()
        for k, v in kwargs.items():
            setattr(args, k, v)
        args.write(self._oprot)
        self._oprot.write_message_end()
        self._oprot.trans.flush()

    def _recv(self, _api):
        logging.info(self._service_name + '.' + _api + '[' + self._uuid + ']' + ' 开始获取数据')
        try:
            fname, mtype, rseqid = self._iprot.read_message_begin()
            if mtype == TMessageType.EXCEPTION:
                x = TApplicationException()
                x.read(self._iprot)
                self._iprot.read_message_end()
                raise x
            result = getattr(self._service, _api + "_result")()
            result.read(self._iprot)
            self._iprot.read_message_end()

            if hasattr(result, "success") and result.success is not None:
                return result.success

            # void api without throws
            if len(result.thrift_spec) == 0:
                return

            # check throws
            for k, v in result.__dict__.items():
                if k != "success" and v:
                    raise v

            # no throws & not void api
            if hasattr(result, "success"):
                raise TApplicationException(TApplicationException.MISSING_RESULT)
        finally:
            logging.info(self._service_name + '.' + _api + '[' + self._uuid + ']' + ' 完成数据获取')

    def _connect(self):
        host, port = self._get_service_addr()
        socket = TSocket(host, port, socket_timeout=self._socket_timeout)

        transport = TBufferedTransportFactory().get_transport(socket)
        protocol = TBinaryProtocolFactory().get_protocol(transport)
        transport.open()
        self._iprot = self._oprot = protocol
        self._uuid = uuid.uuid1()

    def _get_service_addr(self):
        yamlfd = open('config/config.yml', encoding="utf-8")
        config = yaml.load(yamlfd)

        host = config['zkserver'][self._env]['host'] + ':' + str(config['zkserver'][self._env]['port'])
        zk = KazooClient(hosts=host)
        zk.start(200)

        zk_full_name = '/services' + '/' + self._env + '/' + self._appkey
        exists = zk.exists(zk_full_name)
        if exists is None:
            raise Exception('服务未启动 - appkey: ' + self._appkey + '; service: ' + self._service_name)

        services = zk.get_children(zk_full_name)
        nodes = list()
        for service in services:
            if re.match(self._service_name + '_', service) is not None:
                nodes.append(zk.get(zk_full_name + '/' + service))
        addrs = list()
        total_weight = 0
        for node in nodes:
            infos = json.loads(node[0])
            if infos['valid'] == 1:
                addrs.append(infos)
                total_weight += infos['weight']
        if len(addrs) < 1:
            raise Exception('服务未启动 - appkey: ' + self._appkey + '; service: ' + self._service_name)
        randint = random.randint(1, total_weight)
        base_weight = 0
        base_addr = None
        for addr in addrs:
            base_addr = addr
            base_weight += addr['weight']
            if randint <= base_weight:
                break
        return base_addr['host'], base_addr['port']

    def close(self):
        if self._iprot is not None:
            self._iprot.trans.close()
        if self._iprot != self._oprot and self._oprot is not None:
            self._oprot.trans.close()

5.5. setup.py

关于 setup.py 的编写和 pipy 包的打包上传,可以参考上一篇文章。
通过 Nexus3 搭建 pypi 私服

# coding=utf-8
from setuptools import setup

setup(
    name='thriftprobe',
    version='1.0',
    author="techlog",
    license="MIT",
    packages=[
        'probe',
        'probe/config'
    ],
    package_data={'': ['*.*']},
    install_requires=[
        'kazoo',
        'thriftpy2',
        'redis-py-cluster',
        'redis',
        'pymysql',
        'pyyaml',
    ],
    classifiers=[
        "Topic :: Utilities",
        "Topic :: Internet",
        "Topic :: Software Development :: Libraries :: Python Modules"
    ],
)

6. thriftprobe 包的使用

我们的服务治理包本着简单易用的角度出发,使用非常简单。

6.1. 编写 idl

service DemoService {
    string say();
}

6.2. server 端代码

from probe.server import start_service

class DemoServiceHandler(object):
    def say(self):
        return "hello world"

start_service('demo.thrift', 'hello_www', 'DemoService', DemoServiceHandler())

6.3. client 端代码

from probe.client import build_client

client = build_client('demo.thrift', 'hello_www', 'DemoService')
print(client.say())

7. 后续优化

上面我们构建了基本的服务治理,目前包含了服务注册、发现与状态监控、手动控制。
后续我们还可以在以下几个方面继续优化和改进上面的服务治理系统。

  1. 包括版本控制、服务优先级管理、请求数量限制、节点容错等进一步的功能后续需要添加和完善
  2. 客户端、服务端各自的每次调用、连接与响应的时长打点和监控需要进一步添加,但这意味着我们需要复写 TThreadedServer 类等代码
  3. 客户端无需每次重新连接 zookeeper,也无需每次重新请求节点列表,而是通过 @zk.ChildrenWatch 或 @zk.DataWatch 来注册节点变更的触发器,从而在节点信息发生变化时,更新本地路由表

8. 微信公众号

欢迎关注微信公众号,以技术为主,涉及历史、人文等多领域的学习与感悟,每周三到七篇推文,全部原创,只有干货没有鸡汤。

# 此处有图片 4

Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐