GoLang 海量用戶聊天系統(TCP-Socket網絡編程+Redis數據庫+協程)


 GO語言綜合項目

  包含:

    1:GO語言基礎知識

    2:TCP-Socket網絡編程

    3:Redis數據庫

  已實現:

    登錄

      查看在線用戶

      群聊

         私聊(未實現)

      歷史消息(未實現)

    注冊

    退出

 

 

 

整體結構

客戶端client

     

 

    

 

 

 1 package main
 2 
 3 import (
 4     "ChatRoom/client/process"
 5     "fmt"
 6     "os"
 7 )
 8 
 9 var (
10     userID   int
11     userPsw  string
12     userName string
13 )
14 
15 func main() {
16     var key int
17     for {
18         fmt.Println("------/ 多人互動聊天系統 /------")
19         fmt.Printf("1\t\t登錄系統\n2\t\t注冊賬戶\n3\t\t退出系統\n請輸入(1-3)數字進行操作\n")
20         fmt.Scanln(&key)
21         switch key {
22         case 1:
23             fmt.Println("登錄聊天室")
24             fmt.Println("請輸入ID號")
25             fmt.Scanln(&userID)
26             fmt.Println("請輸入密碼")
27             fmt.Scanln(&userPsw)
28 
29             //驗證成功后
30             /*mvs后,使用process下的UserProcess結構體實例
31             調用綁定的Login登錄方法
32             */
33             up := &process.UserProcess{}
34             up.Login(userID, userPsw)
35 
36         case 2:
37             //注冊界面
38             fmt.Println("請輸入 ID號")
39             fmt.Scanln(&userID)
40             fmt.Println("請輸入 密碼")
41             fmt.Scanln(&userPsw)
42             fmt.Println("請輸入 聊天昵稱")
43             fmt.Scanln(&userName)
44             //調用UserProcess完成注冊
45             up := &process.UserProcess{}
46             up.Register(userID, userPsw, userName)
47         case 3:
48             os.Exit(1)
49         default:
50             fmt.Println("輸入有誤,請重新輸入(1-3)")
51         }
52     }
53 }
main.go

 1 package model
 2 
 3 import (
 4     "ChatRoom/conmon/message"
 5     "net"
 6 )
 7 
 8 /*
 9 CurrentUser 當前用戶
10 
11 Conn    連接
12 
13 message.User    用戶信息
14 */
15 type CurrentUser struct {
16     Conn net.Conn
17     message.User
18 }
currentUser.go

 1 package process
 2 
 3 import (
 4     "ChatRoom/client/utils"
 5     "ChatRoom/conmon/message"
 6     "encoding/json"
 7     "fmt"
 8     "net"
 9     "os"
10 )
11 
12 //ShouMenu 顯示登錄后的界面菜單
13 func ShouMenu() {
14     for {
15 
16         var inputKey int
17         var content string
18 
19         /*因為會經常用到SmsProcess,所以我們定義在Switch外部,減少創建實例化的次數*/
20         SmsProcess := &SmsProcess{}
21         fmt.Println(
22             `
23         1      顯示在線用戶
24         2      聊天
25         3      查看消息
26         4      退出系統
27 
28         請輸入(1-4)數字進行操作!
29 
30     `)
31         fmt.Scanln(&inputKey)
32         switch inputKey {
33         case 1:
34 
35             /*調用顯示在線用戶的方法*/
36             ShowOnLineUser()
37 
38         case 2:
39 
40             fmt.Println("請輸入發送內容...")
41             fmt.Scanln(&content)
42             /*調用SmsProcess的發送消息方法*/
43             SmsProcess.EnterMsg(content)
44 
45         case 3:
46             fmt.Println("查看歷史消息")
47         case 4:
48             fmt.Println("選擇了退出系統...")
49             os.Exit(1)
50         default:
51             fmt.Println("輸入有誤,請輸入(1-4)")
52         }
53     }
54 }
55 
56 //KeepConnection 保持客戶端 與 服務端 之間的通訊,顯示服務端推送的信息給客戶端顯示
57 func KeepConnection(conn net.Conn) {
58     //創建Tranfer實例,循環讀取服務器發送的消息
59     tf := &utils.Transfer{
60         Conn: conn,
61     }
62     for {
63         fmt.Println("")
64         msg, err := tf.ReadPkg()
65         if err != nil {
66             fmt.Println("客戶端讀取服務器信息錯誤!\t", err)
67         }
68         /*讀取到消息,做進一步處理...*/
69         switch msg.Type {
70         case message.PushUserStatusMsgType:
71             //有人上線了
72 
73             /*先實例化PushUserStatusMsg*/
74             var pushUserStatusMsg message.PushUserStatusMsg
75 
76             /* 將 msg 反序列化*/
77             json.Unmarshal([]byte(msg.Data), &pushUserStatusMsg)
78 
79             /* 將用戶信息保存到 客戶端維護的 map集合中 */
80             UpdateUserStatus(&pushUserStatusMsg)
81 
82         case message.SmsMsgType:
83             //有群發消息
84 
85             PinrtGroupMessage(&msg)
86         default:
87             fmt.Printf("返回的消息類型,暫時無法處理..\n")
88         }
89 
90     }
91 }
server.go
 1 package process
 2 
 3 import (
 4     "ChatRoom/conmon/message"
 5     "encoding/json"
 6     "fmt"
 7 )
 8 
 9 //PinrtGroupMessage 輸出群發消息
10 func PinrtGroupMessage(msg *message.Message) {
11     /*反序列化msg.data*/
12     var smsmsg message.SmsMsg
13     err := json.Unmarshal([]byte(msg.Data), &smsmsg)
14     if err != nil {
15         fmt.Println("反序列化 msg.data 失敗...")
16         return
17     }
18 
19     //顯示信息
20     talking := fmt.Sprintf("ID:%v\t%v", smsmsg.UserID, smsmsg.Content)
21     fmt.Println(talking)
22     fmt.Println()
23 }
smsManger.go
 1 package process
 2 
 3 import (
 4     "ChatRoom/client/utils"
 5     "ChatRoom/conmon/message"
 6     "encoding/json"
 7     "fmt"
 8 )
 9 
10 //SmsProcess 發送消息結構體
11 type SmsProcess struct {
12 }
13 
14 //EnterMsg 發送消息
15 func (sp *SmsProcess) EnterMsg(content string) (err error) {
16     /* 1 創建一個父類信息結構體實例化對象 */
17     var msg message.Message
18     msg.Type = message.SmsMsgType
19 
20     /* 2  創建一個 子類信息 結構體實例化對象 */
21     var smsMsg message.SmsMsg
22     smsMsg.Content = content
23     smsMsg.UserID = cu.UserID
24     smsMsg.UserStatus = cu.UserStatus
25 
26     /*3    將 smsMsg 序列化 */
27     data, err := json.Marshal(smsMsg)
28     if err != nil {
29         fmt.Println("smsMsg 序列化失敗!")
30         return
31     }
32 
33     /*4    將data 復制給 msg.Data */
34     msg.Data = string(data)
35 
36     /*5    將 msg 序列化  */
37     data, err = json.Marshal(msg)
38     if err != nil {
39         fmt.Println("msg 序列化 失敗!")
40         return
41     }
42 
43     /*6    發送消息*/
44     tf := &utils.Transfer{
45         Conn: cu.Conn,
46     }
47 
48     err = tf.WritePkg(data)
49     if err != nil {
50         fmt.Println("發送信息失敗!")
51         return
52     }
53     return
54 }
smsprocess.go
 1 package process
 2 
 3 import (
 4     "ChatRoom/client/model"
 5     "ChatRoom/conmon/message"
 6     "fmt"
 7 )
 8 
 9 //OnLineUsers 客戶端維護的map
10 var OnLineUsers map[int]*message.User = make(map[int]*message.User, 10)
11 
12 //CurrentUser 當前用戶結構體
13 var cu model.CurrentUser
14 
15 //ShowOnLineUser 在客戶端顯示所有的在線用戶
16 func ShowOnLineUser() {
17     fmt.Println("↓當前在線用戶:↓")
18     for id := range OnLineUsers {
19         fmt.Printf("id:\t%v\n", id)
20     }
21 }
22 
23 //UpdateUserStatus 處理服務器返回的 PushUserStatusMsg信息
24 func UpdateUserStatus(pushUserStatusMsg *message.PushUserStatusMsg) {
25 
26     user, ok := OnLineUsers[pushUserStatusMsg.UserID]
27     if !ok {
28         user = &message.User{
29             UserID: pushUserStatusMsg.UserID,
30         }
31     }
32 
33     user.UserStatus = pushUserStatusMsg.Status
34     OnLineUsers[pushUserStatusMsg.UserID] = user
35 
36     /* 調用顯示在線用戶方法 */
37     ShowOnLineUser()
38 }
userManger.go
  1 package process
  2 
  3 import (
  4     "ChatRoom/client/utils"
  5     "ChatRoom/conmon/message"
  6     "encoding/binary"
  7     "encoding/json"
  8     "fmt"
  9     "net"
 10     "os"
 11 )
 12 
 13 //UserProcess 用戶處理器 結構體
 14 type UserProcess struct {
 15 }
 16 
 17 //Register 注冊聊天賬戶
 18 func (up *UserProcess) Register(userID int, userPsw string, userName string) (err error) {
 19     //1 連接服務器
 20     conn, err := net.Dial("tcp", "127.0.0.1:9000")
 21     if err != nil {
 22         fmt.Println("連接服務器失敗\t", err)
 23         return
 24     }
 25 
 26     //延遲關閉數據庫通道
 27     defer conn.Close()
 28 
 29     //2    連接服務端成功,准備發送數據
 30     var msg message.Message
 31     msg.Type = message.RegisterMsgType
 32 
 33     //3    創建 RegisterMsg 結構體
 34     var registerMsg message.RegisterMsg
 35     registerMsg.User.UserID = userID
 36     registerMsg.User.UserPsw = userPsw
 37     registerMsg.User.UserName = userName
 38 
 39     //4    將registerMsg 結構體序列化為json
 40     data, err := json.Marshal(registerMsg)
 41     if err != nil {
 42         fmt.Println("序列化失敗! \t", err)
 43         return
 44     }
 45 
 46     //5    將data數據 賦值給 msg.Data
 47     msg.Data = string(data)
 48 
 49     //6    將msg 序列化 為json
 50     data, err = json.Marshal(msg)
 51     if err != nil {
 52         fmt.Println("序列化失敗! \t", err)
 53         return
 54     }
 55 
 56     //7    創建一個Transfer實例
 57 
 58     tf := &utils.Transfer{
 59         Conn: conn,
 60     }
 61     //7.1    發送dada給服務器
 62     err = tf.WritePkg(data)
 63     if err != nil {
 64         fmt.Println("客戶端:注冊時發送數據錯誤! \t", err)
 65         os.Exit(0)
 66 
 67     }
 68     //7.2    讀取消息
 69     msg, err = tf.ReadPkg()
 70     if err != nil {
 71         fmt.Println("ReadPkg(conn) 錯誤", err)
 72         os.Exit(0)
 73     }
 74 
 75     //7.3    反序列化
 76 
 77     //將m反序列化為 RegisterResMsg
 78     var registerResMsg message.RegisterResMsg
 79     err = json.Unmarshal([]byte(msg.Data), &registerResMsg)
 80 
 81     //判斷返回值狀態碼
 82     if registerResMsg.Code == 200 {
 83         fmt.Println("注冊成功,請登錄!")
 84 
 85         /*登陸成功后...
 86         1    為客戶端啟動一個協程,該協程能確保客戶端與服務器之間的通訊,
 87             若服務器有數據推送給客戶端,則接受並顯示在客戶端終端上
 88         2    循環顯示登陸成功后的菜單
 89         */
 90         go KeepConnection(conn)
 91 
 92         ShouMenu()
 93 
 94     } else {
 95         fmt.Println("注冊失敗,錯誤:")
 96         fmt.Println(registerResMsg.Error)
 97     }
 98     return
 99 
100 }
101 
102 //Login 登錄校驗
103 func (up *UserProcess) Login(userID int, userPsw string) (err error) {
104 
105     //1 連接服務器
106     conn, err := net.Dial("tcp", "127.0.0.1:9000")
107     if err != nil {
108         fmt.Println("連接服務器失敗\t", err)
109         return
110     }
111 
112     //延遲關閉數據庫通道
113     defer conn.Close()
114 
115     //2    連接服務端成功,准備發送數據
116     var msg message.Message
117     msg.Type = message.LoginMsgType
118 
119     //3    創建 一個LoginMsg 結構體
120     var loginMsg message.LoginMsg
121     loginMsg.UserID = userID
122     loginMsg.UserPsw = userPsw
123 
124     //4    將loginMsg 結構體序列化為json
125     data, err := json.Marshal(loginMsg)
126     if err != nil {
127         fmt.Println("序列化失敗! \t", err)
128         return
129     }
130 
131     //5    將data數據 賦值給 msg.Data
132     msg.Data = string(data)
133 
134     //6    將msg 序列化 為json
135     data2, err := json.Marshal(msg)
136     if err != nil {
137         fmt.Println("序列化失敗! \t", err)
138         return
139     }
140 
141     //7 此時 data 就是我們客戶端 → 服務端 發送的消息封裝結構體
142     var msgLen uint32 = uint32(len(data2))
143     var buf [4]byte
144     //將 信息長度msgLen 轉換為 byte[]切片
145     binary.BigEndian.PutUint32(buf[0:4], msgLen)
146 
147     //發送信息長度
148     r, err := conn.Write(buf[:4])
149     if r != 4 || err != nil {
150         fmt.Println("消息長度發送失敗!\t", err)
151         return
152     }
153     //fmt.Printf("發送長度成功,長度:%v\t內容:%v", len(data2), string(data2))
154 
155     // 發送消息數據本身
156     _, err = conn.Write(data2)
157     if err != nil {
158         fmt.Println("消息長度發送失敗!\t", err)
159         return
160     }
161 
162     //休眠10秒
163     // time.Sleep(time.Second * 10)
164     // fmt.Println("休眠10秒...")
165     //這里處理服務器返回的消息
166     //創建一個Transfer實例
167 
168     tf := &utils.Transfer{
169         Conn: conn,
170     }
171     m, err := tf.ReadPkg()
172     if err != nil {
173         fmt.Println("ReadPkg(conn) 錯誤", err)
174     }
175 
176     //將m反序列化為 LoginResMsg
177     var loginresmsg message.LoginResMsg
178     err = json.Unmarshal([]byte(m.Data), &loginresmsg)
179 
180     //判斷返回值狀態碼
181     if loginresmsg.Code == 200 {
182 
183         /* 初始化 CurrentUser */
184         cu.Conn = conn
185         cu.UserID = userID
186         cu.UserStatus = message.OnLine
187 
188         fmt.Println("登陸成功,代碼200")
189         fmt.Println(loginresmsg.Error)
190         /*顯示當前在線用戶列表,遍歷loginresmsg.UserId*/
191         for _, v := range loginresmsg.UsersID {
192             //不顯示自己
193             if v == userID {
194                 continue
195             }
196             fmt.Println("↓在線用戶為:↓")
197             fmt.Printf("ID:%v", v)
198 
199             /*完成對OnLineUser map的初始化*/
200             user := &message.User{
201                 UserID:     v,
202                 UserStatus: message.OnLine,
203             }
204             OnLineUsers[v] = user
205         }
206         fmt.Printf("\n")
207         /*登陸成功后...
208         1    為客戶端啟動一個協程,該協程能確保客戶端與服務器之間的通訊,
209             若服務器有數據推送給客戶端,則接受並顯示在客戶端終端上
210         2    循環顯示登陸成功后的菜單
211         */
212         go KeepConnection(conn)
213 
214         ShouMenu()
215 
216     } else {
217         fmt.Println("登陸失敗,錯誤:")
218         fmt.Println(loginresmsg.Error)
219     }
220     return
221 }
userprocess.go

 1 package utils
 2 
 3 import (
 4     "ChatRoom/conmon/message"
 5     "encoding/binary"
 6     "encoding/json"
 7     "fmt"
 8     "net"
 9 )
10 
11 //Transfer 將下列方法關聯到結構體中
12 type Transfer struct {
13     //分析有哪些字段
14     Conn net.Conn   //
15     Buf  [8096]byte //緩沖區
16 }
17 
18 //ReadPkg 讀取封包數據
19 func (tf *Transfer) ReadPkg() (msg message.Message, err error) {
20 
21     //buf := make([]byte, 8096)
22 
23     _, err = tf.Conn.Read(tf.Buf[:4])
24 
25     if err != nil {
26         //自定義錯誤
27         //err = errors.New("讀取數據 頭部 錯誤")
28         return
29     }
30     //根據buf[:4] 轉換成一個uint32
31     var pkgLen uint32
32     pkgLen = binary.BigEndian.Uint32(tf.Buf[0:4])
33 
34     //根據 pkgLen的長度讀取內容 到 buf中
35     n, err := tf.Conn.Read(tf.Buf[:pkgLen])
36     if n != int(pkgLen) || err != nil {
37         //自定義錯誤
38         //err = errors.New("讀取數據 內容 錯誤")
39         return
40     }
41 
42     //將pkglen 反序列化為 message.Massage  一定+&
43     err = json.Unmarshal(tf.Buf[:pkgLen], &msg)
44     if err != nil {
45         fmt.Println("反序列化失敗!\t", err)
46         return
47     }
48     return
49 
50 }
51 
52 //WritePkg 發送數據
53 func (tf *Transfer) WritePkg(data []byte) (err error) {
54     //1 發送信息長度
55     var msgLen uint32 = uint32(len(data))
56     //  var buf [4]byte
57     //將 信息長度msgLen 轉換為 byte[]切片
58     binary.BigEndian.PutUint32(tf.Buf[0:4], msgLen)
59 
60     //發送信息長度
61     r, err := tf.Conn.Write(tf.Buf[:4])
62     if r != 4 || err != nil {
63         fmt.Println("消息長度發送失敗!\t", err)
64         return
65     }
66     //2    發送信息內容
67     r, err = tf.Conn.Write(data)
68     if r != int(msgLen) || err != nil {
69         fmt.Println("消息長度發送失敗!\t", err)
70         return
71     }
72     return
73 }
utils.go

共用項common

  1 package message
  2 
  3 /*
  4 LoginMsgType 登錄消息類型
  5 
  6 LoginResMsgType 登錄返回消息類型
  7 
  8 RegisterMsgType 注冊消息類型
  9 
 10 RegisterResMsgType 注冊返回值信息類型
 11 
 12 PushUserStatusMsgType 服務器推送的用戶狀態信息類型
 13 
 14 SmsMsgType 發送的消息類型
 15 */
 16 const (
 17     LoginMsgType = "LoginMsg"
 18 
 19     LoginResMsgType = "LoginResMsg"
 20 
 21     RegisterMsgType = "Register"
 22 
 23     RegisterResMsgType = "RegisterResMsg"
 24 
 25     PushUserStatusMsgType = "PushUserStatusMsg"
 26 
 27     SmsMsgType = "SmsMsg"
 28 )
 29 
 30 /*用戶狀態常量
 31 
 32 OnLine    在線
 33 
 34 OffLine    離線
 35 
 36 Busy    繁忙
 37 */
 38 const (
 39     OnLine = iota
 40     OffLine
 41     Busy
 42 )
 43 
 44 //Message 消息結構體
 45 type Message struct {
 46     //Type 消息類型
 47     Type string `json:"type"`
 48 
 49     //Data 消息的數據
 50     Data string `json:"data"`
 51 }
 52 
 53 //LoginMsg 登錄消息結構體
 54 type LoginMsg struct {
 55     //用戶ID
 56     UserID int `json:"userid"`
 57     //用戶名
 58     UserName string `json:"username"`
 59     //用戶密碼
 60     UserPsw string `json:"userpsw"`
 61 }
 62 
 63 //LoginResMsg 登錄返回信息結構體
 64 /*
 65     狀態值
 66     200        登陸成功
 67     404        未注冊
 68     300        賬號密碼錯誤
 69     505        服務器內部錯誤
 70     ...
 71 
 72 */
 73 type LoginResMsg struct {
 74     Code int `json:"code"`
 75     //錯誤信息
 76     Error string `json:"error"`
 77     //存放在線所有用的id
 78     UsersID []int `json:"usersid"`
 79 }
 80 
 81 //RegisterMsg 注冊
 82 type RegisterMsg struct {
 83     User User `json:"user"`
 84 }
 85 
 86 //RegisterResMsg 注冊返回信息
 87 //Code    400表示 用戶已存在 200 表示注冊成功
 88 type RegisterResMsg struct {
 89     Code int `json:"code"`
 90     //錯誤信息
 91     Error string `json:"error"`
 92 }
 93 
 94 /*PushUserStatusMsg 推送用戶狀態信息
 95 
 96 UserID    用戶ID
 97 
 98 Status    用戶狀態信息值
 99 */
100 type PushUserStatusMsg struct {
101     UserID int `json:"userid"`
102 
103     Status int `json:"status"`
104 }
105 
106 /*
107 SmsMsg 發送的消息結構體
108 
109 Content    發送的內容
110 
111 User 匿名結構體 繼承關系 user.go 下的 User 結構體
112 */
113 type SmsMsg struct {
114     Content string `json:"content"`
115     User           //匿名結構體 繼承關系 user.go 下的 User 結構體
116 }
message.go
1 package message
2 
3 //User 用戶結構體
4 type User struct {
5     UserID     int    `json:"userid"`     //用戶通訊ID
6     UserName   string `json:"username"`   //用戶名稱昵稱
7     UserPsw    string `json:"userpsw"`    //用戶密碼
8     UserStatus int    `json:"userstatus"` //用戶狀態值
9 }
user.go

 

服務端sever

 1 package main
 2 
 3 import (
 4     "ChatRoom/sever/model"
 5     "ChatRoom/sever/processor"
 6 
 7     "fmt"
 8     "net"
 9     "time"
10 )
11 
12 //Process 處理客戶端-服務端之間的通信
13 func Process(conn net.Conn) {
14 
15     defer conn.Close()
16     //創建主控
17     processor := &processor.Processor{
18         Conn: conn,
19     }
20     err := processor.Process2()
21     if err != nil {
22         fmt.Println("客戶端 與 服務端之間的協程 出錯", err)
23     }
24 }
25 
26 //InitUserDao 初始化UserDao
27 func InitUserDao() {
28     fmt.Println("初始化UserDao...")
29     model.MyUserDao = model.NewUserDao(model.Pool)
30 }
31 
32 func main() {
33 
34     //當服務器啟動時,初始化連接池
35     model.InitPool("127.0.0.1:6379", 16, 0, 100*time.Second)
36 
37     //初始化pool
38     InitUserDao()
39 
40     //提示信息
41     fmt.Println("服務端正在使用新結構[MVC布局],使用9000端口監聽")
42     /* net.Listen("tcp","服務器監聽IP:端口")
43     可以把這里的IP改成自己的公網IP,客戶端連接處改成公網IP就可以實現聯網聊天了
44     客戶端和服務端的端口和設置的IP一致才可以
45     */
46     l, err := net.Listen("tcp", "127.0.0.1:9000")
47     defer l.Close()
48     if err != nil {
49         fmt.Println("連接服務器出錯\t", err)
50         return
51     }
52 
53     //監聽成功,等待客戶端連接...
54     for {
55         fmt.Println("等待客戶端連接...")
56         conn, err := l.Accept()
57         if err != nil {
58             fmt.Println("l.Accept 錯誤\t", err)
59         }
60 
61         //連接服務器成功,起協程服務器客戶端-服務端的通信
62         go Process(conn)
63     }
64 }
main.go

 1 package model
 2 
 3 import "errors"
 4 
 5 //自定義錯誤
 6 /*
 7 ErrorUserIsNotFound    用戶不存在
 8 
 9 ErrorUserExist    用戶已存在
10 
11 ErrorUserPsw    用戶名或密碼錯誤
12 */
13 var (
14     ErrorUserIsNotFound = errors.New("用戶不存在")
15     ErrorUserExist      = errors.New("用戶已存在")
16     ErrorUserPsw        = errors.New("用戶名或密碼錯誤")
17 )
error.go
 1 package model
 2 
 3 import (
 4     "fmt"
 5     "time"
 6 
 7     "github.com/gomodule/redigo/redis"
 8 )
 9 
10 //Pool 連接池
11 var Pool *redis.Pool
12 
13 //InitPool 初始化連接池
14 func InitPool(ip string, maxIdle int, maxActive int, idleTimeout time.Duration) {
15     fmt.Println("初始化pool...")
16     Pool = &redis.Pool{
17         MaxIdle:     maxIdle,     //最大空閑連接數
18         MaxActive:   maxActive,   //和數據庫最大連接數 0 無限制
19         IdleTimeout: idleTimeout, //最大空閑時間
20         Dial: func() (redis.Conn, error) {
21             return redis.Dial("tcp", ip)
22         },
23     }
24 }
redis
1 package model
2 
3 //User 用戶結構體
4 type User struct {
5     UserID   int    `json:"userid"`
6     UserName string `json:"username"`
7     UserPsw  string `json:"userpsw"`
8 }
user.go
  1 package model
  2 
  3 import (
  4     "ChatRoom/conmon/message"
  5     "encoding/json"
  6     "fmt"
  7 
  8     "github.com/gomodule/redigo/redis"
  9 )
 10 
 11 //MyUserDao 全局變量
 12 var MyUserDao *UserDao
 13 
 14 //UserDao 操作結構體
 15 type UserDao struct {
 16     pool *redis.Pool
 17 }
 18 
 19 //NewUserDao UserDao工廠模式(構造函數)
 20 func NewUserDao(pool *redis.Pool) (userdao *UserDao) {
 21     userdao = &UserDao{
 22         pool: pool,
 23     }
 24     return
 25 }
 26 
 27 //GetUserID 獲取用戶ID
 28 //根據用戶提供的ID,返回一個用戶實例+err錯誤信息
 29 func (ud *UserDao) GetUserID(conn redis.Conn, id int) (user *User, err error) {
 30     //fmt.Println("進入 GetUserID方法")
 31     //查詢用戶信息
 32     res, err := redis.String(conn.Do("hget", "user", id))
 33     //fmt.Println("=================", err)
 34     if err != nil {
 35         fmt.Println("redis.String(conn.Do 進入!=nil")
 36         //如果錯誤=nil  表示在數據庫redis中未找到這個用戶
 37         if err == redis.ErrNil {
 38             fmt.Println("進入了edis.ErrNil  ")
 39             err = ErrorUserIsNotFound
 40             return
 41         }
 42         return
 43     }
 44     user = &User{}
 45     //將res反序列化為 User實例
 46     err = json.Unmarshal([]byte(res), user)
 47     if err != nil {
 48         fmt.Println("將res反序列化為 User實例失敗", err)
 49         return
 50     }
 51 
 52     return
 53 }
 54 
 55 //Login 登錄驗證
 56 /*
 57 若 帳號密碼不對 則返回 空結構體,err = ErrorUserPsw
 58 若 ID未找到        則返回 空結構體,err = ErrorUserIsNotFound
 59 */
 60 func (ud *UserDao) Login(userID int, userPsw string) (user *User, err error) {
 61     /* 獲取連接池中的連接 */
 62     conn := ud.pool.Get()
 63     defer conn.Close()
 64 
 65     /* 調用獲取信息方法 */
 66     user, err = ud.GetUserID(conn, userID)
 67     if err != nil {
 68         fmt.Println("d.GetUserID err", err)
 69         return
 70     }
 71 
 72     /* 成功獲取用戶信息 */
 73     /* 校驗密碼 */
 74     if user.UserPsw != userPsw {
 75         err = ErrorUserPsw
 76         return
 77     }
 78     return
 79 }
 80 
 81 //Register 注冊
 82 func (ud *UserDao) Register(user *message.User) (err error) {
 83     fmt.Println("進入注冊方法...")
 84     /* 獲取連接池中的連接 */
 85     conn := ud.pool.Get()
 86     defer conn.Close()
 87 
 88     /* 調用獲取信息方法 */
 89     _, err = ud.GetUserID(conn, user.UserID)
 90     fmt.Println("***************", err)
 91     if err == nil {
 92         fmt.Println("進入用戶已存在...")
 93         //說明用戶已存在
 94         err = ErrorUserExist
 95         return
 96     }
 97 
 98     /*執行到這里,表示ID在數據庫中不存在,可以注冊*/
 99     data, err := json.Marshal(user)
100     if err != nil {
101         fmt.Println("進入序列化失敗 ...")
102         return
103     }
104 
105     /* 將ID做key ,data做value 存入數據庫user中 */
106     _, err = conn.Do("hset", "user", user.UserID, string(data))
107     if err != nil {
108         fmt.Println("將注冊的用戶信息存入數據庫失敗...", err)
109         return
110     }
111     return
112 }
userdao.go

 1 package process
 2 
 3 import (
 4     "ChatRoom/conmon/message"
 5     "ChatRoom/sever/utils"
 6     "encoding/json"
 7     "fmt"
 8     "net"
 9 )
10 
11 //SmsProcess 服務端消息發送結構體
12 type SmsProcess struct {
13 }
14 
15 //PushMsg 轉發信息
16 func (sp *SmsProcess) PushMsg(msg *message.Message) {
17 
18     /*實例化smsMsg*/
19     var smsMsg message.SmsMsg
20     /*反序列化msg.Data*/
21     err := json.Unmarshal([]byte(msg.Data), &smsMsg)
22     if err != nil {
23         fmt.Println("反序列化 msg.Data失敗!")
24         return
25     }
26 
27     /*序列化msg*/
28     data, err := json.Marshal(msg)
29     if err != nil {
30         fmt.Println("反序列化msg 失敗!")
31         return
32     }
33 
34     //遍歷服務器端的map,將信息轉發給在線用戶
35     for id, up := range userManager.OnLineUsers {
36 
37         //過濾掉自己,不要發送消息給自己
38         if id == smsMsg.UserID {
39             continue
40         }
41         sp.PushMsgToAllOnLineUser(data, up.Conn)
42     }
43 
44 }
45 
46 //PushMsgToAllOnLineUser 推送轉發消息給在線的所有用戶
47 func (sp *SmsProcess) PushMsgToAllOnLineUser(data []byte, conn net.Conn) {
48 
49     tf := &utils.Transfer{
50         Conn: conn,
51     }
52 
53     err := tf.WritePkg(data)
54     if err != nil {
55         fmt.Println("發送消息失敗...\t", err)
56     }
57 }
smsProcess.go
 1 package process
 2 
 3 import "fmt"
 4 
 5 var (
 6     userManager *UserManager
 7 )
 8 
 9 //UserManager 在線用戶結構體
10 type UserManager struct {
11     OnLineUsers map[int]*UserProcess
12 }
13 
14 func init() {
15     userManager = &UserManager{
16         OnLineUsers: make(map[int]*UserProcess, 1024),
17     }
18 }
19 
20 //AddOnLineUserS 添加在線用戶
21 func (um *UserManager) AddOnLineUserS(up *UserProcess) {
22     um.OnLineUsers[up.UserID] = up
23 }
24 
25 //DelOnLineUsers 刪除在線用戶
26 func (um *UserManager) DelOnLineUsers(userid int) {
27     delete(um.OnLineUsers, userid)
28 }
29 
30 //GetAllOnLineUsers 獲取所有在線用戶
31 func (um *UserManager) GetAllOnLineUsers() map[int]*UserProcess {
32     return um.OnLineUsers
33 }
34 
35 //GetOnLineUserToID 根據用戶ID查找現在用戶
36 func (um *UserManager) GetOnLineUserToID(userid int) (up *UserProcess, err error) {
37     up, ok := um.OnLineUsers[userid]
38     if ok {
39         return
40     }
41     //自定義格式化錯誤
42     err = fmt.Errorf("ID:%d 不在線", userid)
43 
44     return
45 
46 }
userManager.go
  1 package process
  2 
  3 import (
  4     "ChatRoom/conmon/message"
  5     "ChatRoom/sever/model"
  6     "ChatRoom/sever/utils"
  7     "encoding/json"
  8     "fmt"
  9     "net"
 10 )
 11 
 12 //UserProcess 用戶處理器
 13 type UserProcess struct {
 14     Conn net.Conn
 15     //UserId 該字段表示是哪一個用戶
 16     UserID int
 17 }
 18 
 19 //NoticeOnLineUsers 通知在線的用戶 userid 上線了
 20 func (thisUP *UserProcess) NoticeOnLineUsers(userid int) {
 21     for id, up := range userManager.OnLineUsers {
 22         //過濾掉自己
 23         if id == userid {
 24             continue
 25         }
 26         //開始通知其他人
 27         up.NoticeMeOnLine(userid)
 28     }
 29 }
 30 
 31 //NoticeMeOnLine 通知我上線了
 32 func (thisUP *UserProcess) NoticeMeOnLine(userid int) {
 33     /*實例化父消息*/
 34     var msg message.Message
 35     msg.Type = message.PushUserStatusMsgType
 36 
 37     /*實例化子消息*/
 38     var pushUserStatusMsg message.PushUserStatusMsg
 39     pushUserStatusMsg.UserID = userid
 40     pushUserStatusMsg.Status = message.OnLine
 41 
 42     /*序列化 子消息結構體*/
 43     data, err := json.Marshal(pushUserStatusMsg)
 44     if err != nil {
 45         fmt.Println("序列化子消息結構體(pushUserStatusMsg)失敗!")
 46         return
 47     }
 48 
 49     /*將子消息序列化后的封裝體data 復制給 父消息結構體中的 data*/
 50     msg.Data = string(data)
 51 
 52     /*序列化 父消息結構體*/
 53     data, err = json.Marshal(msg)
 54     if err != nil {
 55         fmt.Println("父消息結構體(msg)序列化失敗!")
 56         return
 57     }
 58 
 59     /*創建 Tranfer實例*/
 60     tf := utils.Transfer{
 61         Conn: thisUP.Conn,
 62     }
 63     /*發送 父消息 */
 64     err = tf.WritePkg(data)
 65     if err != nil {
 66         fmt.Println("NoticeMeOnLine發送通知失敗!")
 67         return
 68     }
 69 
 70 }
 71 
 72 //SeverProcessRegister 只處理用戶注冊請求
 73 func (thisUP *UserProcess) SeverProcessRegister(msg *message.Message) (err error) {
 74 
 75     /*1    先從msg中取出msg.data,並進行反序列化為registerMsg操作*/
 76     var registerMsg message.RegisterMsg
 77     err = json.Unmarshal([]byte(msg.Data), &registerMsg)
 78     if err != nil {
 79         fmt.Println("反序列化 registerMsg失敗...")
 80         return
 81     }
 82 
 83     //2    聲明 resMsg
 84     var resMsg message.Message
 85     resMsg.Type = message.RegisterResMsgType
 86     //3    聲明 registerResMsg
 87     var registerResMsg message.RegisterResMsg
 88 
 89     //3    反序列化成功,去數據庫完成注冊
 90     err = model.MyUserDao.Register(&registerMsg.User)
 91 
 92     if err != nil {
 93         if err == model.ErrorUserExist {
 94             registerResMsg.Code = 500
 95             registerResMsg.Error = model.ErrorUserExist.Error()
 96         } else if err == model.ErrorUserIsNotFound {
 97             registerResMsg.Code = 404
 98             registerResMsg.Error = model.ErrorUserIsNotFound.Error()
 99         } else {
100             fmt.Println("注冊發生錯誤...")
101         }
102     } else {
103         registerResMsg.Code = 200
104         fmt.Println("注冊成功!")
105     }
106 
107     //4    序列化 registerResMsg
108     data, err := json.Marshal(registerResMsg)
109     if err != nil {
110         fmt.Println("序列化registerResMsg失敗.", err)
111         return
112     }
113 
114     //5    將data 賦值給 resMsg(message.Message)
115     resMsg.Data = string(data)
116 
117     //6 將resMsg進行序列化
118     data, err = json.Marshal(resMsg)
119     if err != nil {
120         fmt.Println("序列化resMsg失敗...", err)
121         return
122     }
123 
124     //7    發送 數據
125     //因為使用分布式mvc,我們先創建一個Transfer實例,然后讀取
126     tf := &utils.Transfer{
127         Conn: thisUP.Conn,
128     }
129     err = tf.WritePkg(data)
130 
131     return
132 }
133 
134 //SeverProcessLogin 只處理登錄請求
135 func (thisUP *UserProcess) SeverProcessLogin(msg *message.Message) (err error) {
136 
137     /*先從msg中取出msg.data,並進行反序列化操作*/
138     var loginmsg message.LoginMsg
139     err = json.Unmarshal([]byte(msg.Data), &loginmsg)
140     if err != nil {
141         fmt.Println("反序列化&loginmsg失敗...")
142         return
143     }
144 
145     //1    聲明 resMsg
146     var resMsg message.Message
147     resMsg.Type = message.LoginResMsgType
148     //2    聲明 loginResMsg
149     var loginResMsg message.LoginResMsg
150 
151     //3    反序列化成功登錄驗證
152     user, err := model.MyUserDao.Login(loginmsg.UserID, loginmsg.UserPsw)
153     if err != nil {
154 
155         if err == model.ErrorUserIsNotFound {
156             loginResMsg.Code = 404
157             loginResMsg.Error = err.Error()
158         } else if err == model.ErrorUserPsw {
159             loginResMsg.Code = 300
160             loginResMsg.Error = err.Error()
161 
162         } else {
163             loginResMsg.Code = 505
164             loginResMsg.Error = err.Error()
165         }
166 
167     } else {
168         loginResMsg.Code = 200
169         fmt.Printf("%v 登陸成功!", user.UserName)
170 
171         /*登陸成功后,將登陸成功的用戶放到userManager中*/
172         thisUP.UserID = loginmsg.UserID
173         userManager.AddOnLineUserS(thisUP)
174 
175         /*調用通知在線用戶,ID上線了的方法,將登陸ID傳入*/
176         thisUP.NoticeOnLineUsers(loginmsg.UserID)
177 
178         /*循環將在線用戶的ID添加到LoginResMsg.UsersID切片中*/
179         for id := range userManager.OnLineUsers {
180             loginResMsg.UsersID = append(loginResMsg.UsersID, id)
181         }
182     }
183 
184     //3    序列化 loginResMsg
185     data, err := json.Marshal(loginResMsg)
186     if err != nil {
187         fmt.Println("序列化loginResMsg失敗.", err)
188         return
189     }
190 
191     //4    將data 賦值給 resMsg(message.Message)
192     resMsg.Data = string(data)
193 
194     //5 將resMsg進行序列化
195     data, err = json.Marshal(resMsg)
196     if err != nil {
197         fmt.Println("序列化resMsg失敗...", err)
198         return
199     }
200 
201     //6    發送 數據
202     //因為使用分布式mvc,我們先創建一個Transfer實例,然后讀取
203     tf := &utils.Transfer{
204         Conn: thisUP.Conn,
205     }
206     err = tf.WritePkg(data)
207 
208     return
209 }
userProcess.go

 1 package processor
 2 
 3 import (
 4     "ChatRoom/conmon/message"
 5     "ChatRoom/sever/process"
 6     "ChatRoom/sever/utils"
 7     "fmt"
 8     "io"
 9     "net"
10 )
11 
12 //Processor 結構體
13 type Processor struct {
14     Conn net.Conn
15 }
16 
17 //SeverProcessMsg 根據客戶端發送消息的不同,決定調用哪個函數來處理
18 func (pcs *Processor) SeverProcessMsg(msg *message.Message) (err error) {
19 
20     //測試一下是否能接收到來自客戶端的群發消息
21     fmt.Println("msg:", msg)
22 
23     switch msg.Type {
24     case message.LoginMsgType:
25         //處理登錄邏輯...
26         //創建一個UserProcess 實例
27         up := &process.UserProcess{
28             Conn: pcs.Conn,
29         }
30         err = up.SeverProcessLogin(msg)
31     case message.RegisterMsgType:
32         //處理注冊邏輯...
33         //創建一個UserProcess 實例
34         up := &process.UserProcess{
35             Conn: pcs.Conn,
36         }
37         err = up.SeverProcessRegister(msg)
38     case message.SmsMsgType:
39         //創建一個SmsMsg實例
40 
41         smsProcess := &process.SmsProcess{}
42         smsProcess.PushMsg(msg)
43     default:
44         fmt.Println("消息類型不存在,無法處理...")
45     }
46     return
47 }
48 
49 //Process2 第二層處理
50 func (pcs *Processor) Process2() (err error) {
51     //讀取客戶端發送的信息
52 
53     for {
54 
55         tf := utils.Transfer{
56             Conn: pcs.Conn,
57         }
58 
59         fmt.Println("讀取來自客戶端的數據中...")
60         msg, err := tf.ReadPkg()
61 
62         if err != nil {
63             if err == io.EOF {
64                 fmt.Println("客戶端斷開連接,服務器退出...")
65                 return err
66             }
67             fmt.Println("ReadPkg()錯誤", err)
68             return err
69 
70         }
71         //fmt.Println("msg", msgs)
72         err = pcs.SeverProcessMsg(&msg)
73         if err != nil {
74             return err
75         }
76     }
77 }
processor.go

 1 package utils
 2 
 3 import (
 4     "ChatRoom/conmon/message"
 5     "encoding/binary"
 6     "encoding/json"
 7     "fmt"
 8     "net"
 9 )
10 
11 //Transfer 將下列方法關聯到結構體中
12 type Transfer struct {
13     //分析有哪些字段
14     Conn net.Conn   //
15     Buf  [8096]byte //緩沖區
16 }
17 
18 //ReadPkg 讀取封包數據
19 func (tf *Transfer) ReadPkg() (msg message.Message, err error) {
20 
21     //buf := make([]byte, 8096)
22 
23     _, err = tf.Conn.Read(tf.Buf[:4])
24 
25     if err != nil {
26         //自定義錯誤
27         //err = errors.New("讀取數據 頭部 錯誤")
28         return
29     }
30     //根據buf[:4] 轉換成一個uint32
31     var pkgLen uint32
32     pkgLen = binary.BigEndian.Uint32(tf.Buf[0:4])
33 
34     //根據 pkgLen的長度讀取內容 到 buf中
35     n, err := tf.Conn.Read(tf.Buf[:pkgLen])
36     if n != int(pkgLen) || err != nil {
37         //自定義錯誤
38         //err = errors.New("讀取數據 內容 錯誤")
39         return
40     }
41 
42     //將pkglen 反序列化為 message.Massage  一定+&
43     err = json.Unmarshal(tf.Buf[:pkgLen], &msg)
44     if err != nil {
45         fmt.Println("反序列化失敗!\t", err)
46         return
47     }
48     return
49 
50 }
51 
52 //WritePkg 發送數據
53 func (tf *Transfer) WritePkg(data []byte) (err error) {
54     //1 發送信息長度
55     var msgLen uint32 = uint32(len(data))
56     //  var buf [4]byte
57     //將 信息長度msgLen 轉換為 byte[]切片
58     binary.BigEndian.PutUint32(tf.Buf[0:4], msgLen)
59 
60     //發送信息長度
61     r, err := tf.Conn.Write(tf.Buf[:4])
62     if r != 4 || err != nil {
63         fmt.Println("消息長度發送失敗!\t", err)
64         return
65     }
66     //2    發送信息內容
67     r, err = tf.Conn.Write(data)
68     if r != int(msgLen) || err != nil {
69         fmt.Println("消息長度發送失敗!\t", err)
70         return
71     }
72     return
73 }
utils.go

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM