- 我們項目要求以最高采集率運行,10k/s,找到的列子都不太好用。
- 直接上效果圖:單通道下任意定量采集,測量一枚3V紐扣式鋰電池電壓
-
安裝:
LabView:稍后貼出網盤鏈接 -
驅動 NI-DAQmx 博主自己網上找的18點幾版本,如需最新版,可去官網(對着官網呵呵)
網盤鏈接--提取碼:l5dx
下載解壓后出來后,再解壓使用第一個壓縮包就好
import nidaqmx
import pprint
import numpy as np
from matplotlib import pyplot as plt
pp = pprint.PrettyPrinter(indent=4)
with nidaqmx.Task() as task:
task.ai_channels.add_ai_voltage_chan("Dev1/ai0")
print('1 Channel 1 Sample Read: ')
data = task.read()
pp.pprint(data)
data = task.read(number_of_samples_per_channel=1)
pp.pprint(data)
print('1 Channel N Samples Read: ')
data = task.read(number_of_samples_per_channel=10)
x=np.arange(0,len(data))
pp.pprint(data)
plt.plot(x,data)
task.ai_channels.add_ai_voltage_chan("Dev1/ai1")
print('N Channel 1 Sample Read: ')
data = task.read()
pp.pprint(data)
print('N Channel N Samples Read: ')
data = task.read(number_of_samples_per_channel=2)
pp.pprint(data)
nidaqmx 下 stream_readers.py文件下有多個讀取類,
比如 AnalogSingleChannelReader 模擬單路通道讀取
或者 AnalogMultiChannelReader 模擬多通道讀取
這兒博主因為需要需要使用兩個物理通道,所以就使用的第二個,當然你也可以使用第二個類去讀取單通道,只需要在任務注冊添加通道時只添加一個通道,並且緩沖區Buffer設置,給一維數組
我調試使用的一段代碼:
import time
import nidaqmx
import numpy as np
from nidaqmx import constants
from nidaqmx import stream_readers
from nidaqmx import stream_writers
NUM_CHANNELS = 2
RATE = 10000/NUM_CHANNELS
with nidaqmx.Task() as task:
for i in range(NUM_CHANNELS):
task.ai_channels.add_ai_voltage_chan("Dev1/ai{}".format(i),name_to_assign_to_channel="AI{}".format(i),max_val=10,min_val=-10)
task.timing.cfg_samp_clk_timing(RATE,sample_mode=constants.AcquisitionType.CONTINUOUS,samps_per_chan=100000) #一直采,直到停止任務
#task.timing.cfg_samp_clk_timing(RATE,sample_mode=constants.AcquisitionType.FINITE,samps_per_chan=300000) #采集指定數量的樣本
non_local_var = {'All samples': []}
channel1_data = []
channel2_data = []
#################使用 stream_readers 來提高性能 ##########################
# read_task = stream_readers.AnalogMultiChannelReader(task.in_stream)
# write_task = stream_writers.AnalogMultiChannelWriter(task.out_stream)
def read_callback(task_handle, every_n_samples_event_type,
number_of_samples, callback_data):
#################使用 stream_readers 來提高性能 ##########################
# buffer = np.empty((NUM_CHANNELS, number_of_samples), dtype=np.float64,order="C")
# read_task.read_many_sample(buffer, number_of_samples, timeout=constants.WAIT_INFINITELY)
# data = buffer.T.astype(np.float64)
####################直接使用 read 函數 #############
data = task.read(number_of_samples_per_channel=number_of_samples)
# non_local_var['All samples'].extend(data)
channel1_data.extend(data[0])
channel2_data.extend(data[1])
return 0
task.register_every_n_samples_acquired_into_buffer_event(
1000, read_callback)
task.start()
startTime = time.time()
while True:
if len(channel1_data)>=300000:
break
print("耗費時間:",time.time()-startTime)
task.stop()
print("數據總量:",len(channel1_data))
print(channel1_data[1:100])
print(channel1_data[-50:])
-
read_task = stream_readers.AnalogMultiChannelReader(task.in_stream)
-
write_task = stream_writers.AnalogMultiChannelWriter(task.out_stream)
將任務添加到流讀取中,以提高性能 -
buffer = np.empty((NUM_CHANNELS, number_of_samples), dtype=np.float64,order="C")
-
read_task.read_many_sample(buffer, number_of_samples, timeout=constants.WAIT_INFINITELY)
設定緩沖區buffer,選用numpy.empty原因是這個初始化所需矩陣比較快,比np.zero快,因為他是隨機值,比如博主需要的就是np.empty((2,300000),dtype=np.float64)
然后通過將buffer 傳遞給流讀取對象,read_task去讀取 -
data = task.read(number_of_samples_per_channel=number_of_samples)
不使用stream_readers正常讀取 -
task.register_every_n_samples_acquired_into_buffer_event(1000, read_callback)
注冊回調函數,就如函數名稱,每采集n個樣品到buffer觸發事件,調用回到函數 -
task.start()
最后記得開啟任務 -
task.stop()關閉任務
讀取到所需數據后