NATIONAL INSTRUMENTS 美國 NI-USB6000 數據采集卡,Python編程


一、簡單介紹
- 最近公司一個項目用到這款數據采集卡,這兩天把我頭都搞炸了,特做一個記錄,給需要使用Python采集后面有需要的人少走點坑
  • 我們項目要求以最高采集率運行,10k/s,找到的列子都不太好用。
  • 直接上效果圖:單通道下任意定量采集,測量一枚3V紐扣式鋰電池電壓
1.1 NI-USB6000
NI USB-6000提供USB接口、八個單端模擬(AI)輸入通道、4個數字輸入輸出(DIO)通道和一個32位計數器。
  • 安裝:
    LabView:稍后貼出網盤鏈接

  • 驅動 NI-DAQmx 博主自己網上找的18點幾版本,如需最新版,可去官網(對着官網呵呵)
    網盤鏈接--提取碼:l5dx
    下載解壓后出來后,再解壓使用第一個壓縮包就好

二、如何通過Python與采集卡通信獲取數據
- nidaqmx 模塊,這是官方提供的一個模塊,可與采集卡驅動通信,非常好用 - 安裝:pip install nidaqmx 下載慢的可以 -i 使用國內鏡像源,清華源、阿里源、豆瓣,都非常好用,速度杠桿的

NI DAQmx Python API參考,及使用教程

2.1 簡單使用
- 按照好驅動后,重啟,NI設備監視器會自動運行,如圖點看查看設備 “Dev1”就是設備名稱了,而 “ai0” "ai1" "ai2" 就是設備不同的通道,看自己需要而定了 ![](https://img2018.cnblogs.com/blog/1226829/202001/1226829-20200102165525453-725194365.png)
import nidaqmx
import pprint
import numpy as np 
from matplotlib import pyplot as plt 

pp = pprint.PrettyPrinter(indent=4)


with nidaqmx.Task() as task:
    task.ai_channels.add_ai_voltage_chan("Dev1/ai0")

    print('1 Channel 1 Sample Read: ')
    data = task.read()
    pp.pprint(data)

    data = task.read(number_of_samples_per_channel=1)
    pp.pprint(data)

    print('1 Channel N Samples Read: ')
    data = task.read(number_of_samples_per_channel=10)
    x=np.arange(0,len(data))
    pp.pprint(data)
    plt.plot(x,data)

    task.ai_channels.add_ai_voltage_chan("Dev1/ai1")

    print('N Channel 1 Sample Read: ')
    data = task.read()
    pp.pprint(data)

    print('N Channel N Samples Read: ')
    data = task.read(number_of_samples_per_channel=2)
    pp.pprint(data)

官方提示

2.2 通過stream_readers來提高性能
- 簡單理解,因為博主這個設備是個舊設備,說明、光盤啥的全丟了,然后NI官方只有會員才能享用他們的技術咨詢服務,(坑爹的NI 買了硬件不行,還要買軟件,會員) - 言歸正傳:據我這兩天網絡查找資料和查看源碼,應當是我們創建虛擬通道對應物理通道與驅動通信,給與驅動Buffer,驅動會將數據讀取回來寫入Buffer,如果讀回來的數據大於Buffer,會自動將舊數據覆蓋

nidaqmx 下 stream_readers.py文件下有多個讀取類,
比如 AnalogSingleChannelReader 模擬單路通道讀取
或者 AnalogMultiChannelReader 模擬多通道讀取
這兒博主因為需要需要使用兩個物理通道,所以就使用的第二個,當然你也可以使用第二個類去讀取單通道,只需要在任務注冊添加通道時只添加一個通道,並且緩沖區Buffer設置,給一維數組

我調試使用的一段代碼:

import time
import nidaqmx
import numpy as np
from nidaqmx import constants
from nidaqmx import stream_readers
from nidaqmx import stream_writers

NUM_CHANNELS = 2
RATE = 10000/NUM_CHANNELS



with nidaqmx.Task() as task:
    for i in range(NUM_CHANNELS):
        task.ai_channels.add_ai_voltage_chan("Dev1/ai{}".format(i),name_to_assign_to_channel="AI{}".format(i),max_val=10,min_val=-10)

    task.timing.cfg_samp_clk_timing(RATE,sample_mode=constants.AcquisitionType.CONTINUOUS,samps_per_chan=100000) #一直采,直到停止任務
    #task.timing.cfg_samp_clk_timing(RATE,sample_mode=constants.AcquisitionType.FINITE,samps_per_chan=300000) #采集指定數量的樣本

    non_local_var = {'All samples': []}
    channel1_data = []
    channel2_data = []

    #################使用 stream_readers 來提高性能 ##########################
    # read_task = stream_readers.AnalogMultiChannelReader(task.in_stream)
    # write_task = stream_writers.AnalogMultiChannelWriter(task.out_stream)

    def read_callback(task_handle, every_n_samples_event_type,
                 number_of_samples, callback_data):

        #################使用 stream_readers 來提高性能 ##########################
        # buffer = np.empty((NUM_CHANNELS, number_of_samples), dtype=np.float64,order="C")
        # read_task.read_many_sample(buffer, number_of_samples, timeout=constants.WAIT_INFINITELY)
        # data = buffer.T.astype(np.float64)

        ####################直接使用 read 函數 #############
        data = task.read(number_of_samples_per_channel=number_of_samples)
        # non_local_var['All samples'].extend(data)
        channel1_data.extend(data[0])
        channel2_data.extend(data[1])

        return 0

    task.register_every_n_samples_acquired_into_buffer_event(
        1000, read_callback)

    task.start()
    startTime = time.time()
    while True:
        if len(channel1_data)>=300000:
            break
    print("耗費時間:",time.time()-startTime)
    task.stop()


    print("數據總量:",len(channel1_data))
    print(channel1_data[1:100])
    print(channel1_data[-50:])

2.2.2 幾個函數說明
- task.ai_channels.add_ai_voltage_chan("Dev1/ai0",name_to_assign_to_channel="AI0",max_val=10,min_val=-10) 選擇ai_channels(輸入)添加通道0 也就是"Dev1/ai0",設備名稱不一樣的自己在上面所示的NI監聽設備中去查看哦。 - task.timing.cfg_samp_clk_timing(RATE,sample_mode=constants.AcquisitionType.CONTINUOUS,samps_per_chan=100000) #一直采,直到停止任務 設定采集率,采集模式,每通道采集數量等...,(注意:采集率最大10k,計算方式應該為最大采集率除以你所需的通道數,10k/channels)比如博主使用2通道,最大就只能設置5000/s的采集率了。
  • read_task = stream_readers.AnalogMultiChannelReader(task.in_stream)

  • write_task = stream_writers.AnalogMultiChannelWriter(task.out_stream)
    將任務添加到流讀取中,以提高性能

  • buffer = np.empty((NUM_CHANNELS, number_of_samples), dtype=np.float64,order="C")

  • read_task.read_many_sample(buffer, number_of_samples, timeout=constants.WAIT_INFINITELY)
    設定緩沖區buffer,選用numpy.empty原因是這個初始化所需矩陣比較快,比np.zero快,因為他是隨機值,比如博主需要的就是np.empty((2,300000),dtype=np.float64)
    然后通過將buffer 傳遞給流讀取對象,read_task去讀取

  • data = task.read(number_of_samples_per_channel=number_of_samples)
    不使用stream_readers正常讀取

  • task.register_every_n_samples_acquired_into_buffer_event(1000, read_callback)
    注冊回調函數,就如函數名稱,每采集n個樣品到buffer觸發事件,調用回到函數

  • task.start()
    最后記得開啟任務

  • task.stop()關閉任務
    讀取到所需數據后

一些事列的Git地址
stream_readers
關於NI設備的一些知識


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM