Golang搭建即時通信系統
1、基本功能
主要是包括用戶上線,用戶私聊,用戶公聊,超時強踢,查詢在線用戶,修改用戶名等基本socket通信功能。
2、簡要介紹
2.1系統結構如下
主要包括兩個部分:
- Client:負責客戶端命令解析,請求與服務器的連接,發送消息等
- Server:監聽,連接創建,主要業務邏輯的處理等。
2.2目錄結構
IM-System
——client.go # 客戶端相關邏輯代碼
client.exe # 客戶端編譯的可執行文件
main.go # 服務端主程序入口
server.go # 服務器相關邏輯代碼
user.go # 用戶相關業務邏輯
server.exe # 服務器代碼編譯的可執行文件
2.3其他說明
內部對象簡要說明:
Server內部創建和維護server和user對象,其中每次有客戶端嘗試與服務器建立連接,都會創建一個新的user對象。
server中除了一些基本的屬性包含兩個主要的屬性,OnlineMap和message_channel,OnlineMap是一個對象的字典,保存當前與服務器建立連接之后創建的對象。channel是一個通信管道,主要將客戶端發送到服務端的message發送給其他的所有用戶的管道來進行廣播。
user除了一些基本的屬性外包含兩個主要屬性,Conn和message_channel,Conn為客戶端與服務器建立的TCP鏈接,channel表示通信管道,每次channel管道中有message都會回顯給客戶端達到客戶端與服務器通信的效果。
協程的使用:
系統會創建多個go程。
user對象內部會創建go程來阻塞監聽user.message_channel,一旦有消息則會回顯客戶端。
server對象內部也會創建go程阻塞監聽server.message_channel,一旦管道有消息則會發送給每個在線用戶的管道。
server在啟動之后會監聽連接,一旦有新的連接,為了防止阻塞主go程,就會生成新的go程處理該連接而不影響其他連接的處理。
系統是基於讀寫分離模型的,因此分別使用不同的go程去接收客戶端寫入的字節流和往客戶端寫入,保證了用戶在寫入消息的時候同時能夠接收到其他消息。
3、代碼
3.1 server.go
點擊查看代碼
package main
import (
"fmt"
"io"
"net"
"sync"
"time"
)
type Server struct {
Ip string
Port int
//用戶在線列表
OnlineMap map[string]*User
//給全局變量map加同步鎖
mapLock sync.RWMutex
// 消息廣播的channel
Message chan string
}
//創建一個普通的server接口(函數)
//開頭大寫表示對外開放
func NewServer(ip string, port int) *Server {
server := &Server{
Ip: ip,
Port: port,
OnlineMap: make(map[string]*User),
Message: make(chan string),
}
return server
}
//監聽Message廣播消息channel的goroutine,一旦有消息就發送給全部在線的user
func (this *Server) ListenMessage() {
//服務器端仍舊需要時刻監聽
for {
msg := <-this.Message
this.mapLock.Lock()
//將消息發給所有用戶
for _, u := range this.OnlineMap {
u.C <- msg
}
this.mapLock.Unlock()
}
}
//廣播消息
func (this *Server) BroadCast(user *User, msg string) {
sendMessage := "[" + user.Addr + "]" + user.Name + ":" + msg
this.Message <- sendMessage
}
func (this *Server) Handler(conn net.Conn) {
//當前連接的業務
user := NewUser(conn, this)
fmt.Printf("new connection %s:%s established...\n", user.Addr, user.Name)
//用戶在線消息封裝
user.Online()
//監聽用戶是否活躍的channel
isLive := make(chan bool)
//接受客戶端發送的消息
//每個客戶端都有一個go程處理客戶端讀的業務
go func() {
// 4K大小
buf := make([]byte, 4096)
for {
n, err := conn.Read(buf)
//fmt.Println("n=", n, ",buf=", string(buf[0:n]))
//n為成功讀取到的字節數
if n == 0{
user.Offline()
return
}
if err != nil && err != io.EOF{
fmt.Println("Conn Read err:", err)
return
}
//正常獲取消息
//用戶輸入消息是以\n結尾的,需要去掉最后一個字節
msg := string(buf[:n-1])
//fmt.Println("handler msg: ", msg)
//接收到的消息進行廣播
user.DoMessage(msg)
//用戶的任意消息,代表用戶是一個活躍的用戶,激活管道
isLive <- true
}
}()
for {
//當前handler阻塞監聽管道的消息,一旦兩個管道有一個有值,就會執行select
select {
//這里一旦當isLive為True,那么就會進入select,執行完case<-isLive之后
//會接着更新After管道,但是因為還沒到時間不會進入case<-After之后的語句
case <- isLive:
// 當前用戶被激活,
// 這里為了重置定時器,把case<-isLive 放到了上邊
//設置定時器,如果定時觸發,則強踢,如果發消息,則重新激活定時器
//After本身是一個channel,如果發生超時,那么該channel中就能讀取到數據
case <- time.After(time.Second * 60):
//進入case表示超時,重置定時器
//將當前的User強制關閉
//發出下線消息
user.SendMessage("you are forced offline")
//銷毀管道資源
close(user.C)
//關閉用戶連接
conn.Close()
//可能是因為管道資源以及conn連接關閉之后
//OnlineMap中值被回收之后,自動刪除鍵值對???
//退出handler
return // 或者runtime.Goexit()
}
}
}
//啟動服務器的接口
func (this *Server) Start() {
//socket listen
fmt.Println("server is starting...")
listener, err := net.Listen("tcp", fmt.Sprintf("%s:%d", this.Ip, this.Port))
if err != nil{
fmt.Println("listener err:", err)
return
}
//為了防止遺忘關閉連接,加上defer保證在接口結束之后close
//close listen socket
defer listener.Close()
//啟動監聽Message的goroutine
go this.ListenMessage()
//listener一直監聽連接
for {
//accept
conn, err := listener.Accept()
if err != nil{
fmt.Println("listener accept err:", err)
continue
}
//go程處理連接
go this.Handler(conn)
}
}
3.2 user.go
點擊查看代碼
package main
import (
"net"
"strings"
)
type User struct {
Name string
Addr string
C chan string // channel數據為string
conn net.Conn
server *Server // 當前用戶屬於的服務器
}
//創建用戶的接口
func NewUser(conn net.Conn, server *Server) *User {
userAddr := conn.RemoteAddr().String()
user := &User{
Name: userAddr,
Addr: userAddr,
C: make(chan string),
conn: conn,
server: server,
}
//每一個新用戶都綁定一個go程監聽當前用戶的channel消息
go user.ListenMessage()
return user
}
//處理用戶上線
func (this *User) Online() {
//用戶上線,將用戶加入onlinemap中
this.server.mapLock.Lock()
this.server.OnlineMap[this.Name] = this
this.server.mapLock.Unlock()
//廣播用戶上線的消息
this.server.BroadCast(this, "user is online")
}
//處理用戶下線
func (this *User) Offline() {
//用戶下線,將用戶從online map刪除
this.server.mapLock.Lock()
delete(this.server.OnlineMap, this.Name)
this.server.mapLock.Unlock()
//廣播用戶下線
this.server.BroadCast(this, "user offline")
}
//給當前user對應的客戶端發送消息
func (this *User) SendMessage(msg string) {
this.conn.Write([]byte(msg))
}
//處理消息
func (this *User) DoMessage(msg string) {
if msg == "who"{
//查詢當前都有哪些用戶在線
this.server.mapLock.Lock()
for _, u := range this.server.OnlineMap{
onlineMsg := "[" + u.Addr + "]" + u.Name + ":" + "online...\n"
//只發送給當前發送查詢指令的用戶
this.SendMessage(onlineMsg)
}
this.server.mapLock.Unlock()
} else if len(msg) > 7 && msg[:7] == "rename|" {
//新的用戶名
newName := strings.Split(msg, "|")[1]
//判斷新的用戶名是否存在
_, ok := this.server.OnlineMap[newName]
if ok{
//服務器中已經存在該用戶名
this.SendMessage(newName + "has existed...")
} else {
//修改OnlineMap
this.server.mapLock.Lock()
delete(this.server.OnlineMap, this.Name)
this.server.OnlineMap[newName] = this
this.server.mapLock.Unlock()
this.Name = newName
this.SendMessage("you have updated user name: " + this.Name + "\n")
}
} else if len(msg) > 4 && msg[:3] == "to|" {
// 消息格式 ”to|alex|hello“
//1 獲取用戶名
remoteName := strings.Split(msg, "|")[1]
if remoteName == ""{
//用戶名無效
this.SendMessage("invalid, user \"to|alex|hello\"")
return
}
//2 查詢對象
remoteUser, ok := this.server.OnlineMap[remoteName]
if !ok {
//用戶不存在
this.SendMessage("username not exist")
return
}
//3 獲取通信消息
content := strings.Split(msg, "|")[2]
if content == ""{
this.SendMessage("invalid message")
return
}
//4 發送消息
remoteUser.SendMessage(this.Name + " say: " + content)
} else {
this.server.BroadCast(this, msg)
}
}
//監聽當前User channel的方法,一旦有消息,就直接發送給對端客戶端
func (this *User) ListenMessage() {
for {
msg := <-this.C
//連接寫回msg,轉換為二進制
//fmt.Println("user listen msg:", msg)
this.conn.Write([]byte(msg + "\n"))
}
}
3.3 main.go
點擊查看代碼
package main
func main() {
server := NewServer("127.0.0.1", 8888)
server.Start()
}
3.4 client.go
點擊查看代碼
package main
import (
"flag"
"fmt"
"io"
"net"
"os"
)
type Client struct {
ServerIp string
ServerPort int
Name string
conn net.Conn
flag int //判斷當前client的模式
}
func newClient(serverIp string, serverPort int) *Client {
client := &Client{
ServerIp: serverIp,
ServerPort: serverPort,
flag: 999, // 設置flay默認值,否則flag默認為int整型
}
//創建鏈接
conn, err := net.Dial("tcp", fmt.Sprintf("%s:%d", serverIp, serverPort))
if err != nil {
fmt.Println("net.Dial error:", err)
return nil
}
client.conn = conn
//返回客戶端
return client
}
//client菜單欄的輸出,並獲取flag輸入
func (client *Client) menu() bool {
var flag int
fmt.Println("input 1 into public chat")
fmt.Println("input 2 into private chat")
fmt.Println("input 3 into rename")
fmt.Println("input 0 into exit")
fmt.Scanln(&flag)
if flag >= 0 && flag <=3{
client.flag = flag
return true
} else {
fmt.Println("invalid input integer")
return false
}
}
// 監聽server回應的消息,直接顯示到標准輸出
func (client *Client) DealResponse() {
io.Copy(os.Stdout, client.conn) // 永久阻塞監聽
/*
上面一句相當於如下for循環一直從conn中讀取,然后輸出到終端
//for {
// buf := make([]byte, 4096)
// client.conn.Read(buf)
// fmt.Println(string(buf))
//}
*/
}
//查詢在線用戶
func (client *Client) QueryUsers() {
sendMsg := "who\n" // 直接查詢
_, err := client.conn.Write([]byte(sendMsg))
if err != nil{
fmt.Println("conn write err: ", err)
return
}
}
//私聊模式
func (client *Client) PrivateChat() {
var remoteName string
var chatMsg string
client.QueryUsers()
fmt.Println("please chat username, exit for stop")
fmt.Scanln(&remoteName)
for remoteName != "exit"{
fmt.Println("please input private chat content, exit for stop")
fmt.Scanln(&chatMsg)
for chatMsg != "exit"{
//發送給服務器
if len(chatMsg) != 0{
//消息不為空
sendMsg := "to|" + remoteName + "|" + chatMsg + "\n\n"
_, err := client.conn.Write([]byte(sendMsg))
if err != nil{
fmt.Println("conn write err:", err)
break
}
}
//寫入下一條消息
chatMsg = ""
fmt.Println("please input private chat content, exit for stop")
fmt.Scanln(&chatMsg)
}
//給一個用戶發送消息之后,可能還會給其他用戶發送
client.QueryUsers()
fmt.Println("please chat username, exit for stop")
fmt.Scanln(&remoteName)
}
}
func (client *Client) PublicChat() {
//公聊模式
var chatMsg string
fmt.Println("please input public chat content, exit for stop")
fmt.Scanln(&chatMsg)
for chatMsg != "exit"{
//發送給服務器
if len(chatMsg) != 0{
sendMsg := chatMsg + "\n"
_, err := client.conn.Write([]byte(sendMsg))
if err != nil{
fmt.Println("conn write err:", err)
break
}
}
//重新接受下一條消息
chatMsg = ""
fmt.Println("please input public chat content, exit for stop")
fmt.Scanln(&chatMsg)
}
}
func (client *Client) UpdateName() bool {
fmt.Println("please input username")
//接收輸入的用戶名
fmt.Scanln(&client.Name)
sendMsg := "rename|" + client.Name + "\n"
//按照格式寫入連接
_, err := client.conn.Write([]byte(sendMsg))
if err != nil{
fmt.Println("conn write error:", err)
return false
}
return true
}
func (client *Client) Run() {
for client.flag != 0{
for client.menu() != true {
}
//根據不同的模式處理不同的業務
switch client.flag {
case 1:
//公聊模式
fmt.Println("under public chat mode...")
client.PublicChat()
break
case 2:
//私聊模式
fmt.Println("under private chat mode...")
client.PrivateChat()
break
case 3:
// 改名
fmt.Println("under rename mode...")
client.UpdateName()
break
case 0:
//退出
fmt.Println("ready to exit")
break
}
}
}
//嘗試從終端命令行解析IP和Port創建客戶端
var serverIp string
var serverPort int
//文件的初始化函數
//命令的格式 ./client.exe -ip 127.0.0.1 -port 8888
func init() {
//屬於初始化工作,一般放在init中
flag.StringVar(&serverIp, "ip", "127.0.0.1", "set server ip(default:127.0.0.1)")
flag.IntVar(&serverPort, "port", 8888, "set server port(default:8888)")
}
func main() {
//通過命令行解析
flag.Parse()
client := newClient(serverIp, serverPort)
//client := newClient("127.0.0.1", 8888)
if client == nil{
fmt.Println("------- connect server error------")
return
}
fmt.Println("-------- connect server success ------")
//按理說啟動client.Run()方法之后,服務器返回相應的處理結果,
//主go程會阻塞在Run方法,如果使用主go程中的Run方法接受返回消息,就會變成串行執行
//無法同一時刻滿足其他的業務,而run應該跟dealResponse應該是並行的
//所以提供一個新的go程只處理server回應的信息
go client.DealResponse()
// 啟動客戶端業務,主go程阻塞在Run方法
fmt.Println("ready to process transactions......")
client.Run()
}
4、效果演示
4.1 源碼編譯
cd IM-System # 進入目錄
在當前文件目錄下生成可執行文件
go build -o server.exe .\server.go .\user.go .\main.go # 編譯服務端代碼生成server.exe
go build -o client.exe .\client.go # 編譯客戶端代碼生成可執行文件client.exe
4.2 啟動
先啟動server.exe,再啟動client.exe,進入可執行文件存放目錄
.\server.exe
.\client.exe
或者直接打開
server:
client1:
client2:
4.3 超時強踢
當一個用戶一直在線,無響應則會被超時重踢。這里主要使用一個定時器來記錄時間,一個select來阻塞監聽兩個channel。
對應接口
func (this *Server) Handler(conn net.Conn)
4.4 公聊
輸入1進入公聊模式,輸入who查詢在線用戶
client發送公聊信息
client2接收到公聊消息
4.5 改名
輸入3進入改名模式
4.6 私聊
輸入2進入私聊模式,查看到在線用戶的姓名
輸入要私聊的用戶和私聊的內容
對應用戶收到私聊信息
5、其他
5.1 踩坑
原來用python用的多,剛接觸go,有一些要踩的坑。
- 跨平台:go屬於編譯型語言,linux下,go build -o server,但是在window下需要編譯成server.exe文件
- 包管理:剛開始建議直接把項目放在GOPATH下的src目錄下,暫時避免go mode的配置
- netcat與telnet:在沒寫client.go客戶端的時候,使用cmd終端模擬客戶端測試,telnet可以短暫測試服務器是否存在,建立鏈接,但是無法大量數據傳輸,想要模擬通信還要使用netcat進行網絡的讀寫數據,但是windows下沒有nc命令需要重新安裝。
- 亂碼:默認的cmd下運行go程序可能會出現亂碼,因為Go編碼是UTF-8,而CMD默認的是GBK。可以在cmd窗口中使用
chcp 65001
改一下活動頁,或者格式化直接使用英文更方便。
5.2 說明
本文僅作為項目學習筆記,項目為劉丹冰Aceld編寫,b站Golang學習視頻中的P37到P52節,講的很好,很受用。