好久也沒寫過博客了,距離上一寫的博文到現在也過去了四年。這段時間Urumqi yq突然爆發,單位暫時也不讓回。一個人宅着沒事就刷刷抖音看看短視頻,作為一位有故事的男人【狗頭】,抖音推薦的視頻還是挺符合個人口味的,於是就萌生了把這些好看的視頻全部保存的想法。之前喜歡用一些免費的公眾號小程序去下載無水印的視頻,可是沒過多久這些小程序不是失效就是需要變相付費下載,最為一名資深白嫖黨【狗頭】豈是能忍的,然后就各種查資料,一頓操作猛如虎后,也沒有什么簡便方法。還是自己寫程序把,可是自從工作以來最多就用用office三件套,編程都荒廢了,用python寫個hello world都會把單詞拼錯【汗】,沒關系我們可以學,大不了從頭再來,個人自學、在B站上學、面向github編程學。以下省略千余字學習過程。
PS:本人也是編程新手寫作此文純當個人學習記錄,代碼只是滿足個人需求,以下內容若有不嚴謹之處希望各位看官老爺們不吝嗇賜教。
好了不廢話了開始進入正題
- 獲得抖音首頁推薦視頻信息然后下載視頻(無水印)
- 獲取某抖主發布的全部視頻信息然后下載視頻(無水印)
- 獲取自己或他人喜歡視頻的信息然后下載視頻(無水印)
- windows環境下搭建好python編程環境,本人使用python3.8.3版本
- 好用的編輯器如pycharm、vscode等
- 安卓模擬器、或者實體機也行
- 抖音app本人使用抖音極速版(v10.8.0)
- https抓包工具charles(可以不用)、 mitmproxy [詳情]
- 假設你已經在pc端和模擬器上配置好charles和mitmproxy證書,確保已經可以正常
抓取http及https數據包 - 一顆不厭其煩的心
mitmproxy is a free and open source interactive HTTPS proxy.
安裝:pip install mitmproxy
安裝后有3個命令行工具:mitmproxy, mitmdump, mitmweb
本文只使用mitmdump查看流量數據(請求與響應)、執行自定義腳本
在配置好mitmproxy之后,在控制台上輸入mitmdump並在模擬器上打開抖音app,mitmdump會出現手機上的所有請求。經過一番分析實踐,查找資料發現有如下三個api接口,分別請求用戶發布的視頻信息、
請求首頁推薦視頻信息,請求用戶喜歡的視頻的信息。
post_api = 'https://aweme-lq.snssdk.com/aweme/v1/aweme/post/......' feed_api = 'https://aweme-lq.snssdk.com/aweme/v2/feed/......' favo_api = 'https://aweme-lq.snssdk.com/aweme/v1/aweme/favorite/......'
然后這三個請求對應的response里面就是我們需要的內容,通過分析response我們發現post、favo
返回是json格式的數據包。feed返回的是protobuf的數據包。所以我們只需要在自定義的mitmproxy
腳本中根據app所以請求中的關鍵字獲取相對應的response數據包即可。
通過分析獲取到的數據包、然后問題出現了:json格式的數據包python很方便操作,主要是如何解析protobuf的數據包。
然后又經過一番各種Google,結果發現:protobuf 有一套自己的語法。不了解 Protobuf 協議語法和用法的話也無法反解數據。也就是說在沒有那個抖音自定義的( .proto 文件)情況下,基本上是無法逆序列化解析。經過一番資料查找各種踩坑之后,借助工具,使用google提供的的protoc編譯工具,這個工具提供反解析參數。這樣就能獲取protobuf數據包的大致信息。
protoc –decode_raw < douyin_feed.bin > 1.txt
我們可以對比解析前和解析后的數據對比。
解析前

解析后

諸如此類的url地址。我們大致可以確定這很可能就是視頻的分享地址,通過一番實踐論證后果真如此,
那么,我們直接請求這些分享地址,在瀏覽器中打開按F12打開開發者模式,觀察里面的ajax異步請求
的數據包,復制視頻play_addr,再打開后發現播放的視頻仍然是有水印的。然后又經過一番查找資料,
無果。好吧在網上找了個某第三方抖音分享視頻下載網站,簡單分析了它的接口,照着網站js加密參數 。后來偶然間發現了一篇帖子(不好意思
那部分自己也能正常獲取請求結果,返回視頻無水印下載地址
實在是想不起了。。。)大致意思是把里面鏈接playwm改成play,用手機端UA就能獲得無水印地址了,試了一下果真如此。

經過上面的分析之后,我們開始碼代碼把。
然后問題出現了,由於mitmdump只是加載我們的腳本程序,如果把所有的代碼都堆在腳本程序里
會造成腳本運行緩慢阻塞網絡請求,開始想在腳本里寫個多線程去執行耗時操作,然后就被教育了。
好吧那么把我們需要的信息寫入數據庫中,然后再寫個程序讀取數據庫然后再去下載視頻總沒問題吧。
然后經過一番實踐發現,獲取的視頻鏈接都有時效性,過段時間就會失效。那么就沒有什么辦法一遍
瀏覽視頻,一邊下載視頻嗎?答案是肯定的,后來想到用socket套接字在兩個程序中進行通信。
用我們的腳本程序A獲取視頻信息用套接字發送到視頻下載程序B,然后在程序B中開個線程用於接收
套接字信息,再開幾個線程下載視頻,程序B中不同線程之間用Queue()隊列實現生產消費模式。
ok,主體框架搭好了,我們開始吧。talk is cheap show me the code…
"""
===================================================
-*- coding:utf-8 -*-
Author :GadyPu
E_mail :Gadypy@gmail.com
Time :2020/8/ 0004 下午 12:03
FileName :mitmproxy_douyin_get_url_scripts.py
====================================================
"""
import mitmproxy.http
import json
import time
import struct
from socket import *
post_api = 'https://aweme-lq.snssdk.com/aweme/v1/aweme/post/'
feed_api = 'https://aweme-lq.snssdk.com/aweme/v2/feed/'
favo_api = 'https://aweme-lq.snssdk.com/aweme/v1/aweme/favorite/'
def send_data_to_server(header_dict, type):
'''
:param header_dict 獲取到的數據包字典
:param type 原視頻類型,feed,post,favo
與服務端通信發送數據,使用自定義協議
每次調用就創建一個套接字,用完就關閉
'''
tcp_client_socket = None
host = '127.0.0.1'
port = 9527
address = (host, port)
try:
tcp_client_socket = socket(AF_INET, SOCK_STREAM)
tcp_client_socket.connect(address)
if type == 'post' or type == 'favo':
json_data = json.dumps(header_dict)
json_bytes = json_data.encode('utf-8')
tcp_client_socket.send(struct.pack('i', len(json_bytes)))
tcp_client_socket.send(json_bytes)
#print(header_dict)
elif type == 'feed':
#先發送協議頭用struct打包,包含要發送的數據大小
data_len = header_dict['size']
byte_arr = header_dict['content']
new_dict = {
'type': 'feed',
'size': data_len
}
json_data = json.dumps(new_dict)
json_bytes = json_data.encode('utf-8')
tcp_client_socket.send(struct.pack('i', len(json_bytes)))
tcp_client_socket.send(json_bytes)
chunk_size = 1024
start = 0
end = 1 * chunk_size
#print('new_dict...........................:', new_dict)
#發送protubuf數據,每次發送1024個字節
while True:
if data_len // chunk_size > 0:
read_bytes = byte_arr[start : end]
start = end
end += chunk_size
data_len -= chunk_size
tcp_client_socket.send(read_bytes)
#print(read_bytes)
else:
read_bytes = byte_arr[start : ]
tcp_client_socket.send(read_bytes)
break
except:
pass
if tcp_client_socket:
tcp_client_socket.close()
def get_local_time(create_time):
'''
:param create_time 原視頻的發布時間,linux時間戳
:return: 返回年月日格式的日期
'''
time_local = time.localtime(int(create_time))
pub_date = time.strftime("%Y-%m-%d", time_local)
return pub_date
def response(flow: mitmproxy.http.HTTPFlow):
if flow.request.url.startswith(post_api) or flow.request.url.startswith(favo_api):
if flow.response.status_code == 200:
url_json = json.loads(flow.response.text)
if url_json and url_json['aweme_list']:
for aweme_list in url_json['aweme_list']:
aweme_id = aweme_list['aweme_id']
create_time = aweme_list['create_time']
create_time = get_local_time(create_time)
type = 'post' if flow.request.url.startswith(post_api) else 'favo'
header_dict = {
'type': type,
'aweme_id_create_time': aweme_id + '_' + create_time,
'nickname': aweme_list['author']['nickname'],
'play_url': aweme_list['video']['play_addr']['url_list'][0]
}
send_data_to_server(header_dict, type)
elif flow.request.url.startswith(feed_api):
if flow.response.status_code == 200:
procbuf = flow.response.content
feed_dict = {
'type': "feed",
'content': procbuf,
'size': len(procbuf)
}
#print('procbuf len................', len(procbuf))
send_data_to_server(feed_dict, 'feed')
addons = {
response()
}
4.2 輔助程序
"""
===================================================
-*- coding:utf-8 -*-
Author :GadyPu
E_mail :Gadypy@gmail.com
Time :2020/8/ 0004 下午
FileName :parase_data.py
====================================================
"""
import os
import re
import json
import time
import requests
import random
import hashlib
from lxml import etree
import math
from decimal import Decimal
import warnings
warnings.filterwarnings('ignore')
'''
class Get_real_play_addr(object):
def __init__(self):
self.request_url = 'http://3g.gljlw.com/diy/ttxs_dy2.php?'
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/84.0.4147.89 Safari/537.36'
}
def parase_play_addr(self, url):
paly_url = ''
r = str(random.random())[2:]
s = hashlib.md5((url + '@&^' + r).encode()).hexdigest()
params = { 'url': url, 'r': r, 's': s }
try:
response = requests.get(url = self.request_url, headers = self.headers, params = params)
if response.status_code == 200:
content = response.content.decode('utf-8')
html = etree.HTML(content)
paly_url = html.xpath('//source/@src')[0]
if paly_url:
return paly_url
except:
print("network error cannot parase play_addr...")
return None
'''
# 打開protobuf文件,用正則表達式匹配出所有的分享鏈接地址
class Get_url_from_protobuf(object):
def __init__(self):
self.pat = r'(?<=\")https://www.iesdouyin.com/share/video/.*(?=\")'
self.command = r' --decode_raw <'
def get_url(self, exe_path, file_path):
try:
fp = os.popen(exe_path + self.command + file_path)
if fp:
src = fp.read()
fp.close()
url_list = re.findall(self.pat, src)
url_list = set(url_list)
return url_list
except:
print('decode protobuf failed...')
return None
def get_local_time(create_time):
time_local = time.localtime(int(create_time))
pub_date = time.strftime("%Y-%m-%d", time_local)
return pub_date
# 獲取分享視頻的下載地址
def Get_real_play_addr_by_web(aweme_id):
headers = {
'User-Agent': 'Mozilla/5.0 (Linux; Android 6.0; ZTE BA520 Build/MRA58K; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/55.0.2883.77 Mobile Safari/537.36'
}
api_url = 'https://www.iesdouyin.com/web/api/v2/aweme/iteminfo/?item_ids=' + aweme_id
response = requests.get(url = api_url, headers = headers, verify = False)
if response.status_code == 200:
response_json = response.json()
play_addr = response_json['item_list'][0]['video']['play_addr']['url_list'][0]
create_time = response_json['item_list'][0]['create_time']
create_time = get_local_time(create_time)
play_addr = play_addr.replace('playwm', 'play', 1)
# 返回下載地址和視頻的發布時間
return (play_addr, create_time)
return None, None
def Get_file_size(e: int) -> str:
if e <= 0:
return ''
t = ["B", "KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB"]
n = math.floor(math.log2(e) / math.log2(1024))
return str(Decimal(e / math.pow(1024, n)).quantize(Decimal("0.00"))) + t[n]
#'https://www.iesdouyin.com/share/video/6854870744690625805/?region=CN&mid=6854870758414781191'
#print(Get_real_play_addr_by_web("6854870744690625805"))
4.3 下載程序
"""
===================================================
-*- coding:utf-8 -*-
Author :GadyPu
E_mail :Gadypy@gmail.com
Time :2020/8/ 0004 下午
FileName :douyin_video_downloads.py
====================================================
"""
import requests
import json
import os
import time
import sys
import threading
import struct
from queue import Queue
from socket import *
from parase_data import Get_url_from_protobuf
from parase_data import Get_real_play_addr_by_web
from parase_data import Get_file_size
import warnings
warnings.filterwarnings("ignore")
headers = {
'User-Agent': 'Mozilla/5.0 (Linux; Android 6.0; ZTE BA520 Build/MRA58K; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/55.0.2883.77 Mobile Safari/537.36'
}
que = Queue()
lock = threading.Lock()
chunk_size = 1024
#下載線程
def Download(path, index):
while True:
global que
if que.empty():
print("No.{} thread is waiting for data...".format(index))
data = que.get()
dir_name = data['type']
file_name = data['aweme_id_create_time']
dir_path = ''
if dir_name == 'feed':
play_url, create_time = Get_real_play_addr_by_web(data['aweme_id_create_time'])
if (not play_url) or (not create_time):
continue
file_name = file_name + '_' + create_time
dir_path = os.path.join(path, dir_name)
else:
dir_path = os.path.join(path, dir_name, data['nickname'])
play_url = data['play_url']
global lock
with lock:
if not os.path.exists(dir_path):
os.makedirs(dir_path)
file_path = os.path.join(dir_path, file_name + '.mp4')
read_size = 0
with lock:
if os.path.exists(file_path):
continue
try:
response = requests.get(url = play_url, headers = headers, verify = False)
if response.status_code == 200:
#print(response.headers)
total_szie = int(response.headers['Content-Length'])
print("NO.{} thread is downloading... {} filesize:{}".format(index, data['aweme_id_create_time'] + '.mp4', Get_file_size(total_szie)))
t_1 = time.time()
with open(file_path, "wb") as fp:
for data in response.iter_content(chunk_size = chunk_size):
if data:
fp.write(data)
read_size += chunk_size
#print('No.{} threading is downloading: {} ...: {}%'.format(index, file_path, str(round(read_size / total_szie * 100, 2))))
print("No.{} thread finshed! total cost: {}s".format(index, str(round(time.time() - t_1, 2))))
time.sleep(0.2)
else:
print("cannot conneted with the servers...")
except:
print("downloading %s failed... network error please try againg"%play_url)
#que.put(data)
# 服務端用於接收mitm腳本發送的數據
def run(exe_path, file_path):
PORT = 9527
HOST = ''
address = (HOST, PORT)
tcp_server_socket = socket(AF_INET, SOCK_STREAM)
tcp_server_socket.bind(address)
print("the server is lunching, listeing the port {}...".format(address[1]))
tcp_server_socket.listen(5)
while True:
try:
client_socket, client_address = tcp_server_socket.accept()
print('the client{} linked:{}'.format(client_address, time.asctime(time.localtime(time.time()))))
data = client_socket.recv(4)
header_size = struct.unpack('i', data)[0]
header_bytes = client_socket.recv(header_size)
header_json = json.loads(header_bytes.decode('utf-8'))
if header_json['type'] == 'post' or header_json['type'] == 'favo':
que.put(header_json)
else:
chunk_size = 1024
read_size = 0
total_size = header_json['size']
with open(file_path, 'wb') as fp:
while read_size < total_size:
data = client_socket.recv(chunk_size)
if data:
fp.write(data)
read_size += len(data)
probuf = Get_url_from_protobuf()
url_list = probuf.get_url(exe_path, file_path)
for url in url_list:
try:
feed_dict = {
'type': 'feed',
'feed_url': url,
'aweme_id_create_time': url[38: 57] #只是視頻的id,並沒有發布時間
}
que.put(feed_dict)
except:
continue
client_socket.close()
except:
tcp_server_socket.close()
print("never run here...")
if __name__ == "__main__":
dir_path = sys.argv[1]
if dir_path.endswith('/'):
dir_path += '/'
if not os.path.exists(dir_path):
os.makedirs(dir_path)
#dir_path = r'C:\Users\Administrator\Desktop\pytho_src\douyin\videos'
thread_list = []
for i in range(4):
if i == 0:
thread_list.append(threading.Thread(target = run, args = (r'.\protobuf\protoc.exe', r'.\probuf.bin', )))
else:
thread_list.append(threading.Thread(target = Download, args = (dir_path , i + 1, )))
thread_list[i].setDaemon = True
[i.start() for i in thread_list]
[i.join() for i in thread_list]
# #run(r'.\protobuf\protoc.exe', r'.\probuf.bin')
# print("finish!")
五、使用方法
5.2 下載程序用法

六、運行效果



七、參考鏈接
1、https://blog.csdn.net/doctor_who2004/article/details/105718889
2、https://www.jianshu.com/p/af381ef134e2
3、https://blog.csdn.net/mp624183768/article/details/80956368
八、下一步計划
差不多就這么多,之前還打算用Appiun實現自動滑動,可又要下一大堆軟件,自己的小電腦跑個模擬器已經不堪重負想想還是算了吧。受條件所限,自己的手機上沒有進行測試(不然躺床上刷視頻pc端程序掛后台運行,手機端瀏覽的視頻基本上都能下載),只是在模擬器上運行,然后就沒然后了。
大家有啥疑問的歡迎在評論區留言。

