背景:
在前面兩個博文中已經提及到,我們打算做一個UWB 結合運動傳感器 融合定位,這篇博文實現上位機代碼,上位機使用我們之前開源Python版本TWR上位機,代碼可以在末尾論壇鏈接下載
我們的上位機實現基礎是之前的《[開源項目] 藍點無限 UWB Python版本上位機》,參考鏈接
https://www.cnblogs.com/tuzhuke/p/15170193.html
再此基礎上將《[藍點無限] UWB 定位數據融合 之 固件實現》
https://www.cnblogs.com/tuzhuke/p/15212574.html 實現完整UWB數據融合項目。
代碼全部已經開源,歡迎大家使用並幫忙改進。
直接上代碼,代碼主要是在《[開源項目] 藍點無限 UWB Python版本上位機》基礎上修改,這里列出代碼更改部分。
1 解析數據包中的運動變量,並存放到字典中
result_dict = {'tag': 0x1005, 'acc':0, 'seq': 7, 'time': 1234, 'anthor_count': 4,'anthor': []} # 數據包以&&& 開頭 res = re.findall(r'&&&', string) flag = 1 if len(res) > 0: # step1 print message length,ex 76 temp_string = string.split("$")[0] # &&&:80$ data_len = int(temp_string.split(":")[1], 16) # tag info temp_string = string.split("$")[1] # 000A:20 tag_id = int(temp_string.split(":")[0], 16) # 000A tag_acc = int(temp_string.split(":")[1], 16) tag_seq = int(temp_string.split(":")[2], 16) # 20 # print("標簽ID: %02X Seq: %X" % (tag_id, tag_seq)) result_dict['tag'] = tag_id result_dict['acc'] = tag_acc result_dict['seq'] = tag_seq
2 在返回結果中,將運動信息一並返回給上層
def twr_main(input_string): print(input_string) error_flag, result_dic = Process_String_Before_Udp(input_string) if error_flag == 0: [location_result, location_seq, location_addr, location_x, location_y] = Compute_Location(result_dic) return location_result, location_seq, location_addr, location_x, location_y return 0, 0, 0, 0, 0
3 頂層收到定位結果和運動信息,打印結果,並發送給處理函數
[location_result, location_seq, location_addr, location_x, location_y] = twr_main(msg) if location_result == 1: self.data_result.emit( '%d %d %0.2f %0.2f' % (location_seq, location_addr, location_x, location_y)) # # bphero_dispose(str(data))
4 UWB和運動信息進行簡單融合,當模塊靜止,不更新坐標信息
def insert_result(self, input_str): strlist = input_str.split(' ') location_addr = int(strlist[1]) location_x = float(strlist[2]) location_y = float(strlist[3]) tag_acc = int(strlist[4]) print("acc = %d"%tag_acc) print("insert result") if tag_acc == 0:#只有模塊移動的時候更新坐標 self.Insert_Tag_Result(location_addr, {"x": location_x, "y": location_y, "z": 0, "qt": QGraphicsEllipseItem(-10, -10, 10, 10)})
其他代碼,上位機增加了串口接收功能
class ComThread(QtCore.QThread): data_result = QtCore.pyqtSignal(object) data_draf = QtCore.pyqtSignal(object) def __init__(self): super(ComThread, self).__init__() self.l_serial = None self.alive = False self.waitEnd = None self.ID = None self.data = None self.port = None def set_port(self,port): self.port = port print(self.port) def waiting(self): if not self.waitEnd is None: self.waitEnd.wait() def SetStopEvent(self): if not self.waitEnd is None: self.waitEnd.set() self.alive = False self.stop() def start(self): self.l_serial = serial.Serial() self.l_serial.port = self.port self.l_serial.baudrate = 115200 self.l_serial.timeout = 2 self.l_serial.open() if self.l_serial.isOpen(): self.waitEnd = threading.Event() self.alive = True self.thread_read = None self.thread_read = threading.Thread(target=self.FirstReader) self.thread_read.setDaemon(1) self.thread_read.start() return True else: return False def SendDate(self, i_msg, send): lmsg = '' isOK = False if isinstance(i_msg): lmsg = i_msg.encode('gb18030') else: lmsg = i_msg try: # 發送數據到相應的處理組件 self.l_serial.write(send) except Exception as ex: pass; return isOK def FirstReader(self): while self.alive: data = '' data = data.encode('utf-8') n = self.l_serial.inWaiting() if n: data = self.l_serial.readline() print(data) msg =str(data, encoding="utf-8") self.data_draf.emit(msg) # for debug only [location_result, location_seq, location_addr, location_x, location_y, tag_acc] = twr_main(msg) print(tag_acc) if location_result == 1: self.data_result.emit( '%d %d %0.2f %0.2f %d' % (location_seq, location_addr, location_x, location_y, tag_acc)) # # bphero_dispose(str(data)) self.waitEnd.set() self.alive = False def stop(self): self.alive = False self.thread_read.join() if self.l_serial.isOpen(): self.l_serial.close()
以上完成了整個近期打算開源的工程項目,源碼請到www.51uwb.cn 下載