实现效果:

设计思路:
1. 开启数据库及表的cdc,定时查询cdc表数据,封装sql语句(通过执行类型,主键;修改类型的cdc数据只取最后更新的记录),添加到离线数据表;
2. 线程定时查询离线数据表,更新远程库数据;
3. 远程库数据被更改又会产生cdc数据,对此数据进行拦截;

配置文件说明:

{
    "AsyncInterval": 10000,
    "Drivers": [
        {
            "SyncDBModel": 1,
            "SyncTBWait": 1,
            "RefreshTime": 3000,
            "Enable": 1,
            "SrcConnect": "Data Source=192.168.8.81;Initial Catalog=master;User ID=sa;Pwd=Nflg1234",
            "SrcMap": [ "SmartFactory|*" ],
            "SrcActionCDC": 0,
            "DstConnect": [ "Data Source=192.168.8.77;Initial Catalog=master;User ID=sa;Pwd=Nflg1234" ]
        }
    ]
}

1. 设置同步间隔时间(AsyncInterval)
2. 根据不同的配置文件,加载不同的Drivers(单向同步,双向同步,多库汇总)
3. 数据库同步模式(SyncDBModel 0:增量同步;1:全量同步)
4. 数据表同步等待(SyncTBWait 0:并行;1:按序 - 按SrcMap的表顺序执行,解决关联表中存在外键约束问题)
5. 设置刷新时间(RefreshTime)
6. 是否启用(Enable)
7. 源库链接(SrcConnect)
8. 源表信息(SrcMap, "SrcMap": [ "SmartFactory|LabProportionHopper,LabProportionHopperDetail", "SmartFactory|*" ] 先同步LabProportionHopper,LabProportionHopperDetail表,再同步其他表)
9. cdc操作 (SrcActionCDC -1:停用;0:不处理;1:更新)
10. 目标数据库连接字符串(可链接多库) (DstConnect,以逗号隔开)

备注:
增量同步,首次使用时,尤其是全量同步后,主库,从库,设置SrcActionCDC=1,清空cdc日志,后面可以SrcActionCDC=0
SmartFactory|* 表示监听该数据库下的所有表
SmartFactory|*#AbpUsers,AbpUserRoles 表示监听该数据库下除了AbpUsers和AbpUserRoles外的表

数据表说明:

async_data        离线数据表
id                主键自增            INTEGER
connect_str        连接字符串            NVARCHAR(255)
excute_sql        需要同步的sql语句    NVARCHAR(255)
cdc_time        cdc时间                DATETIME
event_time        event时间            DATETIME
db_name            数据库名            NVARCHAR(255)
table_name        表名                NVARCHAR(255)
table_pk        表主键                NVARCHAR(255)
excute_type        执行类型(I/U/D)    NVARCHAR(255)

sqlserver cdc表(日志表)中如果一条id多次更新,取最新一条数据
sqlite asy_data表(离线数据表),入库时,查dbname + table + pk,无记录则添加,有记录比较cdc记录时间,如果时间更新则更新sql语句

特殊数据处理:
uniqueidentifier类型的数据转为NULL,数据中含有'的替换''
sqlserver表中的字段用[]标识,比如[Key],防止字段名为数据库的关键词

问题:
INSERT 语句与 FOREIGN KEY 约束"FK_IdentityServerApiClaims_IdentityServerApiResources_ApiResourceId"冲突。该冲突发生于数据库"SmartFactory",表"dbo.IdentityServerApiResources", column 'Id'。

指定数据表同步等待 SyncTBWait  -> 1;
再指定表同步执行顺序 SrcMap -> SmartFactory|LabProportionHopper,LabProportionHopperDetail;

或者执行两次同步,第1次先把基础信息补齐,会提示关联错误;第2次同步,有了基础数据就不会再报错

using SqlServerAsync.Util.config;
using SqlServerAsync.Util.dto;
using SqlServerAsync.Util.sqlite;
using SqlServerAsync.Util.sqlite.model;
using System;
using System.Collections;
using System.Collections.Generic;
using System.Data;
using System.Threading.Tasks;

namespace SqlServerAsync.Util
{
    public class SqlServerData
    {
        public static Queue queue_err = new Queue();

        /// <summary>
        /// 开启监听
        /// </summary>
        public void Listen(Driver driver)
        {  
            var copy = driver.SyncDBModel == 1 ? true : false;
            var wait = driver.SyncTBWait == 1 ? true : false;
            var enable = driver.Enable == 1 ? true : false;

            foreach (var map in driver.SrcMap)
            {
                QueueDto dto = new QueueDto() { srcconnect = driver.SrcConnect, dstconnect = driver.DstConnect, refreshTime = driver.RefreshTime, enable = enable, action_cdc = driver.SrcActionCDC, map = map, copy = copy, wait = wait };

                bool status = Start(dto);

                if (!status)
                { 
                    queue_err.Enqueue(dto); // 初始化错误,加入到队列 
                }
            }

            if (queue_err.Count > 0)
            { 
                Task.Run(() => ReStart());
            }
        } 

        /// <summary>
        /// 开始运行
        /// </summary>
        bool Start(QueueDto dto)
        {
            string srcconnect = dto.srcconnect; List<string> dstconnect = dto.dstconnect; int refreshTime = dto.refreshTime; bool enable = dto.enable; int action_cdc = dto.action_cdc; string map = dto.map; bool copy = dto.copy; bool wait = dto.wait;

            bool ret = true;

            var freeSql = new FreeSql.FreeSqlBuilder()
                     .UseConnectionString(FreeSql.DataType.SqlServer, srcconnect)
                     .UseNoneCommandParameter(true)// 不使用参数化
                     .UseAutoSyncStructure(false)// 不同步表结构
                     .Build();

            try
            {
                Program.AddLog($"=============================="); 

                var arrayMap = map.Split('|');
                var db = arrayMap[0];
                var tbs = arrayMap[1];

                var dstStr = string.Join(" ", dstconnect);

                if (!enable)
                {
                    Program.AddLog($"禁用监听,来源={srcconnect},目标数={dstconnect.Count},目标={dstStr},db={db},Tables={tbs}");
                    return true;
                }

                Program.AddLog($"启用监听,来源={srcconnect},目标数={dstconnect.Count},目标={dstStr},db={db},Tables={tbs}");

                ThreadDto entity = new ThreadDto() { freeSql = freeSql, db = db, refreshTime = refreshTime, srcconnect = srcconnect, dstconnect = dstconnect, wait = wait };

                HandleTable(freeSql, db, tbs, copy, action_cdc, entity.dicTable);

                var note = wait ? "串行" : "并行";

                if (copy)
                {
                    Program.AddLog($"全量同步({note}),db={db}");

                    Task.Run(() => CopyDB(entity));
                }
                else
                {
                    Program.AddLog($"增量同步({note}),db={db}");

                    Task.Run(() => MonitorDB(entity));// 监视数据变化并添加到sqlite
                }
            }
            catch (Exception ex)
            {
                ret = false;

                if (freeSql != null) freeSql.Dispose();// 资源释放
               
                Program.AddLog($"[Error] 启动异常,errmsg:{ex.Message}");
            }

            return ret;
        }

        /// <summary>
        /// 重新连接
        /// </summary>
        async void ReStart()
        { 
            while (true)
            {
                await Task.Delay(60000);// 1分钟重试1次 

                if (queue_err.Count > 0)
                {
                    Program.AddLog($"********* 开始重试 *********");

                    var dto = (QueueDto)queue_err.Dequeue();// 出队

                    bool status = Start(dto);

                    if (!status)
                    {
                        queue_err.Enqueue(dto);// 入队
                    }
                }
                else
                {
                    break;
                }
            }
        }

        /// <summary>
        /// 数据表操作(cdc开启/关闭,获取表字段信息)
        /// </summary>
        void HandleTable(IFreeSql freeSql, string db, string tbs, bool copy, int action_cdc, Dictionary<string, Table> dicTable)
        { 
            string sql = string.Empty; string[] arrayTB = null;

            if (tbs.Contains("*"))
            {
                // 查询db下所有表名(按顺序)
                bool all = true;
                sql = $"use {db};select TABLE_NAME from {db}.information_schema.tables where TABLE_SCHEMA='dbo' and TABLE_NAME not in('systranschemas','sysdiagrams') order by TABLE_NAME asc";
                DataTable dtAll = freeSql.Ado.ExecuteDataTable(sql);
                var rowCount = dtAll.Rows.Count; 
                List<string> lstTB = new List<string>();
                for (int i = 0; i < rowCount; i++)
                {
                    var tablename =  dtAll.Rows[i]["TABLE_NAME"].ToString();

                    if (tbs.IndexOf(tablename, StringComparison.OrdinalIgnoreCase) >= 0)
                    {
                        all = false;
                        Program.AddLog($"排除数据表 {db} {tablename} ×");
                    }
                    else
                    {
                        lstTB.Add(tablename);
                    }
                }

                arrayTB = lstTB.ToArray();

                if (all) Program.AddLog($"整库同步({db}) √");
            }
            else
            {
                arrayTB = tbs.Split(',');
            }

            if (null == arrayTB || 0 == arrayTB.Length)
            {
                Program.AddLog($"数据库{db},查无数据表 ×");
                return;
            }

            // 数据库cdc操作
            switch (action_cdc)
            {
                case -1:
                    // 关闭数据库CDC
                    sql = $"use {db};if exists(select 1 from {db}.sys.databases where name='{db}' and is_cdc_enabled=1)\n" +
                                    "begin\n" +
                                        $"exec {db}.sys.sp_cdc_disable_db\n" +
                                    "end";//
                    freeSql.Ado.ExecuteNonQuery(sql);
                    break;
                case 1:
                    // 开启数据库CDC
                    sql = $"use {db};if exists(select 1 from {db}.sys.databases where name='{db}' and is_cdc_enabled=0)\n" +
                                "begin\n" +
                                    $"exec {db}.sys.sp_cdc_enable_db\n" +
                                "end";
                    freeSql.Ado.ExecuteNonQuery(sql);
                    break;
            } 

            // 查询库cdc是否开启成功
            sql = $"use {db};select is_cdc_enabled from {db}.sys.databases where name='{db}'";
            DataTable dtCDC_DB = freeSql.Ado.ExecuteDataTable(sql);
            if (dtCDC_DB.Rows.Count <= 0 || !Convert.ToBoolean(dtCDC_DB.Rows[0]["is_cdc_enabled"]))
            {
                Program.AddLog($"数据库CDC状态:关闭({db}) ×");
                if (!copy)
                {
                    return;
                }
            }
            else
            {
                Program.AddLog($"数据库CDC状态:开启({db}) √");
            } 

            foreach (var table in arrayTB)
            {
                if (string.IsNullOrEmpty(table)) continue;

                // 数据表cdc操作
                if (-1 == action_cdc || 1 == action_cdc)
                {
                    // 关闭单张表的CDC功能
                    sql = $"use {db};if exists(select 1 from {db}.sys.tables where name='{table}' AND is_tracked_by_cdc=1)\n" +
                          "begin\n" +
                             $"exec {db}.sys.sp_cdc_disable_table @source_schema='dbo',@source_name='{table}',@capture_instance='dbo_{table}'" +
                          "end";
                    freeSql.Ado.ExecuteNonQuery(sql);
                }

                if (1 == action_cdc)
                {
                    // 开启单张表的CDC功能
                    sql = $"use {db};if exists(select 1 from {db}.sys.tables where name='{table}' AND is_tracked_by_cdc=0)\n" +
                              "begin\n" +
                                  $"exec {db}.sys.sp_cdc_enable_table\n" +
                                      "@source_schema='dbo',\n" +
                                      $"@source_name='{table}',\n" +
                                      "@capture_instance=NULL,\n" +
                                      "@supports_net_changes=1,\n" +
                                      "@role_name=NULL\n" +
                              "end";
                    freeSql.Ado.ExecuteNonQuery(sql);

                    // 查询表cdc是否开启成功
                    sql = $"use {db};select is_tracked_by_cdc from {db}.sys.tables WHERE name='{table}'";
                    DataTable dtCDC_TB = freeSql.Ado.ExecuteDataTable(sql);
                    if (dtCDC_TB.Rows.Count <= 0 || !Convert.ToBoolean(dtCDC_TB.Rows[0]["is_tracked_by_cdc"]))
                    {
                        Program.AddLog($"数据表CDC开启失败({table}) ×");
                        continue;
                    }
                    Program.AddLog($"数据表CDC开启成功({table}) √");
                }

                // 获取table信息
                Table tb = new Table() { Name = table };

                // 获取字段名,是否主键,字段类型
                string strpkkeys = string.Empty;

                sql = $"use {db};select b.column_name\n" +
                        $"from information_schema.table_constraints a\n" +
                        $"inner join information_schema.constraint_column_usage b\n" +
                        $"on a.constraint_name = b.constraint_name\n" +
                        $"where a.constraint_type='PRIMARY KEY' and a.table_name='{table}'"; 
                var lstPk = freeSql.Ado.Query<string>(sql);// 查询表的主键 
                if (null != lstPk) strpkkeys = string.Join(",", lstPk.ToArray());

                sql = $"use {db};SELECT distinct col.name AS 'Name', TYPE_NAME(system_type_id) as 'Type'\n" +
                            $"FROM sys.columns col\n" +
                            $"LEFT JOIN sys.index_columns idxcol ON col.object_id=idxcol.object_id AND col.column_id=idxcol.column_id\n" +
                            $"LEFT JOIN sys.indexes idx ON idxcol.object_id=idx.object_id AND idxcol.index_id=idx.index_id\n" +
                            $"WHERE col.object_id=OBJECT_ID('{table}')";

                List<Field> lstField = freeSql.Ado.Query<Field>(sql);
                foreach (var field in lstField)
                {
                    var ispk = strpkkeys.Contains(field.Name);
                    if (ispk)
                    {
                        tb.LstPKField.Add(field);// 主键,用于更新删除
                    }
                    else
                    {
                        tb.LstDataField.Add(field);
                    }
                }

                dicTable.Add(table, tb);
            }
        }

        /// <summary>
        /// 全量同步:主库更新到从库
        /// </summary>
        async void CopyDB(ThreadDto dto)
        {
            string sql = string.Empty; 
            var freeSql = dto.freeSql;
            var db = dto.db;
            var dstconnect = dto.dstconnect;
            var dicTable = dto.dicTable;
            var wait = dto.wait;

            foreach (var item in dicTable)
            {
                // 有执行顺序,不使用task
                var t = Task.Run(() =>
                {
                    var table_name = item.Key;
                    var tableEntity = item.Value;

                    Program.AddLog($"开始同步数据表,db={db},table={table_name}"); 

                    List<string> lstSqlServer = new List<string>
                    {
                        $"use {db};delete from {table_name}"
                    }; 
                    
                    sql = $"use {db};select * from {table_name}";
                    var dt = freeSql.Ado.ExecuteDataTable(sql);

                    for (int i = 0; i < dt.Rows.Count; i++)
                    {
                        var row = dt.Rows[i];

                        var lstPKField = tableEntity.LstPKField;
                        var lstDataField = tableEntity.LstDataField;

                        var excute_type = BaseEnum.Insert;
                        string insertField = string.Empty;
                        string insertValue = string.Empty;

                        foreach (var field1 in lstPKField)
                        {
                            insertField += $"[{ field1.Name }],";
                            insertValue += HandleSpecialData(field1.Type, row[field1.Name]) + ",";
                        }

                        foreach (var field2 in lstDataField)
                        {
                            insertField += $"[{ field2.Name}],";
                            insertValue += HandleSpecialData(field2.Type, row[field2.Name]) + ",";
                        }

                        insertField = insertField.Substring(0, insertField.Length - 1);
                        insertValue = insertValue.Substring(0, insertValue.Length - 1); 
                        lstSqlServer.Add($"use {db};insert into {table_name} ({insertField}) values({insertValue})");
                    }

                    if (lstSqlServer.Count > 0)
                    { 
                        List<string> lstSqlite = new List<string>();
                        foreach (var dst in dstconnect)
                        {
                            foreach (var sql_execute in lstSqlServer)
                            {
                                var asyncdata = new AsyncData() { ConnectStr = dst, ExcuteSQL = sql_execute, CDCTime = DateTime.Now, EventTime = DateTime.Now, DBName = db, TableName = table_name, TablePK = "", ExcuteType = "" };
                                string sql_sqlite_insert = SqliteHelper.GetInsertAsyncData(asyncdata);
                                lstSqlite.Add(sql_sqlite_insert);
                            }
                        }

                        if (lstSqlite.Count > 0)
                        {
                            SqliteHelper.BathSqlWithTran(lstSqlite);
                        }
                    }
                });

                if (wait) t.Wait();// 需要按顺序执行
            }
        }

        /// <summary>
        /// 增量更新:监视数据变化并添加到sqlite
        /// </summary>
        async void MonitorDB(ThreadDto dto)
        {
            var freeSql = dto.freeSql;
            var db = dto.db;
            var refreshTime = dto.refreshTime;
            var srcconnect = dto.srcconnect;
            var dstconnect = dto.dstconnect;
            var dicTable = dto.dicTable;
            var wait = dto.wait;

            while (true)
            {
                await Task.Delay(refreshTime);

                foreach (var item in dicTable)
                {
                    var t = Task.Run(() =>
                    {
                        var table_name = item.Key;
                        var tableEntity = item.Value;

                        try
                        {
                            if (tableEntity == null)
                            {
                                return;
                            }

                            lock (tableEntity.Lock)
                            { 
                                //Stopwatch sw = new Stopwatch();
                                //sw.Start();

                                var sql = string.Empty;
                                tableEntity.DicTBUpdatePK.Clear();
                                tableEntity.LstSqlServer.Clear();
                                tableEntity.LstSqlite.Clear();

                                // cdc表查询
                                //__$start_lsn :与相应更改的提交事务关联的日志序列号(LSN)
                                //__$end_lsn : (在 SQL Server 2008中,此列始终为 NULL)
                                //__$seqval :对事务内的行更改顺序
                                //__$operation :源表DML操作
                                var cdc_table_name = $"{db}.cdc.dbo_{table_name}_CT";
                                sql = $"use {db};select top 5000 sys.fn_cdc_map_lsn_to_time(__$start_lsn) as cdctime,* from {cdc_table_name}";// 查询cdc时间
                                var dt = freeSql.Ado.ExecuteDataTable(sql);

                                table_name = $"{db}.dbo." + table_name;
                                for (int i = 0; i < dt.Rows.Count; i++)
                                {
                                    var row = dt.Rows[i];

                                    var lstPKField = tableEntity.LstPKField;
                                    var lstDataField = tableEntity.LstDataField;

                                    var cdctime = Convert.ToDateTime(row["cdctime"]);
                                    var operation = Convert.ToInt32(row["__$operation"]);
                                    var seqval = (byte[])(row["__$seqval"]);// __$start_lsn代表事件时间,并发时,会有相同的情况,改用__$seqval
                                    var str_seqval = BitConverter.ToString(seqval, 0).Replace("-", string.Empty);

                                    if (3 == operation)
                                    {
                                        continue;
                                    }

                                    var sql_cdc_execute = string.Empty;

                                    string table_pk = string.Empty;
                                    foreach (var field1 in lstPKField)
                                    {
                                        table_pk += $"[{field1.Name}]='{row[field1.Name]}' and ";
                                    }

                                    // 排除无主键的表
                                    if (!string.IsNullOrEmpty(table_pk)) table_pk = table_pk.Substring(0, table_pk.Length - 5);

                                    // cdc表中过滤多条表中一条记录多次更新,取最新一条数据(查询过的数据利用字典存储),前提是有主键
                                    string str_seqval1 = string.Empty;
                                    if (4 == operation && !string.IsNullOrEmpty(table_pk))
                                    {
                                        string cdc_dic_pk = table_name + ";" + table_pk;

                                        if (tableEntity.DicTBUpdatePK.ContainsKey(cdc_dic_pk))
                                        {
                                            str_seqval1 = tableEntity.DicTBUpdatePK[cdc_dic_pk];
                                        }
                                        else
                                        {
                                            // 查询多次更新后的最新值
                                            sql = $"use {db};select top 1 __$seqval from {cdc_table_name} where {table_pk} and __$operation=4 order by __$seqval desc";
                                            var dtlsn = freeSql.Ado.ExecuteDataTable(sql);
                                            var seqval1 = (byte[])(dtlsn.Rows[0]["__$seqval"]);
                                            str_seqval1 = BitConverter.ToString(seqval1, 0).Replace("-", string.Empty);
                                            tableEntity.DicTBUpdatePK.Add(cdc_dic_pk, str_seqval1);
                                        }
                                    }

                                    // 删除cdc表数据
                                    sql = $"use {db};delete from {cdc_table_name} where __$seqval=CONVERT(BINARY(10), '{str_seqval}', 2)";
                                    tableEntity.LstSqlServer.Add(sql);

                                    string excute_type = string.Empty;

                                    switch (operation)
                                    {
                                        case 1:
                                            // 删除
                                            if (!string.IsNullOrEmpty(table_pk))
                                            {
                                                excute_type = BaseEnum.Delete;
                                                sql_cdc_execute = $"use {db};delete from {table_name} where {table_pk}";
                                            }
                                            break;
                                        case 2:
                                            // 插入
                                            excute_type = BaseEnum.Insert;
                                            string insertField = string.Empty;
                                            string insertValue = string.Empty;

                                            foreach (var field1 in lstPKField)
                                            {
                                                insertField += $"[{field1.Name}],";
                                                insertValue += HandleSpecialData(field1.Type, row[field1.Name]) + ",";
                                            }

                                            foreach (var field2 in lstDataField)
                                            {
                                                insertField += $"[{field2.Name}],";
                                                insertValue += HandleSpecialData(field2.Type, row[field2.Name]) + ",";
                                            }

                                            insertField = insertField.Substring(0, insertField.Length - 1);
                                            insertValue = insertValue.Substring(0, insertValue.Length - 1);
                                            sql_cdc_execute = $"use {db};insert into {table_name} ({insertField}) values({insertValue})";
                                            break;
                                        case 3:
                                            break;
                                        case 4:
                                            // 修改 
                                            if (str_seqval == str_seqval1 && !string.IsNullOrEmpty(table_pk))// 最新的数据
                                            {
                                                excute_type = BaseEnum.Update;
                                                string updateData = string.Empty;

                                                foreach (var field2 in lstDataField)
                                                {
                                                    updateData += $"[{field2.Name}]={HandleSpecialData(field2.Type, row[field2.Name])},";
                                                }

                                                updateData = updateData.Substring(0, updateData.Length - 1);
                                                sql_cdc_execute = $"use {db};update {table_name} set {updateData} where {table_pk}";
                                            }
                                            break;
                                    }

                                    if (!string.IsNullOrEmpty(sql_cdc_execute))
                                    {
                                        foreach (var dst in dstconnect)
                                        {
                                            bool add = true;

                                            string key1 = srcconnect + "_" + table_name + "_" + table_pk + "_" + excute_type; // A同步B,B更新后,CDC日志返回A,这边做截取
                                            if (Program.DicExecuted.ContainsKey(key1))
                                            {
                                                add = false;
                                                string removedValue;
                                                Program.DicExecuted.TryRemove(key1, out removedValue);
                                            }
                                            else
                                            {
                                                // 修改以最后时间的数据为准
                                                var entity = SqliteHelper.GetUpdateAsyncData(db, table_name, table_pk);

                                                if (null == entity)
                                                {
                                                    var asyncdata = new AsyncData() { ConnectStr = dst, ExcuteSQL = sql_cdc_execute, CDCTime = cdctime, EventTime = DateTime.Now, DBName = db, TableName = table_name, TablePK = table_pk, ExcuteType = excute_type };
                                                    string sql_sqlite_insert = SqliteHelper.GetInsertAsyncData(asyncdata);
                                                    tableEntity.LstSqlite.Add(sql_sqlite_insert);
                                                }
                                                else
                                                {
                                                    // 比较时间
                                                    if (DateTime.Compare(entity.CDCTime, cdctime) < 0)
                                                    {
                                                        string sql_sqlite_update = SqliteHelper.GetUpdateAsyncData(dst, sql_cdc_execute, entity.Id);
                                                        tableEntity.LstSqlite.Add(sql_sqlite_update);
                                                    }
                                                    else
                                                    {
                                                        add = false;
                                                    }
                                                }

                                                if (add)
                                                {
                                                    if (dst.Contains("192.168.8.81"))
                                                    {
                                                        //Console.WriteLine("111");
                                                    }
                                                    //Program.AddLog($"添加,dst:{dst},sql:{sql_cdc_execute},key:{key1}");// debug
                                                }
                                            }
                                        }
                                    }
                                }

                                if (tableEntity.LstSqlServer.Count > 0)
                                {
                                    Program.ServerSqlBatchSqlWithTran(freeSql, tableEntity.LstSqlServer);  // 使用事务批量执行sqlserver sql 
                                }

                                if (tableEntity.LstSqlite.Count > 0)
                                {
                                    SqliteHelper.BathSqlWithTran(tableEntity.LstSqlite);
                                }

                                //sw.Stop();
                                //Program.AddLog($"抓取并存储,记录数:{dt.Rows.Count},耗时:{sw.ElapsedMilliseconds / 1000}秒");
                            }
                        }
                        catch (Exception ex)
                        {
                            Program.AddLog($"MonitorDB Error,ex:{ex.Message}");
                        } 
                    });

                    if (wait) t.Wait();// 需要按顺序执行
                }
            }
        }

