Golang做一个IM即时通信系统
阿巩
今天的内容可有些干哦
最近在复习Golang语法,打算通过一个“IM即时通信系统”来实际动手巩固下之前和B站上的大神学到的知识点。这个系统涵盖了私聊、群聊、上线和下线通知、超时强踢以及在线用户查询等等常用功能,可谓麻雀虽小五脏俱全。代码我放到gitlib仓库了,指路:
https://gitlab.com/893376179/golang_im_system
目前项目还在维护,当前版本仅可通过终端运行。阿巩也希望可以得到各路大神的指教。好了,日拱一卒,让我们开始吧!
项目采用敏捷思想,确保每个版本实现一个功能且能够正常运行,目前更新到了第八个版本。我们先从最简陋的第一个版本开始搞起!
第一个版本:构建基础Server
创建server.go和main.go ,server.go负责服务构建,main.go作为当前进程的主入口,server类型包含ip和端口两个属性,提供创建server对象供main使用、启动server服务做一个网络套接字的listen和accept操作,go一个新的业务和通过一个协程处理链接业务三个方法。
server.go
package main
import (
"fmt"
"net"
)
type Server struct {
Ip string
Port int
}
//创建一个server接口,即返回server对象。NewServer作为一个类对外开放的方法
func NewServer(ip string, port int) *Server {
server := &Server{
Ip: ip,
Port: port,
}
return server
}
func (ser *Server) Handler(conn net.Conn) {
fmt.Println("连接建立成功")
}
// 启动服务器接口
func (ser *Server) Start() { // ser是创建了一个类对象,便于使用类的属性
// socket listen
listener, err := net.Listen("tcp", fmt.Sprintf("%s:%d", ser.Ip, ser.Port)) // Sprintf用于拼接字符串,拼接为"127.0.0.1:8888"
if err != nil {
fmt.Println("net.Listen err:", err)
return
}
//close listen socket
defer listener.Close() // 为了防止遗忘关闭,使用defer关闭
for {
//accept
conn, err := listener.Accept()
if err != nil {
fmt.Println("listener Accept err:", err)
continue
}
//do handler 业务回调
go ser.Handler(conn)
}
}
main.go
package main
func main() {
server := NewServer("127.0.0.1", 8888)
server.Start()
}
第二个版本:广播用户上线功能
创建user.go表示当前用户,通过结构体User封装,包含Name用户和Addr,默认都是用户的ip地址;C表示当前是否有数据回写给对应的客户端;conn表示维护的socket通信的连接。实现两个方法:创建一个user对象为其赋值,并启动一个Listen去监听当前user channel的消息、监听user对象对应的channel消息,一旦C中有消息,将消息发送给对应的客户端。
在server类中新增OnlineMap和Message属性,分别是全部在线用户的Map和消息广播的channel,向NewServer中增加对这两个属性的初始化。
将Handler中只显示“已发送”更改为添加上线用户到OnlineMap中以及调用广播当前用户上线通知的方法。该方法就是将“当前用户上线”做成消息,并将该消息发给Server Message的channel中。那么有发就有取,我们增加ListenMessager来监听当前Message的channel中是否有消息,一旦有消息就会取出OnlineMap中所有在线的user,将消息发送给这些用户的C。每个user通过user.go中的ListenMessage方法,监听当前user channel的方法,一旦有消息,就发送给对端客户端。在Start启动中加入ListenMessager的goroutine启动起来。
user.go
package main
import "net"
type User struct {
Name string
Addr string
C chan string // 跟用户绑定的channel
conn net.Conn // 当前用户和客户端通信的连接句柄
}
// 创建一个用户的API
func NewUser(conn net.Conn) *User {
userAddr := conn.RemoteAddr().String()
user := &User{ // 获取User对象
Name: userAddr, // 获取当前客户端地址
Addr: userAddr,
C: make(chan string),
conn: conn,
}
// 启动监听当前user channel消息的goroutine
go user.ListenMessage()
return user
}
// 监听当前user channel的方法,一旦有消息,就发送给对端客户端
func (u *User) ListenMessage() {
for {
msg := <-u.C // 从channel中读取数据到msg
u.conn.Write([]byte(msg + "\n")) // 使用byte数组来写msg
}
}
server.go
package main
import (
"fmt"
"net"
"sync"
)
type Server struct {
Ip string
Port int
// 在线用户的map表
OnlineMap map[string]*User // key放当前用户名,value存放user对象
// 由于map是全局的,需要加一个读写锁
mapLock sync.RWMutex // 同步的全部机制都在sync包中
// 消息广播的channel
Message chan string
}
//创建一个server接口,即返回server对象。NewServer作为一个类对外开放的方法
func NewServer(ip string, port int) *Server {
server := &Server{
Ip: ip,
Port: port,
OnlineMap: make(map[string]*User),
Message: make(chan string),
}
return server
}
//监听Message广播消息channel的goroutine,一旦有消息就发送给全部在线的User
func (ser *Server) ListenMessager() {
for {
msg := <-ser.Message // 不断尝试从Message channel中读数据
// 将msg发送给全部在线的User
ser.mapLock.Lock()
for _, cli := range ser.OnlineMap { // 从map值中拿到user对象
cli.C <- msg
}
ser.mapLock.Unlock()
}
}
// 广播消息的方法,参数包括:由哪个用户发起、消息内容是什么
func (ser *Server) BroadCast(user *User, msg string) {
sendMsg := "[" + user.Addr + "]" + user.Name + ":" + msg // 消息内容 做一个字符串拼接
ser.Message <- sendMsg // 将消息发给Message的channel中
}
func (ser *Server) Handler(conn net.Conn) {
//fmt.Println("连接建立成功")
user := NewUser(conn)
// 用户上线,将用户加入到OnlineMap中
ser.mapLock.Lock() // 加锁
ser.OnlineMap[user.Name] = user // key是用户名,value是用户对象
ser.mapLock.Unlock() // 释放锁
// 广播当前用户上线消息
ser.BroadCast(user, "已上线")
// 发送完消息先阻塞Handler,保证goroutine存活不死亡
select {}
}
// 启动服务器接口
func (ser *Server) Start() { // ser是创建了一个类对象,便于使用类的属性
// socket listen
listener, err := net.Listen("tcp", fmt.Sprintf("%s:%d", ser.Ip, ser.Port)) // Sprintf用于拼接字符串,拼接为"127.0.0.1:8888"
if err != nil {
fmt.Println("net.Listen err:", err)
return
}
//close listen socket
defer listener.Close() // 为了防止遗忘关闭,使用defer关闭
//启动监听Message的goroutine
go ser.ListenMessager()
for {
//accept
conn, err := listener.Accept()
if err != nil {
fmt.Println("listener Accept err:", err)
continue
}
//do handler 业务回调
go ser.Handler(conn)
}
}
版本三:用户消息广播功能
在server.go中增加handler处理读业务的方法,启动一个针对当前客户端的读goroutine。如果读到的n是0即进程已关闭则广播该用户已下线,对于正常发出的消息,获取去掉换行之后的消息,并将得到的消息进行广播。
func (ser *Server) Handler(conn net.Conn) {
//fmt.Println("连接建立成功")
user := NewUser(conn)
// 用户上线,将用户加入到OnlineMap中
ser.mapLock.Lock() // 加锁
ser.OnlineMap[user.Name] = user // key是用户名,value是用户对象
ser.mapLock.Unlock() // 释放锁
// 广播当前用户上线消息
ser.BroadCast(user, "已上线")
// 启动goroutine来接受客户端发送的消息
go func() {
buf := make([]byte, 4096)
for {
n, err := conn.Read(buf)
if n == 0 {
ser.BroadCast(user, "已下线")
return
}
if err != nil && err != io.EOF {
fmt.Println("Conn Read err:", err)
return
}
// 提取用户的消息(去除结尾的'\n')
msg := string(buf[:n-1])
// 将得到的消息进行广播
ser.BroadCast(user, msg)
}
}()
// 发送完消息先阻塞Handler,保证goroutine存活不死亡
select {}
}
版本四:对用户业务层进行封装
在之前的版本中,用户上线、下线以及消息处理是在server.go中完成的,为了逻辑更好维护,我们将用户业务层做了封装。首先在user类型中新增server关联 server *Server,在NewServer初始化时增添对server的赋值,增加用户上线Online()、下线Offline()、处理消息DoMessage()方法。将之前server.go中的逻辑进行替换,在Handler中将conn和server都和user进行绑定,然后做上线的调用;当客户端关闭,做下线的调用;当成功获取到用户消息做一个处理当前用户消息的调用。
user.go
package main
import "net"
type User struct {
Name string
Addr string
C chan string // 跟用户绑定的channel
conn net.Conn // 当前用户和客户端通信的连接句柄
server *Server
}
// 创建一个用户的API
func NewUser(conn net.Conn, server *Server) *User {
userAddr := conn.RemoteAddr().String()
user := &User{ // 获取User对象
Name: userAddr, // 获取当前客户端地址
Addr: userAddr,
C: make(chan string),
conn: conn,
server: server}
// 启动监听当前user channel消息的goroutine
go user.ListenMessage()
return user
}
// 用户上线业务
func (u *User) Online() {
u.server.mapLock.Lock() // 加锁
u.server.OnlineMap[u.Name] = u // key是用户名,value是用户对象
u.server.mapLock.Unlock() // 释放锁
// 广播当前用户上线消息
u.server.BroadCast(u, "已上线")
}
// 用户下线业务
func (u *User) Offline() {
u.server.mapLock.Lock() // 加锁
delete(u.server.OnlineMap, u.Name)
u.server.mapLock.Unlock() // 释放锁
// 广播当前用户上线消息
u.server.BroadCast(u, "已下线")
}
// 用户处理消息的业务
func (u *User) DoMessage(msg string) {
u.server.BroadCast(u, msg)
}
// 监听当前user channel的方法,一旦有消息,就发送给对端客户端
func (u *User) ListenMessage() {
for {
msg := <-u.C // 从channel中读取数据到msg
u.conn.Write([]byte(msg + "\n")) // 使用byte数组来写msg
}
}
版本五、六:查询在线用户及修改用户名
查询在线用户列表:在user.go中DoMessage方法中添加逻辑,当msg发送过来的指令是"who"表示查询在线用户,返回在线用户信息;另外谁查询的需要将查询结果发送给这个指定的user,提供SendMsg方法指定user.conn去做msg的写入。
// 给当前user的客户端发送消息
func (u *User) SendMsg(msg string) {
u.conn.Write([]byte(msg))
}
// 用户处理消息的业务
func (u *User) DoMessage(msg string) {
if msg == "who" { // 查询当前所有在线用户
u.server.mapLock.Lock()
for _, user := range u.server.OnlineMap {
onlineMsg := "[" + user.Addr + "]" + user.Name + ":" + "在线...\n"
u.SendMsg(onlineMsg)
}
u.server.mapLock.Unlock()
} else {
u.server.BroadCast(u, msg)
}
}
修改用户名:当消息格式为“rename|”时表示重命名,在user.go中DoMessage方法中添加逻辑,增加一个分支判断新用户名是否合法以及是否已被在线用户占用。
// 用户处理消息的业务
func (u *User) DoMessage(msg string) {
if msg == "who" { // 查询当前所有在线用户的消息格式是"who"
...
} else if len(msg) > 7 && msg[:7] == "rename|" {
// 重命名的消息格式是"rename|张三"
newName := strings.Split(msg, "|")[1]
// 判断新用户名是否已被占用
_, ok := u.server.OnlineMap[newName]
if ok { // 被占用
u.SendMsg("当前用户名已被占用\n")
} else {
u.server.mapLock.Lock()
delete(u.server.OnlineMap, u.Name) // 删除旧用户名
u.server.OnlineMap[newName] = u // 将新用户名及对象存储到map
u.server.mapLock.Unlock()
u.Name = newName
u.SendMsg("您已修改用户名为:" + u.Name + "\n")
}
} else {
u.server.BroadCast(u, msg)
}
}
版本七:超时强踢功能
在server.go中的Handler方法的goroutine中,添加用户活跃channel isAlive,一旦用户本人发送消息就会使用户处于活跃状态,就向该channel发送true。
在循环中监听isAlive和time.After()定时器两个管道,如果case<-isAlive触发就重置定时器,如果case<-isAlive超过定时器时间没有触发的话,select阻塞,按超时处理关闭管道和连接以及退出Handler goroutine。
func (ser *Server) Handler(conn net.Conn) {
//fmt.Println("连接建立成功")
user := NewUser(conn, ser)
user.Online()
// 监听用户是否活跃的channel
isAlive := make(chan bool)
// 启动goroutine来接受客户端发送的消息
go func() {...
// 用户的任意消息,都会使用户处于活跃
isAlive <- true
}
}()
// 发送完消息先阻塞Handler,保证goroutine存活不死亡
for {
select {
// 定时器
case <-isAlive:
// 当前用户活跃,无需处理,激活select更新下面case中的定时器
case <-time.After(time.Second * 10):
// 已超时,将当前user客户端强制关闭
user.SendMsg("您已超时,请重新登录")
// 销毁channel资源
close(user.C)
// 关闭绑定Handler的连接
conn.Close()
// 退出Handler goroutine
runtime.Goexit() // 或 return
}
}
}
版本八:私聊功能
在user.go中DoMessage方法中添加逻辑,增加对消息格式为“to|张三|你好啊”时的处理,获取对方的用户名,判断用户是否存在,如果存在根据用户名得到对方user对象,然后获取消息内容,如果消息内容不为空,将消息内容发送给对应的user对象。
func (u *User) DoMessage(msg string) {
if msg == "who" { ...
} else if len(msg) > 7 && msg[:7] == "rename|" {...
}
} else if len(msg) > 4 && msg[:3] == "to|" {
// 消息格式:to|张三|你好啊
//1 获取对方的用户名
remoteName := strings.Split(msg, "|")[1]
if remoteName == "" {
u.SendMsg("消息格式不正确,请使用\"to|张三|你好啊\"格式\n")
return
}
//2 根据用户名得到对方user对象
remoteUser, ok := u.server.OnlineMap[remoteName]
if !ok { // 该用户名不存在/不在线
u.SendMsg("该用户名不存在/不在线\n")
return
}
//3 获取消息内容,通过对方的user对象将消息内容发送过去
content := strings.Split(msg, "|")[2]
if content == "" {
u.SendMsg("无消息内容,请重发\n")
}
remoteUser.SendMsg(u.Name + "对您说:" + content)
} else {
u.server.BroadCast(u, msg)
}
}
至此,我们看到对于一个即时通信系统已经实现了它的基本功能。关于其中的知识点之后会陆续出文深入,这个系统也会继续更新和维护。感谢阅读!
相关文章
- 天冷了,任务栏养只猫吧「GitHub 热点速览 v.21.46」
- GitHub 开源的小工具「GitHub 热点速览 v.21.45」
- 盘点 GitHub 年度盛会|附视频
- 它说你的代码有 Bug「GitHub 热点速览 v.21.44」
- 《HelloGitHub》第 67 期
- 代码混淆保安全「GitHub 热点速览 v.21.43」
- 每个男孩的机械梦「GitHub 热点速览 v.21.41」
- 《HelloGitHub》第 66 期
- 大型项目源码集合「GitHub 热点速览 v.21.39」
- 有备无患「GitHub 热点速览 v.21.38」
- 用 Java 写个塔防游戏「GitHub 热点速览 v.21.37」
- 这款打怪升级的小游戏,7 年前出生于 GitHub 社区,如今在谷歌商店有 8 万人打了满分
- 开源的 Web 框架哪个快?我在 GitHub 找到了答案
- 人生重开模拟器「GitHub 热点速览 v.21.36」
- 自建纯净谷歌搜索「GitHub 热点速览 v.21.35」
- 早产的《HelloGitHub》第 65 期
- 5 秒克隆声音「GitHub 热点速览 v.21.34」
- 那些 Unix 命令替代品们「GitHub 热点速览 v.21.32」
- 承载童年的游戏机,已停产!但我在 GitHub 找到了它们
- 自制车速记录仪「GitHub 热点速览 v.21.31」