多線程迭代器


自己hack的迭代器總覺得卡,可能是兩個處理器之間工作不連貫,batch size高了會使CPU上下起伏,卡頓(看着流水圖也心塞),低了GPU的Utilization不高。所以最好的方案應該就是多線程了。


后續評論(正文請忽略)
后面再來估計,似乎官方的版本也是沒有多線程的(比如mx.img.ImageIter?),只是計算引擎在那里使人產生錯覺,或者是在調用某些特別的函數時,在某處調用了多線程。(待驗證)


看了下,似乎正好有個通用interface。

 class mxnet.io.PrefetchingIter(iters, rename_data=None, rename_label=None)

    Performs pre-fetch for other data iterators.

    This iterator will create another thread to perform iter_next and then store the data in memory. It potentially accelerates the data read, at the cost of more memory usage.
    Parameters:	

        iters (DataIter or list of DataIter) – The data iterators to be pre-fetched.
        rename_data (None or list of dict) – The i-th element is a renaming map for the i-th iter, in the form of {‘original_name’ : ‘new_name’}. Should have one entry for each entry in iter[i].provide_data.
        rename_label (None or list of dict) – Similar to rename_data.

    Examples

>>> iter1 = mx.io.NDArrayIter({'data':mx.nd.ones((100,10))}, batch_size=25)
>>> iter2 = mx.io.NDArrayIter({'data':mx.nd.ones((100,10))}, batch_size=25)
>>> piter = mx.io.PrefetchingIter([iter1, iter2],
...                               rename_data=[{'data': 'data_1'}, {'data': 'data_2'}])
>>> print(piter.provide_data)
[DataDesc[data_1,(25, 10L),<type 'numpy.float32'>,NCHW],
 DataDesc[data_2,(25, 10L),<type 'numpy.float32'>,NCHW]]

如果要更多的線程,估計可以將迭代器反復迭代。

NOTE
上面的結論還沒試。


試了下,CPU的波動問題似乎沒有得到解決,不過從GPU的utilization來看,這部分應該是有效的。


21 Jun, 2017 再記
前面寫了幾段發現還不適合發布,最近發現些其他問題,想到這還沒完結,正好放這了。

Previous Note

前面提到的PrefetchingIter最好單獨開一個變量來存儲返回值:

  1. 后續內部自定義調用更靈活
  2. 線程相關

2點還沒有詳細的試驗,但這樣做肯定是安全的,並且至少第一點是一個優勢。

Concern

談談新的問題。
通常多線程成為噩夢的重要原因是同步is prone to bugs,這在以傳遞ref為特色的設計里面尤為使人憂慮。比如這段測試:

import mxnet as mx
import numpy as np
m_  = np.random.randint(-10,10,(4,5,6,7))
m= mx.nd.array(m_)

m_it = mx.io.PrefetchingIter(mx.io.NDArrayIter(m))
d=m_it.next().data[0]

m[:]=0  #-> 0....
d.asnumpy().sum() #  dangerous !!!!
# 0

Solution

顯然,使用強制拷貝是最合理的:

import mxnet as mx
import numpy as np
m_  = np.random.randint(-10,10,(4,5,6,7))
m= mx.nd.array(m_)
#m_it = mx.io.NDArrayIter(m)
m_it = mx.io.PrefetchingIter(mx.io.NDArrayIter(m))

d=m_it.next().data[0].copy()
m[:]=0  #-> 0....
d.asnumpy().sum() 
# -118.0

Followup

另一個附帶的問題是,拷貝的目的地在哪:

import mxnet as mx
import numpy as np
m_  = np.random.randint(-10,10,(4,5,6,7))
m= mx.nd.array(m_,mx.cpu(1))
m.copy().context
#  cpu(1)  

一個稱心的返回值 😃


17 July, 2017 記

內存空間釋放重利用

在空間管理上還存在一些問題。看github上的討論,意思是mxnet將內存作為池來管理,所以即使釋放了內存也不會從nvidia上看到變化。(其中一個問題是怎樣釋放掉一個變量,發現直接賦值為None可以解決問題)
但還是遇到一些問題,比如下面這段:

import mxnet as mx
def test():
    m=mx.nd.zeros((999,999,550),mx.gpu())  # 4GB 的 total dedicated memory
    return mx.io.NDArrayIter(m,batch_size=1)

用下面這個這段可以順利運行:

from test import test, mx
import time
it_ = test()
it_ = None
time.sleep(2)  # 似乎要這樣運作一下
it_ = test() # 沒問題

但這段就不行了:

from test import test, mx
import time
it_ = test()
it = mx.io.PrefetchingIter(it_)

it_ = None
it =None
time.sleep(2)
it_ = test()
it = mx.io.PrefetchingIter(it_)  # 提示 Out of Memory

檢查io.py時發現一個可能行的方案:

from test import test, mx
import time
it_ = test()
it = mx.io.PrefetchingIter(it_)

it.__del__()    # 加入這個
it_ = None
it =None
time.sleep(2)
it_ = test()
it = mx.io.PrefetchingIter(it_)  # 沒有問題!

21 Jul, 2017 記

關於 __del__()與內存

另一最近遇到的一個例子是example/ssd里面的。
需要把圖片里面的目標檢測出來。使用的是for來一次次讀,然后檢測的結構。過了一會發現內存上去了,后面就被killed掉了,於是只好過一段時間自己中斷掉,然后重新啟動。后面發現可能要長期使用,這樣手工中斷就有些吃不消了,又不想跑到里面去改迭代器。跟着里面走了一會兒,發現出現了PrefetchingIter(於是乎,陡然間想起多年前的寒假,在某kernel里面講到出現系統kill的情況),於是在這段函數結束前,調用__del__(),內存搞定。(函數結束沒有結束線程應該是沒問題的)


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM