继1年前的系列文章《GO语言实现redis客户端》之后,几乎就没怎么碰过Golang这门语言了,在云原生时代,go语言无疑是无可替代的。近期,我们顺手把go语言重新捡起来,边写边学,手撸一个数据配置中心玩玩。笔者的习惯是,不喜欢在自己文章中赘述别人已经讲的很好的东西了,所以关于什么是数据配置中心?配置中心的作用和意义、以及当下热门的产品等话题,请移步这里(文章讲的很好):https://developer.51cto.com/art/202102/645471.htm

直接上代码,先考虑一下初步的设计,主要分为管理门户、注册订阅客户端、存储模块以及推送模块等等,我们初期先尝试实现单机版,基于http协议通信。工程目录如下:

先看一下配置中心的存储模型定义,包括命名空间、用户空间、以及元数据空间等等,实现如下:

config_service.go

package configservice

import "time"

//命名空间
type Namespace struct {
	Appid       string
	Appsecret   string
	Appname     string
	Owner       string
	Contact     string
	Description string
	Create      time.Time
	Modify      time.Time
}

//元数据空间
type Metaspace struct {
	Appid string
	//元数据,json字符串
	Properties     string
	LastAccessTime time.Time
	LastModifyBy   string
	Create         time.Time
	Modify         time.Time
}

//用户空间
type Userspace struct {
	Uid      string
	Password string
	//角色:SuperStar、Admin、Leader、Follower
	Role string
	//readOnly、read/write
	Authority string
	Create    time.Time
	Modify    time.Time
}

//用户命名空间绑定关系
type Binding struct {
	Uid    string
	Appid  string
	Create time.Time
	Modify time.Time
}

管理门户模块的大概实现。基本思路就是提供几个http接口,实现配置的增删改查功能。

meta_controller.go

package portal

import (
	"configservice"
	"encoding/json"
	"log"
	"net/http"
)

const HOST = "127.0.0.1:8081"
const (
	LOGIN      = "/v1.0/api/login"
	REGISTRY   = "/v1.0/api/users/add"
	NAMESPACES = "/v1.0/api/namespaces"
)

func init() {
	configservice.LoggerInit()
}

func WebServer() {
	http.HandleFunc(LOGIN, login)
	http.HandleFunc(REGISTRY, registry)
	http.HandleFunc(NAMESPACES, getNamespaces)

	//服务器要监听的主机地址和端口号
	err := http.ListenAndServe(HOST, nil)

	if err != nil {
		log.Println("ListenAndServe error: ", err.Error())
	}
	log.Println("服务启动成功,正在监听")
}

//登陆接口
func login(w http.ResponseWriter, req *http.Request) {
	log.Println("登录接口调用")
	//获取客户端通过GET/POST方式传递的参数
	var loginReq LoginRequest

	// 将请求体中的 JSON 数据解析到结构体中
	// 发生错误,返回400 错误码
	err := json.NewDecoder(req.Body).Decode(&loginReq)
	if err != nil {
		http.Error(w, err.Error(), http.StatusBadRequest)
		return
	}
	loginResponse := Login(loginReq)
	handlerResponse(w, &loginResponse)
}

//注册用户接口
func registry(w http.ResponseWriter, req *http.Request) {
	log.Println("注册接口调用")
	var user configservice.Userspace
	err := json.NewDecoder(req.Body).Decode(&user)
	if err != nil {
		http.Error(w, err.Error(), http.StatusBadRequest)
		return
	}

	response := Registry(user)
	handlerResponse(w, &response)
}

func getNamespaces(w http.ResponseWriter, req *http.Request) {

}

func handlerResponse(w http.ResponseWriter, response *CommonResponse) {
	w.Header().Set("Content-Type", "application/json")
	error := json.NewEncoder(w).Encode(response)
	if error != nil {
		http.Error(w, error.Error(), http.StatusBadRequest)
		return
	}
}

meta_service.go

package portal

import (
	"configservice"
	"crypto/md5"
	"encoding/hex"
	"go.mongodb.org/mongo-driver/bson"
	"net/http"
	"storage"
	"time"
)

var mgo = storage.Mgo{"mongodb://localhost:27017", "meta-config", "user_space"}

//登陆业务处理
func Login(loginReq LoginRequest) CommonResponse {
	mongoTemplate := storage.NewMongoTemplate(&mgo)
	result := mongoTemplate.Query(bson.D{
		{"uid", loginReq.UserName},
		{"password", loginReq.Password}})

	var loginResponse CommonResponse
	if result == nil {
		loginResponse = CommonResponse{Code: http.StatusBadRequest, Timestamp: time.Now(), Message: "登陆失败,用户名或密码错误"}
	} else {
		var user configservice.Userspace
		result.Decode(&user)
		loginResponse = CommonResponse{Code: http.StatusOK, Timestamp: time.Now(), Message: "登陆成功", Data: &LoginDataResponse{Token: GetToken(user.Uid)}}
	}
	return loginResponse
}

//新增用户业务处理

func Registry(user configservice.Userspace) CommonResponse {
	user.Create = time.Now()
	user.Modify = time.Now()
	mongoTemplate := storage.NewMongoTemplate(&mgo)
	result, _ := mongoTemplate.Insert(user)
	return CommonResponse{Code: http.StatusOK, Timestamp: time.Now(), Message: "注册成功", Data: result}
}

func GetToken(s string) string {
	md5 := md5.New()
	md5.Write([]byte(s))
	md5Str := hex.EncodeToString(md5.Sum(nil))
	return md5Str
}

model.go

package portal

import "time"

//登陆请求入参
type LoginRequest struct {
	UserName string
	Password string
}

//登陆响应结果
type CommonResponse struct {
	Code      int
	Data      interface{}
	Timestamp time.Time
	Message   string
}

//登陆响应数据
type LoginDataResponse struct {
	Token string
}

配置中心的底层存储我们采用mongodb(分布式、基于文档类型),相关操作代码如下:

db.go

package storage

import (
	"context"
	"fmt"
	"go.mongodb.org/mongo-driver/bson"
	"go.mongodb.org/mongo-driver/mongo"
	"go.mongodb.org/mongo-driver/mongo/options"
	"go.mongodb.org/mongo-driver/x/bsonx"
	"log"
	"time"
)

type Mgo struct {
	Uri        string //数据库网络地址
	Database   string //要连接的数据库
	Collection string //要连接的集合
}

type MongoTemplate struct {
	collection *mongo.Collection
}

func NewMongoTemplate(mgo *Mgo) *MongoTemplate {
	collection, _ := mgo.Connect()
	return &MongoTemplate{collection: collection}
}

func (t *MongoTemplate) GetCollection() *mongo.Collection {
	return t.collection
}

func (m *Mgo) Connect() (*mongo.Collection, error) {
	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
	defer cancel()
	client, err := mongo.Connect(ctx, options.Client().ApplyURI(m.Uri).SetMaxPoolSize(20))
	if err != nil {
		log.Print(err)
	}
	collection := client.Database(m.Database).Collection(m.Collection)
	return collection, nil
}

/**
 * 插入数据
 */
func (t *MongoTemplate) Insert(document interface{}) (interface{}, error) {
	insertResult, err := t.collection.InsertOne(context.TODO(), document)
	if err != nil {
		log.Fatal(err)
		return nil, err
	}

	fmt.Println("Inserted a single document: ", insertResult.InsertedID)
	return insertResult.InsertedID, nil
}

/**
 * 查询单条数据
 */
func (t *MongoTemplate) Query(filter bson.D) *mongo.SingleResult {
	return t.collection.FindOne(context.TODO(), filter)
}

/**
 * 更新数据
 */
func (t *MongoTemplate) Update(filter bson.D, update bson.D) (interface{}, error) {
	updateResult, err := t.collection.UpdateOne(context.TODO(), filter, update)
	if err != nil {
		log.Fatal(err)
		return nil, err
	}
	fmt.Printf("Matched %v documents and updated %v documents.\n", updateResult.MatchedCount, updateResult.ModifiedCount)
	return updateResult.ModifiedCount, nil
}

/**
 * 删除数据
 */
func (t *MongoTemplate) Delete(filter bson.D) (interface{}, error) {
	deleteResult, err := t.collection.DeleteOne(context.TODO(), filter)
	if err != nil {
		log.Fatal(err)
		return nil, err
	}
	fmt.Printf("Deleted %v documents in the trainers collection\n", deleteResult.DeletedCount)
	return deleteResult, nil
}

//设置索引的过期时间
func (t *MongoTemplate) Expire() (interface{}, error) {
	k := mongo.IndexModel{
		Keys:    bsonx.Doc{{"expire", bsonx.Int32(1)}},
		Options: options.Index().SetExpireAfterSeconds(1 * 60 * 60), //一小时后过期
	}
	return t.collection.Indexes().CreateOne(context.TODO(), k)
}

至此,我们第一期的实现先到这里,后续有时间,我会继续更新。

项目地址:

https://github.com/zhangxiaomin1993/meta_config

欢迎大家多多关注,一起学习!

 

Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