Golang搭建即時通信系統IM-System


Golang搭建即時通信系統

1、基本功能

主要是包括用戶上線,用戶私聊,用戶公聊,超時強踢,查詢在線用戶,修改用戶名等基本socket通信功能。

2、簡要介紹

2.1系統結構如下


主要包括兩個部分:

  • Client:負責客戶端命令解析,請求與服務器的連接,發送消息等
  • Server:監聽,連接創建,主要業務邏輯的處理等。

2.2目錄結構

IM-System
——client.go   # 客戶端相關邏輯代碼  
    client.exe  # 客戶端編譯的可執行文件
    main.go     # 服務端主程序入口
    server.go   # 服務器相關邏輯代碼
    user.go     # 用戶相關業務邏輯
    server.exe  # 服務器代碼編譯的可執行文件

2.3其他說明

內部對象簡要說明:
Server內部創建和維護server和user對象,其中每次有客戶端嘗試與服務器建立連接,都會創建一個新的user對象。
server中除了一些基本的屬性包含兩個主要的屬性,OnlineMap和message_channel,OnlineMap是一個對象的字典,保存當前與服務器建立連接之后創建的對象。channel是一個通信管道,主要將客戶端發送到服務端的message發送給其他的所有用戶的管道來進行廣播。
user除了一些基本的屬性外包含兩個主要屬性,Conn和message_channel,Conn為客戶端與服務器建立的TCP鏈接,channel表示通信管道,每次channel管道中有message都會回顯給客戶端達到客戶端與服務器通信的效果。

協程的使用:
系統會創建多個go程。
user對象內部會創建go程來阻塞監聽user.message_channel,一旦有消息則會回顯客戶端。
server對象內部也會創建go程阻塞監聽server.message_channel,一旦管道有消息則會發送給每個在線用戶的管道。
server在啟動之后會監聽連接,一旦有新的連接,為了防止阻塞主go程,就會生成新的go程處理該連接而不影響其他連接的處理。
系統是基於讀寫分離模型的,因此分別使用不同的go程去接收客戶端寫入的字節流和往客戶端寫入,保證了用戶在寫入消息的時候同時能夠接收到其他消息。

3、代碼

3.1 server.go

點擊查看代碼
package main

import (
	"fmt"
	"io"
	"net"
	"sync"
	"time"
)

type Server struct {
	Ip   string
	Port int

	//用戶在線列表
	OnlineMap map[string]*User
	//給全局變量map加同步鎖
	mapLock sync.RWMutex

	// 消息廣播的channel
	Message chan string
}

//創建一個普通的server接口(函數)
//開頭大寫表示對外開放
func NewServer(ip string, port int) *Server {
	server := &Server{
		Ip: ip,
		Port: port,
		OnlineMap: make(map[string]*User),
		Message: make(chan string),
	}
	return server
}

//監聽Message廣播消息channel的goroutine,一旦有消息就發送給全部在線的user
func (this *Server) ListenMessage()  {
	//服務器端仍舊需要時刻監聽
	for {
		msg := <-this.Message
		this.mapLock.Lock()
		//將消息發給所有用戶
		for _, u := range this.OnlineMap {
			u.C <- msg
		}
		this.mapLock.Unlock()
	}
}

//廣播消息
func (this *Server) BroadCast(user *User, msg string)  {
	sendMessage := "[" + user.Addr + "]" + user.Name + ":" + msg
	this.Message <- sendMessage
}

func (this *Server) Handler(conn net.Conn)  {
	//當前連接的業務

	user := NewUser(conn, this)
        fmt.Printf("new connection %s:%s established...\n", user.Addr, user.Name)

	//用戶在線消息封裝
	user.Online()

	//監聽用戶是否活躍的channel
	isLive := make(chan bool)

	//接受客戶端發送的消息
	//每個客戶端都有一個go程處理客戶端讀的業務
	go func() {
		// 4K大小
		buf := make([]byte, 4096)
		for  {
			n, err := conn.Read(buf)
			//fmt.Println("n=", n, ",buf=", string(buf[0:n]))
			//n為成功讀取到的字節數
			if n == 0{
				user.Offline()
				return
			}
			if err != nil && err != io.EOF{
				fmt.Println("Conn Read err:", err)
				return
			}

			//正常獲取消息
			//用戶輸入消息是以\n結尾的,需要去掉最后一個字節
			msg := string(buf[:n-1])

			//fmt.Println("handler msg: ", msg)
			//接收到的消息進行廣播
			user.DoMessage(msg)

			//用戶的任意消息,代表用戶是一個活躍的用戶,激活管道
			isLive <- true
		}
	}()

	for {
		//當前handler阻塞監聽管道的消息,一旦兩個管道有一個有值,就會執行select
		select {
			//這里一旦當isLive為True,那么就會進入select,執行完case<-isLive之后
			//會接着更新After管道,但是因為還沒到時間不會進入case<-After之后的語句
			case <- isLive:
				//	當前用戶被激活,
				//	這里為了重置定時器,把case<-isLive 放到了上邊

			//設置定時器,如果定時觸發,則強踢,如果發消息,則重新激活定時器
			//After本身是一個channel,如果發生超時,那么該channel中就能讀取到數據
			case <- time.After(time.Second * 60):
				//進入case表示超時,重置定時器
				//將當前的User強制關閉

				//發出下線消息
				user.SendMessage("you are forced offline")

				//銷毀管道資源
				close(user.C)

				//關閉用戶連接
				conn.Close()

				//可能是因為管道資源以及conn連接關閉之后
				//OnlineMap中值被回收之后,自動刪除鍵值對???

				//退出handler
				return  // 或者runtime.Goexit()
		}
	}

}

//啟動服務器的接口
func (this *Server) Start()  {
	//socket listen
        fmt.Println("server is starting...")
	listener, err := net.Listen("tcp", fmt.Sprintf("%s:%d", this.Ip, this.Port))
	if err != nil{
		fmt.Println("listener err:", err)
		return
	}
	//為了防止遺忘關閉連接,加上defer保證在接口結束之后close
	//close listen socket
	defer listener.Close()

	//啟動監聽Message的goroutine
	go this.ListenMessage()

	//listener一直監聽連接
	for {
		//accept
		conn, err := listener.Accept()
		if err != nil{
			fmt.Println("listener accept err:", err)
			continue
		}
		//go程處理連接
		go this.Handler(conn)
	}
}

3.2 user.go

點擊查看代碼
package main

import (
	"net"
	"strings"
)

type User struct {
	Name  string
	Addr  string
	C     chan string  // channel數據為string
	conn  net.Conn

	server *Server  // 當前用戶屬於的服務器
}

//創建用戶的接口
func NewUser(conn net.Conn, server *Server) *User {
	userAddr := conn.RemoteAddr().String()
	user := &User{
		Name: userAddr,
		Addr: userAddr,
		C:    make(chan string),
		conn: conn,
		server: server,
	}

	//每一個新用戶都綁定一個go程監聽當前用戶的channel消息
	go user.ListenMessage()

	return user
}

//處理用戶上線
func (this *User) Online()  {

	//用戶上線,將用戶加入onlinemap中
	this.server.mapLock.Lock()
	this.server.OnlineMap[this.Name] = this
	this.server.mapLock.Unlock()

	//廣播用戶上線的消息
	this.server.BroadCast(this, "user is online")
}

//處理用戶下線
func (this *User) Offline()  {

	//用戶下線,將用戶從online map刪除
	this.server.mapLock.Lock()
	delete(this.server.OnlineMap, this.Name)
	this.server.mapLock.Unlock()

	//廣播用戶下線
	this.server.BroadCast(this, "user offline")

}

//給當前user對應的客戶端發送消息
func (this *User) SendMessage(msg string)  {
	this.conn.Write([]byte(msg))
}

//處理消息
func (this *User) DoMessage(msg string)  {
	if msg == "who"{
		//查詢當前都有哪些用戶在線
		this.server.mapLock.Lock()
		for _, u := range this.server.OnlineMap{
			onlineMsg := "[" + u.Addr + "]" + u.Name + ":" + "online...\n"
			//只發送給當前發送查詢指令的用戶
			this.SendMessage(onlineMsg)
		}
		this.server.mapLock.Unlock()

	} else if len(msg) > 7 && msg[:7] == "rename|" {
		//新的用戶名
		newName := strings.Split(msg, "|")[1]
		//判斷新的用戶名是否存在
		_, ok := this.server.OnlineMap[newName]
		if ok{
			//服務器中已經存在該用戶名
			this.SendMessage(newName + "has existed...")
		} else {
			//修改OnlineMap
			this.server.mapLock.Lock()
			delete(this.server.OnlineMap, this.Name)
			this.server.OnlineMap[newName] = this
			this.server.mapLock.Unlock()

			this.Name = newName
			this.SendMessage("you have updated user name: " + this.Name + "\n")
		}
	} else if len(msg) > 4 && msg[:3] == "to|" {
		// 消息格式 ”to|alex|hello“

		//1 獲取用戶名
		remoteName := strings.Split(msg, "|")[1]
		if remoteName == ""{
			//用戶名無效
			this.SendMessage("invalid, user \"to|alex|hello\"")
			return
		}

		//2 查詢對象
		remoteUser, ok := this.server.OnlineMap[remoteName]
		if !ok {
			//用戶不存在
			this.SendMessage("username not exist")
			return
		}

		//3 獲取通信消息
		content := strings.Split(msg, "|")[2]
		if content == ""{
			this.SendMessage("invalid message")
			return
		}

		//4 發送消息
		remoteUser.SendMessage(this.Name + " say: " + content)

	} else {
		this.server.BroadCast(this, msg)
	}
}

//監聽當前User channel的方法,一旦有消息,就直接發送給對端客戶端
func (this *User) ListenMessage()  {
	for  {
		msg := <-this.C
		//連接寫回msg,轉換為二進制
		//fmt.Println("user listen msg:", msg)
		this.conn.Write([]byte(msg + "\n"))
	}
}

3.3 main.go

點擊查看代碼
package main


func main()  {

	server := NewServer("127.0.0.1", 8888)
	server.Start()
}

3.4 client.go

點擊查看代碼
package main

import (
	"flag"
	"fmt"
	"io"
	"net"
	"os"
)

type Client struct {
	ServerIp string
	ServerPort  int
	Name     string
	conn   net.Conn
	flag        int //判斷當前client的模式
}

func newClient(serverIp string, serverPort int) *Client {
	client := &Client{
		ServerIp: serverIp,
		ServerPort: serverPort,
		flag: 999,  // 設置flay默認值,否則flag默認為int整型
	}
	//創建鏈接
	conn, err := net.Dial("tcp", fmt.Sprintf("%s:%d", serverIp, serverPort))
	if err != nil {
		fmt.Println("net.Dial error:", err)
		return nil
	}
	client.conn = conn
	//返回客戶端
	return client
}

//client菜單欄的輸出,並獲取flag輸入
func (client *Client) menu() bool {
	var flag int

	fmt.Println("input 1 into public chat")
	fmt.Println("input 2 into private chat")
	fmt.Println("input 3 into rename")
	fmt.Println("input 0 into exit")

	fmt.Scanln(&flag)

	if flag >= 0 && flag <=3{
		client.flag = flag
		return true
	} else {
		fmt.Println("invalid input integer")
		return false
	}
}

// 監聽server回應的消息,直接顯示到標准輸出
func (client *Client) DealResponse()  {
	io.Copy(os.Stdout, client.conn) // 永久阻塞監聽
	/*
	上面一句相當於如下for循環一直從conn中讀取,然后輸出到終端
	//for {
	//	buf := make([]byte, 4096)
	//	client.conn.Read(buf)
	//	fmt.Println(string(buf))
	//}
	 */

}

//查詢在線用戶
func (client *Client) QueryUsers() {
	sendMsg := "who\n"  // 直接查詢
	_, err := client.conn.Write([]byte(sendMsg))
	if err != nil{
		fmt.Println("conn write err: ", err)
		return
	}
}

//私聊模式
func (client *Client) PrivateChat()  {
	var remoteName string
	var chatMsg string

	client.QueryUsers()
	fmt.Println("please chat username, exit for stop")
	fmt.Scanln(&remoteName)
	for remoteName != "exit"{
		fmt.Println("please input private chat content, exit for stop")
		fmt.Scanln(&chatMsg)
		for chatMsg != "exit"{
			//發送給服務器
			if len(chatMsg) != 0{
				//消息不為空
				sendMsg := "to|" + remoteName + "|" + chatMsg + "\n\n"
				_, err := client.conn.Write([]byte(sendMsg))
				if err != nil{
					fmt.Println("conn write err:", err)
					break
				}
			}
			//寫入下一條消息
			chatMsg = ""
			fmt.Println("please input private chat content, exit for stop")
			fmt.Scanln(&chatMsg)
		}
		//給一個用戶發送消息之后,可能還會給其他用戶發送
		client.QueryUsers()
		fmt.Println("please chat username, exit for stop")
		fmt.Scanln(&remoteName)
	}

}

func (client *Client) PublicChat() {
	//公聊模式
	var chatMsg string

	fmt.Println("please input public chat content, exit for stop")
	fmt.Scanln(&chatMsg)

	for chatMsg != "exit"{
		//發送給服務器
		if len(chatMsg) != 0{
			sendMsg := chatMsg + "\n"
			_, err := client.conn.Write([]byte(sendMsg))
			if err != nil{
				fmt.Println("conn write err:", err)
				break
			}
		}

		//重新接受下一條消息
		chatMsg = ""
		fmt.Println("please input public chat content, exit for stop")
		fmt.Scanln(&chatMsg)
	}
}

func (client *Client) UpdateName() bool {
	fmt.Println("please input username")
	//接收輸入的用戶名
	fmt.Scanln(&client.Name)

	sendMsg := "rename|" + client.Name + "\n"
	//按照格式寫入連接
	_, err := client.conn.Write([]byte(sendMsg))
	if err != nil{
		fmt.Println("conn write error:", err)
		return false
	}
	return true
}

func (client *Client) Run() {
	for client.flag != 0{
		for client.menu() != true {
		}

		//根據不同的模式處理不同的業務
		switch  client.flag {
			case 1:
				//公聊模式
				fmt.Println("under public chat mode...")
				client.PublicChat()
				break
			case 2:
				//私聊模式
				fmt.Println("under private chat mode...")
				client.PrivateChat()
				break
			case 3:
				//	改名
				fmt.Println("under rename mode...")
				client.UpdateName()
				break
			case 0:
				//退出
				fmt.Println("ready to exit")
				break
		}
	}
}

//嘗試從終端命令行解析IP和Port創建客戶端
var serverIp string
var serverPort int

//文件的初始化函數
//命令的格式  ./client.exe -ip 127.0.0.1 -port 8888
func init() {
	//屬於初始化工作,一般放在init中
	flag.StringVar(&serverIp, "ip", "127.0.0.1", "set server ip(default:127.0.0.1)")
	flag.IntVar(&serverPort, "port", 8888, "set server port(default:8888)")
}

func main()  {
	//通過命令行解析
	flag.Parse()
	client := newClient(serverIp, serverPort)
	//client := newClient("127.0.0.1", 8888)
	if client == nil{
		fmt.Println("------- connect server error------")
		return
	}

	fmt.Println("-------- connect server success ------")

	//按理說啟動client.Run()方法之后,服務器返回相應的處理結果,
	//主go程會阻塞在Run方法,如果使用主go程中的Run方法接受返回消息,就會變成串行執行
	//無法同一時刻滿足其他的業務,而run應該跟dealResponse應該是並行的
	//所以提供一個新的go程只處理server回應的信息
	go client.DealResponse()

	// 啟動客戶端業務,主go程阻塞在Run方法
	fmt.Println("ready to process transactions......")
	client.Run()

}

4、效果演示

4.1 源碼編譯

cd IM-System  # 進入目錄

在當前文件目錄下生成可執行文件

go build -o server.exe .\server.go .\user.go .\main.go  # 編譯服務端代碼生成server.exe
go build -o client.exe .\client.go  # 編譯客戶端代碼生成可執行文件client.exe

4.2 啟動

先啟動server.exe,再啟動client.exe,進入可執行文件存放目錄

.\server.exe 
.\client.exe

或者直接打開
server:

client1:

client2:

4.3 超時強踢

當一個用戶一直在線,無響應則會被超時重踢。這里主要使用一個定時器來記錄時間,一個select來阻塞監聽兩個channel。
對應接口

func (this *Server) Handler(conn net.Conn)

4.4 公聊

輸入1進入公聊模式,輸入who查詢在線用戶

client發送公聊信息

client2接收到公聊消息

4.5 改名

輸入3進入改名模式

4.6 私聊

輸入2進入私聊模式,查看到在線用戶的姓名

輸入要私聊的用戶和私聊的內容

對應用戶收到私聊信息

5、其他

5.1 踩坑

原來用python用的多,剛接觸go,有一些要踩的坑。

  • 跨平台:go屬於編譯型語言,linux下,go build -o server,但是在window下需要編譯成server.exe文件
  • 包管理:剛開始建議直接把項目放在GOPATH下的src目錄下,暫時避免go mode的配置
  • netcat與telnet:在沒寫client.go客戶端的時候,使用cmd終端模擬客戶端測試,telnet可以短暫測試服務器是否存在,建立鏈接,但是無法大量數據傳輸,想要模擬通信還要使用netcat進行網絡的讀寫數據,但是windows下沒有nc命令需要重新安裝。
  • 亂碼:默認的cmd下運行go程序可能會出現亂碼,因為Go編碼是UTF-8,而CMD默認的是GBK。可以在cmd窗口中使用chcp 65001改一下活動頁,或者格式化直接使用英文更方便。

5.2 說明

本文僅作為項目學習筆記,項目為劉丹冰Aceld編寫,b站Golang學習視頻中的P37到P52節,講的很好,很受用。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM