Tkinter 吐槽之一:多線程與 UI 交互


背景

最近想簡單粗暴的用 Python 寫一個 GUI 的小程序。因為 Tkinter 是 Python 自帶的 GUI 解決方案,為了部署方便,就直接選擇了 Tkinter。
本來覺得 GUI 發展這么多年以來,就算功能簡陋,但也應該大差不差才對,何況我的需求本就十分簡單。但事實上 Tkinter 的簡陋仍然超出了我的想象。因此想寫幾篇文章,記錄一下踩到的坑,權當吐槽。

問題

眾所周知,任何 GUI 程序的主線程中,都是不能執行一些比較耗時的操作的,因為這樣的操作會阻塞主線程,造成 GUI 卡頓無響應。因此需要將耗時操作放在其他線程中執行。而在使用多線程機制時,需要能夠確保:

  1. 對數據的修改是需要線程安全的
  2. 鎖或者數據等待不會阻塞主 UI 線程
  3. 只在主 GUI 線程中修改 GUI

然而 Tkinter 默認是沒有提供這樣的機制的。它並不能像很多其他框架那樣,將線程的 callback 變成主線程的事件,進而在主線程中執行。這就導致了,主線程的 UI 邏輯很難與耗時操作的代碼做到良好的解耦。

方案

參考其他 GUI 框架的解決方案,從編碼簡單的角度來說,多線程與 UI 線程最直觀的使用方法,應該是使用 callback 的形式,類似這樣的:

def run_in_thread():
  time.sleep(1)
  return 1

def on_data_received(num):
  # change ui
  pass

ttk.Button(frame, command=lambda: thread.submit(run_in_thread, callback=on_data_received))

如果忽略 thread 的話,這樣的寫法和傳統的同步代碼寫法並沒有什么區別,類似於 button.on_click = lambda: do_something()

其實,Python 中 concurrent.futures 的 Future 就有類似的方法:

def run_in_thread(n):
    time.sleep(n)
    return n * 2

def on_data_received(future):
    print(future.result())

thread_pool = ThreadPoolExecutor()
future = thread_pool.submit(run_in_thread, 3)
future.add_done_callback(on_data_received)

雖然 Future 是可以 add_callback 的,但這個回調是在子線程中執行的,並非 UI 主線程。這就會打破 Tkinter 的約束:只有主 GUI 線程可以修改 GUI。

其實,參考很多其他的 GUI 的設計來看,解決多線程與 UI 線程交互的問題,歸根結底其實只有一種方案:主線程輪詢是否有回調事件或者數據改變。不論是 Pub/Sub 的方案,還是 Event 的方案,本質上都是這樣一種 loop。因此,我們可以在主 UI 線程中檢查其他線程是否完成,進而主動觸發回調。

Tkinter 中的 root.after() 正是做這種輪詢的最佳方式,類似於:

def check_event(callback: Callable):
    if future.done():
        callback(future.result())
    else:
        root.after(100, check_event, callback)

在這段代碼里,使用 root.after 通過遞歸的方式每 100ms 檢查一次 future 是否完成,如果執行完成,則直接調用 callback 函數。因為 root.after 一定是在主 UI 線程中執行的,因此 callback 也是在主 UI 線程中執行。

但是每次這么寫會比較麻煩,可以參考 js 中 promise 的方法做一些封裝,就可以得到一些相對比較通用的工具類:

"""
The module for asynchronous running tasks in Tk GUI.
"""
import tkinter as tk
from concurrent import futures
from typing import Callable, Generic, List, TypeVar


_EVENT_PERIOD_MS = 100
_THREAD_POOL = futures.ThreadPoolExecutor(5, 'pool')

T = TypeVar('T')


class _Promise(Generic[T]):
    def __init__(self, future: futures.Future[T]) -> None:
        self._future = future
        self._on_success = None
        self._on_failure = None

    def then(self, on_success: Callable[[T], None]):
        """ Do something when task is finished. """
        self._on_success = on_success
        return self

    def catch(self, on_failure: Callable[[BaseException], None]):
        """ Do something when task is failed. """
        self._on_failure = on_failure
        return self


class AsyncEvent:
    """
    Used for asynchronous tasks in Tk GUI. It takes use of tk.after to check the
    event and do the callback in the GUI thread, so we can use it just like
    traditional "callback" way.

    The class is singleton, so it's shared in the process.

    """
    def __init__(self, master: tk.Misc) -> None:
        """ Initialize the singleton with Tk.
        Args:
            master: Same in Tk.
        """
        self._master: tk.Misc = master
        self._promise_list: List[_Promise] = []

    def submit(self, task: Callable[..., T], /, *args) -> _Promise[T]:
        """
        Adds an asynchronous task, and return a `Promise` for this task.
        We can add callback by the `Promise`.

        Args:
            task: A function which will be called asynchronously in a thread-pool.
            *args: The arguments for the function.
        Return: Promise object then you can add callback to it.
        """
        if not getattr(self, '_master', None):
            raise RuntimeError('Not initialized. Please call init() at first.')

        future = _THREAD_POOL.submit(task, *args)
        promise: _Promise[T] = _Promise(future)
        self._promise_list.append(promise)

        # If the len of event list is 1, means that it's not running.
        if len(self._promise_list) == 1:
            self._master.after(_EVENT_PERIOD_MS, self._handle_event)

        return promise

    def _handle_event(self):
        """ Works as event loop to do the callback. """
        for promise in self._promise_list:
            future = promise._future
            on_success = promise._on_success
            on_failure = promise._on_failure

            if future.done():
                if future.exception():
                    if on_failure:
                        on_failure(future.exception() or BaseException())
                    else:
                        # add log for the exception.
                elif on_success:
                    on_success(future.result())
                self._promise_list.remove(promise)

        # Try to handle events in next cycle.
        if len(self._promise_list) > 0:
            self._master.after(_EVENT_PERIOD_MS, self._handle_event)

這樣的話,使用起來就會非常簡單:

def run_in_thread(n):
    time.sleep(n)
    return n * 2

def print_result(n2):
    print(n2)

async_executor = AsyncEvent(root)
async_executor.submit(run_in_thread, 3).then(print_result)

這樣,整個 coding 比較易讀,也滿足異步執行的所有要求。

總結

其實類似於 future 或者 promise 的概念在很多語言中都是支持的。在 Tkinter 中,主要的區別在於需要將異步線程的回調運行在主線程當中。因此,可以仿照其他語言的 future 或者 promise 的語法方式來進行封裝,從而達到易用且符合要求的目的。

語法糖雖然不是必須的,但好看的代碼,終歸是賞心悅目的。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM