1、linux提供了好幾種IPC的機制:共享內存、管道、消息隊列、信號量等,所有IPC機制的核心或本質就是在內核開辟一塊空間,通信雙方都從這塊空間讀寫數據,整個流程圖示如下:

這種通信方式天生的缺陷看出來了么? A進程把數據拷貝到內核,B進程從內核再拷貝走,同一份數據可能在內存存放了3份,同時還復制了2次,感覺和app通過read、write等方法從磁盤讀數據的效率一樣低啊!既然app讀寫文件數據能通過mmap提升效率,IPC是不是也能通過mmap提升效率了?答案也是肯定的,這就是android中binder誕生的背景!相比傳統的IPC,binder只需要拷貝1次,整個原理和流程如下圖所示:

A進程還是把數據從用戶空間寫到內核緩存區,也就是生產者的流程或方式沒變!改動最大的就是消費端了:先是內核建立數據接受緩存區,但這個緩存區和內核緩存區建立了映射,換句話說用的是同一塊物理地址!接着是接收端進程空間和內核的數據接受緩存區頁也建立映射,換句話說用的也是同一塊物理地址,這樣一來內核實際只用了1塊物理地址,但是這塊物理地址映射了2個虛擬地址(感覺好雞賊.....)!
2、原理和本質其實也很簡單:通信雙方不就是共用一塊物理內存么?既然雙方都要依靠這塊內存,那么對這塊內存的管理就尤為重要了!圍繞這塊內存的各種操作,也就是binder的各種操作,都定義在了binder_fops 結構體中,如下:
static const struct file_operations binder_fops = { .owner = THIS_MODULE, .poll = binder_poll, .unlocked_ioctl = binder_ioctl, .compat_ioctl = binder_ioctl, .mmap = binder_mmap, .open = binder_open, .flush = binder_flush, .release = binder_release, };
從名字就能看出來,分別是poll、io控制、mmap、打開、刷新和釋放,通過這些接口完全能夠操作共用的物理內存了!老規矩,在分析具體的實現方法前,先看看有哪些重要的結構體,從這些結構體的屬性字段就能管中窺豹,看出具體怎么落地實現,結構體主要字段和關聯關系如下:

