通过update_engine-整体结构(一),(二),(三)对update_engine整体的运行机制有了一定的认识之后。开始逐个分析重要的Action。先从DownloadAction开始分析。
开始分析DownloadAction
src/update_engine/payload_consumer/download_action.cc
1 DownloadAction::DownloadAction(PrefsInterface* prefs, 2 BootControlInterface* boot_control, 3 HardwareInterface* hardware, 4 SystemState* system_state, 5 HttpFetcher* http_fetcher) 6 : prefs_(prefs), 7 boot_control_(boot_control), 8 hardware_(hardware), 9 system_state_(system_state), 10 http_fetcher_(new MultiRangeHttpFetcher(http_fetcher)), //MultiRangeHttpFetcher也继承了HttpFetcher,实现了HttpFetcherDelegate
11 writer_(nullptr), code_(ErrorCode::kSuccess), delegate_(nullptr), p2p_sharing_fd_(-1), p2p_visible_(true) {}
在构造方法中,system_state为nullptr。
接着看PerformAction()
1 void DownloadAction::PerformAction() { 2 http_fetcher_->set_delegate(this); //http_fetcher_是MultiRangeHttpFetcher
3
4 // Get the InstallPlan and read it
5 CHECK(HasInputObject()); //检查是否有输入管道
6 install_plan_ = GetInputObject(); //获取InstallPlanAction输出的install_plan_
7 install_plan_.Dump(); 8
9 bytes_received_ = 0; 10 bytes_total_ = 0; 11 for (const auto& payload : install_plan_.payloads) 12 bytes_total_ += payload.size; //计算payload的总大小
13
14 if (install_plan_.is_resume) { //检查是否进行恢复更新
15 int64_t payload_index = 0; 16 if (prefs_->GetInt64(kPrefsUpdateStatePayloadIndex, &payload_index) && //获取需要继续更新的payload的索引
17 static_cast<size_t>(payload_index) < install_plan_.payloads.size()) { 18 // Save the index for the resume payload before downloading any previous
19 // payload, otherwise it will be overwritten.
20 resume_payload_index_ = payload_index; 21 for (int i = 0; i < payload_index; i++) 22 install_plan_.payloads[i].already_applied = true; //获取到了索引后说明在该索引之前的都已经应用过了
23 } 24 } 25 // TODO(senj): check that install plan has at least one payload.
26 if (!payload_) //payload_为InstallPlan::Payload
27 payload_ = &install_plan_.payloads[0]; //默认的payload_
28
29 LOG(INFO) << "Marking new slot as unbootable"; //将target_slot标记为unboot状态
30 if (!boot_control_->MarkSlotUnbootable(install_plan_.target_slot)) { 31 LOG(WARNING) << "Unable to mark new slot "
32 << BootControlInterface::SlotName(install_plan_.target_slot) 33 << ". Proceeding with the update anyway."; 34 } 35
36 StartDownloading(); //开始下载
37 }
在PerformAction()中首先是获取InstallPlan,对resume_payload_index_,payload_进行恢复,之后设置target_slot为unboot,最后开始downloading。
1 void DownloadAction::StartDownloading() { 2 download_active_ = true; 3 http_fetcher_->ClearRanges(); 4 if (install_plan_.is_resume && //判断是否需要恢复更新
5 payload_ == &install_plan_.payloads[resume_payload_index_]) { 6 // Resuming an update so fetch the update manifest metadata first.
7 int64_t manifest_metadata_size = 0; 8 int64_t manifest_signature_size = 0; 9 prefs_->GetInt64(kPrefsManifestMetadataSize, &manifest_metadata_size); 10 prefs_->GetInt64(kPrefsManifestSignatureSize, &manifest_signature_size); 11 http_fetcher_->AddRange(base_offset_, 12 manifest_metadata_size + manifest_signature_size); 13 // If there're remaining unprocessed data blobs, fetch them. Be careful not
14 // to request data beyond the end of the payload to avoid 416 HTTP response
15 // error codes.
16 int64_t next_data_offset = 0; 17 prefs_->GetInt64(kPrefsUpdateStateNextDataOffset, &next_data_offset); 18 uint64_t resume_offset =
19 manifest_metadata_size + manifest_signature_size + next_data_offset; 20 if (!payload_->size) { 21 http_fetcher_->AddRange(base_offset_ + resume_offset); 22 } else if (resume_offset < payload_->size) { 23 http_fetcher_->AddRange(base_offset_ + resume_offset, 24 payload_->size - resume_offset); 25 } 26 } else { 27 if (payload_->size) { //如果payload->size不为0
28 http_fetcher_->AddRange(base_offset_, payload_->size); //设置下载数据的offset和length。
29 } else { 30 // If no payload size is passed we assume we read until the end of the
31 // stream.
32 http_fetcher_->AddRange(base_offset_); //设置下载数据的offset
33 } 34 } 35
36 if (writer_ && writer_ != delta_performer_.get()) { //对writer_进行初始化,writer_比较重要后面会进行详细的介绍
37 LOG(INFO) << "Using writer for test."; 38 } else { 39 delta_performer_.reset(new DeltaPerformer( 40 prefs_, boot_control_, hardware_, delegate_, &install_plan_, payload_)); 41 writer_ = delta_performer_.get(); 42 } 43 if (system_state_ != nullptr) { //在这里system_state为nullptr后面就不在进行分析
44 const PayloadStateInterface* payload_state = system_state_->payload_state(); 45 string file_id = utils::CalculateP2PFileId(payload_->hash, payload_->size); 46 if (payload_state->GetUsingP2PForSharing()) { 47 // If we're sharing the update, store the file_id to convey
48 // that we should write to the file.
49 p2p_file_id_ = file_id; 50 LOG(INFO) << "p2p file id: " << p2p_file_id_; 51 } else { 52 // Even if we're not sharing the update, it could be that
53 // there's a partial file from a previous attempt with the same
54 // hash. If this is the case, we NEED to clean it up otherwise
55 // we're essentially timing out other peers downloading from us
56 // (since we're never going to complete the file).
57 FilePath path = system_state_->p2p_manager()->FileGetPath(file_id); 58 if (!path.empty()) { 59 if (unlink(path.value().c_str()) != 0) { 60 PLOG(ERROR) << "Error deleting p2p file " << path.value(); 61 } else { 62 LOG(INFO) << "Deleting partial p2p file " << path.value() 63 << " since we're not using p2p to share."; 64 } 65 } 66 } 67
68 // Tweak timeouts on the HTTP fetcher if we're downloading from a
69 // local peer.
70 if (payload_state->GetUsingP2PForDownloading() &&
71 payload_state->GetP2PUrl() == install_plan_.download_url) { 72 LOG(INFO) << "Tweaking HTTP fetcher since we're downloading via p2p"; 73 http_fetcher_->set_low_speed_limit(kDownloadP2PLowSpeedLimitBps, 74 kDownloadP2PLowSpeedTimeSeconds); 75 http_fetcher_->set_max_retry_count(kDownloadP2PMaxRetryCount); 76 http_fetcher_->set_connect_timeout(kDownloadP2PConnectTimeoutSeconds); 77 } 78 } 79
80 http_fetcher_->BeginTransfer(install_plan_.download_url); //开始进行下载
81 }
先不分析恢复更新这部分的操作,可以看到里面的操作几乎都是在获取中断更新时保存的数据。所以当我们明白了所保存的数据都有什么意义的时候,也就明白了这部分的操作,我们需要从头开始分析。之后是system_state_其实是nullptr所以这部分暂时不进行分析。这样排除完之后,我们需要关注的就是http_fetcher_和writer_,先对这两个类进行一个简单的分析。先看MultiRangeHttpFetcher,下面是部分的代码
src/system/update_engine/common/multi_range_http_fetcher.h
1 class MultiRangeHttpFetcher : public HttpFetcher, public HttpFetcherDelegate { 2 public: 3 // Takes ownership of the passed in fetcher.
4 explicit MultiRangeHttpFetcher(HttpFetcher* base_fetcher) 5 : HttpFetcher(base_fetcher->proxy_resolver()), 6 base_fetcher_(base_fetcher), 7 base_fetcher_active_(false), 8 pending_transfer_ended_(false), 9 terminating_(false), 10 current_index_(0), 11 bytes_received_this_range_(0) {} 12 ~MultiRangeHttpFetcher() override {} 13
14 void ClearRanges() { ranges_.clear(); } 15
16 void AddRange(off_t offset, size_t size) { 17 CHECK_GT(size, static_cast<size_t>(0)); 18 ranges_.push_back(Range(offset, size)); 19 } 20
21 void AddRange(off_t offset) { 22 ranges_.push_back(Range(offset)); 23 } 24
25
26 private: 27 class Range { 28 public: 29 Range(off_t offset, size_t length) : offset_(offset), length_(length) {} 30 explicit Range(off_t offset) : offset_(offset), length_(0) {} 31
32 inline off_t offset() const { return offset_; } 33 inline size_t length() const { return length_; } 34
35 inline bool HasLength() const { return (length_ > 0); } 36
37 std::string ToString() const; 38
39 private: 40 off_t offset_; 41 size_t length_; 42 }; 43 };
可以看到MultiRangeHttpFetcher继承了HttpFetcher,实现了 HttpFetcherDelegate并且还保存了一个base_fetcher_。在它的内部还定义了一个内部类Range,主要就是表示所要下载数据的偏移量和长度。
而DeltaPerformer继承自FileWriter,同时重写了Writer方法,这个方法比较重要,可以说是升级的核心方法,在后面会做详细的介绍。
接着看http_fetcher_->BeginTransfer(install_plan_.download_url)。注意是 MultiRangeHttpFetcher的BeginTransfer方法。
src/system/update_engine/common/multi_range_http_fetcher.cc
1 void MultiRangeHttpFetcher::BeginTransfer(const std::string& url) { 2 CHECK(!base_fetcher_active_) << "BeginTransfer but already active."; 3 CHECK(!pending_transfer_ended_) << "BeginTransfer but pending."; 4 CHECK(!terminating_) << "BeginTransfer but terminating."; 5
6 if (ranges_.empty()) { 7 // Note that after the callback returns this object may be destroyed.
8 if (delegate_) 9 delegate_->TransferComplete(this, true); //DownloadAction的TransferComplete
10 return; 11 } 12 url_ = url; 13 current_index_ = 0; 14 bytes_received_this_range_ = 0; 15 LOG(INFO) << "starting first transfer"; 16 base_fetcher_->set_delegate(this); //为FileFetcher设置delegate
17 StartTransfer(); 18 } 19
20 // State change: Stopped or Downloading -> Downloading
21 void MultiRangeHttpFetcher::StartTransfer() { 22 if (current_index_ >= ranges_.size()) { 23 return; 24 } 25
26 Range range = ranges_[current_index_]; 27 LOG(INFO) << "starting transfer of range " << range.ToString(); 28
29 bytes_received_this_range_ = 0; 30 base_fetcher_->SetOffset(range.offset()); 31 if (range.HasLength()) 32 base_fetcher_->SetLength(range.length()); 33 else
34 base_fetcher_->UnsetLength(); 35 if (delegate_) 36 delegate_->SeekToOffset(range.offset()); 37 base_fetcher_active_ = true; 38 base_fetcher_->BeginTransfer(url_); 39 }
在 BeginTransfer这个方法中:
1. 判断ranges是否为空,如果为空则认为已经下载完成回调DownloadAction的TransferComplete
2. 为base_fetcher_设置delegate
3. 调用StartTransfer()
在StartTransger()中,根据current_index_获取到Range,再设置base_fetcher_的offset和length,之后调用base_fetcher_->BeginTransfer(url_);开始进行正式的下载。那么FileFetcher的BeginTransfer都做了些什么?
src/system/update_engine/common/file_fether.cc
1 void FileFetcher::BeginTransfer(const string& url) { 2 CHECK(!transfer_in_progress_); 3
4 if (!SupportedUrl(url)) { //检查是否是file:///协议
5 LOG(ERROR) << "Unsupported file URL: " << url; 6 // No HTTP error code when the URL is not supported.
7 http_response_code_ = 0; 8 CleanUp(); 9 if (delegate_) 10 delegate_->TransferComplete(this, false); //delgate_是MultiRangeHttpFetcher
11 return; 12 } 13
14 string file_path = url.substr(strlen("file://")); 15 stream_ =
16 brillo::FileStream::Open(base::FilePath(file_path), //打开file_path指向的升级文件
17 brillo::Stream::AccessMode::READ, 18 brillo::FileStream::Disposition::OPEN_EXISTING, 19 nullptr); 20
21 if (!stream_) { 22 LOG(ERROR) << "Couldn't open " << file_path; 23 http_response_code_ = kHttpResponseNotFound; 24 CleanUp(); 25 if (delegate_) 26 delegate_->TransferComplete(this, false); 27 return; 28 } 29 http_response_code_ = kHttpResponseOk; 30
31 if (offset_) //设置读取的位置
32 stream_->SetPosition(offset_, nullptr); 33 bytes_copied_ = 0; //已经copy的字节,也就是下载了多少字节了
34 transfer_in_progress_ = true; 35 ScheduleRead(); 36 } 37
38 void FileFetcher::ScheduleRead() { 39 if (transfer_paused_ || ongoing_read_ || !transfer_in_progress_) 40 return; 41
42 buffer_.resize(kReadBufferSize); //设置buffer_缓存区的大小
43 size_t bytes_to_read = buffer_.size(); //设置读取数据的数量
44 if (data_length_ >= 0) { 45 bytes_to_read = std::min(static_cast<uint64_t>(bytes_to_read), 46 data_length_ - bytes_copied_); //剩下的数据量,bytes_to_read哪一个小哪一个就是将要读取的数据量
47 } 48
49 if (!bytes_to_read) { //没有可读取的数据了,说明已经下载完了
50 OnReadDoneCallback(0); 51 return; 52 } 53
54 ongoing_read_ = stream_->ReadAsync( //开始下载数据
55 buffer_.data(), 56 bytes_to_read, 57 base::Bind(&FileFetcher::OnReadDoneCallback, base::Unretained(this)), 58 base::Bind(&FileFetcher::OnReadErrorCallback, base::Unretained(this)), 59 nullptr); 60
61 if (!ongoing_read_) { 62 LOG(ERROR) << "Unable to schedule an asynchronous read from the stream."; 63 CleanUp(); 64 if (delegate_) 65 delegate_->TransferComplete(this, false); 66 } 67 } 68
69 void FileFetcher::OnReadDoneCallback(size_t bytes_read) { 70 ongoing_read_ = false; 71 if (bytes_read == 0) { //判读数据是否已经被下载完成了
72 CleanUp(); 73 if (delegate_) 74 delegate_->TransferComplete(this, true); 75 } else { 76 bytes_copied_ += bytes_read; 77 if (delegate_) 78 delegate_->ReceivedBytes(this, buffer_.data(), bytes_read); //调用MultiRangeHttpFetcher的ReceivedBytes
79 ScheduleRead(); 80 } 81 }
这几个方法比较简单,主要就是打开文件,下载数据就是把数据保存到buffer_中,通过回调向外传递数据直到下载完成。
接下来看MultiRangeHttpFetcher的ReceivedBytes
1 void MultiRangeHttpFetcher::ReceivedBytes(HttpFetcher* fetcher, 2 const void* bytes, 3 size_t length) { 4 CHECK_LT(current_index_, ranges_.size()); 5 CHECK_EQ(fetcher, base_fetcher_.get()); 6 CHECK(!pending_transfer_ended_); 7 size_t next_size = length; 8 Range range = ranges_[current_index_]; 9 if (range.HasLength()) { 10 next_size = std::min(next_size, 11 range.length() - bytes_received_this_range_); //获取接收到数据的长度
12 } 13 LOG_IF(WARNING, next_size <= 0) << "Asked to write length <= 0"; 14 if (delegate_) { 15 delegate_->ReceivedBytes(this, bytes, next_size); //delegate_是DownloadAction
16 } 17 bytes_received_this_range_ += length; //更新已经接收到数据的长度
18 if (range.HasLength() && bytes_received_this_range_ >= range.length()) { //如果已经下载完了
19 // Terminates the current fetcher. Waits for its TransferTerminated
20 // callback before starting the next range so that we don't end up
21 // signalling the delegate that the whole multi-transfer is complete
22 // before all fetchers are really done and cleaned up.
23 pending_transfer_ended_ = true; 24 LOG(INFO) << "terminating transfer"; 25 fetcher->TerminateTransfer(); 26 } 27 }
这个方法其实主要就是向外传递,由DownloadAction的ReceivedBytes来进行执行。
1 void DownloadAction::ReceivedBytes(HttpFetcher* fetcher, 2 const void* bytes, 3 size_t length) { 4 // Note that bytes_received_ is the current offset.
5 if (!p2p_file_id_.empty()) { 6 WriteToP2PFile(bytes, length, bytes_received_); 7 } 8
9 bytes_received_ += length; 10 if (delegate_ && download_active_) { 11 delegate_->BytesReceived(length, bytes_received_, bytes_total_); //delegate_是UpdateAttempterAndroid
12 } 13 if (writer_ && !writer_->Write(bytes, length, &code_)) { 14 if (code_ != ErrorCode::kSuccess) { 15 LOG(ERROR) << "Error " << utils::ErrorCodeToString(code_) << " (" << code_ 16 << ") in DeltaPerformer's Write method when "
17 << "processing the received payload -- Terminating processing"; 18 } 19 // Delete p2p file, if applicable.
20 if (!p2p_file_id_.empty()) 21 CloseP2PSharingFd(true); 22 // Don't tell the action processor that the action is complete until we get 23 // the TransferTerminated callback. Otherwise, this and the HTTP fetcher 24 // objects may get destroyed before all callbacks are complete.
25 TerminateProcessing(); 26 return; 27 } 28
29 // Call p2p_manager_->FileMakeVisible() when we've successfully 30 // verified the manifest!
31 if (!p2p_visible_ && system_state_ && delta_performer_.get() &&
32 delta_performer_->IsManifestValid()) { 33 LOG(INFO) << "Manifest has been validated. Making p2p file visible."; 34 system_state_->p2p_manager()->FileMakeVisible(p2p_file_id_); 35 p2p_visible_ = true; 36 } 37 }
p2p_file_id_是在system_state不为nullptr时为其进行初始化,所以这里的p2p_file_id_和system_state相关的操作目前不用关心。在这个方法中其实主要就是向外继续传递BytesReceived,传递给了UpdateAttempterAndroid。在UpdateAttempterAndroid中的BytesReceived操作也比较简单主要就是更新了下载数据的进度。在更新完成进度之后就会调用DeltaPerformer的Write方法。该方法比较重要,在update_engine-DownloadAction(二)将会进行单独的介绍。