之前利用Python進行Modbus_TCP進行數據接收,本次需要利用串口進行數據傳輸,學習Modbus_RTU的簡單實現
首先要在創建兩個虛擬串口,利用VSPD工具即可。在一台電腦上實現數據的發送和接收
進入Python IDE進行slave端的編寫
import serial import modbus_tk import modbus_tk.defines as cst from modbus_tk import modbus_rtu import time
def ModbusRTU_Slave(): try: # 設定串口為從站 # 外置參數包括端口 port = "COM3" 波特率:9600 server= modbus_rtu.RtuServer(serial.Serial(port="com3",baudrate=9600, bytesize=8, parity='N', stopbits=1)) server.start() print("runing...") SLAVE1 = server.add_slave(1) SLAVE1.add_block('A', cst.HOLDING_REGISTERS, 0, 4) # 地址0,長度4 increase_num = 17000 for i in range(300): SLAVE1.set_values('A', 0, [17000, i + 1, i + 2]) # 改變在地址0處的寄存器的值 time.sleep(0.1) for i in range(1500): increase_num = increase_num - 1 SLAVE1.set_values('A', 0, [increase_num, i + 1, i + 2]) # 改變在地址0處的寄存器的值 time.sleep(0.1) for i in range(800): SLAVE1.set_values('A', 0, [10000, i + 1, i + 2]) # 改變在地址0處的寄存器的值 time.sleep(0.1) for i in range(800): SLAVE1.set_values('A', 0, [17000, i + 1, i + 2]) # 改變在地址0處的寄存器的值 time.sleep(0.1) except Exception as exc: print(str(exc))
再進行master端的編寫
# 數據接收端 def ModbusRTU_Master(): try: # 設定串口為從站 # 外置參數包括端口 port = "COM3" 波特率:9600 master = modbus_rtu.RtuMaster(serial.Serial(port="com4",baudrate=9600, bytesize=8, parity='N', stopbits=1)) master.set_timeout(1.0) master.set_verbose(True) # 讀保持寄存器 read = master.execute(1, cst.HOLDING_REGISTERS, 0, 4) # 這里可以修改需要讀取的功能碼 print(red) except Exception as exc: print(str(exc))
補充功能碼
功能代碼cst 1~255中1~21是常用的,以下是讀寫專用的功能碼 READ_COILS = 01 讀線圈,位操作 READ_DISCRETE_INPUTS = 02 讀離散輸入狀態,位操作 READ_HOLDING_REGISTERS = 03 讀保持寄存器,字操作 READ_INPUT_REGISTERS = 04 讀輸入寄存器,字操作 WRITE_SINGLE_COIL = 05 寫單線圈,位操作 WRITE_SINGLE_REGISTER = 06 寫單一寄存器,字操作 WRITE_MULTIPLE_COILS = 15 寫多個線圈【強制多點線圈】,位操作 WRITE_MULTIPLE_REGISTERS = 16 寫多寄存器【寫乘法寄存器】,字操作
演示Demo 創建一個項目 1. 通過Modbus_RTU讀取 數據 在通過Modbus_RTU將數據發出
首先創建四個虛擬串口
利用Modbus 工具 模擬數據發送 和 接收 【注意】:modbus poll 設置中的數據長度一定要和程序中推送的長度一致 ,在這個問題上浪費了好長時間 哎
Python程序,其中包含了tcp 和 rtu的內容 可以通過JSON配置文件實現協議切換

import time from datetime import datetime import modbus_tk import modbus_tk.defines as cst import modbus_tk.modbus_tcp as modbus_tcp from modbus_tk import modbus_rtu import serial import json # JSON 文件初始化 def InitJSON(): global filejson try: with open('20191219_1443.json', 'r') as f: filejson = json.load(f) except FileNotFoundError: print('無法打開指定的文件!') except LookupError: print('指定了未知的編碼!') except UnicodeDecodeError: print('讀取文件時解碼錯誤!') # modbus 相關參數初始化 def ModbusInit(): # 首先判斷數據傳輸協議 Modbus_TCP 還是 RTU if filejson['ModbusInit']['Modbus_Mode'] == "Modbus_TCP": print("Modbus_Mode = Modbus_TCP") Modbus_master_IP = filejson['ModbusInit']['Modbus_master_IP'] Modbus_master_port = filejson['ModbusInit']['Modbus_master_port'] MASTER = modbus_tcp.TcpMaster(host=Modbus_master_IP, port=Modbus_master_port) MASTER.set_timeout(5.0) Modbus_slave_IP = filejson['ModbusInit']['Modbus_slave_IP'] Modbus_slave_port = filejson['ModbusInit']['Modbus_slave_port'] SLAVE = modbus_tcp.TcpServer(address=Modbus_slave_IP, port=Modbus_slave_port) # SLAVE.set_timeout(5.0) else: print("Modbus_Mode = Modbus_RTU") Modbus_master_PORT = filejson['ModbusInit']['Modbus_master_PORT'] Modbus_master_baudrate = filejson['ModbusInit']['Modbus_master_baudrate'] print(Modbus_master_PORT) MASTER = modbus_rtu.RtuMaster(serial.Serial(port=Modbus_master_PORT, baudrate=Modbus_master_baudrate)) MASTER.set_timeout(5.0) Modbus_slave_PORT = filejson['ModbusInit']['Modbus_slave_PORT'] print(Modbus_slave_PORT) Modbus_slave_baudrate = filejson['ModbusInit']['Modbus_slave_baudrate'] SLAVE = modbus_rtu.RtuServer(serial.Serial(port=Modbus_slave_PORT, baudrate=Modbus_slave_baudrate, bytesize=8, parity='N', stopbits=1)) # SLAVE.set_timeout(5.0) SLAVE.start() print("runing...") SLAVE1 = SLAVE.add_slave(1) SLAVE1.add_block('warning', cst.HOLDING_REGISTERS, 0, 4) # 地址0,長度1 return MASTER,SLAVE1 def main(): # 初始化JSON 文件 InitJSON() # modbus 初始化 MASTER, SLAVE1 = ModbusInit() a = filejson['DistanceParam']['EquationParamA'] b = filejson['DistanceParam']['EquationParamB'] print("方程參數a = {},b= {}\n".format(a, b)) while(1): # 測試 用來獲取數據 num = MASTER.execute(1, cst.READ_HOLDING_REGISTERS, 0, 1)[0] # 獲取數據 # num = MASTER.execute(1, cst.READ_INPUT_REGISTERS, 0, 1)[0] print("GetV={}".format(num)) SLAVE1.set_values('warning', 0, num) # 改變在地址0處的寄存器的值 time.sleep(filejson['WhileTime']) if __name__ == '__main__': main()