update_engine-DownloadAction(一)


通過update_engine-整體結構(一),(二),(三)對update_engine整體的運行機制有了一定的認識之后。開始逐個分析重要的Action。先從DownloadAction開始分析。

開始分析DownloadAction

src/update_engine/payload_consumer/download_action.cc

 1 DownloadAction::DownloadAction(PrefsInterface* prefs,  2                                BootControlInterface* boot_control,  3                                HardwareInterface* hardware,  4                                SystemState* system_state,  5                                HttpFetcher* http_fetcher)  6  : prefs_(prefs),  7  boot_control_(boot_control),  8  hardware_(hardware),  9  system_state_(system_state), 10       http_fetcher_(new MultiRangeHttpFetcher(http_fetcher)),               //MultiRangeHttpFetcher也繼承了HttpFetcher,實現了HttpFetcherDelegate
11 writer_(nullptr), code_(ErrorCode::kSuccess), delegate_(nullptr), p2p_sharing_fd_(-1), p2p_visible_(true) {}

 在構造方法中,system_state為nullptr。

接着看PerformAction()

 1 void DownloadAction::PerformAction() { 2   http_fetcher_->set_delegate(this);        //http_fetcher_是MultiRangeHttpFetcher
 3 
 4   // Get the InstallPlan and read it
 5   CHECK(HasInputObject());                   //檢查是否有輸入管道
 6   install_plan_ = GetInputObject();           //獲取InstallPlanAction輸出的install_plan_
 7 install_plan_.Dump(); 8 
 9   bytes_received_ = 0; 10   bytes_total_ = 0; 11   for (const auto& payload : install_plan_.payloads) 12     bytes_total_ += payload.size;              //計算payload的總大小
13 
14   if (install_plan_.is_resume) {                  //檢查是否進行恢復更新
15     int64_t payload_index = 0; 16     if (prefs_->GetInt64(kPrefsUpdateStatePayloadIndex, &payload_index) &&           //獲取需要繼續更新的payload的索引
17         static_cast<size_t>(payload_index) < install_plan_.payloads.size()) { 18       // Save the index for the resume payload before downloading any previous
19       // payload, otherwise it will be overwritten.
20       resume_payload_index_ = payload_index; 21       for (int i = 0; i < payload_index; i++) 22         install_plan_.payloads[i].already_applied = true;                   //獲取到了索引后說明在該索引之前的都已經應用過了
23 } 24 } 25   // TODO(senj): check that install plan has at least one payload.
26   if (!payload_)                                     //payload_為InstallPlan::Payload 
27     payload_ = &install_plan_.payloads[0];           //默認的payload_
28 
29   LOG(INFO) << "Marking new slot as unbootable";                             //將target_slot標記為unboot狀態
30   if (!boot_control_->MarkSlotUnbootable(install_plan_.target_slot)) { 31     LOG(WARNING) << "Unable to mark new slot "
32                  << BootControlInterface::SlotName(install_plan_.target_slot) 33                  << ". Proceeding with the update anyway."; 34 } 35 
36   StartDownloading();                  //開始下載
37 } 

 在PerformAction()中首先是獲取InstallPlan,對resume_payload_index_,payload_進行恢復,之后設置target_slot為unboot,最后開始downloading。

 1 void DownloadAction::StartDownloading() { 2   download_active_ = true; 3   http_fetcher_->ClearRanges(); 4   if (install_plan_.is_resume &&                                             //判斷是否需要恢復更新
 5       payload_ == &install_plan_.payloads[resume_payload_index_]) { 6     // Resuming an update so fetch the update manifest metadata first.
 7     int64_t manifest_metadata_size = 0; 8     int64_t manifest_signature_size = 0; 9     prefs_->GetInt64(kPrefsManifestMetadataSize, &manifest_metadata_size); 10     prefs_->GetInt64(kPrefsManifestSignatureSize, &manifest_signature_size); 11     http_fetcher_->AddRange(base_offset_, 12                             manifest_metadata_size + manifest_signature_size); 13     // If there're remaining unprocessed data blobs, fetch them. Be careful not
14     // to request data beyond the end of the payload to avoid 416 HTTP response
15     // error codes.
16     int64_t next_data_offset = 0; 17     prefs_->GetInt64(kPrefsUpdateStateNextDataOffset, &next_data_offset); 18     uint64_t resume_offset =
19         manifest_metadata_size + manifest_signature_size + next_data_offset; 20     if (!payload_->size) { 21       http_fetcher_->AddRange(base_offset_ + resume_offset); 22     } else if (resume_offset < payload_->size) { 23       http_fetcher_->AddRange(base_offset_ + resume_offset, 24                               payload_->size - resume_offset); 25 } 26   } else { 27     if (payload_->size) {                                                 //如果payload->size不為0
28       http_fetcher_->AddRange(base_offset_, payload_->size);              //設置下載數據的offset和length。
29     } else { 30       // If no payload size is passed we assume we read until the end of the
31       // stream.
32       http_fetcher_->AddRange(base_offset_);                        //設置下載數據的offset
33 } 34 } 35 
36   if (writer_ && writer_ != delta_performer_.get()) {              //對writer_進行初始化,writer_比較重要后面會進行詳細的介紹
37     LOG(INFO) << "Using writer for test."; 38   } else { 39     delta_performer_.reset(new DeltaPerformer( 40         prefs_, boot_control_, hardware_, delegate_, &install_plan_, payload_)); 41     writer_ = delta_performer_.get(); 42 } 43   if (system_state_ != nullptr) {                                             //在這里system_state為nullptr后面就不在進行分析
44     const PayloadStateInterface* payload_state = system_state_->payload_state(); 45     string file_id = utils::CalculateP2PFileId(payload_->hash, payload_->size); 46     if (payload_state->GetUsingP2PForSharing()) { 47       // If we're sharing the update, store the file_id to convey
48       // that we should write to the file.
49       p2p_file_id_ = file_id; 50       LOG(INFO) << "p2p file id: " << p2p_file_id_; 51     } else { 52       // Even if we're not sharing the update, it could be that
53       // there's a partial file from a previous attempt with the same
54       // hash. If this is the case, we NEED to clean it up otherwise
55       // we're essentially timing out other peers downloading from us
56       // (since we're never going to complete the file).
57       FilePath path = system_state_->p2p_manager()->FileGetPath(file_id); 58       if (!path.empty()) { 59         if (unlink(path.value().c_str()) != 0) { 60           PLOG(ERROR) << "Error deleting p2p file " << path.value(); 61         } else { 62           LOG(INFO) << "Deleting partial p2p file " << path.value() 63                     << " since we're not using p2p to share."; 64 } 65 } 66 } 67 
68     // Tweak timeouts on the HTTP fetcher if we're downloading from a
69     // local peer.
70     if (payload_state->GetUsingP2PForDownloading() &&
71         payload_state->GetP2PUrl() == install_plan_.download_url) { 72       LOG(INFO) << "Tweaking HTTP fetcher since we're downloading via p2p"; 73       http_fetcher_->set_low_speed_limit(kDownloadP2PLowSpeedLimitBps, 74 kDownloadP2PLowSpeedTimeSeconds); 75       http_fetcher_->set_max_retry_count(kDownloadP2PMaxRetryCount); 76       http_fetcher_->set_connect_timeout(kDownloadP2PConnectTimeoutSeconds); 77 } 78 } 79 
80   http_fetcher_->BeginTransfer(install_plan_.download_url);   //開始進行下載
81 }

先不分析恢復更新這部分的操作,可以看到里面的操作幾乎都是在獲取中斷更新時保存的數據。所以當我們明白了所保存的數據都有什么意義的時候,也就明白了這部分的操作,我們需要從頭開始分析。之后是system_state_其實是nullptr所以這部分暫時不進行分析。這樣排除完之后,我們需要關注的就是http_fetcher_和writer_,先對這兩個類進行一個簡單的分析。先看MultiRangeHttpFetcher,下面是部分的代碼

src/system/update_engine/common/multi_range_http_fetcher.h

 1 class MultiRangeHttpFetcher : public HttpFetcher, public HttpFetcherDelegate { 2  public: 3   // Takes ownership of the passed in fetcher.
 4   explicit MultiRangeHttpFetcher(HttpFetcher* base_fetcher) 5       : HttpFetcher(base_fetcher->proxy_resolver()), 6 base_fetcher_(base_fetcher), 7         base_fetcher_active_(false), 8         pending_transfer_ended_(false), 9         terminating_(false), 10         current_index_(0), 11         bytes_received_this_range_(0) {} 12   ~MultiRangeHttpFetcher() override {} 13 
14   void ClearRanges() { ranges_.clear(); } 15 
16   void AddRange(off_t offset, size_t size) { 17     CHECK_GT(size, static_cast<size_t>(0)); 18 ranges_.push_back(Range(offset, size)); 19 } 20 
21   void AddRange(off_t offset) { 22 ranges_.push_back(Range(offset)); 23 } 24 
25   
26  private: 27        class Range { 28    public: 29 Range(off_t offset, size_t length) : offset_(offset), length_(length) {} 30     explicit Range(off_t offset) : offset_(offset), length_(0) {} 31 
32     inline off_t offset() const { return offset_; } 33     inline size_t length() const { return length_; } 34 
35     inline bool HasLength() const { return (length_ > 0); } 36 
37     std::string ToString() const; 38 
39    private: 40 off_t offset_; 41 size_t length_; 42 }; 43 };

可以看到MultiRangeHttpFetcher繼承了HttpFetcher,實現了 HttpFetcherDelegate並且還保存了一個base_fetcher_。在它的內部還定義了一個內部類Range,主要就是表示所要下載數據的偏移量和長度。

而DeltaPerformer繼承自FileWriter,同時重寫了Writer方法,這個方法比較重要,可以說是升級的核心方法,在后面會做詳細的介紹。

接着看http_fetcher_->BeginTransfer(install_plan_.download_url)。注意是 MultiRangeHttpFetcher的BeginTransfer方法。

 src/system/update_engine/common/multi_range_http_fetcher.cc

 1 void MultiRangeHttpFetcher::BeginTransfer(const std::string& url) { 2   CHECK(!base_fetcher_active_) << "BeginTransfer but already active."; 3   CHECK(!pending_transfer_ended_) << "BeginTransfer but pending."; 4   CHECK(!terminating_) << "BeginTransfer but terminating."; 5 
 6   if (ranges_.empty()) { 7     // Note that after the callback returns this object may be destroyed.
 8     if (delegate_) 9       delegate_->TransferComplete(this, true);          //DownloadAction的TransferComplete
10     return; 11 } 12   url_ = url; 13   current_index_ = 0; 14   bytes_received_this_range_ = 0; 15   LOG(INFO) << "starting first transfer"; 16   base_fetcher_->set_delegate(this);                 //為FileFetcher設置delegate
17 StartTransfer(); 18 } 19 
20 // State change: Stopped or Downloading -> Downloading
21 void MultiRangeHttpFetcher::StartTransfer() { 22   if (current_index_ >= ranges_.size()) { 23     return; 24 } 25 
26   Range range = ranges_[current_index_]; 27   LOG(INFO) << "starting transfer of range " << range.ToString(); 28 
29   bytes_received_this_range_ = 0; 30   base_fetcher_->SetOffset(range.offset()); 31   if (range.HasLength()) 32     base_fetcher_->SetLength(range.length()); 33   else
34     base_fetcher_->UnsetLength(); 35   if (delegate_) 36     delegate_->SeekToOffset(range.offset()); 37   base_fetcher_active_ = true; 38   base_fetcher_->BeginTransfer(url_); 39 }

在 BeginTransfer這個方法中:

1. 判斷ranges是否為空,如果為空則認為已經下載完成回調DownloadAction的TransferComplete

2. 為base_fetcher_設置delegate

3. 調用StartTransfer()

在StartTransger()中,根據current_index_獲取到Range,再設置base_fetcher_的offset和length,之后調用base_fetcher_->BeginTransfer(url_);開始進行正式的下載。那么FileFetcher的BeginTransfer都做了些什么?

src/system/update_engine/common/file_fether.cc

 1 void FileFetcher::BeginTransfer(const string& url) { 2   CHECK(!transfer_in_progress_); 3 
 4   if (!SupportedUrl(url)) {                            //檢查是否是file:///協議
 5     LOG(ERROR) << "Unsupported file URL: " << url; 6     // No HTTP error code when the URL is not supported.
 7     http_response_code_ = 0; 8 CleanUp(); 9     if (delegate_) 10       delegate_->TransferComplete(this, false);      //delgate_是MultiRangeHttpFetcher
11     return; 12 } 13 
14   string file_path = url.substr(strlen("file://")); 15   stream_ =
16       brillo::FileStream::Open(base::FilePath(file_path),           //打開file_path指向的升級文件
17 brillo::Stream::AccessMode::READ, 18 brillo::FileStream::Disposition::OPEN_EXISTING, 19 nullptr); 20 
21   if (!stream_) { 22     LOG(ERROR) << "Couldn't open " << file_path; 23     http_response_code_ = kHttpResponseNotFound; 24 CleanUp(); 25     if (delegate_) 26       delegate_->TransferComplete(this, false); 27     return; 28 } 29   http_response_code_ = kHttpResponseOk; 30 
31   if (offset_)                               //設置讀取的位置
32     stream_->SetPosition(offset_, nullptr); 33   bytes_copied_ = 0;                        //已經copy的字節,也就是下載了多少字節了
34   transfer_in_progress_ = true; 35 ScheduleRead(); 36 } 37 
38 void FileFetcher::ScheduleRead() { 39   if (transfer_paused_ || ongoing_read_ || !transfer_in_progress_) 40     return; 41 
42   buffer_.resize(kReadBufferSize);          //設置buffer_緩存區的大小
43   size_t bytes_to_read = buffer_.size();     //設置讀取數據的數量
44   if (data_length_ >= 0) { 45     bytes_to_read = std::min(static_cast<uint64_t>(bytes_to_read), 46                              data_length_ - bytes_copied_);   //剩下的數據量,bytes_to_read哪一個小哪一個就是將要讀取的數據量
47 } 48 
49   if (!bytes_to_read) {          //沒有可讀取的數據了,說明已經下載完了
50     OnReadDoneCallback(0); 51     return; 52 } 53 
54   ongoing_read_ = stream_->ReadAsync(            //開始下載數據
55 buffer_.data(), 56 bytes_to_read, 57       base::Bind(&FileFetcher::OnReadDoneCallback, base::Unretained(this)), 58       base::Bind(&FileFetcher::OnReadErrorCallback, base::Unretained(this)), 59 nullptr); 60 
61   if (!ongoing_read_) { 62     LOG(ERROR) << "Unable to schedule an asynchronous read from the stream."; 63 CleanUp(); 64     if (delegate_) 65       delegate_->TransferComplete(this, false); 66 } 67 } 68 
69 void FileFetcher::OnReadDoneCallback(size_t bytes_read) { 70   ongoing_read_ = false; 71   if (bytes_read == 0) {           //判讀數據是否已經被下載完成了
72 CleanUp(); 73     if (delegate_) 74       delegate_->TransferComplete(this, true); 75   } else { 76     bytes_copied_ += bytes_read; 77     if (delegate_) 78       delegate_->ReceivedBytes(this, buffer_.data(), bytes_read);  //調用MultiRangeHttpFetcher的ReceivedBytes
79 ScheduleRead(); 80 } 81 }

這幾個方法比較簡單,主要就是打開文件,下載數據就是把數據保存到buffer_中,通過回調向外傳遞數據直到下載完成。

接下來看MultiRangeHttpFetcher的ReceivedBytes

 1 void MultiRangeHttpFetcher::ReceivedBytes(HttpFetcher* fetcher, 2                                           const void* bytes, 3 size_t length) { 4 CHECK_LT(current_index_, ranges_.size()); 5   CHECK_EQ(fetcher, base_fetcher_.get()); 6   CHECK(!pending_transfer_ended_); 7   size_t next_size = length; 8   Range range = ranges_[current_index_]; 9   if (range.HasLength()) { 10     next_size = std::min(next_size, 11                          range.length() - bytes_received_this_range_);   //獲取接收到數據的長度
12 } 13   LOG_IF(WARNING, next_size <= 0) << "Asked to write length <= 0"; 14   if (delegate_) { 15     delegate_->ReceivedBytes(this, bytes, next_size);                    //delegate_是DownloadAction
16 } 17   bytes_received_this_range_ += length;                           //更新已經接收到數據的長度
18   if (range.HasLength() && bytes_received_this_range_ >= range.length()) {   //如果已經下載完了
19     // Terminates the current fetcher. Waits for its TransferTerminated
20     // callback before starting the next range so that we don't end up
21     // signalling the delegate that the whole multi-transfer is complete
22     // before all fetchers are really done and cleaned up.
23     pending_transfer_ended_ = true; 24     LOG(INFO) << "terminating transfer"; 25     fetcher->TerminateTransfer(); 26 } 27 }

 這個方法其實主要就是向外傳遞,由DownloadAction的ReceivedBytes來進行執行。

 1 void DownloadAction::ReceivedBytes(HttpFetcher* fetcher,  2                                    const void* bytes,  3  size_t length) {  4   // Note that bytes_received_ is the current offset.
 5   if (!p2p_file_id_.empty()) {  6  WriteToP2PFile(bytes, length, bytes_received_);  7  }  8 
 9   bytes_received_ += length; 10   if (delegate_ && download_active_) { 11     delegate_->BytesReceived(length, bytes_received_, bytes_total_);  //delegate_是UpdateAttempterAndroid
12  } 13   if (writer_ && !writer_->Write(bytes, length, &code_)) { 14     if (code_ != ErrorCode::kSuccess) { 15       LOG(ERROR) << "Error " << utils::ErrorCodeToString(code_) << " (" << code_ 16                  << ") in DeltaPerformer's Write method when "
17                  << "processing the received payload -- Terminating processing"; 18  } 19     // Delete p2p file, if applicable.
20     if (!p2p_file_id_.empty()) 21       CloseP2PSharingFd(true); 22     // Don't tell the action processor that the action is complete until we get 23     // the TransferTerminated callback. Otherwise, this and the HTTP fetcher 24     // objects may get destroyed before all callbacks are complete.
25  TerminateProcessing(); 26     return; 27  } 28 
29   // Call p2p_manager_->FileMakeVisible() when we've successfully 30   // verified the manifest!
31   if (!p2p_visible_ && system_state_ && delta_performer_.get() &&
32       delta_performer_->IsManifestValid()) { 33     LOG(INFO) << "Manifest has been validated. Making p2p file visible."; 34     system_state_->p2p_manager()->FileMakeVisible(p2p_file_id_); 35     p2p_visible_ = true; 36  } 37 }

 p2p_file_id_是在system_state不為nullptr時為其進行初始化,所以這里的p2p_file_id_和system_state相關的操作目前不用關心。在這個方法中其實主要就是向外繼續傳遞BytesReceived,傳遞給了UpdateAttempterAndroid。在UpdateAttempterAndroid中的BytesReceived操作也比較簡單主要就是更新了下載數據的進度。在更新完成進度之后就會調用DeltaPerformer的Write方法。該方法比較重要,在update_engine-DownloadAction(二)將會進行單獨的介紹。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM