MySQL插件實現淺析——插件的調用


一、 MySQL中的動態插件

最初想到這個問題是在學習mysql半同步復制相關問題的時候,為何在mysql運行時install半同步插件並開啟后就能起到作用,他是如何讓事務停下來等待的。安裝插件的時候加載的是一個.so動態庫,這個庫里是插件的實現。那么MySQL源碼中應該需要對應的框架去以調用這些插件,這個框架是如何運作的呢?

二、從源碼中尋找答案

首先,我們需要要知道插件從何處調用的。以半同步插件為例:眾所周知,開啟半同步插件后通過設置rpl_semi_sync_master_wait_point 的值可以決定mysql主庫在何處等待從庫的ack,為after_sync則在binlog刷新到磁盤后,為after_commit則是在事務提交以后。那么在mysql事務刷新到binlog之后和事務提交之后應該需要各有一個地方調用半同步插件的接口進行等待。

首先,進入mysql的事務提交函數,當開啟binlog的時候mysql由MYSQL_BIN_LOG::ordered_commit進行提交操作。為了保證不丟數據,我們通常使用after_sync同步點,如果要尋找在這個同步點半同步插件的調用,那么就需要在這個函數中刷新binlog之后與事務正式提交之前尋找。事實上在刷新binlog后,可以找到這樣一個函數調用:

int MYSQL_BIN_LOG::ordered_commit(THD *thd, bool all, bool skip_commit)
{
  ···
  if (flush_error == 0 && sync_error == 0)
      sync_error= call_after_sync_hook(commit_queue);
  ···
}

似乎看其函數名字就和半同步有關,那么進入這個函數中:

static inline int call_after_sync_hook(THD *queue_head)
{
  const char *log_file= NULL;
  my_off_t pos= 0;

  if (NO_HOOK(binlog_storage))
    return 0;

  DBUG_ASSERT(queue_head != NULL);
  for (THD *thd= queue_head; thd != NULL; thd= thd->next_to_commit)
    if (likely(thd->commit_error == THD::CE_NONE))
      thd->get_trans_fixed_pos(&log_file, &pos);

  if (DBUG_EVALUATE_IF("simulate_after_sync_hook_error", 1, 0) ||
      RUN_HOOK(binlog_storage, after_sync, (queue_head, log_file, pos)))
  {
    sql_print_error("Failed to run 'after_sync' hooks");
    return ER_ERROR_ON_WRITE;
  }
  return 0;
}

這個函數接受一隊待提交的線程的隊列作為參數,讓這些線程等待從庫的ack,那么具體是那個調用呢?就是這個RUN_HOOK:

RUN_HOOK(binlog_storage, after_sync, (queue_head, log_file, pos))

查看RUN_HOOK的源碼發現這是一個宏:

#define RUN_HOOK(group, hook, args)             \
  (group ##_delegate->is_empty() ?              \
   0 : group ##_delegate->hook args)

將上面的調用按照這個宏的定義進行文本替換后得到實際上的調用為:

binlog_storage_delegate->is_empty() ? 0 : binlog_storage_delegate->after_sync(queue_head, log_file, pos)

也就是說實際上是調用了binlog_storage_delegate指向的對象的函數來實現半同步的功能,如果is_empty()的返回值為真則不做任何操作,否則調用after_sync函數來進行等待。查找binlog_storage_delegate這個指針的定義,可以在rpl_handler.cc中找到它,這是一個全局指針變量,類型為Binlog_storage_delegate *:

//rpl_handler.cc
Binlog_storage_delegate *binlog_storage_delegate;

進一步查找這個變量在何處初始化可以找到delegates_init函數:

int delegates_init()
{
  ······
  transaction_delegate= new (place_trans_mem) Trans_delegate;

  if (!transaction_delegate->is_inited())
  {
    sql_print_error("Initialization of transaction delegates failed. "
                    "Please report a bug.");
    return 1;
  }

  binlog_storage_delegate= new (place_storage_mem) Binlog_storage_delegate;

  if (!binlog_storage_delegate->is_inited())
  {
    sql_print_error("Initialization binlog storage delegates failed. "
                    "Please report a bug.");
    return 1;
  }

  server_state_delegate= new (place_state_mem) Server_state_delegate;
  ······
}

這個函數專門初始化各種xxx_delegate類型的指針,為他們分配對象,實際上繼續追蹤調用棧可以發現如下函數調用

mysqld_main()
    |
    |
init_server_components()
    |
    |
delegates_init()

所以可以看出,實際上在mysql啟動的時候這個這些指針就已經得到了初始化。接下來,依然以半同步插件的同步等待點為例子來看看到底是如何實現調用的。

三、 插件中的觀察者模式

依然以上面等待從庫ack的調用為例,已經知道了實際上是以Binlog_storage_delegate類來實現的,那么接下來看Binlog_storage_delegate的代碼:

class Binlog_storage_delegate
  :public Delegate {
public:

  Binlog_storage_delegate()
  : Delegate(
#ifdef HAVE_PSI_INTERFACE
             key_rwlock_Binlog_storage_delegate_lock
#endif
             )
  {}

  typedef Binlog_storage_observer Observer;
  int after_flush(THD *thd, const char *log_file,
                  my_off_t log_pos);
  int after_sync(THD *thd, const char *log_file,
                 my_off_t log_pos);
};

這個類中有一個after_sync函數,也就是說我們上面的RUN_HOOK實際上調用的是這個函數。Binlog_storage_delegate繼承了一個基類Delegate:

class Delegate {
public:
  typedef List<Observer_info> Observer_info_list;
  typedef List_iterator<Observer_info> Observer_info_iterator;

  int add_observer(void *observer, st_plugin_int *plugin)
  {
    ······
  }

  int remove_observer(void *observer, st_plugin_int *plugin)
  {
    ···
  }

  inline Observer_info_iterator observer_info_iter()
  {
    return Observer_info_iterator(observer_info_list);
  }

  inline bool is_empty()
  {
    DBUG_PRINT("debug", ("is_empty: %d", observer_info_list.is_empty()));
    return observer_info_list.is_empty();
  }

  inline int read_lock()
  {
    if (!inited)
      return TRUE;
    return mysql_rwlock_rdlock(&lock);
  }

  inline int write_lock()
  {
    if (!inited)
      return TRUE;
    return mysql_rwlock_wrlock(&lock);
  }

  inline int unlock()
  {
    if (!inited)
      return TRUE;
    return mysql_rwlock_unlock(&lock);
  }

  inline bool is_inited()
  {
    return inited;
  }

  Delegate(
#ifdef HAVE_PSI_INTERFACE
           PSI_rwlock_key key
#endif
           )
  {
    inited= FALSE;
#ifdef HAVE_PSI_INTERFACE
    if (mysql_rwlock_init(key, &lock))
      return;
#else
    if (mysql_rwlock_init(0, &lock))
      return;
#endif
    init_sql_alloc(key_memory_delegate, &memroot, 1024, 0);
    inited= TRUE;
  }
  ~Delegate()
  {
    inited= FALSE;
    mysql_rwlock_destroy(&lock);
    free_root(&memroot, MYF(0));
  }

private:
  Observer_info_list observer_info_list;
  mysql_rwlock_t lock;
  MEM_ROOT memroot;
  bool inited;
};

上面略去了一些函數的細節,先來看里面的主要成員。observer_info_list是一個鏈表,里面存儲了Observer_info類型的成員,同時還有一把讀寫鎖lock進行保護。其他成員函數為對這個鏈表的操作函數(例如add_observer為添加成員)以及加鎖解鎖的函數。注意到其中有個is_empty函數,這個函數用來判斷鏈表是否為空。在RUN_HOOK中調用的is_empty就是這個函數。

接下來繼續回到子類Binlog_storage_delegate中,我們已經知道after_sync的等待實際上就是靠Binlog_storage_delegate的after_sync函數實現的。接下來進入這個函數的源代碼,來看這個函數如何實現的

int Binlog_storage_delegate::after_sync(THD *thd,
                                        const char *log_file,
                                        my_off_t log_pos)
{
  DBUG_ENTER("Binlog_storage_delegate::after_sync");
  DBUG_PRINT("enter", ("log_file: %s, log_pos: %llu",
                       log_file, (ulonglong) log_pos));
  Binlog_storage_param param;
  param.server_id= thd->server_id;

  DBUG_ASSERT(log_pos != 0);
  int ret= 0;
  FOREACH_OBSERVER(ret, after_sync, thd, (&param, log_file, log_pos));

  DEBUG_SYNC(thd, "after_call_after_sync_observer");
  DBUG_RETURN(ret);
}

除去各種調試信息外,這個函數實際上只是調用了FOREACH_OBSERVER這個宏:

FOREACH_OBSERVER(ret, after_sync, thd, (&param, log_file, log_pos));

繼續查看這個宏的定義:

#define FOREACH_OBSERVER(r, f, thd, args)                               \
  /*
     Use a struct to make sure that they are allocated adjacent, check
     delete_dynamic().
  */                                                                    \
  Prealloced_array<plugin_ref, 8> plugins(PSI_NOT_INSTRUMENTED);        \
  read_lock();                                                          \
  Observer_info_iterator iter= observer_info_iter();                    \
  Observer_info *info= iter++;                                          \
  for (; info; info= iter++)                                            \
  {                                                                     \
    plugin_ref plugin=                                                  \
      my_plugin_lock(0, &info->plugin);                                 \
    if (!plugin)                                                        \
    {                                                                   \
      /* plugin is not intialized or deleted, this is not an error */   \
      r= 0;                                                             \
      break;                                                            \
    }                                                                   \
    plugins.push_back(plugin);                                          \
    if (((Observer *)info->observer)->f                                 \
        && ((Observer *)info->observer)->f args)                        \
    {                                                                   \
      r= 1;                                                             \
      sql_print_error("Run function '" #f "' in plugin '%s' failed",    \
                      info->plugin_int->name.str);                      \
      break;                                                            \
    }                                                                   \
  }                                                                     \
  unlock();                                                             \
  /*
     Unlock plugins should be done after we released the Delegate lock
     to avoid possible deadlock when this is the last user of the
     plugin, and when we unlock the plugin, it will try to
     deinitialize the plugin, which will try to lock the Delegate in
     order to remove the observers.
  */                                                                    \
  if (!plugins.empty())                                                 \
    plugin_unlock_list(0, &plugins[0], plugins.size());

這個宏比較長,我們先忽略其他對於插件管理的代碼,注意力放在遍歷這個鏈表的for循環中。我們將這個循環進行簡化:

Observer_info_iterator iter= observer_info_iter();                    \
  Observer_info *info= iter++;                                          \
  for (; info; info= iter++)                                            \
  {                                                 
    ······                                                                  \
    if (((Observer *)info->observer)->f                                 \
        && ((Observer *)info->observer)->f args)                        \
    {                                                                   \
      r= 1;                                                             \
      sql_print_error("Run function '" #f "' in plugin '%s' failed",    \
                      info->plugin_int->name.str);                      \
      break;                                                            \
     }                                                                   \
   }                                                                     \
   ······

從中可以看出,這個for循環遍歷整個鏈表,取出每個元素的指針並轉換為Observer 類型的指針,再以傳入的args作為參數調用其名字為f成員函數。我們可以依然將宏進行文本替換,以Binlog_storage_delegate::after_sync中調用的FOREACH_OBSERVER為例,外部的調用為:

FOREACH_OBSERVER(ret, after_sync, thd, (&param, log_file, log_pos));

則內部for循環中,使用每個元素的指針封裝出的調用可以翻譯為:

if (((Observer *)info->observer)->after_sync && 
    ((Observer *)info->observer)->after_sync ((&param, log_file, log_pos)))

假設info指針指向的對象的after_sync函數成員存在,則以args作為這個函數的參數調用它。那么也就是說Binlog_storage_delegate::after_sync的作用實際上是用來挨個調用它鏈表中保存的各個Observer對象的after_sync的成員函數。而半同步插件中等待ack的動作實際上再進一步由observer對象實現的,在Binlog_storage_delegate類中可以看到它的定義:

typedef Binlog_storage_observer Observer;

這里這樣定義的原因是為了FOREACH_OBSERVER這個宏統一使用Observer 這個名字。那么實際上就是調用的Binlog_storage_observer的成員函數了。繼續查看Binlog_storage_observer的定義:

typedef struct Binlog_storage_observer {
  uint32 len;

  /**
     This callback is called after binlog has been flushed

     This callback is called after cached events have been flushed to
     binary log file but not yet synced.

     @param param Observer common parameter
     @param log_file Binlog file name been updated
     @param log_pos Binlog position after update

     @retval 0 Sucess
     @retval 1 Failure
  */
  int (*after_flush)(Binlog_storage_param *param,
                     const char *log_file, my_off_t log_pos);
  int (*after_sync)(Binlog_storage_param *param,
                     const char *log_file, my_off_t log_pos);
} Binlog_storage_observer;

/**
   Binlog storage observer parameters
 */
typedef struct Binlog_storage_param {
  uint32 server_id;
} Binlog_storage_param;

從上面可以看出,Binlog_storage_observer其實就是一個含有兩個函數指針的class,最終實現等待從庫ack的是這里面的函數指針。

那么問題接下來就變成了到底Binlog_storage_delegate中保存Binlog_storage_observer實例是從哪里來的?Binlog_storage_observer中的函數指針具體又是指向哪個函數呢?

回到Binlog_storage_delegate類中,這個類有一個向鏈表中添加對象的add_observer函數,要向其中添加則應該會調用這個函數。查找對它的調用可以找到添加Binlog_storage_observer到鏈表中的函數register_binlog_storage_observer:

//rpl_handler
int register_binlog_storage_observer(Binlog_storage_observer *observer, void *p)
{
  DBUG_ENTER("register_binlog_storage_observer");
  int result= binlog_storage_delegate->add_observer(observer, (st_plugin_int *)p);
  DBUG_RETURN(result);
}

而向其中添加的observer則又是register_binlog_storage_observer的參數,那么順着調用鏈繼續向上找,最終可以在半同步插件的源碼中找到semi_sync_master_plugin_init函數:

static int semi_sync_master_plugin_init(void *p)
{
#ifdef HAVE_PSI_INTERFACE
  init_semisync_psi_keys();
#endif

  my_create_thread_local_key(&THR_RPL_SEMI_SYNC_DUMP, NULL);

  if (repl_semisync.initObject())
    return 1;
  if (ack_receiver.init())
    return 1;
  if (register_trans_observer(&trans_observer, p))
    return 1;
  if (register_binlog_storage_observer(&storage_observer, p))
    return 1;
  if (register_binlog_transmit_observer(&transmit_observer, p))
    return 1;
  return 0;
}

這個函數注冊半同步插件的各個xxxx_obeserver,將其添加到xxxx_delegate的鏈表中以供調用。依然關注我們之前的Binlog_storage_observe,上文提到的register_binlog_storage_observer函數參數來自於一個叫storage_observer的變量,這個變量就是Binlog_storage_observe的一個實例。我們繼續追蹤最后可以發現這是一個在半同步復制插件源碼中定義的全局變量:

//semisync_master_plugin.cc
Binlog_storage_observer storage_observer = {
  sizeof(Binlog_storage_observer), // len

  repl_semi_report_binlog_update, // report_update
  repl_semi_report_binlog_sync,   // after_sync
};

after_sync指針指向的repl_semi_report_binlog_sync就是半同步插件關於等待從庫ack的具體實現了,這里不在具體深入半同步的實現。

跟着源碼追了了一大圈,這里來做一個回顧。其實從插件被調用的過程(以及叫xxxx_observer的名字)就可以看出這是一個典型的觀察者模式,RUN_HOOK調用的時候,如果對應的xxxx_delegate中有對應的xxxx_observer觀察者,就挨個調用這些觀察者的回調函數,如果沒有就什么都不做。

最后,我們以半同步插件等待從庫ack的功能總結一個插件調用過程中的時序圖:


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM