簡明扼要的說下, 就兩個線程,一個 負責收數據,一個負責發心跳包。
步驟如下,
進程1,調用 發包函數,發送連接請求,然后再發送 獲取彈幕類型請求,就一直循環接收數據。
進程2,循環函數,每隔45秒向服務器發一次心跳包。
因為斗魚自己定義了 包頭,,所以來在發包之前,先發送包數據。12個字節,
消息頭部:消息長度 4字節 +消息類型4字節+加密字段2字節(默認為0)+保留字段2字節(默認為0)
然后就要把要發的內容 加上 “\0”,utf-8 編碼后就能發送了
完整的 消息是:包頭 + 內容 +”\0”;
上Python代碼:
main.py
import socket import time import threading import multiprocessing from barrage_func import * # 導入自定義方法 SERVER_DOMAIN = "openbarrage.douyutv.com" # 彈幕服務器 域名 SERVER_PORT = 8601; # 彈幕服務器 端口 ROOM_ID = 288016; #房間ID global FIX_TAIL #拼接處理后被丟棄的數據,防止彈幕丟失 FIX_TAIL = "" global gl_client #全局socket gl_client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) def init_socket(): global gl_client host_ip = socket.gethostbyname(SERVER_DOMAIN) gl_client.connect((host_ip, SERVER_PORT)) def sendDate(client,data): data = data + '\0' #斗魚獨創序列化文本數據,結尾必須為'\0' data_length = length = len(data)+8 #斗魚協議在尾部加了 消長度4字節,消息類型2字節(689),加密字段1字節,保留字段1字節, code = 689 # 消息類型 # 消息頭部:消息長度+消息類型+加密字段(默認為0)+保留字段(默認為0) head = data_length.to_bytes(4, 'little') + data_length.to_bytes(4, 'little') + code.to_bytes(2,'little')+ (0).to_bytes(2,'little') # head = int.to_bytes(data_length, 4, 'little') + int.to_bytes(data_length, 4, 'little') + int.to_bytes(code, 4,'little') client.sendall(head) # 發送頭部部分 msg = (data).encode('utf-8') # 使用utf-8編碼 數據部分 client.sendall(bytes(msg)) # 發送數據部分 def getdanmu(client): login = 'type@=loginreq/roomid@=%s/' % ROOM_ID sendDate(client,login) joingroup = 'type@=joingroup/rid@=%s/gid@=-9999/' % ROOM_ID sendDate(client,joingroup) while True: try: part_body = client.recv(1024,socket.MSG_WAITALL) if not part_body: #如果 服務器發送終止連接b'',則終止會話 break msg_str = part_body.decode(encoding="utf-8", errors="ignore") get_type(msg_str) except Exception as e: print("getdanmu未知錯誤: %s" % e) continue def get_type(msg_str): global FIX_TAIL msg_str = FIX_TAIL + msg_str msg_arr = msg_str.split("type@=") FIX_TAIL = msg_arr.pop() for value in msg_arr: type_temp = value.split("/") if len(type_temp) >= 2: type_name = type_temp[0] if type_name == "chatmsg": chatmsg =BRRAGE_FUC.get_chatmsg(value) #獲取彈幕類 print("["+chatmsg.nn+"]: "+chatmsg.txt) # pass elif type_name == "dgb": dgb = BRRAGE_FUC.get_Dbg(value) #獲取禮物類 print("感謝[{}] ,贈送的 {} 個 '{}'".format(dgb.nn,int(dgb.gfcnt) * int(dgb.hits),dgb.gfid)) # pass elif type_name == "uenter": uenter=BRRAGE_FUC.get_uenter(value) #獲取進入房間類 print("歡迎 ["+ uenter.nn+"] " + "進入直播間") # pass elif type_name == "spbc": spbc = BRRAGE_FUC.get_spbc(value) # 獲取房間廣播類 print("{} 房間,[{}]贈送給[{}] {} 個 '{}'".format(spbc.drid,spbc.sn,spbc.dn,spbc.gc, spbc.gn)) def keep_alive(client): ''' 客戶端每隔 45 秒發送心跳信息給彈幕服務器 ''' while True: alive_msg = "type@=mrkl/" #新版本 # alive_msg = "type@=keeplive/tick@=%s/" % int(time.time()) #舊版本 sendDate(client,alive_msg) time.sleep(20) if __name__ == '__main__': init_socket() p1 = multiprocessing.Process(target=getdanmu, args=(gl_client,)) p2 = multiprocessing.Process(target=keep_alive, args=(gl_client,)) p1.start() p2.start()
這里引用了 2個文件,
1個是定義了4個類發言彈幕Brrage_Msg(),贈送禮物Brrage_Dgb(),用戶進入房間Brrage_Enter(),廣播消息Brrage_Spbc ()
1 個是 寫了靜態方法BRRAGE_FUC 對上面的類進行 賦值
barrage_func.py
import time from barrage_info import * class BRRAGE_FUC(object): ''' 常被調用的靜態方法 ''' #提取發言彈幕 @staticmethod def get_chatmsg(msg): brrage_msg =Brrage_Msg() #獲取當時時間 eg: '2019-02-16 18:50:02' brrage_msg.time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) key_value_list = msg.split("/") for key_value_temp in key_value_list: key_value=key_value_temp.split("@=",1) if len(key_value)==2: if key_value[0]=="rid": brrage_msg.rid =str(key_value[1]) elif key_value[0]=="uid": brrage_msg.uid =str(key_value[1]) elif key_value[0]=="nn": brrage_msg.nn =str(key_value[1]) elif key_value[0]=="txt": brrage_msg.txt=str(key_value[1]) elif key_value[0]=="cid": brrage_msg.cid =str(key_value[1]) elif key_value[0]=="nl": brrage_msg.nl =str(key_value[1]) elif key_value[0]=="level": brrage_msg.level =str(key_value[1]) elif key_value[0]=="bnn": brrage_msg.bnn =str(key_value[1]) elif key_value[0]=="bl": brrage_msg.bl =str(key_value[1]) elif key_value[0]=="brid": brrage_msg.brid =str(key_value[1]) return brrage_msg #提取送禮物彈幕 @staticmethod def get_Dbg(msg): brrage_dgb = Brrage_Dgb() # 獲取當時時間 eg: '2019-02-16 18:50:02' brrage_dgb.time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) key_value_list = msg.split("/") for key_value_temp in key_value_list: key_value = key_value_temp.split("@=",1) if len(key_value) == 2: if key_value[0] == "rid": brrage_dgb.rid = key_value[1] elif key_value[0] == "uid": brrage_dgb.uid = key_value[1] elif key_value[0] == "nn": brrage_dgb.nn = key_value[1] elif key_value[0] == "sn": brrage_dgb.sn = key_value[1] elif key_value[0] == "gfid": brrage_dgb.gfid = key_value[1] elif key_value[0] == "gfcnt": brrage_dgb.gfcnt = key_value[1] elif key_value[0] == "hits": brrage_dgb.hits = key_value[1] return brrage_dgb #提取用戶進房通知彈幕 @staticmethod def get_uenter(msg): brrage_enter = Brrage_Enter() key_value_list = msg.split("/") for key_value_temp in key_value_list: key_value = key_value_temp.split("@=",1) if len(key_value) == 2: if key_value[0] == "rid": brrage_enter.rid = key_value[1] if key_value[0] == "uid": brrage_enter.uid = key_value[1] if key_value[0] == "nn": brrage_enter.nn = key_value[1] if key_value[0] == "nl": brrage_enter.nl = key_value[1] return brrage_enter #飛機、火箭 廣播消息 @staticmethod def get_spbc(msg): brrage_spbc = Brrage_Spbc() key_value_list = msg.split("/") for key_value_temp in key_value_list: key_value = key_value_temp.split("@=",1) if len(key_value) == 2: if key_value[0] == "rid": brrage_spbc.id = key_value[1] if key_value[0] == "drid": brrage_spbc.drid = key_value[1] if key_value[0] == "uid": brrage_spbc.uid = key_value[1] if key_value[0] == "sn": brrage_spbc.sn = key_value[1] if key_value[0] == "dn": brrage_spbc.dn = key_value[1] if key_value[0] == "gn": brrage_spbc.gn = key_value[1] if key_value[0] == "gc": brrage_spbc.gc = key_value[1] if key_value[0] == "gb": brrage_spbc.gb = key_value[1] if key_value[0] == "gfid": brrage_spbc.gfid = key_value[1] return brrage_spbc
barrage_info.py
class Brrage_Base(object): rid = "0" # 房間號 uid = "0" # 用戶id nn = "nn" # 用戶昵稱 time = "0000-00-00 00:00:00" # 時間 class Brrage_Msg(Brrage_Base): """表示為“彈幕”消息,type固定為 chatmsg""" def __init__(self): self.txt="txt" #彈幕文本內容 self.cid="" #彈幕唯一 ID self.nl=0 #貴族等級 self.level =0 #用戶等級 self.bnn = "" # 徽章昵稱 self.bl = 0 # 徽章等級 self.brid=0 #徽章房間 id class Brrage_Dgb(Brrage_Base): '''表示為“贈送禮物”消息,type固定為 dgb ''' def __init__(self): self.gfid=0 #禮物 id self.gfcnt =1 #禮物個數:默認值 1 self.hits=1 #禮物連擊次數:默認值 1(表示 1 連擊) class Brrage_Enter(Brrage_Base): ''' 表示為“用戶進房通知”消息,type固定為 uenter ''' def __init__(self): self.nl = 0 # 貴族等級 class Brrage_Spbc(Brrage_Base): ''' 房間內禮物廣播,type固定為 spbc''' def __init__(self): self.drid = 0 #贈送房間 rid ,默認為0 self.sn = "" # 贈送者昵稱 self.dn = "" # 受贈者昵稱 self.gn = "" # 禮物名稱 self.gc = 1 # 禮物數量 # self.gs = "" # 廣播樣式 self.gb = 1 # 是否有禮包(0-無禮包,1-有禮包) # self.es = 1 # 廣播展現樣式(1-火箭,2-飛機) self.gfid = 1 #禮物 id
運行 main.py 看效果,,比 官方彈幕 還 全,,官方應該 對 tcp 粘包沒處理好。
下面的是C#的代碼,原理都一樣
using System; using System.Net; using System.Net.Sockets; using System.Text; using System.Text.RegularExpressions; using System.Threading; namespace danmu { class Program { private static string SERVER_DOMAIN = "openbarrage.douyutv.com"; private static int SERVER_PORT = 8601; private static int ROOM_ID = 288016; private static string FIX_TAIL = String.Empty; //拼接處理后被丟棄的數據,防止彈幕丟失 class BrrageMsg { public string Name = String.Empty; public string Txt = String.Empty; } static void Main(string[] args) { try { Socket tcpClient = InitTcp(SERVER_DOMAIN, SERVER_PORT); Thread getDanmuThread = new Thread(GetDanmu); getDanmuThread.Start(tcpClient); Thread keepAliveThread = new Thread(KeepAlive); keepAliveThread.Start(tcpClient); } catch (Exception ex) { Console.WriteLine(ex.ToString()); } } static Socket InitTcp(string host, int port) { IPHostEntry hostInfo = Dns.GetHostEntry(host); IPAddress ipAddress = hostInfo.AddressList[0]; //域名轉IP IPEndPoint ipe = new IPEndPoint(ipAddress, port); Socket s = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); s.Connect(ipe); return s; } static byte[] DataToBytes(string data) { string dantaNew = data + "\0"; byte[] bodyDataByte = Encoding.UTF8.GetBytes(dantaNew); byte[] cType = BitConverter.GetBytes(689); int dataLength = dantaNew.Length + cType.Length + 8; byte[] dataLengthByte = BitConverter.GetBytes(dataLength); byte[] dataLengthByte2 = BitConverter.GetBytes(dataLength); byte[] result = new byte[dataLength + 4]; Array.Copy(dataLengthByte, 0, result, 0, 4); Array.Copy(dataLengthByte2, 0, result, 4, 4); Array.Copy(cType, 0, result, 8, 4); Array.Copy(bodyDataByte, 0, result, 12, bodyDataByte.Length); byte[] source = new byte[result.Length]; Array.Copy(result, 0, source, 0, result.Length); return result; } static void GetDanmu(object obj) { Socket tcpClient = (Socket)obj; string login = "type@=loginreq/roomid@=" + ROOM_ID + "/"; byte[] loginBytes = DataToBytes(login); tcpClient.Send(loginBytes); string joingroup = "type@=joingroup/rid@=" + ROOM_ID + "/gid@=-9999/"; byte[] joingroupBytes = DataToBytes(joingroup); tcpClient.Send(joingroupBytes); string recvStr = ""; byte[] recvBytes = new byte[1024]; int bytes; while (true) { bytes = tcpClient.Receive(recvBytes, recvBytes.Length, 0);//從服務器端接受返回信息 recvStr = Encoding.UTF8.GetString(recvBytes, 0, bytes); ShowMsg(recvStr); } } static BrrageMsg GetMsgType(string[] msgType) { BrrageMsg brrageMsg = new BrrageMsg(); foreach (string keyValueTemp in msgType) { string[] keyValue = Regex.Split(keyValueTemp, "@=", RegexOptions.IgnoreCase); if (keyValue.Length >= 2) { string key = keyValue[0]; string[] textArr = new string[keyValue.Length - 1]; Array.Copy(keyValue, 1, textArr, 0, keyValue.Length - 1); string value = String.Join("@", textArr); if (key =="nn") { brrageMsg.Name = value; } if ((key == "txt")) { brrageMsg.Txt = value; } } } return brrageMsg; } static void ShowMsg(string msg) { msg = FIX_TAIL + msg; string[] chatmsgArray = Regex.Split(msg, "type@=", RegexOptions.IgnoreCase); FIX_TAIL = chatmsgArray[chatmsgArray.Length - 1]; //截取最后的丟棄數據,放在下個包的開頭,防止數據丟失 string[] newChatmsgArrayArr = new string[chatmsgArray.Length - 1]; Array.Copy(chatmsgArray, 0, newChatmsgArrayArr, 0, chatmsgArray.Length - 1); foreach (string t in newChatmsgArrayArr) { string[] msgType = t.Split('/'); if (msgType.Length >= 2) { string type = msgType[0]; if (type == "chatmsg") { BrrageMsg brrageMsg=GetMsgType(msgType); string result = String.Format("[{0}]: {1}", brrageMsg.Name, brrageMsg.Txt); Console.WriteLine(result); } } } } static void KeepAlive(object obj) { Socket tcpClient = (Socket)obj; byte[] aliveMsg = DataToBytes("type@=mrkl/"); while (true) { tcpClient.Send(aliveMsg); Thread.Sleep(40000); } } } }