Python实现NI USB-4431数据采集卡的数据采集与保存方法
本文介绍了使用NI USB-4431数据采集卡采集音频数据并保存的方法。通过Python的nidaqmx库,可以配置采样参数(采样率、时长、电压范围等),采集指定通道的数据,并将结果同时保存为CSV和WAV格式文件。CSV文件包含时间-电压值数据及元信息,WAV文件则存储为24位音频格式。程序提供了设备检测、自动文件名生成和错误处理功能,适用于音频信号采集与分析场景。
NI USB-4431数据采集卡的数据采集与保存方法

代码如下
"""
NI USB-4431数据采集卡 - 采集第一个通道数据并保存到CSV文件
作者: Assistant
日期: 2025-10-21
"""
import nidaqmx
import nidaqmx.system
from nidaqmx.constants import AcquisitionType, Edge, READ_ALL_AVAILABLE
import csv
import datetime
import numpy as np
import time
import os
import wave
def list_devices():
"""列出系统中所有可用的NI设备"""
print("检测到的NI设备:")
system = nidaqmx.system.System.local()
for device in system.devices:
print(f" 设备名称: {device.name}")
print(f" 产品类型: {device.product_type}")
print(f" 序列号: {device.dev_serial_num}")
print(f" AI物理通道数: {device.ai_physical_chans}")
print()
def acquire_and_save_to_csv_and_wav(device_name="Dev1", channel="ai0",
sample_rate=48000, duration=5.0,
output_file=None,
voltage_range=1.0):
"""
使用NI USB-4431采集卡采集指定通道的数据并保存到CSV和WAV文件
参数:
device_name (str): 设备名称,默认为"Dev1"
channel (str): 要采集的通道,默认为"ai0"(第一个通道)
sample_rate (float): 采样率,单位Hz,默认为48000Hz
duration (float): 采集时长,单位秒,默认为5秒
output_file (str): 输出CSV文件路径,如果为None则自动生成
voltage_range (float): 电压范围,单位V,默认为±1V
"""
# 如果没有指定输出文件名,则自动生成
if output_file is None:
timestamp = datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
output_file = f"usb4431_ch{channel[-1]}_data_{timestamp}.csv"
# 生成WAV文件名
wav_file = output_file.replace('.csv', '.wav')
# 计算需要采集的样本数
samples_per_channel = int(sample_rate * duration)
print(f"开始采集数据...")
print(f"设备: {device_name}")
print(f"通道: {channel}")
print(f"采样率: {sample_rate} Hz")
print(f"采集时长: {duration} 秒")
print(f"电压范围: ±{voltage_range} V")
print(f"总样本数: {samples_per_channel}")
print(f"输出CSV文件: {output_file}")
print(f"输出WAV文件: {wav_file}")
print("-" * 50)
try:
# 创建任务
with nidaqmx.Task() as task:
# 添加模拟输入电压通道
# USB-4431支持AC/DC耦合,这里使用默认设置
task.ai_channels.add_ai_voltage_chan(
f"{device_name}/{channel}",
min_val=-voltage_range,
max_val=voltage_range
)
# 配置采样时钟
task.timing.cfg_samp_clk_timing(
rate=sample_rate,
sample_mode=AcquisitionType.FINITE,
samps_per_chan=samples_per_channel
)
# 开始采集
start_time = time.time()
data = task.read(number_of_samples_per_channel=samples_per_channel)
end_time = time.time()
print(f"数据采集完成,耗时: {end_time - start_time:.2f} 秒")
print(f"采集到 {len(data)} 个数据点")
print(f"原始数据范围: [{np.min(data):.6f}, {np.max(data):.6f}] V")
# 保存到CSV文件
save_to_csv(data, sample_rate, output_file, voltage_range)
# 转换并保存到WAV文件
save_to_wav(data, sample_rate, wav_file)
print(f"数据已保存到: {output_file}")
print(f"WAV已保存到: {wav_file}")
return output_file, wav_file
except nidaqmx.DaqError as e:
print(f"NI-DAQmx错误: {e}")
return None, None
except Exception as e:
print(f"发生错误: {e}")
return None, None
def save_to_csv(data, sample_rate, filename, voltage_range):
"""
将采集到的数据保存到CSV文件
参数:
data (list): 采集到的数据点列表
sample_rate (float): 采样率
filename (str): 输出文件名
voltage_range (float): 电压范围
"""
# 计算时间戳
timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
with open(filename, 'w', newline='', encoding='utf-8') as csvfile:
writer = csv.writer(csvfile)
# 写入元数据
writer.writerow(['Data Acquisition Information'])
writer.writerow(['Device', 'NI USB-4431'])
writer.writerow(['Timestamp', timestamp])
writer.writerow(['Sampling', str(int(sample_rate))])
writer.writerow(['Voltage Range', f"±{voltage_range}V"])
writer.writerow(['Channel', 'CH1'])
writer.writerow([]) # 空行
# 写入列标题
writer.writerow(['Time(s)', 'CH1(V)'])
# 写入数据
dt = 1.0 / sample_rate # 时间间隔
for i, value in enumerate(data):
time_point = i * dt
writer.writerow([f"{time_point:.6f}", f"{value:.6f}"])
def save_to_wav(data, sample_rate, filename, bit_depth=24):
"""
将采集到的数据转换并保存为WAV文件
参数:
data (list): 采集到的数据点列表
sample_rate (float): 采样率
filename (str): 输出WAV文件名
bit_depth (int): 位深度,默认为24位
"""
# 将数据转换为numpy数组
audio_data = np.array(data, dtype=np.float32)
# 确保数据在±1范围内(已经是±1V范围,对应WAV的±1振幅)
# 数据已经是正确的范围,不需要额外缩放
print(f"使用采样率: {sample_rate} Hz")
print(f"输出位深度: {bit_depth}位")
print(f"WAV数据范围: [{np.min(audio_data):.6f}, {np.max(audio_data):.6f}]")
if bit_depth == 16:
# 转换为16位整数格式 (-32768 到 32767)
audio_data_int = (audio_data * 32767).astype(np.int16)
sampwidth = 2 # 2字节 = 16位
elif bit_depth == 24:
# 转换为24位整数格式 (-8388608 到 8388607)
audio_data_int = (audio_data * 8388607).astype(np.int32)
sampwidth = 3 # 3字节 = 24位
elif bit_depth == 32:
# 转换为32位浮点格式 (-1.0 到 1.0)
audio_data_int = audio_data.astype(np.float32)
sampwidth = 4 # 4字节 = 32位浮点
else:
raise ValueError("只支持16位、24位或32位深度")
# 创建WAV文件
with wave.open(filename, 'w') as wav_file:
wav_file.setnchannels(1) # 单声道
wav_file.setsampwidth(sampwidth)
wav_file.setframerate(sample_rate)
if bit_depth == 16:
wav_file.writeframes(audio_data_int.tobytes())
elif bit_depth == 24:
frames = audio_data_int.tobytes()
# 只取每个32位整数的低24位(3个字节)
frames_24bit = bytearray()
for i in range(0, len(frames), 4):
# 取低3个字节(24位)
frames_24bit.extend(frames[i:i+3])
wav_file.writeframes(bytes(frames_24bit))
elif bit_depth == 32:
wav_file.writeframes(audio_data_int.tobytes())
print(f"成功转换WAV: {filename}")
print(f"采样率: {sample_rate} Hz")
print(f"数据点数: {len(audio_data)}")
print(f"时长: {len(audio_data) / sample_rate:.2f} 秒")
def continuous_acquisition(device_name="Dev1", channel="ai0",
sample_rate=48000, duration=5.0,
num_cycles=3, voltage_range=1.0):
"""
连续多次采集数据并同时生成CSV和WAV文件
参数:
device_name (str): 设备名称
channel (str): 通道名称
sample_rate (float): 采样率
duration (float): 每次采集时长
num_cycles (int): 采集次数
voltage_range (float): 电压范围
"""
print(f"开始连续采集 {num_cycles} 次数据...")
print("=" * 60)
output_files = []
for cycle in range(num_cycles):
print(f"\n第 {cycle + 1}/{num_cycles} 次采集:")
print("-" * 30)
# 生成文件名
timestamp = datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
output_file = f"usb4431_ch{channel[-1]}_data_{timestamp}_cycle{cycle+1}.csv"
# 采集数据并生成CSV和WAV文件
csv_file, wav_file = acquire_and_save_to_csv_and_wav(
device_name=device_name,
channel=channel,
sample_rate=sample_rate,
duration=duration,
output_file=output_file,
voltage_range=voltage_range
)
if csv_file and wav_file:
output_files.append((csv_file, wav_file))
print(f"第 {cycle + 1} 次采集完成")
else:
print(f"第 {cycle + 1} 次采集失败")
# 在采集之间稍作延迟(除了最后一次)
if cycle < num_cycles - 1:
time.sleep(1)
print("\n" + "=" * 60)
print(f"连续采集完成,共生成 {len(output_files)} 对文件:")
for csv_file, wav_file in output_files:
print(f" - {csv_file}")
print(f" - {wav_file}")
def main():
"""主函数"""
print("NI USB-4431 数据采集与CSV/WAV保存工具")
print("=" * 50)
# 首先列出系统中的设备
try:
list_devices()
except Exception as e:
print(f"无法列出设备: {e}")
print("请确保已正确安装NI-DAQmx驱动和nidaqmx Python包")
print("安装命令: pip install nidaqmx")
return
# 示例1: 单次采集
print("\n示例1: 单次采集5秒数据")
print("-" * 30)
acquire_and_save_to_csv_and_wav(
device_name="Dev1", # 根据实际情况修改设备名
channel="ai0", # 第一个通道
sample_rate=48000, # 48 kHz采样率
duration=10.0, # 采集5秒
voltage_range=1.0 # ±1V范围,对应WAV的±1振幅范围
)
# # 示例2: 连续采集
# print("\n\n示例2: 连续采集3次数据")
# print("-" * 30)
# continuous_acquisition(
# device_name="Dev1", # 根据实际情况修改设备名
# channel="ai0", # 第一个通道
# sample_rate=48000, # 48 kHz采样率
# duration=2.0, # 每次采集2秒
# num_cycles=3, # 采集3次
# voltage_range=1.0 # ±1V范围,对应WAV的±1振幅范围
# )
if __name__ == "__main__":
main()
设备检测与列表
list_devices()函数可以列出系统中所有可用的NI设备。该函数会显示设备名称、产品类型、序列号和模拟输入通道数,方便用户确认设备连接状态。
数据采集配置
acquire_and_save_to_csv_and_wav()函数是核心功能,支持配置多个参数:设备名称、采集通道、采样率、采集时长、输出电压范围和自定义输出文件名。默认采样率设为48kHz,适合大多数音频采集场景。
数据采集实现
函数内部创建DAQmx任务,配置模拟输入电压通道和采样时钟。使用read()方法执行同步采集,返回采集到的电压数据数组。采集过程中会实时显示进度和参数信息。
数据保存格式
采集数据支持两种格式保存:CSV和WAV。CSV文件包含元数据信息和时间-电压数据对,便于后续分析处理。WAV文件则直接保存为音频格式,方便播放和音频处理软件使用。
CSV文件格式
save_to_csv()函数生成的CSV文件包含完整元数据:设备信息、时间戳、采样率、电压范围和通道信息。数据部分以时间戳和对应电压值的形式存储,时间精度达到微秒级。
WAV文件生成
save_to_wav()函数将电压数据转换为32位浮点格式,符合WAV文件标准。默认使用24位深度保存,确保音频质量。数据范围自动归一化到±1.0范围内,符合WAV格式规范。
错误处理
代码包含完整的异常处理机制,能捕获并显示NI-DAQmx错误和其他异常情况,确保程序稳定运行。
实际应用场景
该代码适合需要高精度音频采集的场合,如:声学测量、振动分析、工业噪声监测等。USB-4431的24位分辨率能准确捕捉微弱信号,48kHz采样率满足大多数音频频段需求。
CSV格式便于导入MATLAB、Excel等工具进行后续分析,而WAV格式可直接用于音频编辑软件。两种格式的结合提供了灵活的数据处理选择。
魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。
更多推荐


所有评论(0)