
C#连接Influxdb时序数据库
C# 连接Influxdb时序数据库的方法总结,其中包括初始化,连接,读取,写入等方法。/// InfluxDB辅助器。#region 属性。
·
C# 连接Influxdb时序数据库的方法总结,其中包括初始化,连接,读取,写入等方法。
using InfluxDB.Client;
using InfluxDB.Client.Api.Domain;
using InfluxDB.Client.Core.Flux.Domain;
using InfluxDB.Client.Linq;
using InfluxDB.Client.Writes;
using System;
using System.Collections.Generic;
using System.Data;
using System.Linq;
using System.Text;
using static DataBaseDll.Constant;
using static DataBaseDll.Method;
namespace DataBaseDll
{
///
/// InfluxDB辅助器
///
[Serializable]
public class InfluxDBHelper : TimescaleBase
{
#region 属性
///
/// 组织
///
public string Org { get; set; } = string.Empty;
///
/// 令牌
///
public string Token { get; set; } = string.Empty;
/// <summary>
/// 是否验证令牌
/// </summary>
public bool IsAuthenticateToken { get; set; } = false;
/// <summary>
/// SQL语句最大数量
/// </summary>
public int MaxSQLTextCount { get; set; } = 10000;
/// <summary>
/// 客户端
/// </summary>
[field: NonSerialized()]
public InfluxDBClient Client { get; set; } = null;
#endregion
#region 构造函数
/// <summary>
/// 构造函数
/// </summary>
/// <param name="userName">用户名</param>
/// <param name="password">密码</param>
/// <param name="token">令牌</param>
/// <param name="org">组织</param>
/// <param name="dbName">库名</param>
/// <param name="ip">IP</param>
/// <param name="port">端口</param>
public InfluxDBHelper(string userName, string password, string token, string org, string dbName,
string ip = LocalHost, int port = 8086, int timeOut = 10, bool isAuthenticateToken = true)
{
IP = ip;
Port = port;
UserName = userName;
Password = password;
DBName = dbName;
Token = token;
IsAuthenticateToken = isAuthenticateToken;
Org = org;
TimeOut = timeOut;
}
#endregion
#region 方法
#region 连接
/// <summary>
/// 连接
/// </summary>
/// <param name="errorString">错误字符串</param>
/// <returns>连接是否成功</returns>
public bool Connect(out string errorString)
{
//初始化
errorString = string.Empty;
//检查连接
if (CheckPort(IP, Port))
{
InfluxDBClientOptions.Builder builder = new InfluxDBClientOptions.Builder().
Url($"http://{IP}:{Port}").
TimeOut(TimeSpan.FromSeconds(TimeOut));
Client = Client ?? InfluxDBClientFactory.Create(IsAuthenticateToken ? builder.AuthenticateToken(Token).Build() : builder.Authenticate(UserName, Password.ToString().ToArray()).Build());
if (Client == null)
{
errorString = "无法创建InfluxDB客户端!\n";
}
}
else
{
errorString = $"{IP}:{Port}无法连接!\n";
}
return string.IsNullOrEmpty(errorString);
}
/// <summary>
/// 断开
/// </summary>
/// <param name="errorString">错误字符串</param>
/// <returns>断开是否成功</returns>
public bool Disconnect(out string errorString)
{
//初始化
errorString = string.Empty;
Client.Dispose();
Client = null;
return string.IsNullOrEmpty(errorString);
}
#endregion
#region
/// <summary>
/// 根据SQL语句写入数据库
/// </summary>
/// <param name="sqlText">SQL语句</param>
/// <param name="errorString">错误字符串</param>
/// <returns>写入是否成功</returns>
public bool Write(string sqlText, out string errorString)
{
//初始化
errorString = string.Empty;
//检查参数
if (string.IsNullOrEmpty(sqlText))
{
errorString = $"查询语句为空!\n";
return false;
}
//检查连接
if (!Connect(out errorString))
{
return false;
}
using (WriteApi writeApi = Client.GetWriteApi())
{
writeApi.WriteRecord(sqlText, WritePrecision.Ns, DBName, Org);
}
return string.IsNullOrEmpty(errorString);
}
/// <summary>
/// 写入数据
/// </summary>
/// <param name="pointDataList">点数据列表</param>
/// <param name="errorString">错误字符串</param>
/// <returns>是否写入成功</returns>
public bool Write(List<PointData> pointDataList, out string errorString)
{
//初始化
errorString = string.Empty;
//检查参数
if (pointDataList == null || !pointDataList.Any())
{
errorString = $"写入数据为空!\n";
return false;
}
//检查连接
if (!Connect(out string errorstr))
{
errorString = errorstr;
return false;
}
using (WriteApi writeApi = Client.GetWriteApi())
{
writeApi.WritePoints(pointDataList, DBName, Org);
}
return string.IsNullOrEmpty(errorString);
}
/// <summary>
/// 查询指定条件的数据个数,使用外部委托参数时效率不高
/// </summary>
/// <typeparam name="InfluxDBDataT">InfluxDB数据类型</typeparam>
/// <param name="func"></param>
/// <param name="startTime">起始时间</param>
/// <param name="endTime">终止时间</param>
/// <param name="count">个数</param>
/// <param name="errorString">错误字符串</param>
/// <returns>是否读取成功</returns>
public bool GetCount<InfluxDBDataT>(out int count, out string errorString, Func<InfluxDBDataT, bool> func = null, DateTime? startTime = null, DateTime? endTime = null)
where InfluxDBDataT : InfluxDBDataBase
{
//初始化
count = 0;
errorString = string.Empty;
List<InfluxDBDataT> influxDBDataTList = new List<InfluxDBDataT>();
//检查连接
if (!Connect(out string str))
{
errorString = str;
return false;
}
try
{
influxDBDataTList = InfluxDBQueryable<InfluxDBDataT>.Queryable(DBName, Org, Client.GetQueryApiSync()).
Where(p => (startTime == null || p.Timestamp.ToLocalTime() >= (DateTime)startTime) &&
(endTime == null || p.Timestamp.ToLocalTime() <= (DateTime)endTime)).ToList();
}
catch (Exception ex)
{
errorString = $"查询失败!{ex.Message}。\n";
return false;
}
count = influxDBDataTList.Count(p => func(p));
return string.IsNullOrEmpty(errorString);
}
/// <summary>
/// 查询指定条件的InfluxDB数据序列,使用外部委托参数时效率不高
/// </summary>
/// <typeparam name="InfluxDBDataT">InfluxDB数据类型</typeparam>
/// <param name="func">指定条件的委托</param>
/// <param name="influxDBDataTList">InfluxDB数据序列</param>
/// <param name="errorString">错误字符串</param>
/// <param name="startTime">起始时间</param>
/// <param name="endTime">终止时间</param>
/// <returns>是否读取成功</returns>
public bool Read<InfluxDBDataT>(out List<InfluxDBDataT> influxDBDataTList, out string errorString, Func<InfluxDBDataT, bool> func = null, DateTime? startTime = null, DateTime? endTime = null)
where InfluxDBDataT : InfluxDBDataBase
{
//初始化
errorString = string.Empty;
influxDBDataTList = null;
//检查连接
if (!Connect(out string str))
{
errorString = str;
return false;
}
try
{
influxDBDataTList = InfluxDBQueryable<InfluxDBDataT>.Queryable(DBName, Org, Client.GetQueryApiSync()).
Where(p => (startTime == null || p.Timestamp >= ((DateTime)startTime).ToUniversalTime()) &&
(endTime == null || p.Timestamp <= ((DateTime)endTime).ToUniversalTime())).ToList();
}
catch (Exception ex)
{
errorString = $"查询失败!{ex.Message}。\n";
return false;
}
if (func != null)
{
influxDBDataTList = influxDBDataTList.Where(p => func(p)).ToList();
}
return string.IsNullOrEmpty(errorString);
}
// <summary>
/// 查询指定条件的值序列和时间序列,使用外部委托参数时效率不高
/// </summary>
/// <typeparam name="InfluxDBDataT">InfluxDB数据类型</typeparam>
/// <param name="func">指定条件的委托</param>
/// <param name="valueList">值序列</param>
/// <param name="timeList">时间序列</param>
/// <param name="errorString">错误字符串</param>
/// <param name="startTime">起始时间</param>
/// <param name="endTime">终止时间</param>
/// <returns>是否读取成功</returns>
public bool Read<ValueT, InfluxDBDataT>(out List<ValueT> valueList, out List<DateTime> timeList, out string errorString, Func<InfluxDBDataT, bool> func = null, DateTime? startTime = null, DateTime? endTime = null)
where InfluxDBDataT : InfluxDBDataBase
{
//初始化
errorString = string.Empty;
valueList = null;
timeList = null;
List<InfluxDBDataT> influxDBDataTList = null;
Func<InfluxDBDataT, bool> newFunc = null;
if (func == null)
{
newFunc = new Func<InfluxDBDataT, bool>(p => p.Value.GetType() == typeof(ValueT));
}
else
{
newFunc = new Func<InfluxDBDataT, bool>(p => func.Invoke(p) && p.Value.GetType() == typeof(ValueT));
}
//检查连接
if (!Connect(out string str))
{
errorString = str;
return false;
}
if (Read<InfluxDBDataT>(out influxDBDataTList, out errorString, newFunc, startTime, endTime))
{
valueList = influxDBDataTList.Select(p => (ValueT)p.Value).ToList();
timeList = influxDBDataTList.Select(p => p.Timestamp.ToLocalTime()).ToList();
}
return string.IsNullOrEmpty(errorString);
}
#endregion
#endregion
}
}
点击阅读全文
更多推荐
所有评论(0)