        /// <summary>
        /// 特殊数据类型处理 1. uniqueidentifier为空时,设置为NULL;2. 单引号,转成双号
        /// </summary>
        /// <param name="val"></param>
        /// <returns></returns>
        string HandleSpecialData(string type, object val)
        {
            if (null == val) return string.Empty;

            string ret = val.ToString(); bool special = false;

            if ("uniqueidentifier" == type.ToLower())// 特殊数据类型处理
            {
                if (string.IsNullOrEmpty(ret))
                {
                    special = true;
                    ret = "NULL";
                }
            }

            if (!special)
            {
                if (ret.Contains("'"))
                {
                    ret = ret.Replace("'", "''");// 把单引号转成双引号
                }

                ret = $"'{ret}'";
            }

            return ret;
        } 
    }

    public class Table
    {
        public string Name { get; set; }
        public List<Field> LstPKField { get; set; } = new List<Field>();
        public List<Field> LstDataField { get; set; } = new List<Field>();
        public object Lock { get; set; } = new object();
        public Dictionary<string, string> DicTBUpdatePK { get; set; } = new Dictionary<string, string>();
        public List<string> LstSqlServer { get; set; } = new List<string>();
        public List<string> LstSqlite { get; set; } =    new List<string>();
    }

    public class Field
    {
        public string Name { get; set; } 
        public string Type { get; set; }// GUID,uniqueidentifier为空时,改为NULL
    }
}

Logo

加入「COC·上海城市开发者社区」,成就更好的自己!

更多推荐