Python實現CAN總線J1939報文接收、發送


一、環境搭建

1.概述

本文主要是通過Python3實現CAN總線上J1939報文接收、發送等功能,通過模擬單幀、多幀實現周期性發送報文等模擬場景。

 

2.CAN工具

本案例采用的是PCAN-USB工具

PCAN-USB驅動:https://www.peak-system.com/fileadmin/media/files/pcan-basic.zip

 

3.Python安裝

下載地址:https://www.python.org/ftp/python/3.7.9/python-3.7.9-amd64.exe

 庫:pip install can-j1939

 

二、項目演示

1.J1939報文接收

 

2.J1939報文發送

 

 用另一台CAN工具查看報文,接收正常:

 

 

三、完整代碼

#!/usr/bin/python
# _*_ coding:utf-8 _*_

import time, j1939, threading


def on_message(priority, pgn, sa, timestamp, data):
    """接收來自總線的消息"""
    print(priority, pgn, sa, timestamp, [data[i] for i in range(len(data))])  # 打印消息


def recv_j1939(read_time):
    """訂閱總線消息"""
    ecu.subscribe(on_message)
    time.sleep(read_time)
    ecu.unsubscribe(on_message)  # 停止訂閱消息


def cycle_send_j1939():
    """發送j1939報文"""
    j1939_loog_data = [{'cycle': 1000, "pgn": 65251, "dp": 0, "pf": 254, "ps": 227, "p": 6, "sa": 0,
                        "data": [0, 25, 132, 112, 63, 137, 64, 31, 133, 128, 37, 133, 32, 53, 136, 155, 66, 255, 255,
                                 164, 3, 0, 0,60, 70, 213, 125, 133, 155, 66, 134, 0, 255, 255, 255, 255, 255, 255, 255,255]}]  # j1939長幀報文
    j1939_data = [{'cycle': 250, "can_id": 0x18FEDF00, 'pgn': 65247, "data": [130, 128, 37, 125, 251, 255, 255, 240]},
                  {'cycle': 50, "can_id": 0x18F00400, "pgn": 61444, "data": [14, 189, 130, 0, 0, 0, 0, 125]},
                  {'cycle': 100, "can_id": 0x18FE9200, "pgn": 65170,"data": [255, 255, 254, 255, 255, 255, 0, 0]}]  # j1939短幀報文
    for i in j1939_loog_data:
        i['currenttime'] = int(round(time.time() * 1000))
    for i in j1939_data:
        i['currenttime'] = int(round(time.time() * 1000))
    while True:
        currenttime = int(round(time.time() * 1000))
        for i in j1939_loog_data:
            interval = currenttime - i['currenttime']
            if interval >= i['cycle']:
                ecu.send_pgn(data_page=i["dp"], pdu_format=i["pf"], pdu_specific=i["ps"], priority=i["p"],
                             src_address=i["sa"], data=i["data"])  # 發送長幀報文
                print('PGN%d:長幀報文發送成功' % i['pgn'])
                i['currenttime'] = currenttime  # 更新發送時間
        for i in j1939_data:
            interval = currenttime - i['currenttime']
            if interval >= i['cycle']:
                ecu.send_message(can_id=i["can_id"], data=i["data"])  # 發送短幀報文
                print('PGN%d:短幀報文發送成功' % i['pgn'])
                i['currenttime'] = currenttime  # 更新發送時間


if __name__ == '__main__':
    print("初始化總線")
    ecu = j1939.ElectronicControlUnit()  # 創建ECU
    ecu.connect(bustype='pcan', channel='PCAN_USBBUS1', bitrate=500000)  # 連接CAN總線
    # s1 = threading.Thread(target=recv_j1939, name="接收J1939消息線程",args=(10,))
    # s1.start()
    s2 = threading.Thread(target=cycle_send_j1939, name="發送J1939消息線程")
    s2.start()
    # print('取消初始化')
    # ecu.disconnect()   #斷開總線連接

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM