Data types
Unlike Python, D-Bus is statically typed - each method has a certain signature representing the types of its arguments, and will not accept arguments of other types.
具體內容詳見官網教程,此處不再贅述。
Connecting to the Bus
import dbus session_bus = dbus.SessionBus() system_bus = dbus.SystemBus()
Making method calls (同步調用)
- The bus name.
This identifies which application you want to communicate with. - The object path.
To identify which one you want to interact with, you use an object path.
Proxy objects
proxy = bus.get_object('org.freedesktop.NetworkManager', '/org/freedesktop/NetworkManager/Devices/0')
Interfaces and methods
props = proxy.Introspect(dbus_interface='org.freedesktop.DBus.Introspectable') # 這里proxy作為代理對象使用,直接調用具體方法:Introspect(null) # props is a tuple of properties, the first of which is the object path
As a short cut, if you’re going to be calling many methods with the same interface, you can construct a dbus.Interface object and call methods on that, without needing to specify the interface again:
# 通過Interface對象間接調用 eth0_dev_iface = dbus.Interface(proxy, dbus_interface='org.freedesktop.DBus.Introspectable') props2 = eth0_dev_iface.Introspect() # props2 is the same as before
你可以直接 print(props) 打印獲得的反饋。但如果通過type()查詢props,你會得到 <class 'dbus.String'>,而不是標准的 python::str 。請參考文章開頭的Data Types 章節的陳述。
Making asynchronous calls(異步調用)
Setting up an event loop
To make asynchronous calls, you first need an event loop or “main loop”.
from dbus.mainloop.glib import DBusGMainLoop DBusGMainLoop(set_as_default=True) # You must do this before connecting to the bus. # 或者直接將DBus的 mainloop 附加在 pygi 主循環中 from gi.repository import GLib loop = GLib.MainLoop() loop.run() # 或者將主循環的調用通過參數傳入每個connection import dbus from dbus.mainloop.glib import DBusGMainLoop dbus_loop = DBusGMainLoop() bus = dbus.SessionBus(mainloop=dbus_loop)
PyQt v4.2 and later includes support for integrating dbus-python with the Qt event loop. To connect D-Bus to this main loop, call dbus.mainloop.qt.DBusQtMainLoop instead of dbus.mainloop.glib.DBusGMainLoop. Otherwise the Qt loop is used in exactly the same way as the GLib loop.
Asynchronous method calls
- the reply_handler will be called with the method's return values as arguments; or
- the error_handler will be called with one argument, an instance of DBusException representing a remote exception.
# To make an async call, use the reply_handler and error_handler kwargs remote_object.HelloWorld("Hello from example-async-client.py!", dbus_interface='com.example.SampleInterface', reply_handler=handle_hello_reply, error_handler=handle_hello_error) # Interface objects also support async calls iface = dbus.Interface(remote_object, 'com.example.SampleInterface') iface.RaiseException(reply_handler=handle_raise_reply, error_handler=handle_raise_error)
實現 reply_handler 和 error_handler 方法:
def handle_hello_reply(r_args): print str(r_args) pass def handle_hello_error(err_args): print "HelloWorld raised an exception! That's not meant to happen..." print "\t", str(err_args) if True: # ... loop.quit()
Receiving signals
Signal matching
- a callable (the handler_function) which will be called by the event loop when the signal is received
- the signal name, signal_name
- the D-Bus interface, dbus_interface
- a sender bus name (well-known or unique), bus_name
- a sender object path, path
add_signal_receiver( ) returns a SignalMatch object. Its only useful public API at the moment is a remove( ) method with no arguments, which removes the signal match from the connection.
def catchall_signal_handler(*args, **kwargs): print ("Caught signal (in catchall handler) " + kwargs['dbus_interface'] + "." + kwargs['member']) for arg in args: print " " + str(arg) def catchall_hello_signals_handler(hello_string): print "Received a hello signal and it says " + hello_string def catchall_testservice_interface_handler(hello_string, dbus_message): print "com.example.TestService interface says " + hello_string + " when it sent signal " + dbus_message.get_member() # catch the signal bus = dbus.SessionBus() bus.add_signal_receiver(catchall_signal_handler, interface_keyword='dbus_interface', member_keyword='member') bus.add_signal_receiver(catchall_hello_signals_handler, dbus_interface="com.example.TestService", signal_name="HelloSignal") bus.add_signal_receiver(catchall_testservice_interface_handler, dbus_interface="com.example.TestService", message_keyword='dbus_message')
Receiving signals from a proxy object
- the name of the signal
- a callable (the handler function) which will be called by the event loop when the signal is received
- the handler function
- the keyword argument dbus_interface qualifies the name with its interface
Getting more information from a signal
You can also arrange for more information to be passed to the handler function. If you pass the keyword arguments sender_keyword, destination_keyword, interface_keyword, member_keyword or path_keyword to the connect_to_signal( ) method, the appropriate part of the signal message will be passed to the handler function as a keyword argument: for instance if you use:
def handler(sender=None): print "got signal from %r" % sender iface.connect_to_signal("Hello", handler, sender_keyword='sender')
and a signal Hello with no arguments is received from "com.example.Foo", the handler( ) function will be called with sender='com.example.Foo'.
String argument matching
The handler will only be called if that argument (argN) of the signal (numbered from zero) is a D-Bus string (in particular, not an object-path or a signature) with that value.
def hello_signal_handler(hello_string): print ("Received signal (by connecting using remote object) and it says: " + hello_string) proxy = bus.get_object("com.example.TestService","/com/example/TestService/object") proxy.connect_to_signal("HelloSignal", hello_signal_handler, dbus_interface="com.example.TestService", arg0="Hello") # String argument matching
Exporting objects
Inheriting from dbus.service.Object
Object expects either a BusName or a Bus object, and an object-path, to be passed to its constructor: arrange for this information to be available. For example:
class Example(dbus.service.Object): def __init__(self, object_path): dbus.service.Object.__init__(self, dbus.SessionBus(), path)
This object will automatically support introspection, but won’t do anything particularly interesting. To fix that, you’ll need to export some methods and signals too.
Exporting methods with dbus.service.method
To export a method, use the decorator dbus.service.method. For example:
class Example(dbus.service.Object): def __init__(self, object_path): dbus.service.Object.__init__(self, dbus.SessionBus(), path) @dbus.service.method(dbus_interface='com.example.Sample', in_signature='v', out_signature='s') def StringifyVariant(self, variant): return str(variant)
The in_signature and out_signature are D-Bus signature strings as described in Data Types.
Finding out the caller’s bus name
The method decorator accepts a sender_keyword keyword argument. If you set that to a string, the unique bus name of the sender will be passed to the decorated method as a keyword argument of that name:
class Example(dbus.service.Object): def __init__(self, object_path): dbus.service.Object.__init__(self, dbus.SessionBus(), path) @dbus.service.method(dbus_interface='com.example.Sample', in_signature='', out_signature='s', sender_keyword='sender') def SayHello(self, sender=None): return 'Hello, %s!' % sender # -> something like 'Hello, :1.1!'
Asynchronous method implementations
class SomeObject(dbus.service.Object): @dbus.service.method("com.example.SampleInterface", in_signature='s', out_signature='as') def HelloWorld(self, hello_message): print (str(hello_message)) return ["Hello", " from example-service.py", "with unique name", session_bus.get_unique_name()]
Emitting signals with dbus.service.signal
To export a signal, use the decorator dbus.service.signal
; to emit that signal, call the decorated method. The decorated method can also contain code which will be run when called, as usual. For example:
class Example(dbus.service.Object): def __init__(self, object_path): dbus.service.Object.__init__(self, dbus.SessionBus(), path) @dbus.service.signal(dbus_interface='com.example.Sample', signature='us') def NumberOfBottlesChanged(self, number, contents): print "%d bottles of %s on the wall" % (number, contents) e = Example('/bottle-counter') e.NumberOfBottlesChanged(100, 'beer') # -> emits com.example.Sample.NumberOfBottlesChanged(100, 'beer') # and prints "100 bottles of beer on the wall"
The signal will be queued for sending when the decorated method returns - you can prevent the signal from being sent by raising an exception from the decorated method (for instance, if the parameters are inappropriate). The signal will only actually be sent when the event loop next runs.
思考
- 通過Proxy調用服務難道不是一個標准模式嗎?為什么官方教程在 Receiving signals from a proxy object 一節並不推薦“僅僅為了監聽信號而創建代理”?多余的開銷是什么?
答:官方解釋:當創建一個對象代理時,如果代理對象中存在服務方法,那么該方法在代理創建時也將被一並激活。那么就只剩下 bus.add_signal_receiver( ) 方法可以使用了——這恰恰說明,該方法的實現並沒有創建一個代理,而僅僅是在監聽bus的服務而已~