Fdbus实例教程

Fdbus简介

Fdbus 全称 Fast Distributed Bus(高速分布式总线),提供IPC+RPC功能。适用于多种OS:

  • Linux
  • QNX
  • AnroidOS
  • Window

Fdbus本质是Socket,IPC基于Unix domain socket,RPC基于TCP。使用Google Protobuf进行序列化和反序列化。

利用它,可以实现同域间的IPC通信,比如应用和OS间。也可以实现跨域、跨设备间通信,比如不同设备间数据传输,比如host/gust间的数据传输。
在这里插入图片描述

Fdbus的一些特点

  • 支持多种通信模式,比如点对点、注册/发布、广播等等。
  • 支持服务动态发现,自带心跳检测、重连,上线通知、离线检测等功能。
  • 支持一定程度的安全策略配置,token、证书、访问鉴权等等。
  • 支持Log调试,可以通过调试工具抓取在Fdbus上传输的内容。

其作者jeremy_cz(膜拜大神)对于FDBus的介绍非常详细。本文就不介绍了。关于FDBUS的内容,可以阅读作者的《Fast Distributed Bus - FDBus:高速分布式总线以及中间件开发框架》

Fdbus实例教程

基于Fdbus在一台Linux上,写一个C/S通信的Hellworld(例子)。
PC OS:Ubutun 20.04

编译Fdbus
  • 编译protobuf
git clone https://github.com/protocolbuffers/protobuf.git
git submodule update --init --recursive
mkdir -p build
cd build
# 这里把/usr作为了安装目录 
cmake -DCMAKE_INSTALL_PREFIX=/usr -DBUILD_SHARED_LIBS=1 ../cmake
make -j4
make install
  • 编译fdbus

git clone https://gitee.com/jeremyczhen/fdbus.git
cd fdbus
mkdir -p build/install
cd build
cmake -DCMAKE_INSTALL_PREFIX=install -Dfdbus_LOG_TO_STDOUT ../cmake
make -j4

编译成功后,会在build目录下生成大概以下文件,其中name_server是FDBUS用来管理服务名的进程,一会跑test需要运行它(具体请查看Fdbus作者jeremy_cz的博客)。编译生成内容大概如下

CMakeCache.txt
CMakeFiles
cmake_install.cmake
cmake_uninstall.cmake
fdbclienttest
fdbservertest
fdbxclient
fdbxserver
host_server
libfdbus-clib.so
libfdbus.so
logsvc
logviewer
lsclt
lsdp
lsevt
lshost
lssvc
Makefile
name_server
ntfcenter
HellWorld

这里直接在fdbus/example中新建了HelloWorld目录,目录结构(主要为了方便,直接借用fdbus源码的cmake。使用其他方式也可以,只要引用fdbus的lib和头文件即可。)

HelloWorld
-- HelloClient.cpp
-- HelloServer.cpp
-- HelloWorld.pb.h
-- HelloWorld.pb.cc
-- HelloWorld.proto

其中HelloWorld.pb.cc和HelloWorld.pb.h,是通过HelloWorld.proto生成的。生成命令

protoc --proto_path=. --cpp_out=. HelloWorld.proto
  • HelloWorld.proto
syntax = "proto2";

message HelloWorld
{
    required string name = 1; 
}
  • HelloServer.cpp
#define FDB_LOG_TAG "HELLO_SERVER"
#include <fdbus/fdbus.h>
#include "HelloWorld.pb.h"
#include <fdbus/CFdbProtoMsgBuilder.h>
#include <fdbus/cJSON/cJSON.h>
#include <fdbus/CFdbCJsonMsgBuilder.h>

using namespace ipc::fdbus;
// FDBUS提供的功能线程
static CBaseWorker main_worker;

static const int METHOD_ID = 1;

class HelloServer : public CBaseServer
{
public:
    HelloServer(const char*name, CBaseWorker* work = 0)
            : CBaseServer(name, work)
    {
            // Empty
    }


protected:
    void onOnline(const CFdbOnlineInfo &info)
    {
        std::cout << "connected to the client" << std::endl;
    }

    void onOffline(const CFdbOnlineInfo &info)
    {
        std::cout << "disconnected from client" << std::endl;
    }

    /* called when client calls invoke() */
    void onInvoke(CBaseJob::Ptr &msg_ref)
    {
        auto msg = castToMessage<CBaseMessage *>(msg_ref);
        static int32_t elapse_time = 0;
        switch (msg->code())
        {
            case METHOD_ID:
                {
                HelloWorld client;
                CFdbProtoMsgParser parser(client);
                if (!msg->deserialize(parser))
                {
                    return;
                }

                std::cout << "Client name " << client.name() << std::endl;
                /* fill in protocol buffer and reply to client */
                HelloWorld server;
                server.set_name("Linduo");
                CFdbProtoMsgBuilder builder(server);
                msg->reply(msg_ref, builder);
                }
                break;
            default:
                break;
        }
    }
};


int main(int argc, char*argv[])
{
/* start fdbus context thread */
    FDB_CONTEXT->start();
    fdbLogAppendLineEnd(true);
    FDB_CONTEXT->registerNsWatchdogListener([](const tNsWatchdogList &dropped_list)
    {
        for (auto it = dropped_list.begin(); it != dropped_list.end(); ++it)
        {
                printf("Error!!! Endpoint drops - name: %s, pid: %d\n",
                        it->mClientName.c_str(), it->mPid);
        }
    });

    CBaseWorker *worker_ptr = &main_worker;
    /* start worker thread */
    worker_ptr->start();

    /* create servers and bind the address: svc://service_name */
    for (int i = 1; i < argc; ++i)
    {
        std::string server_name = argv[i];
        std::string url(FDB_URL_SVC);
        url += server_name;
        server_name += "_server";
        auto server = new HelloServer(server_name.c_str(), worker_ptr);
        server->enableWatchdog(true);
        server->enableUDP(true);
        server->setExportableLevel(FDB_EXPORTABLE_SITE);
        server->bind(url.c_str());
    }

    // 将主线程变为WorkThread
    CBaseWorker background_worker;
    background_worker.start(FDB_WORKER_EXE_IN_PLACE);
}
  • HelloClient.cpp
#define FDB_LOG_TAG "HELLO_CLIENT"
#include <fdbus/fdbus.h>
#include <thread>
#include "HelloWorld.pb.h"
#include <fdbus/CFdbProtoMsgBuilder.h>
#include <fdbus/cJSON/cJSON.h>
#include <fdbus/CFdbCJsonMsgBuilder.h>

// 引用命名空间
// 正式开发代码不建议
using namespace ipc::fdbus;
// FDBUS提供的功能线程
static CBaseWorker main_worker;
static const int METHOD_ID = 1;

class HelloClient : public CBaseClient
{
public:
	HelloClient(const char*name, CBaseWorker* work = 0)
		: CBaseClient(name, work)
	{
		// Empty
	}

	void callServerSync()
	{
		// ProtoBuffer
        HelloWorld my;
        my.set_name("Adver");
        // 序列化
        CFdbProtoMsgBuilder builder(my);
        // 同步调用
        CBaseJob::Ptr ref(new CBaseMessage(1));
        invoke(ref, builder);

        // 解析Server返回的内容
        auto msg = castToMessage<CBaseMessage *>(ref);
        if (msg->isStatus()) {
        	// Error;
        	return;
        }

        // 反序列化
        HelloWorld server;
        CFdbProtoMsgParser parser(server);
        bool result = msg->deserialize(parser);
        if (result) {
    		std::cout << "My Name:" << std::endl;
        	std::cout << server.name() << std::endl;
        }
	}


	void callServerAsync()
	{
		// ProtoBuffer
        HelloWorld my;
        my.set_name("Adver");
        // 序列化
        CFdbProtoMsgBuilder builder(my);
        // 异步调用
        invoke(METHOD_ID, builder);
	}

protected:
    void onOnline(const CFdbOnlineInfo &info)
    {
    	std::cout << "connected to the server" << std::endl;
        callServerAsync();
    }

     void onOffline(const CFdbOnlineInfo &info)
     {
     	std::cout << "disconnected from server" << std::endl;
     }

	void onReply(CBaseJob::Ptr &msg_ref)
	{
        auto msg = castToMessage<CBaseMessage *>(msg_ref);

     	switch (msg->code())
        {
            case METHOD_ID:
	            {
	                if (msg->isStatus())
	                {
	      				std::cout << "Error" << std::endl;
	                    if (msg->isError())
	                    {
	                    	return;
	                    }
	                    return;
	                }

	                HelloWorld server;
	                CFdbProtoMsgParser parser(server);
	                if (msg->deserialize(parser))
	                {
	           			std::cout << "Server name " << server.name() << std::endl;
	                }

                    callServerAsync();
	            }
	            break;
            default:
            	break;
        }
	}
};


int main(int argc, char*argv[])
{
    /* start fdbus context thread */
    FDB_CONTEXT->start();
    fdbLogAppendLineEnd(true);
    FDB_CONTEXT->registerNsWatchdogListener([](const tNsWatchdogList &dropped_list)
    {
        for (auto it = dropped_list.begin(); it != dropped_list.end(); ++it)
        {
        	printf("Error!!! Endpoint drops - name: %s, pid: %d\n",
        		it->mClientName.c_str(), it->mPid);
        }
    });

	CBaseWorker *worker_ptr = &main_worker;
    /* start worker thread */
    worker_ptr->start();

    for (int i = 1; i < argc; ++i)
    {
        std::string server_name = argv[i];
        std::string url(FDB_URL_SVC);
        url += server_name;
        server_name += "_client";
        auto client = new HelloClient(server_name.c_str(), worker_ptr);
        
        client->enableReconnect(true);
        client->enableUDP(true);
        client->enableTimeStamp(true);
        client->connect(url.c_str());
    }

    // 将主线程变为WorkThread
    CBaseWorker background_worker;
    background_worker.start(FDB_WORKER_EXE_IN_PLACE);
}
  • 编译部分:加到了dbus/cmake/pb-example/example.cmake中,在cmake中编译上面的cpp
add_executable(helloworldclient
    ${PACKAGE_SOURCE_ROOT}/example/HelloWorld/HelloClient.cpp
    ${PACKAGE_SOURCE_ROOT}/example/HelloWorld/HelloWorld.pb.cc
    ${PACKAGE_SOURCE_ROOT}/example/HelloWorld/HelloWorld.pb.h
)

add_executable(helloworldserver
    ${PACKAGE_SOURCE_ROOT}/example/HelloWorld/HelloServer.cpp
    ${PACKAGE_SOURCE_ROOT}/example/HelloWorld/HelloWorld.pb.cc
    ${PACKAGE_SOURCE_ROOT}/example/HelloWorld/HelloWorld.pb.h
)
  • 在fdbus/cmake/CMakeLists.txt中引入了这个cmake,并且加入了C++14的Flag。
if (MSVC)
    add_definitions("-D__WIN32__")
elseif(fdbus_ANDROID)
    add_definitions("-D__LINUX__")
else()
    if(CMAKE_COMPILER_IS_GNUCXX)
        set(CMAKE_CXX_FLAGS "-std=gnu++14 -Wall -Wl,--copy-dt-needed-entries ${CMAKE_CXX_FLAGS}")
    endif()
    #add_compile_options(-g -O0)
    add_definitions("-D__LINUX__")
endif()


include(pb-example/example.cmake)
  • 重新运行cmake,然后make。编译出helloworldclient和helloworldserver两个文件。
cmake -DCMAKE_INSTALL_PREFIX=install -Dfdbus_LOG_TO_STDOUT ../cmake
make -j4
运行HelloWorld
  • 开三个终端,分别运行,无先后顺序。Server/Client运行无需先后顺序,这点也是Fdbus的有点。大概原理是,通过服务动态绑定,当网络中的Server上线时,Client会自动连接该Service。
# 开3个终端,分别运行。其中name_server(用来实现服务动态绑定)
./name_server
./helloworldserver my
./helloworldclient my

执行成功后:
helloworldserver会输出从客户端收到的内容。

connected to the client
Client name Adver

helloworldclient会输出从服务端收到的内容。

connected to the server
Server name Linduo

PS:name_server在服务端和客户端连接时,不是必须得。如果两端指定了IPC端口,可以不使用name_server连接。但是不使用name_server时,就需要控制好连接时序。所以一般来讲,应用时都是启动一个name_server。

Logo

欢迎加入西安开发者社区!我们致力于为西安地区的开发者提供学习、合作和成长的机会。参与我们的活动,与专家分享最新技术趋势,解决挑战,探索创新。加入我们,共同打造技术社区!

更多推荐