因未測試其他作者的算法時間和效率,本文不敢自稱是最快的,但是速度也可以肯定說是相當快的,在一台I5機器上占用單核的資源處理 3000 * 2000的灰度數據用時約 20ms,並且算法和核心的大小是無關的,即所謂的o(1)算法。
在實現本算法之前,也曾經參考何凱明在暗通道去霧時提出的一篇參考論文中的算法: STREAMING MAXIMUM-MINIMUM FILTER USING NO MORE THAN THREE COMPARISONS PER ELEMENT ,這篇文章所描述的過程也是o(1)的,速度也相當快,不過還是不夠快。
我曾經自己構思了一個想法,也是基於行列分離的,在速度上比上文的代碼又要快,並且也是o(1)算法,但是算法速度和圖片的內容有關,比如對一個圖進行了一次算法后,再次對結果執行相同的算法,可能后一次就要慢很多,這和我改進的算法本身有關系,但是這種情況是很少見的。
本文所要介紹的算法也是在很久以前就看到過的,但是一直沒有引起我的重視,其對應的參考論文是 A fast algorithm for local minimum and maximum filters on rectangular and octagonal kernels ,當時覺得論文的實現起來可能沒有我自己構思的快,也就沒有去深究。
論文的實現步驟也是基於行列分離的,即先進行行方向的一維運算,然后再對結果進行列方向的一維計算,具體的理論描述大家去研究論文吧。
那其實論文的核心就是下面這個圖。

In表示一維行方向的輸入數據,g/h分別運算過程中的兩個中間數據,和輸入數據大小一樣。
如上圖所示,我們假定需要進行計算的核大小為R,那么將一行分為多個大小為 D =(2R+1) 的分段,例如圖中R=2, D=5 ,對每一個分段進行預處理,其中 x 號位置存放的是箭頭所在直線段上的點中的最大值(最小值),如此處理得到 g 和 h 兩個數組,那么對於某個點(索引為I),其半徑R內的最大(小)值為:Max/ Min(g(I+R),h(I-R))。
過程好簡單。
我們拿一組數據來說明上述過程,假如一行數據如下,我們要進行膨脹操作(最大值),核半徑為2:
In: 20 12 35 9 10 7 32 15 20 45 28
對應的g/h為:
g: 20 20 35 35 35 7 32 32 32 45 45
h: 35 35 35 10 9 45 45 45 45 45 28
如果我們要計算第4個點的半徑為2的最大值,則對應 g(I+R) = g(4+2) = 7, h(I-R)=h(4-2)=35, 得到結果為max(7,35) = 35;
如果我們要計算第6個點的半徑為2的最大值,則對應 g(I+R) = g(6+2) = 32, h(I-R)=h(6-2)=10, 得到結果為max(32,10) = 32;
注意上面索引是以1位小標計數起點的。
邊緣處理:
注意到在邊緣處,比如左側邊緣,當要計算的點的索引小於R時,這時h值是無效的,而在右側邊緣,g值是無法取到的,但是仔細觀察,問題很簡單,還拿上述數據為例,如果要計算索引為2處半徑為2的最值,由於h(2-2)是超出索引的(前面說了本例以1為下標起點),因此不能用前述的方式,那我們回歸到問題的本質,計算2處半徑為2的最值,就是計算max(In(1), In(2), In(3), In(4)), 而這個值不就是g數據中索引為2+2處的數據嗎,在這之前就已經幫我們算法,直接取值就可以了。
在數據尾部(右側),情況有所不同,我們應該從H中取值。
從上述的分析可知,這個算法有個特性,就是半徑越大,算法的耗時會越小,因為對邊緣的處理只需要拷貝數據,而沒有了更多的計算,可能很多人不信吧。
算法實現:
有了上面的描述,要實現一個快速的腐蝕或膨脹算法相信對部分來說應該是一件非常容易的事情,先行方向處理,在列方向,好簡單。
最近我是迷上了SSE算法優化,於是就思考了這個算法的SSE優化,以前在看SSE的函數時,就一直在想_mm_max_epi8/_mm_min_epi8這種一次性能獲取16個字節數據的最值的函數是否能用在腐蝕和膨脹上,不過由於他們是對兩個連續數據的比較,在行方向上是難以運用的,但是如果數據比較是列方向上,那不就可以使用了嗎。
我們上述算法就有列方向的比較,不就有了用武之地了。
首先,我們給出在列方向更新g值/h值在每個分段范圍內的C語言實現代碼,比如獲取g值大概的代碼如下:
memcpy(G + StartY * ValidDataLength, Src + StartY * Stride, ValidDataLength); for (int Y = StartY + 1; Y < EndY; Y++) { unsigned char *LinePS = Src + Y * Stride; unsigned char *LinePD = G + Y * ValidDataLength; unsigned char *LinePD = G + (Y - 1) * ValidDataLength; for (int X = 0; X < ValidDataLength; X++) { LinePD[X] = IM_Max(LinePS[X], LinePL[X]); } }
StartY為計算好的分段范圍的起點,EndY為分段范圍的終點,我們觀察g數據的規律,知道在分段范圍內第一行的最大值就是數據本身,而后面的則是和前一行比較得到結果。
很明顯:
for (int X = 0; X < Width * Channel; X++) { LinePD[X] = IM_Max(LinePS[X], LinePL[X]); }
這個代碼很容易向量化,如果這里是浮點運算,編譯器會直接幫我們向量處理的,但是對於字節,似乎編譯器還沒有那么智能,我們自己手動來向量化,代碼如下:
memcpy(G + StartY * ValidDataLength, Dest + StartY * ValidDataLength, ValidDataLength); // 每段G數據第一行就是原始的數據 for (int Y = StartY + 1; Y < EndY; Y++) { unsigned char *LinePS = Dest + Y * ValidDataLength; unsigned char *LinePD = G + Y * ValidDataLength; unsigned char *LinePL = G + (Y - 1) * ValidDataLength; for (int X = 0; X < Block * BlockSize; X += BlockSize) { _mm_storeu_si128((__m128i *)(LinePD + X), _mm_max_epu8(_mm_loadu_si128((__m128i *)(LinePS + X)), _mm_loadu_si128((__m128i *)(LinePL + X)))); } for (int X = Block * BlockSize; X < ValidDataLength; X++) { LinePD[X] = IM_Max(LinePS[X], LinePL[X]); } }
其中BlockSize = 16, Block = ValidDataLength / BlockSize;
這段代碼很簡單,對於h的處理也是類似的道理。
當我們獲得了g,h的數據后,后面的處理過程的C代碼也很簡單:
for (int Y = Radius; Y < IM_Min(BlockV * Size - Radius, Width); Y++) // 這些位於中間的數據是符合 G+Radius 和 R - Radius 取大的要求的 { unsigned char *LinePG = G + IM_Min(Y + Radius, Width - 1) * Width; unsigned char *LinePH = H + (Y - Radius) * ValidDataLength; unsigned char *LinePD = T + Y * ValidDataLength;for (int X = 0; X < ValidDataLength; X++) { LinePD[X] = IM_Max(LinePG[X], LinePH[X]); } }
又是同樣的道理,內部的for循環可直接用SSE代替。
但是這里只是對列方向進行處理,行方向有沒有可能用SSE做處理呢,可以肯定的說,絕對可以,但是除非你非常有耐心,以為其中各種pack和unpack或者shuffle會讓你瘋掉,而且最后的效率也許還不如直接使用普通的C語言。
那么如何處理呢,我想大家肯定能想到轉置,確實,對數據進行轉置后再進行列方向的處理,然后再轉置回來就相當於對原數據的行方向處理。
關於轉置,一直也是個耗時的過程,但是我在圖像轉置的SSE優化(支持8位、24位、32位),提速4-6倍 一文中提到了利用SSE實現高速的轉置操作,利用它去實現本文的流程則非常靠譜。
那么我們貼出整體的大部分處理代碼:
垂直方向的核心:
int IM_Vert_MaxFilter(unsigned char *Src, unsigned char *Dest, int Width, int Height, int Stride, int Radius) { int Channel = Stride / Width; if ((Src == NULL) || (Dest == NULL)) return IM_STATUS_NULLREFRENCE; if ((Width <= 0) || (Height <= 0)) return IM_STATUS_INVALIDPARAMETER; if ((Channel != 1) && (Channel != 3) && (Channel != 4)) return IM_STATUS_INVALIDPARAMETER; // 從節省內存的角度考慮,可以只需要兩倍額外的內存 unsigned char *G = (unsigned char *)malloc(Height * Width * Channel * sizeof(unsigned char)); unsigned char *H = (unsigned char *)malloc(Height * Width * Channel * sizeof(unsigned char)); if ((G == NULL) || (H == NULL)) { if (G != NULL) free(G); if (H != NULL) free(H); return IM_STATUS_OUTOFMEMORY; } // 垂直方向處理 int Size = Radius * 2 + 1, ValidDataLength = Width * Channel; int BlockSize = 16, Block = ValidDataLength / BlockSize; int BlockV = ((Height % Size) == 0 ? Height / Size : (Height / Size) + 1); for (int Index = 0; Index < BlockV; Index++) { int StartY = Index * Size, EndY = IM_Min(Index * Size + Size, Height); memcpy(G + StartY * ValidDataLength, Src + StartY * Stride, ValidDataLength); // 每段G數據第一行就是原始的數據 for (int Y = StartY + 1; Y < EndY; Y++) { unsigned char *LinePS = Src + Y * Stride; unsigned char *LinePD = G + Y * ValidDataLength; unsigned char *LinePL = G + (Y - 1) * ValidDataLength; for (int X = 0; X < Block * BlockSize; X += BlockSize) { _mm_storeu_si128((__m128i *)(LinePD + X), _mm_max_epu8(_mm_loadu_si128((__m128i *)(LinePS + X)), _mm_loadu_si128((__m128i *)(LinePL + X)))); } for (int X = Block * BlockSize; X < ValidDataLength; X++) { LinePD[X] = IM_Max(LinePS[X], LinePL[X]); } } memcpy(H + StartY * ValidDataLength, G + (EndY - 1) * ValidDataLength, ValidDataLength); // 每段H數據的第一行就是對應G數據的最后一行 memcpy(H + (EndY - 1) * ValidDataLength, Src + (EndY - 1) * Stride, ValidDataLength); // 每段H數據的最后一行就是原始的數據 for (int Y = EndY - 2; Y > StartY; Y--) // 注意循環次數的改變 { unsigned char *LinePS = Src + Y * Stride; unsigned char *LinePD = H + Y * ValidDataLength; unsigned char *LinePL = H + (Y + 1) * ValidDataLength; for (int X = 0; X < Block * BlockSize; X += BlockSize) { _mm_storeu_si128((__m128i *)(LinePD + X), _mm_max_epu8(_mm_loadu_si128((__m128i *)(LinePS + X)), _mm_loadu_si128((__m128i *)(LinePL + X)))); } for (int X = Block * BlockSize; X < ValidDataLength; X++) { LinePD[X] = IM_Max(LinePS[X], LinePL[X]); } } // 針對最值算法,在列方向最后一塊不是Size大小時,后面的數據只能是重復邊緣像素,這樣后面跟的G/H值和Height - 1大小是相同的 } // 整個的數據分為三個部分,[0, Radius]為第一組,[Radius, BlockV * Size - Radius]為第二組,[BlockV * Size - Radius, BlockV * Size]為第三組, // 第一組數據的結果取G中[Radius, 2 * Radius]的值,第二組數據取G + Radius和H - Radius中的小值,第三組取H - Radius的值。 // 最頂部的一半數據,此時的H數據無效 // // 此處刪除若干代碼 // for (int Y = Radius; Y < IM_Min(BlockV * Size - Radius, Height); Y++) // 這些位於中間的數據是符合 G + Radius 和 R - Radius 取大的要求的 { unsigned char *LinePG = G + IM_Min(Y + Radius, Height - 1) * ValidDataLength; // 有可能超出范圍的 unsigned char *LinePH = H + (Y - Radius) * ValidDataLength; unsigned char *LinePD = Dest + Y * Stride; for (int X = 0; X < Block * BlockSize; X += BlockSize) { _mm_storeu_si128((__m128i *)(LinePD + X), _mm_max_epu8(_mm_loadu_si128((__m128i *)(LinePG + X)), _mm_loadu_si128((__m128i *)(LinePH + X)))); } for (int X = Block * BlockSize; X < ValidDataLength; X++) { LinePD[X] = IM_Max(LinePG[X], LinePH[X]); } } // 最底部的一半數據,此時的G數據無用 // // 此處刪除若干代碼 // free(G); free(H); return IM_STATUS_OK; }
綜合的調用:
int IM_MaxFilter(unsigned char *Src, unsigned char *Dest, int Width, int Height, int Stride, int Radius) { int Channel = Stride / Width; if ((Src == NULL) || (Dest == NULL)) return IM_STATUS_NULLREFRENCE; if ((Width <= 0) || (Height <= 0)) return IM_STATUS_INVALIDPARAMETER; if ((Channel != 1) && (Channel != 3) && (Channel != 4)) return IM_STATUS_INVALIDPARAMETER; int Status = IM_STATUS_OK; unsigned char *T = (unsigned char *)malloc(Height * Stride * sizeof(unsigned char)); if (T == NULL) return IM_STATUS_OUTOFMEMORY; Status = IM_Vert_MaxFilter(Src, T, Width, Height, Stride, Radius); if (Status != IM_STATUS_OK) goto FreeMemory; Status = IM_Transpose(T, Dest, Width, Height, Stride, Height, Width, Height * Channel); // 轉置,注意Dest我只用了Height * Channel的數據 if (Status != IM_STATUS_OK) goto FreeMemory; Status = IM_Vert_MaxFilter(Dest, T, Height, Width, Height * Channel, Radius); if (Status != IM_STATUS_OK) goto FreeMemory; Status = IM_Transpose(T, Dest, Height, Width, Height * Channel, Width, Height, Stride); FreeMemory: free(T); return Status; }
上面代碼中還有很多細節,包括分塊尾部不完整數據的處理,大家可以自己理解。
有兩處刪除了部分代碼,刪除的代碼是很容易補上去的,因為我不喜歡我的代碼被別人直接復制黏貼。
進一步的分析:
由上面的代碼可以看到,要實現整個的過程,我們需要原圖3倍大小的額外內存,那么是否有降低這個的可能性呢,是有的,在處理列方向時,我們可以一次性只處理16列或32列,這樣g/h數據只各需要Height * 16(32) * sizeof(unsigned char)大小的內存,而且這樣做還有一個優勢就是在每個分段內部比較時,由於更新的內容較少,可以用一個xmm寄存器保存最值得臨時結果,這樣就不用去加載上一行的內存數據,少了很多次內存讀寫的過程,一個簡單的示例代碼如下所示:
unsigned char *LinePS = Src + StartY * Stride + X; unsigned char *LinePD = G + StartY * 32; __m128i Max1 = _mm_setzero_si128(), Max2 = _mm_setzero_si128(); // 這樣寫能減少一次內存加載 for (int Y = StartY; Y < EndY; Y++) { Max1 = _mm_max_epu8(_mm_loadu_si128((__m128i *)(LinePS + 0)), Max1); // 或者使用一條AVX指令 Max2 = _mm_max_epu8(_mm_loadu_si128((__m128i *)(LinePS + 16)), Max2); _mm_store_si128((__m128i *)(LinePD + 0), Max1); _mm_store_si128((__m128i *)(LinePD + 16), Max2); LinePS += Stride; LinePD += 32; }
在我的筆記本中測試,這個的速度要比上面的版本還快一點,並且有占用了更少的內存,一舉兩得啊。
歡迎大家能提供更快速的算法的實現思路。
本文Demo下載地址: http://files.cnblogs.com/files/Imageshop/SSE_Optimization_Demo.rar,里面的所有算法都是基於SSE實現的。

如果覺得本文對你有幫助,請為本文點個贊。
2019.4.26日更新:
在本文的評論里有網友提出該文的速度還是不夠快,比起商業軟件還有很大的差距,一直未找到良方,最近有網友說他做的比我這個速度要快,其提到了不需要完全轉置。雖未提到代碼層面的東西,但是這種事情是一點就通的,要實現起來還是很容易的。
比起全局轉置,局部轉置可以只需要分配很少的內存,從速度方面考慮,我們在進行了垂直方向的優化后,就要進行水平方向處理,此時,我每次轉置32行(或其他16的倍數),然后利用垂直方向的處理技巧處理轉置后數據的垂直方向最值,處理完成后在轉置到水平方向對應的32行數據中,至於不能被32行整除的那一部分,就用普通的方式處理了。
此種優化后,我們驚喜的發現,速度較之前有2到3倍的提高,如下圖所示:

其實,整體來看,修改后的代碼計算量上並沒有什么減少,那主要耗時降低了,其核心在於減少了內存的Cache Miss,這種技巧在很多算法中也可以加以推廣。
最后,我們還注意到一個問題,表面上看垂直方向的代碼更為簡介,流程也少那些轉置的過程,但是最后實測垂直處理的時間和水平處理的時間的占比約為內存·6:4,分析認為這個的主要原因是在垂直方向處理時圖比較大,連續訪問垂直方向的內存,Cache miss比較多,而水平方向因為是分塊處理的,中間用到的臨時內存訪問時基本無啥Cache miss(寬度為32的臨時區都是連續訪問的)。
修改后的算法對於評論區里博友提到的4096X8192大小的的灰度圖,其耗時大概在70ms,比其說的商業軟件的速度還是要慢一倍的。在Opencv中,使用cvErode函數,發現他也非常非常的快,和我這里的速度旗鼓相當,有的時候還快點,查看其代碼,發現他核心的地方是調用hal:morph實現的,但是HAL硬件加速層到底是個啥家伙,實在是搞不清楚。

