Python3+PCAN-USB基於PCAN-Basic二次開發實現上位機功能


一、環境搭建

1.概述

本文主要是通過Python3與CAN總線工具PCAN-USB基於官方PCAN-Basic庫實現CAN總線消息讀寫功能。

 

2.PCANBasic.py和PCANBasic.dll下載地址

https://www.peak-system.com/fileadmin/media/files/pcan-basic.zip

 

3.Python安裝

下載地址:https://www.python.org/ftp/python/3.7.9/python-3.7.9-amd64.exe

 

二、項目展示

1.文件目錄

2.xmt文件內容

 

3.CAN消息讀取截圖

 

4.CAN消息發送截圖

 

三、完整代碼

#!/usr/bin/python
# _*_ coding:utf-8 _*_
from PCANBasic import *
from queue import *
import threading, time, os


class PcanOperate(PCANBasic, threading.Thread):
    def __init__(self):
        super().__init__()  # 繼承父類的init方法
        result = self.Initialize(PCAN_USBBUS1, PCAN_BAUD_500K)  # 總線初始化
        if result != PCAN_ERROR_OK:
            print(self.GetErrorText(result))  # 發生錯誤,獲取錯誤信息
        else:
            print("PCAN-USB 已初始化")

    def ProcessMessage(self, msg, timestamp):
        """CAN消息處理方法"""
        msg_dict = {}
        msg_id = hex(msg.ID)
        if len(msg_id[2:]) == 7:
            msg_dict["CANID"] = '0' + msg_id[2:].upper()
        else:
            msg_dict["CANID"] = msg_id[2:].upper()
        msg_dict["MSGTYPE"] = msg.MSGTYPE
        msg_dict["LEN"] = msg.LEN
        data = ''
        for i in range(8):
            if len(hex(msg.DATA[i])[2:]) == 1:
                d = ' ' + '0' + hex(msg.DATA[i])[2:].upper()
            else:
                d = ' ' + hex(msg.DATA[i])[2:].upper()
            data += d
        msg_dict["DATA"] = data[1:]

        timestamp_dict = {}
        timestamp_dict['millis'] = timestamp.millis
        timestamp_dict['millis_overflow'] = timestamp.millis_overflow
        timestamp_dict['micros'] = timestamp.micros
        time_sum = timestamp.micros + 1000 * timestamp.millis + 0x100000000 * 1000 * timestamp.millis_overflow
        return msg_dict

    def PutQueue(self):
        """從總線中讀取CAN消息及其時間戳,並放入隊列。"""
        while True:
            """檢查接收隊列是否有新消息"""
            readResult = self.GetStatus(PCAN_USBBUS1)
            if readResult != PCAN_ERROR_OK:
                result = self.GetErrorText(readResult)
                print(result[1])
                event2.set()
            else:
                readResult = self.Read(PCAN_USBBUS1)  # 接收總線報文
                time.sleep(0.01)
                if readResult[0] != PCAN_ERROR_QRCVEMPTY:
                    print("消息入隊")
                    q.put(self.ProcessMessage(readResult[1], readResult[2]))  # 消息入隊
                    event1.set()
                else:
                    result = self.GetErrorText(readResult[0])
                    print(result[1])
                    event2.set()
                    break

    def GetQueue(self):
        """從隊列中獲取信息"""
        while True:
            if event2.is_set():
                break
            if not q.empty():
                event1.wait()
                print(q.get())  # 消息出隊

    def GetXmtMsg(self):
        """獲取xmt文件中需要發送的消息"""
        with open(os.path.join(os.getcwd(), 'zuidazhi.xmt'), 'r+') as f:
            fo = f.readlines()
        allcandata = []
        for i, j in enumerate(fo, start=1):
            onecandata = {}
            if i > 14:
                f1 = j.strip().split()
                if len(f1) == 14:  # 獲取長度為8的data
                    onecandata['cycle'] = (int(f1[3]))
                    msg = TPCANMsg()
                    msg.ID = int(f1[1][:-1], 16)
                    msg.MSGTYPE = PCAN_MESSAGE_EXTENDED
                    msg.LEN = int(f1[4])
                    msg.DATA[0] = int(f1[6][:-1], 16)
                    msg.DATA[1] = int(f1[7][:-1], 16)
                    msg.DATA[2] = int(f1[8][:-1], 16)
                    msg.DATA[3] = int(f1[9][:-1], 16)
                    msg.DATA[4] = int(f1[10][:-1], 16)
                    msg.DATA[5] = int(f1[11][:-1], 16)
                    msg.DATA[6] = int(f1[12][:-1], 16)
                    msg.DATA[7] = int(f1[13][:-1], 16)
                    onecandata['msg'] = msg
                    allcandata.append(onecandata)
        return allcandata

    def CanWrite(self):
        """發送CAN消息"""
        allcandata = self.GetXmtMsg()
        print(allcandata)
        if len(allcandata) > 0:
            for i in allcandata:
                result = self.Write(PCAN_USBBUS1, i['msg'])
                currenttime = int(round(time.time() * 1000))
                i['currenttime'] = currenttime
                if result != PCAN_ERROR_OK:
                    result = self.GetErrorText(result)
                    print(result)
                else:
                    print("時間戳記錄成功")
        time.sleep(1)
        while True:
            currenttime = int(round(time.time() * 1000))
            for i in allcandata:
                interval = currenttime - i['currenttime']
                if interval >= i['cycle']:
                    result = self.Write(PCAN_USBBUS1, i['msg'])  #
                    if result != PCAN_ERROR_OK:
                        result = self.GetErrorText(result)
                        print(result)
                    else:
                        print("消息發送成功")
                        i['currenttime'] = currenttime

    def __del__(self):
        result = self.Uninitialize(PCAN_USBBUS1)  # 總線釋放
        if result != PCAN_ERROR_OK:
            result = self.GetErrorText(result)
            print(result[1])
        else:
            print("PCAN-USB 已釋放")


if __name__ == '__main__':
    q = Queue()
    event1 = threading.Event()
    event2 = threading.Event()
    pcan = PcanOperate()
    # s1 = threading.Thread(target=pcan.CanWrite, name="發送CAN消息線程")
    # s1.start()
    s2 = threading.Thread(target=pcan.PutQueue, name="讀取CAN消息並放入隊列線程")
    s2.start()
    s3 = threading.Thread(target=pcan.GetQueue, name="從隊列中讀取CAN消息線程")
    s3.start()

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM