最近在項目中,涉及到對行為和狀態進行建模的需求,嘗試用有限狀態機(Finite-state machine, FSM)來實現。
1. 概念介紹
1.1 運行機制
基於對有限狀態機的粗淺理解,大體的運行機制為:
- 系統所處的狀態是明確並且有限的,必定屬於狀態全集中的某一種;
- 系統接受輸入,根據判定條件,決定是維持當前狀態,還是切換到某一個新的狀態;
- 在維持或切換的過程中,執行一些預設的操作。
可以認為有限狀態機是一個離散系統,每接受一次輸入,進行一次判斷和切換。
1.2 所含要素
一個有限狀態機包含如下幾個要素:
-
狀態:系統所處的狀態,在運行過程中又可以分為當前狀態和下一階段狀態;
-
事件:也可以理解為每一次運行的輸入;
-
條件:根據輸入事件執行的判定條件,條件是基於狀態的,當前所處的每一種狀態,都可以有自己對應的一套判定條件,來決定下一步進入哪一種狀態;
-
動作:確定切換路徑后,執行的附加操作。
以一個共3種狀態的FSM為例,共有3套判定條件,根據當前所處的狀態來確定使用哪一種判定條件,共有3*3=9種動作,決定每一種狀態切換過程中需要執行的動作。
1.3 分析方法
通常可以用一個表格來對所處理的FSM進行分析,防止情況的遺漏。

在表格中分析清楚每一種狀態切換的判定條件和執行動作,再用代碼實現,可以最大程度地減輕思考的難度,減少錯誤的概率。
2. 代碼實現
以OOP的方式,做了一個基礎的Python實現。
FSM基類:
class StateMachine:
def __init__(self, cfg, states, events_handler, actions_handler):
# config information for an instance
self.cfg = cfg
# define the states and the initial state
self.states = [s.lower() for s in states]
self.state = self.states[0]
# process the inputs according to current state
self.events = dict()
# actions according to current transfer
self.actions = {state: dict() for state in self.states}
# cached data for temporary use
self.records = dict()
# add events and actions
for i, state in enumerate(self.states):
self._add_event(state, events_handler[i])
for j, n_state in enumerate(self.states):
self._add_action(state, n_state, actions_handler[i][j])
def _add_event(self, state, handler):
self.events[state] = handler
def _add_action(self, cur_state, next_state, handler):
self.actions[cur_state][next_state] = handler
def run(self, inputs):
# decide the state-transfer according to the inputs
new_state, outputs = self.events[self.state](inputs, self.states, self.records, self.cfg)
# do the actions related with the transfer
self.actions[self.state][new_state](outputs, self.records, self.cfg)
# do the state transfer
self.state = new_state
return new_state
def reset(self):
self.state = self.states[0]
self.records = dict()
return
# handlers for events and actions, event_X and action_XX are all specific functions
events_handlers = [event_A, event_B]
actions_handlers = [[action_AA, action_AB],
[action_BA, action_BB]]
# define an instance of StateMachine
state_machine = StateMachine(cfg, states, events_handlers, actions_handlers)
如果對於狀態機有具體的要求,可以繼承這個基類進行派生。
比如,有對狀態機分層嵌套的需求。
class StateGeneral(StateMachine):
def __init__(self, cfg, states):
super(StateGeneral, self).__init__(cfg, states, events_handler, actions_handler)
self.sub_state_machines = dict()
def add_sub_fsm(self, name, fsm):
self.sub_state_machines[name] = fsm
def run(self, inputs):
new_state, outputs = self.events[self.state](inputs, self.states, self.records, self.cfg)
# operate the sub_state_machines in actions
self.actions[self.state][new_state](outputs, self.records, self.cfg, \
self.sub_state_machines)
self.state = new_state
return new_state
def reset(self):
self.state = self.states[0]
self.records = dict()
for _, sub_fsm in self.sub_state_machines.items():
sub_fsm.reset()
return
