最近在給某個主播開發斗魚直播間輔助工具,為了程序的高效穩定,也搜索了大量的資料,經過大量什么百度,谷歌搜索。。。
雖然有很多Python的腳本及JS腳本實現了拉取斗魚彈幕信息,但是這些年來的開發職業病告訴我,這滿足不了對系統的控制欲望。。
后來,找啊。。。找啊。。。意外間發現這個文檔。。。。廢話不多說了,說正題吧。
斗魚很人性化的提供了一個基於Socket TCP傳輸協議的標准文檔,通過接口我們可以安全穩定高效的獲取斗魚直播間彈幕信息,實現多種多樣化的輔助功能。
一、協議組成
眾所周知,受TCP最大傳輸單(MTU)限制及連包機制影響,應用層協議需自己設計協議頭,以保證不同消息的隔離性和消息完整性。
斗魚后台協議頭設計如下:
字節 | Byte0 | Byte 1 | Byte 2 | Byte 3 |
長度 | 消息長度 | |||
頭部 | 消息長度 | |||
消息類型 | 加密字段 | 保留字段 | ||
數據部 | 數據部分(結尾必須為 '\0') |
斗魚消息協議格式如上所示,其中字段說明如下:
消息長度:4字節小端整數,表示整條消息(包括自身)長度(字節數)。消息長度出現兩遍,二者相同。
消息類型:2字節小端整數,表示消息類型。取值如下:
689 客戶端發送給彈幕服務器的文本格式數據
690 彈幕服務器發送給客戶端的文本格式數據。
加密字段:暫未使用,默認為0。
保留字段:暫未使用,默認為0。
數據部分:斗魚獨創序列化文本數據,結尾必須為 ‘\0’。詳細序列化、反序列化算法見下節。(所有協議內容均為UTF-8編碼)
二、序列化
為增強兼容性、可讀性斗魚后台通訊協議采用文本形式的明文數據。同時針對平台數據特點,斗魚自創序列化、反序列化算法。即STT序列化。下面詳細
介紹STT序列化和反序列化。STT序列化支持鍵值對類型、數組類型(意外發現有的報文還有JSON類型)。規定如下:
1、鍵key和值value直接采用 '@='分割
2、數組采用 '/' 分割
3、如果key或者value中含有字符 '/',則使用 '@S' 轉義
4、如果key或者value中含有字符 '@',則使用 '@A' 轉義
舉例:
(1)多個鍵值對數據:key1@=value1/key2@=value2/key3@=value3/
(2)數組數組:value1/value2/value3/
不同消息有相同的協議頭、序列化方式。
三、客戶端消息格式(部分)
1、登錄請求消息
該消息用於完成登陸授權,完整的數據部分應包含的字段如下:
type@=loginreq/roomid@=58839/
type:表示登陸請求消息,固定為loginreq
roomid:登陸房間的ID
2、客戶端心跳消息
該消息用於維持與后台間的心跳,完整的數據部分應包含的字段如下:
type@=mrkl/
type:表示心跳消息,固定為mrkl
3、加入房間分組消息
該消息用於完成加入房間分組,完整的數據部分應包含的字段如下:
type@=joingroup/rid@=59872/gid@=-9999/
type:表示為加入房間分組消息,固定為joingroup
rid:所登錄的房間號
gid:分組號,第三方平台建議選擇-9999(即海量彈幕模式)
4、登出消息
type@=logout/
該消息用於完成登出后台服務,完整的數據部分應包含的字段如下:
type:表示為登出消息,固定為logout
三、實現斗魚直播彈幕服務器API
現在網上可以輕松找到《斗魚彈幕服務器第三方接入協議v1.6.2》接口文檔,在文檔中有提到兩個重要的數據:
彈幕服務器地址:openbarrage.douyutv.com
彈幕服務器端口:8601
我們可以通過.NET Framework 提供的TcpClient類庫來方便連接SOCKET彈幕服務器。為了實現服務的穩定性,我這里使用了異步SOCKET客戶端完成連接。
1、彈幕服務器報文頭:
/// <summary> /// 彈幕報文頭 /// </summary> [StructLayout(LayoutKind.Sequential, Pack = 1, CharSet = CharSet.Ansi)] public struct BARRAGE_PACKAGE { /// <summary> /// 長度 /// </summary> public int dwLen; /// <summary> /// 長度 /// </summary> public int dwLen2; /// <summary> /// 發送方向 /// </summary> public Int16 bType; /// <summary> /// 加密字段(保留) /// </summary> public byte encrypt; /// <summary> /// 備注字段(保留) /// </summary> public byte reserved; }
2、異步套接字格式
// <summary> /// 套接字數據 /// </summary> public class SOCKET_PACKAGE { /// <summary> /// Socket套接字主對象 /// </summary> public Socket Socket = null; /// <summary> /// 緩沖區大小 /// </summary> public const int BufferSize = 4; // 說明一下,這里由於有的包並不夠1024緩沖區,經過大量測試,緩沖區設置為4最合適了 /// <summary> /// 套接字緩沖區 /// </summary> public byte[] SocketBuffer = new byte[BufferSize]; /// <summary> /// 套接字流緩存 /// </summary> public NetworkStream Stream = null; }
3、SOCKET幫助類
這個類封裝了直接通過NetworkStream對象並格式化報文向斗魚發送報文(僅僅為了提高開發效率)
#region SOCKET幫助類 /// <summary> /// SOCKET幫助類 /// </summary> public static class SocketHelper { /// <summary> /// 發送斗魚報文 /// </summary> /// <param name="message"></param> /// <param name="ms"></param> /// <returns></returns> public static void LiveMessagePush(string message, NetworkStream ms) { #region 斗魚報文 BARRAGE_PACKAGE package = new BARRAGE_PACKAGE(); package.bType = 689; byte[] buffer = Encoding.UTF8.GetBytes(message); package.dwLen = buffer.Length + 8; package.dwLen2 = package.dwLen; package.encrypt = 0x00; package.reserved = 0x00; #endregion #region 發送數據 byte[] block = new byte[buffer.Length + 12]; Array.Copy(StreamSerializationHelper.StructureToBytes(package), 0, block, 0, 12); Array.Copy(buffer, 0, block, 12, buffer.Length); ms.Write(block, 0, block.Length); ms.Flush(); #endregion } } #endregion
這里可能會有人問到 StreamSerializationHelper這個類庫從哪里來的,這個是自己寫的一個實現對struct結構體序列化的方法。下面也提供一下,如果有更好的可自行更換:)
/// <summary> /// 本基類提供和二進制結構體數據處理的相關函數,這里包含的所有方法都是與標准語言二進制結構體操作 /// 相關函數 /// </summary> /// <remarks> /// 本基類提供和二進制結構體數據處理的相關函數。這里采用靜態方法的形式提供出各種數據對象進行互轉 /// 的方法 /// <list type="bullet"> /// <item>二進制文件到結構體的轉換</item> /// <item>結構體文件轉換為二進制數據</item> /// </list> /// </remarks> public static class StreamSerializationHelper { /// <summary> /// 將托管格式結構體轉換為byte數組格式 /// </summary> /// <param name="graph">源數據</param> /// <returns></returns> public static byte[] StructureToBytes(object graph) { // 獲取數據結構體大小(非托管) int dwStructureSize = Marshal.SizeOf(graph); // 從進程的非托管內存中分配內存 IntPtr iter = Marshal.AllocHGlobal(dwStructureSize); // 將數據從托管對象封裝送往非托管內存塊 Marshal.StructureToPtr(graph, iter, true); // 分配指定大小數組塊 byte[] mBytes = new byte[dwStructureSize]; // 將數據從非托管內存復制到托管數組中 Marshal.Copy(iter, mBytes, 0, dwStructureSize); Marshal.FreeHGlobal(iter); return mBytes; } /// <summary> /// 將非托管數組轉換至托管結構體 /// </summary> /// <typeparam name="T">數據類型</typeparam> /// <param name="graph">非托管數組</param> /// <returns></returns> public static T BytesToStructure<T>(byte[] graph) { // 獲取數據結構體大小(托管) int dwStructureSize = Marshal.SizeOf(typeof(T)); // 從進程的非托管內存中分配內存 IntPtr iter = Marshal.AllocHGlobal(dwStructureSize); // 將數據從托管內存數組復制到非托管內存指針 Marshal.Copy(graph, 0, iter, dwStructureSize); // 將數據從非托管內存塊送到新分配並指定類型的托管對象並返回 T obj = (T)Marshal.PtrToStructure(iter, typeof(T)); Marshal.FreeHGlobal(iter); return obj; } /// <summary> /// 通過序列化復制對象 /// </summary> /// <param name="graph"></param> /// <returns></returns> public static object CloneObject(object graph) { ExceptionHelper.FalseThrow<ArgumentNullException>(graph != null, "graph"); using (MemoryStream memoryStream = new MemoryStream(1024)) { BinaryFormatter formatter = new BinaryFormatter(); formatter.Serialize(memoryStream, graph); memoryStream.Position = 0; return formatter.Deserialize(memoryStream); } } }
4、實現登陸彈幕服務器代碼如下:
#region 私有變量 int dwMrkl = Environment.TickCount; // 記錄執行的時間,因為斗魚規定每45秒要向斗魚發送一次心跳消息(否則踢下線) #endregion #region 連接彈幕 TcpClient tcpClient = new TcpClient(); tcpClient.Connect("openbarrage.douyutv.com",8601); #endregion #region 網絡數據 using (NetworkStream ms = tcpClient.GetStream()) { #region 登陸請求 SocketHelper.LiveMessagePush(string.Format("type@=loginreq/roomid@={0}/\0", 99999), ms); #endregion #region 接收數據 while (environment_semaphore && tcpClient.Connected) { #region 發送心跳 if (!ms.DataAvailable && tcpClient.Connected) { // 不管是否有數據,只要SOCKET連接那么就進行心跳判斷 if (Environment.TickCount - dwMrkl >= 45000) { dwMrkl = Environment.TickCount; // 重新計算心跳消息時間 SocketHelper.LiveMessagePush("type@=mrkl/\0", ms); } Thread.Sleep(5); continue; } #endregion #region 發送心跳 if (Environment.TickCount - dwMrkl >= 45000) { dwMrkl = Environment.TickCount; SocketHelper.LiveMessagePush("type@=mrkl/\0", ms); } #region 數據處理 byte[] buffer = new byte[SOCKET_PACKAGE.BufferSize]; ms.Read(buffer, 0, buffer.Length); int dwLen = BitConverter.ToInt32(buffer, 0); int unReadOfBytes = dwLen; #endregion #region 報文處理 using (MemoryStream s = new MemoryStream()) { #region 粘包處理 // 大家都知道TCP有粘包數據,因為不是優雅的一問一答式,所以要自行處理,這是我想到的最簡單處理粘包的辦法咯 do { buffer = new byte[unReadOfBytes >= 1024 ? 1024 : unReadOfBytes]; int dwBytesOfRead = ms.Read(buffer, 0, buffer.Length); s.Write(buffer, 0, dwBytesOfRead); unReadOfBytes -= dwBytesOfRead; } while (unReadOfBytes > 0); s.Position = 0; #endregion #region 報文處理 if (true) { string content = Encoding.UTF8.GetString(s.ToArray(), 8, dwLen - 8); foreach (string target in Regex.Split(content, "/", RegexOptions.IgnoreCase)) { if (!string.IsNullOrWhiteSpace(target)) { string[] items = Regex.Split(target, "@=", RegexOptions.IgnoreCase); if (string.Compare("type", items[0], true) == 0 && string.Compare("loginres", items[1], true) == 0) {
// 當我們收到loginres消息后再發送加入房間分組消息 SocketHelper.LiveMessagePush(string.Format("type@=joingroup/rid@={0}/gif@=-9999/\0", 99999), ms); } if (string.Compare("type", items[0], true) == 0 && string.Compare("loginres", items[1], true) != 0) { string message_type = items[1].Replace("@S", "/").Replace("@A", "@"); if (!string.IsNullOrWhiteSpace(message_type) && string.Compare("mrkl", message_type, true) != 0) { // 這里拿到的content數據就是不含心跳報文的數據,具體要怎么處理看你自己需求了 // TO DO : } } } } } #endregion } #endregion } #endregion } #endregion
好了,上面就是基本全部代碼了,具體的自行研究吧,有空的話提供大家一些報文的詳情數據。