(2)要想實現IPC,第一個就要在內核開辟內存空間,否則交換個毛的數據啊!看上面的結構體,交換數據的空間在binder_buffer中,和binder_proc中的buffers的字段是關聯的,所以binder驅動先生成binder_proc實例(本質是用來管理交換數據的公共內存),初始化后加入鏈表隊列,整個過程在binder_open方法內部實現的,如下:
/*1、新生成、初始化binder_proc實例,並加入binder_procs全局隊列 2、初始化todo隊列和等待隊列 */ static int binder_open(struct inode *nodp, struct file *filp) { struct binder_proc *proc; binder_debug(BINDER_DEBUG_OPEN_CLOSE, "binder_open: %d:%d\n", current->group_leader->pid, current->pid); // 分配 binder_proc 數據結構內存 proc = kzalloc(sizeof(*proc), GFP_KERNEL); if (proc == NULL) return -ENOMEM; //增加task結構體的引用計數 get_task_struct(current); proc->tsk = current; proc->vma_vm_mm = current->mm; INIT_LIST_HEAD(&proc->todo);//初始化待處理事件隊列頭 init_waitqueue_head(&proc->wait);//初始化等待隊列頭 proc->default_priority = task_nice(current); //鎖定臨界區 binder_lock(__func__); // 增加BINDER_STAT_PROC的對象計數 binder_stats_created(BINDER_STAT_PROC); /*添加新生成的proc_node到 binder_procs全局隊列中, 這樣任何進程就可以訪問到其他進程的 binder_proc 對象了*/ hlist_add_head(&proc->proc_node, &binder_procs); proc->pid = current->group_leader->pid; INIT_LIST_HEAD(&proc->delivered_death); filp->private_data = proc; //釋放臨界區的鎖 binder_unlock(__func__); if (binder_debugfs_dir_entry_proc) { char strbuf[11]; snprintf(strbuf, sizeof(strbuf), "%u", proc->pid); proc->debugfs_entry = debugfs_create_file(strbuf, S_IRUGO, binder_debugfs_dir_entry_proc, proc, &binder_proc_fops); } return 0; }
既然binder_open是生成binder_proc實例,用完后也需要釋放和回收,避免內存泄漏,binder驅動提供了binder_release方法,如下:
static int binder_release(struct inode *nodp, struct file *filp) { struct binder_proc *proc = filp->private_data; debugfs_remove(proc->debugfs_entry); binder_defer_work(proc, BINDER_DEFERRED_RELEASE); return 0; } static void binder_defer_work(struct binder_proc *proc, enum binder_deferred_state defer) { mutex_lock(&binder_deferred_lock); proc->deferred_work |= defer; if (hlist_unhashed(&proc->deferred_work_node)) { //binder_proc實例添加到釋放隊列 hlist_add_head(&proc->deferred_work_node, &binder_deferred_list); schedule_work(&binder_deferred_work); } mutex_unlock(&binder_deferred_lock); }
(3) binder_proc本質上是用來管理共用內存的結構體,這個實例化后就需要開始最重要的一步了:在進程虛擬地址申請內存,然后映射到內核的物理地址,這個過程是在binder_map中實現的,代碼如下:
/*把內核的物理內存映射到用戶進程地址空間中,這樣就可以像操作用戶內存那樣操作內核內存*/ static int binder_mmap(struct file *filp, struct vm_area_struct *vma) { int ret; struct vm_struct *area; //獲取proc實例,這里有通信雙方的共用內存 struct binder_proc *proc = filp->private_data; const char *failure_string; //通信雙方的共用內存 struct binder_buffer *buffer; if (proc->tsk != current) return -EINVAL; //共用內存最多4m,不能再多了 if ((vma->vm_end - vma->vm_start) > SZ_4M) vma->vm_end = vma->vm_start + SZ_4M; binder_debug(BINDER_DEBUG_OPEN_CLOSE, "binder_mmap: %d %lx-%lx (%ld K) vma %lx pagep %lx\n", proc->pid, vma->vm_start, vma->vm_end, (vma->vm_end - vma->vm_start) / SZ_1K, vma->vm_flags, (unsigned long)pgprot_val(vma->vm_page_prot)); //看看flags是否合法 if (vma->vm_flags & FORBIDDEN_MMAP_FLAGS) { ret = -EPERM; failure_string = "bad vm_flags"; goto err_bad_arg; } vma->vm_flags = (vma->vm_flags | VM_DONTCOPY) & ~VM_MAYWRITE; //鎖定臨界區,便於在進程之間互斥,避免不同的進程同時申請虛擬內存 mutex_lock(&binder_mmap_lock); if (proc->buffer) {//這塊內存已經映射了 ret = -EBUSY; failure_string = "already mapped"; goto err_already_mapped; } /*從/proc/self/maps查找未使用的虛擬內存,並申請內核虛擬內存空間 注意:這里是進程的虛擬地址空間 */ area = get_vm_area(vma->vm_end - vma->vm_start, VM_IOREMAP); if (area == NULL) { ret = -ENOMEM; failure_string = "get_vm_area"; goto err_get_vm_area_failed; } // 將申請到的內存地址保存到 binder_proc 對象中 proc->buffer = area->addr; proc->user_buffer_offset = vma->vm_start - (uintptr_t)proc->buffer; mutex_unlock(&binder_mmap_lock); #ifdef CONFIG_CPU_CACHE_VIPT if (cache_is_vipt_aliasing()) { while (CACHE_COLOUR((vma->vm_start ^ (uint32_t)proc->buffer))) { pr_info("binder_mmap: %d %lx-%lx maps %p bad alignment\n", proc->pid, vma->vm_start, vma->vm_end, proc->buffer); vma->vm_start += PAGE_SIZE; } } #endif //根據請求到的內存空間大小,分配給binder_proc對象的pages, 用於保存指向物理頁的指針 proc->pages = kzalloc(sizeof(proc->pages[0]) * ((vma->vm_end - vma->vm_start) / PAGE_SIZE), GFP_KERNEL); if (proc->pages == NULL) { ret = -ENOMEM; failure_string = "alloc page array"; goto err_alloc_pages_failed; } //共用內存大小,不超過4M proc->buffer_size = vma->vm_end - vma->vm_start; vma->vm_ops = &binder_vm_ops; vma->vm_private_data = proc; //分配一個物理頁 if (binder_update_page_range(proc, 1, proc->buffer, proc->buffer + PAGE_SIZE, vma)) { ret = -ENOMEM; failure_string = "alloc small buf"; goto err_alloc_small_buf_failed; } buffer = proc->buffer; INIT_LIST_HEAD(&proc->buffers); //將binder_buffer對象放入到proc->buffers鏈表中,便於統一管理 list_add(&buffer->entry, &proc->buffers); buffer->free = 1; /*新生成的buffer加入紅黑樹管理*/ binder_insert_free_buffer(proc, buffer); proc->free_async_space = proc->buffer_size / 2; //內存屏障,防止亂序 barrier(); proc->files = get_files_struct(current); proc->vma = vma; proc->vma_vm_mm = vma->vm_mm; /*pr_info("binder_mmap: %d %lx-%lx maps %p\n", proc->pid, vma->vm_start, vma->vm_end, proc->buffer);*/ return 0; err_alloc_small_buf_failed: kfree(proc->pages); proc->pages = NULL; err_alloc_pages_failed: mutex_lock(&binder_mmap_lock); vfree(proc->buffer); proc->buffer = NULL; err_get_vm_area_failed: err_already_mapped: mutex_unlock(&binder_mmap_lock); err_bad_arg: pr_err("binder_mmap: %d %lx-%lx %s failed %d\n", proc->pid, vma->vm_start, vma->vm_end, failure_string, ret); return ret; }
這里面最核心的函數就是binder_update_page_range了,這個函數要么釋放物理頁,要么分配物理頁;如果分配物理頁,還會建立和進程虛擬地址空間的映射關系!其中真正建立虛擬內存和物理頁映射關系的當屬map_kernel_range_noflush和vm_insert_page函數了:前者是內核虛擬內存和物理頁建立映射,后者是進程虛擬內存和物理頁建立映射。整個代碼如下:
/*分配和釋放物理頁;如果是分配,同時建立和進程虛擬地址空間的映射*/ static int binder_update_page_range(struct binder_proc *proc, int allocate, void *start, void *end, struct vm_area_struct *vma) { void *page_addr; unsigned long user_page_addr; struct page **page; struct mm_struct *mm; binder_debug(BINDER_DEBUG_BUFFER_ALLOC, "%d: %s pages %p-%p\n", proc->pid, allocate ? "allocate" : "free", start, end); if (end <= start) return 0; trace_binder_update_page_range(proc, allocate, start, end); if (vma) mm = NULL; else /* 讀取進程的內存描述符(mm_struct), * 並增加內存描述符(mm_struct)中的mm_users用戶計數,防止mm_struct被釋放*/ mm = get_task_mm(proc->tsk); if (mm) { /*獲取寫鎖*/ down_write(&mm->mmap_sem); vma = proc->vma; if (vma && mm != proc->vma_vm_mm) { pr_err("%d: vma mm and task mm mismatch\n", proc->pid); vma = NULL; } } //如果傳入的allocate是0,就是釋放物理頁 if (allocate == 0) goto free_range; if (vma == NULL) { pr_err("%d: binder_alloc_buf failed to map pages in userspace, no vma\n", proc->pid); goto err_no_vma; } /* 開始循環分配物理頁,並建立映射,每次循環分配1個頁*/ for (page_addr = start; page_addr < end; page_addr += PAGE_SIZE) { int ret; /* 確定頁所存放的數組的位置,按內核虛擬地址由小到大排列*/ page = &proc->pages[(page_addr - proc->buffer) / PAGE_SIZE]; BUG_ON(*page); //最核心的地方;分配物理頁 *page = alloc_page(GFP_KERNEL | __GFP_HIGHMEM | __GFP_ZERO); if (*page == NULL) { pr_err("%d: binder_alloc_buf failed for page at %p\n", proc->pid, page_addr); goto err_alloc_page_failed; } /*將內核虛擬地址與該物理頁建立映射關系,最終調用的是 mm\vmalloc.c的vmap_page_range_noflush,然后依次遞進調用 vmap_pud_range、vmap_pmd_range、vmap_pte_range、set_pte_at設置頁表的4級映射關系; 注意:這里映射的是內核虛擬地址空間,用戶進程的虛擬地址空間映射在后面, 調用的是vm_insert_page方法*/ ret = map_kernel_range_noflush((unsigned long)page_addr, PAGE_SIZE, PAGE_KERNEL, page); //頁表更新后刷新cpu的TLB緩存 flush_cache_vmap((unsigned long)page_addr, (unsigned long)page_addr + PAGE_SIZE); if (ret != 1) { pr_err("%d: binder_alloc_buf failed to map page at %p in kernel\n", proc->pid, page_addr); goto err_map_kernel_failed; } /*計算用戶態虛地址*/ user_page_addr = (uintptr_t)page_addr + proc->user_buffer_offset; /*將用戶虛擬地址與該物理頁建立映射關系*/ ret = vm_insert_page(vma, user_page_addr, page[0]); if (ret) { pr_err("%d: binder_alloc_buf failed to map page at %lx in userspace\n", proc->pid, user_page_addr); goto err_vm_insert_page_failed; } /* vm_insert_page does not seem to increment the refcount */ } if (mm) { //釋放寫鎖 up_write(&mm->mmap_sem); /*減少內存描述符(mm_struct)中的mm_users用戶計數*/ mmput(mm); } return 0; free_range: for (page_addr = end - PAGE_SIZE; page_addr >= start; page_addr -= PAGE_SIZE) { page = &proc->pages[(page_addr - proc->buffer) / PAGE_SIZE]; if (vma) zap_page_range(vma, (uintptr_t)page_addr + proc->user_buffer_offset, PAGE_SIZE, NULL); err_vm_insert_page_failed: unmap_kernel_range((unsigned long)page_addr, PAGE_SIZE); err_map_kernel_failed: __free_page(*page); *page = NULL; err_alloc_page_failed: ; } err_no_vma: if (mm) { up_write(&mm->mmap_sem); mmput(mm); } return -ENOMEM; }
(4)內存映射完畢后,萬事俱備,只欠數據了!通信雙方都可以從內核緩存區讀寫數據,調用鏈條比較長,是這樣的:binder_ioctl() -> binder_get_thread() -> binder_ioctl_write_read() -> binder_thread_write()/binder_thread_read()!最終執行的就是binder_thread_write和binder_thread_read方法了,這兩個方法的核心思路也很簡單:循環讀取cmd,根據不同的cmd才取不同的動作;具體讀寫數據時,因為涉及到內核和用戶進程之間的拷貝,最終調用的還是copy_from_user和copy_to_user。因為代碼太長,我這里只截取少量核心代碼展示:
/*循環讀取cmd,根據不同的cmd才取不同的動作;具體讀寫數據時,因為涉及到內核和用戶進程之間 的拷貝,最終調用的還是copy_from_user和copy_to_user*/ static int binder_thread_write(struct binder_proc *proc, struct binder_thread *thread, binder_uintptr_t binder_buffer, size_t size, binder_size_t *consumed) { uint32_t cmd; void __user *buffer = (void __user *)(uintptr_t)binder_buffer; //數據起始地址 void __user *ptr = buffer + *consumed; //數據結束地址 void __user *end = buffer + size; //可能有多個命令或數據要處理,需要循環 while (ptr < end && thread->return_error == BR_OK) { //讀取一個cmd命令 if (get_user(cmd, (uint32_t __user *)ptr)) return -EFAULT; //起始地址條過cmd命令占用的空間 ptr += sizeof(uint32_t); ................. switch (cmd) { case BC_INCREFS: case BC_ACQUIRE: case BC_RELEASE: case BC_REPLY: { struct binder_transaction_data tr; //從3環用戶態寫數據到內核態,離不開copy_from_user和copy_to_user,任何情況都不例外 if (copy_from_user(&tr, ptr, sizeof(tr))) return -EFAULT; ptr += sizeof(tr); binder_transaction(proc, thread, &tr, cmd == BC_REPLY); break; } }
binder_thread_read核心思路: 檢查當前線程是否被喚醒。如果是,核心功能就是執行copy_to_user讓3環的進程把數據拷貝走!如果不是就繼續retry循環,核心代碼如下:有個while死循環,在循環中探查是否有數據;如果沒數據就回到retry繼續等待喚醒!
retry: // 獲取將要處理的任務 wait_for_proc_work = thread->transaction_stack == NULL && list_empty(&thread->todo); if (wait_for_proc_work) { if (!(thread->looper & (BINDER_LOOPER_STATE_REGISTERED | BINDER_LOOPER_STATE_ENTERED))) { binder_user_error("binder: %d:%d ERROR: Thread waiting " "for process work before calling BC_REGISTER_" "LOOPER or BC_ENTER_LOOPER (state %x)\n", proc->pid, thread->pid, thread->looper); wait_event_interruptible(binder_user_error_wait, binder_stop_on_user_error < 2); } binder_set_nice(proc->default_priority); if (non_block) { // 非阻塞且沒有數據則返回 EAGAIN if (!binder_has_proc_work(proc, thread)) ret = -EAGAIN; } else // 阻塞則進入睡眠狀態,等待可操作的任務 ret = wait_event_freezable_exclusive(proc->wait, binder_has_proc_work(proc, thread)); } else { if (non_block) { if (!binder_has_thread_work(thread)) ret = -EAGAIN; } else ret = wait_event_freezable(thread->wait, binder_has_thread_work(thread)); } binder_lock(__func__); if (wait_for_proc_work) proc->ready_threads--; thread->looper &= ~BINDER_LOOPER_STATE_WAITING; if (ret) return ret; while (1) { uint32_t cmd; struct binder_transaction_data tr; struct binder_work *w; struct binder_transaction *t = NULL; // 獲取 binder_work 對象 if (!list_empty(&thread->todo)) w = list_first_entry(&thread->todo, struct binder_work, entry); else if (!list_empty(&proc->todo) && wait_for_proc_work) w = list_first_entry(&proc->todo, struct binder_work, entry); else { if (ptr - buffer == 4 && !(thread->looper & BINDER_LOOPER_STATE_NEED_RETURN)) /* no data added沒有數據就回到retry繼續等 */ goto retry; break; }
..................
}
(5)上述通過讀寫公共內存通信的方式原理很簡單,實現的時候還有點需要注意:記得介紹linux IPC時的poll么? binder面臨同樣的問題:讀取數據的進程是怎么知道公共內存有數據的了?這個問題是在binder_transaction解決的:binder_thread_write函數執行完后會調用binder_transaction,最后兩行代碼就是喚醒目標線程了,剛好和binder_thread_read中的循環探查數據完美閉環!
if (target_wait) // 喚醒目標線程 wake_up_interruptible(target_wait);
總結:
1、整個過程本質就是更改頁表,讓不同的虛擬地址映射到同一個物理地址,和windows下用shadow walker過PG保護的原理一模一樣!
2、3環進程和內核之間拷貝數據用的還是copy_from_user和copy_to_user,一萬年都不變的!
參考:
1、https://www.bilibili.com/video/BV1Kf4y1z7kT?p=3&spm_id_from=pageDriver android為什么選binder
2、https://zhuanlan.zhihu.com/p/35519585 binder原理剖析
3、https://github.com/xdtianyu/SourceAnalysis/blob/master/Binder%E6%BA%90%E7%A0%81%E5%88%86%E6%9E%90.md binder源碼分析
4、https://www.bilibili.com/video/BV1Ef4y127nU 手寫binder進程通信框架
https://www.bilibili.com/video/BV1Zp4y1Q7tZ/?spm_id_from=333.788.recommend_more_video.1
5、https://wangkuiwu.github.io/2014/09/02/Binder-Datastruct/ binder中的數據結構
6、https://blog.csdn.net/zhanshenwu/article/details/106458188 binder地址映射全解
7、https://blog.csdn.net/ljt326053002/article/details/105328384 binder源碼分析
