swift中創建對象,即PUT object,根據選定的存儲策略將對象內容寫入至相應的服務器(object server)。我們重點關注object controller和object servers之間的通信過程,其它從略。
在proxy-server上對client發送來的HTTP請求進行解析、wsgi環境變量進行設置、認證及相應錯誤處理過程從略。唯一需要說明的是,對外部client 通過HTTP請求發送來的對象(object),swift會將其具體內容存放於環境變量env中,即:
self.environ['wsgi.input'] = WsgiStringIO(value)
對象創建最終會定位到BaseObjectController類中的PUT方法:
@public
@cors_validation
@delay_denial
def PUT(self, req):
"""HTTP PUT request handler."""
if req.if_none_match is not None and '*' not in req.if_none_match:
# Sending an etag with if-none-match isn't currently supported
return HTTPBadRequest(request=req, content_type='text/plain',
body='If-None-Match only supports *')
container_info = self.container_info(
self.account_name, self.container_name, req)
policy_index = req.headers.get('X-Backend-Storage-Policy-Index',
container_info['storage_policy'])
obj_ring = self.app.get_object_ring(policy_index)
container_nodes = container_info['nodes']
container_partition = container_info['partition']
partition, nodes = obj_ring.get_nodes(
self.account_name, self.container_name, self.object_name)
# pass the policy index to storage nodes via req header
req.headers['X-Backend-Storage-Policy-Index'] = policy_index
req.acl = container_info['write_acl']
req.environ['swift_sync_key'] = container_info['sync_key']
# is request authorized
if 'swift.authorize' in req.environ:
aresp = req.environ['swift.authorize'](req)
if aresp:
return aresp
if not container_info['nodes']:
return HTTPNotFound(request=req)
# update content type in case it is missing
self._update_content_type(req)
# check constraints on object name and request headers
error_response = check_object_creation(req, self.object_name) or \
check_content_type(req)
if error_response:
return error_response
self._update_x_timestamp(req)
# check if versioning is enabled and handle copying previous version
self._handle_object_versions(req)
# check if request is a COPY of an existing object
source_header = req.headers.get('X-Copy-From')
if source_header:
error_response, req, data_source, update_response = \
self._handle_copy_request(req)
if error_response:
return error_response
else:
reader = req.environ['wsgi.input'].read
data_source = iter(lambda: reader(self.app.client_chunk_size), '')
update_response = lambda req, resp: resp
# check if object is set to be automaticaly deleted (i.e. expired)
req, delete_at_container, delete_at_part, \
delete_at_nodes = self._config_obj_expiration(req)
# add special headers to be handled by storage nodes
outgoing_headers = self._backend_requests(
req, len(nodes), container_partition, container_nodes,
delete_at_container, delete_at_part, delete_at_nodes)
# send object to storage nodes
resp = self._store_object(
req, data_source, nodes, partition, outgoing_headers)
return update_response(req, resp)
下面我們就從這七十來行代碼來詳細探討此PUT操作究竟做了什么。其中錯誤處理和有些有明顯注釋的代碼不深究,專注於主要流程。
1.獲取node及partition信息
要執行PUT object,首先便要獲取關於此object的必要信息,它從哪里來(前文提到過,env中直接暫存對象內容)、它要放到哪里去,同時也要知道它所在container的相應信息,因為PUT object會增加container內容,影響其元數據信息,故需要對相應存放container信息的數據表進行更新,這必然要涉及到存放container信息數據表partition及具體存放結點信息。
10-18行,是這一過程的具體代碼。我們可以看到,繼承自Controller類的container_info方法被首先調用,此函數相當於讀取container元數據信息緩存,返回一個字典,包含相應container的partition、nodes及policy_index信息。即某一個具體container(容器)的所在區、存放結點列表、所使用的存儲策略(副本or糾刪碼)。
還會根據object所在container具體存儲策略,獲得其object ring,然后根據ring中定義的映射規則,取得該object所在區及存放結點列表(line. 17-18)。
2. 認證、更新、檢查
認證(line. 26-29),根據env中'swift.authorize'存放的認證函數,校驗token,確認請求的合法性。
更新請求信息(line. 21-23,35,43),更新HTTP請求中相應信息,包括content_type(內容類型)、timestamp(時間戳)等等。
檢查,包括檢查object命名與請求是否合法(line. 38-41),檢查object版本信息(line. 46),檢查是否從已有對象拷貝(line. 49-58),檢查是否設定自動刪除時間(line. 61-62)。
雖然沒有研究過關於對象自動刪除的代碼,但我猜想對象的自動刪除要額外啟動服務進程(類似於swift-object-server之類),定期對所有servers上object delete_time進行輪詢,到達相應截止時間后刪除相應對象。
還有一點需要注意的便是,若要創建的object不是從已有對象拷貝,將定義一個對object內容進行讀取的迭代器data_source(line. 56-57)。
3.生成請求頭
雖然生成請求頭和接下來的上傳數據在上面的PUT代碼中,均只封裝成一個函數,但它們會調用其它許多函數。這兩個流程也是PUT object操作的核心。
先附上65行_backend_request()
函數的源碼:
def _backend_requests(self, req, n_outgoing,
container_partition, containers,
delete_at_container=None, delete_at_partition=None,
delete_at_nodes=None):
headers = [self.generate_request_headers(req,additional=req.headers)
for _junk in range(n_outgoing)]
for i, container in enumerate(containers):
i = i % len(headers)
headers[i]['X-Container-Partition'] = container_partition
headers[i]['X-Container-Host'] = csv_append(
headers[i].get('X-Container-Host'),
'%(ip)s:%(port)s' % container)
headers[i]['X-Container-Device'] = csv_append(
headers[i].get('X-Container-Device'),
container['device'])
for i, node in enumerate(delete_at_nodes or []):
i = i % len(headers)
headers[i]['X-Delete-At-Container'] = delete_at_container
headers[i]['X-Delete-At-Partition'] = delete_at_partition
headers[i]['X-Delete-At-Host'] = csv_append(
headers[i].get('X-Delete-At-Host'),
'%(ip)s:%(port)s' % node)
headers[i]['X-Delete-At-Device'] = csv_append(
headers[i].get('X-Delete-At-Device'),
node['device'])
return headers
這個函數的目的在於,針對要上傳的object所在container的存放信息,生成若干個HTTP請求頭,用於object上傳成功后container信息的更新。object存放在多少個node上,便生成多少相應的HTTP請求頭,即len(nodes)
。可以預料,對container信息的更新,將會在接收PUT object的若干個object-server上並發。
在_backend_request()
函數中,首先生成最基本的HTTP請求頭列表headers
, headers = [header_for_node1, header_for_node2,......]
,其中的每一個元素都是一個完整的HTTP請求頭字典,最初的請求頭僅包括時間戳、x-trans-id、user-agent等信息。
接下來的兩個for循環,完成的工作類似,第一個for循環將container更新所需要的信息平均分配給headers列表中的header;第二個for循環將定時刪除所需要的信息平均分配給headers中的header。我們重點講講第一個for循環,第二個,類比即可。
line. 9的取余操作,保證了將container信息更新平均分配至存放該object的結點進行並發,若container信息存放的結點數小於存放object的結點,那么headers列表中后面的一些header將只有基本初始化信息,接收這些header的node不用負責container信息的更新。
line. 11進行container partition信息的設定,每個container的partition信息均相同,所以不用擔心它們會打架(被覆蓋)。
line. 12-14和line. 15-17分別設定header中需要更新的container信息所在的主機地址(X-Container-Host
)和設備名(X-Container-Device
)。container信息是通過數據表存放的,所以container信息的更新最終會化為對相應服務器相應設備上數據表的update操作。csv_append
函數會將不同Host和Device用逗號隔開,相應node接收到這樣的header后,將會利用兩者信息定位到所有相應數據表,完成update。
通過這整個for循環,將每個存放object的node更新container信息的進行了分配和設定,簡單明了。
4.上傳數據
_store_object()
是PUT流程中最復雜也最重要的函數,直接負責向相應的存儲結點上傳數據。值得一提的是,使用糾刪碼作為存儲策略的話,將會重定義_store_object()
函數。這里僅就原有_store_object()
進行探討,重定義的_store_object()
留待以后討論。_store_object()
執行具體流程如下:
def _store_object(self, req, data_source, nodes, partition,
outgoing_headers):
policy_index = req.headers.get('X-Backend-Storage-Policy-Index')
policy = POLICIES.get_by_index(policy_index)
if not nodes:
return HTTPNotFound()
# RFC2616:8.2.3 disallows 100-continue without a body
if (req.content_length > 0) or req.is_chunked:
expect = True
else:
expect = False
conns = self._get_put_connections(req, nodes, partition,
outgoing_headers, policy, expect)
min_conns = quorum_size(len(nodes))
try:
# check that a minimum number of connections were established and
# meet all the correct conditions set in the request
self._check_failure_put_connections(conns, req, nodes, min_conns)
# transfer data
self._transfer_data(req, data_source, conns, nodes)
# get responses
statuses, reasons, bodies, etags = self._get_put_responses(
req, conns, nodes)
except HTTPException as resp:
return resp
finally:
for conn in conns:
conn.close()
if len(etags) > 1:
self.app.logger.error(
_('Object servers returned %s mismatched etags'), len(etags))
return HTTPServerError(request=req)
etag = etags.pop() if len(etags) else None
resp = self.best_response(req, statuses, reasons, bodies,
_('Object PUT'), etag=etag)
resp.last_modified = math.ceil(
float(Timestamp(req.headers['X-Timestamp'])))
return resp
其中除第一個操作獲取存儲策略之外,其它每個流程都對應一個具體的函數。_get_put_connections
建立從proxy-server到存放object的node之間的http連接,其中利用eventlet中的GreenPile實現並發:
pile = GreenPile(len(nodes))
for nheaders in outgoing_headers:
if expect:
nheaders['Expect'] = '100-continue'
pile.spawn(self._connect_put_node, node_iter, partition,
req.swift_entity_path, nheaders,
self.app.logger.thread_locals)
具體connection的建立,在_connect_put_node()
中:
with ConnectionTimeout(self.app.conn_timeout):
conn = http_connect(node['ip'], node['port'], node['device'], part,
'PUT', path, headers)
其中,node_iter迭代器在不同的nodes之間進行遍歷,創建鏈接失敗即跳過,保證min_conns個鏈接建立成功即可PUT數據。但這里有兩個問題:(1)node_iter如何在多個_get_put_node間起作用,難道是和協程的實際順序執行有關?(2)創建鏈接數滿足min_conns可傳輸數據,但要保證數據指定冗余,數據必須存放在所鏈接所有結點上,那么鏈接不成功那些結點上數據什么時候以何種方式上傳?(關於這個問題,研究了半天之后,發現它和swift中采用的NWR策略相關,至於剩余冗余的上傳初步估計是額外的數據同步服務直接執行對象的拷貝)
接下來對建立的connections進行檢查,查看其返回狀態,及_check_min_conn()
驗證成功創建的連接是否滿足最低需求,錯誤則直接返回異常狀態碼。
_transfer_data()
在上傳數據的過程中直接使用了繼承自GreenPool類的ContextPool類,ContextPool類增添了退出時殺死正在運行的協程的方法。_transfer_data()
中使用隊列(Queue)定義緩沖區暫存上傳的數據,其中使用綠色線程池(ContextPool)定義多個協程執行_send_file()
將Queue中chunk傳輸至建立的HTTP鏈接,每個storage _transfer_data()
后面的過程便是將上傳的object數據寫入conns隊列中。node和proxy之間建立的HTTP鏈接conn都擁有獨立的隊列,副本模式下數據同時寫入活動的諸多conns中隊列。
for conn in list(conns):
if not conn.failed:
conn.queue.put('%x\r\n%s\r\n' % (len(chunk), chunk)
if req.is_chunked else chunk)
最后獲得響應,從獲得的諸多response中選取一定數量的(quorum_size)最優響應,生成對用戶的響應返回,PUT object至此終結。
通過對PUT Object這一過程的學習,發現自己對ring部分代碼非常陌生,有些地方完全弄不明白,映射關系和node管理完全靠ring的原理猜測。比如三副本存取且整個系統僅有一個storage node,那么系統會報錯嗎?如果正常運行node及node_iter又是何種狀態?
特別是后期,我還打算對ring進行功能擴展,代碼都沒弄懂就談擴展,無異痴人說夢。
還有,NWR策略相關內容還要進一步研究,整理出博文。