Python寫ROS 訂閱與發布程序


 

1. 編寫talker代碼

vim ..../src/talker.py

#!/usr/bin/env python # license removed for brevity import rospy from std_msgs.msg import String def talker(): pub = rospy.Publisher('chatter', String, queue_size=10) #調用publisher函數 創建發布節點 ,定義數據類型, #queue_size參數是當其接受者接受不夠快造成信號堵塞中隊列信息的數量限制 rospy.init_node('talker', anonymous=True) #啟動節點同時為節點命名, 若anoymous為真則節點會自動補充名字,實際名字以talker_322345等表示 #若為假,則系統不會補充名字,采用用戶命名。但是一次只能有一個同名節點,若后面有一個相同listener #名字的節點則后面的節點啟動會注銷前面的相同節點名。 rate = rospy.Rate(10) # 10hz #延時的時間變量賦值,通過rate.sleep()實現延時 while not rospy.is_shutdown(): # 判定開始方式,循環發送,以服務程序跳出為終止點 一般ctrl+c也可 hello_str = "hello world %s" % rospy.get_time() # 數據變量的內容 rospy.get_time() 是指ros系統時間,精確到0.01s # 也可以使用 import time time.strftime('%Y%m%d%H%M%S') rospy.loginfo(hello_str) #在運行的terminal界面info 出信息,可不加,可隨意改 pub.publish(hello_str) #發布數據 必須發布 rate.sleep() #ros中的延時表示,也可用系統的延時 import time time.sleep(1) #ros系統中的延時應該比ubuntu自帶好好,盡量用ros的 if __name__ == '__main__': try: talker() except rospy.ROSInterruptException: pass 

2. 編寫listener代碼

vim ..../src/listener.py

#!/usr/bin/env python import rospy from std_msgs.msg import String def callback(data): rospy.loginfo(rospy.get_caller_id() + "I heard %s", data.data) #回調函數 收到的參數.data是通信的數據 默認通過這樣的 def callback(data) 取出data.data數據 def listener(): # In ROS, nodes are uniquely named. If two nodes with the same # node are launched, the previous one is kicked off. The # anonymous=True flag means that rospy will choose a unique # name for our 'listener' node so that multiple listeners can # run simultaneously. rospy.init_node('listener', anonymous=True) #啟動節點同時為節點命名, 若anoymous為真則節點會自動補充名字,實際名字以 listener_322345等表示 #若為假,則系統不會補充名字,采用用戶命名。但是一次只能有一個同名節點,若后面有一個相同listener #名字的節點則后面的節點啟動會注銷前面的相同節點名。 rospy.Subscriber("chatter", String, callback) #啟動訂閱,訂閱主題,及標准字符串格式,同時調用回調函數,當有數據時調用函數,取出數據 # spin() simply keeps python from exiting until this node is stopped #循環程序 rospy.spin() if __name__ == '__main__': listener() #函數不被用作模塊調用 

3. 添加權限

chmod +x ..../src/talker.py |chmod +x .../src/listener.py

4. 編譯

catkin_make

5. 運行talker

rosrun exam talker.py

6. 運行listener

rosrun exam listener.py

注釋:最簡單的訂閱與發布程序:

  • 訂閱

      #!/usr/bin/env python import rospy from std_msgs.msg import String rospy.init_node('talker',anonymous=0) pub=rospy.Publiser('chatter',String) rate=rospy.Rate(1) while not rospy.is_shutdown(): rospy.loginfo('telll myself') pub.publish('hello') rate.sleep() 
  • 發布
    #!/usr/bin/env python
    import rospy
    from std_msgs.msg import String
    rospy.init_node('listener',anonymous=0)
    lis=rospy.Subscriber('chatter',String,callback)
    rospy.spin()
    def callback(data):
    rospy.loginfo(data.data)




免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM