zl程序教程

您现在的位置是:首页 >  其他

当前栏目

Golang做一个IM即时通信系统

2023-02-18 16:23:55 时间

阿巩

今天的内容可有些干哦

最近在复习Golang语法,打算通过一个“IM即时通信系统”来实际动手巩固下之前和B站上的大神学到的知识点。这个系统涵盖了私聊、群聊、上线和下线通知、超时强踢以及在线用户查询等等常用功能,可谓麻雀虽小五脏俱全。代码我放到gitlib仓库了,指路:

https://gitlab.com/893376179/golang_im_system

目前项目还在维护,当前版本仅可通过终端运行。阿巩也希望可以得到各路大神的指教。好了,日拱一卒,让我们开始吧!

项目采用敏捷思想,确保每个版本实现一个功能且能够正常运行,目前更新到了第八个版本。我们先从最简陋的第一个版本开始搞起!

第一个版本:构建基础Server

创建server.go和main.go ,server.go负责服务构建,main.go作为当前进程的主入口,server类型包含ip和端口两个属性,提供创建server对象供main使用、启动server服务做一个网络套接字的listen和accept操作,go一个新的业务和通过一个协程处理链接业务三个方法。

server.go

package main

import (
  "fmt"
  "net"
)

type Server struct {
  Ip   string
  Port int
}

//创建一个server接口,即返回server对象。NewServer作为一个类对外开放的方法
func NewServer(ip string, port int) *Server {
  server := &Server{
    Ip:   ip,
    Port: port,
  }
  return server
}

func (ser *Server) Handler(conn net.Conn) {
  fmt.Println("连接建立成功")
}

// 启动服务器接口
func (ser *Server) Start() { // ser是创建了一个类对象,便于使用类的属性
  // socket listen
  listener, err := net.Listen("tcp", fmt.Sprintf("%s:%d", ser.Ip, ser.Port)) // Sprintf用于拼接字符串,拼接为"127.0.0.1:8888"
  if err != nil {
    fmt.Println("net.Listen err:", err)
    return
  }
  //close listen socket
  defer listener.Close() // 为了防止遗忘关闭,使用defer关闭
  for {
    //accept
    conn, err := listener.Accept()
    if err != nil {
      fmt.Println("listener Accept err:", err)
      continue
    }
    //do handler 业务回调
    go ser.Handler(conn)
  }

}

main.go

package main

func main() {
  server := NewServer("127.0.0.1", 8888)
  server.Start()
}

第二个版本:广播用户上线功能

创建user.go表示当前用户,通过结构体User封装,包含Name用户和Addr,默认都是用户的ip地址;C表示当前是否有数据回写给对应的客户端;conn表示维护的socket通信的连接。实现两个方法:创建一个user对象为其赋值,并启动一个Listen去监听当前user channel的消息、监听user对象对应的channel消息,一旦C中有消息,将消息发送给对应的客户端。

在server类中新增OnlineMap和Message属性,分别是全部在线用户的Map和消息广播的channel,向NewServer中增加对这两个属性的初始化。

将Handler中只显示“已发送”更改为添加上线用户到OnlineMap中以及调用广播当前用户上线通知的方法。该方法就是将“当前用户上线”做成消息,并将该消息发给Server Message的channel中。那么有发就有取,我们增加ListenMessager来监听当前Message的channel中是否有消息,一旦有消息就会取出OnlineMap中所有在线的user,将消息发送给这些用户的C。每个user通过user.go中的ListenMessage方法,监听当前user channel的方法,一旦有消息,就发送给对端客户端。在Start启动中加入ListenMessager的goroutine启动起来。

user.go

package main

import "net"

type User struct {
  Name string
  Addr string
  C    chan string // 跟用户绑定的channel
  conn net.Conn    // 当前用户和客户端通信的连接句柄
}

// 创建一个用户的API
func NewUser(conn net.Conn) *User {
  userAddr := conn.RemoteAddr().String()
  user := &User{ // 获取User对象
    Name: userAddr, // 获取当前客户端地址
    Addr: userAddr,
    C:    make(chan string),
    conn: conn,
  }
  // 启动监听当前user channel消息的goroutine
  go user.ListenMessage()

  return user
}

// 监听当前user channel的方法,一旦有消息,就发送给对端客户端
func (u *User) ListenMessage() {
  for {
    msg := <-u.C                     // 从channel中读取数据到msg
    u.conn.Write([]byte(msg + "\n")) // 使用byte数组来写msg
  }
}

server.go

package main

import (
  "fmt"
  "net"
  "sync"
)

type Server struct {
  Ip   string
  Port int
  // 在线用户的map表
  OnlineMap map[string]*User // key放当前用户名,value存放user对象
  // 由于map是全局的,需要加一个读写锁
  mapLock sync.RWMutex // 同步的全部机制都在sync包中
  // 消息广播的channel
  Message chan string
}

//创建一个server接口,即返回server对象。NewServer作为一个类对外开放的方法
func NewServer(ip string, port int) *Server {
  server := &Server{
    Ip:        ip,
    Port:      port,
    OnlineMap: make(map[string]*User),
    Message:   make(chan string),
  }
  return server
}

//监听Message广播消息channel的goroutine,一旦有消息就发送给全部在线的User
func (ser *Server) ListenMessager() {
  for {
    msg := <-ser.Message // 不断尝试从Message channel中读数据
    // 将msg发送给全部在线的User
    ser.mapLock.Lock()
    for _, cli := range ser.OnlineMap { // 从map值中拿到user对象
      cli.C <- msg
    }
    ser.mapLock.Unlock()
  }
}

// 广播消息的方法,参数包括:由哪个用户发起、消息内容是什么
func (ser *Server) BroadCast(user *User, msg string) {
  sendMsg := "[" + user.Addr + "]" + user.Name + ":" + msg // 消息内容 做一个字符串拼接
  ser.Message <- sendMsg                                   // 将消息发给Message的channel中
}

func (ser *Server) Handler(conn net.Conn) {
  //fmt.Println("连接建立成功")
  user := NewUser(conn)
  // 用户上线,将用户加入到OnlineMap中
  ser.mapLock.Lock()              // 加锁
  ser.OnlineMap[user.Name] = user // key是用户名,value是用户对象
  ser.mapLock.Unlock()            // 释放锁
  // 广播当前用户上线消息
  ser.BroadCast(user, "已上线")
  // 发送完消息先阻塞Handler,保证goroutine存活不死亡
  select {}
}

// 启动服务器接口
func (ser *Server) Start() { // ser是创建了一个类对象,便于使用类的属性
  // socket listen
  listener, err := net.Listen("tcp", fmt.Sprintf("%s:%d", ser.Ip, ser.Port)) // Sprintf用于拼接字符串,拼接为"127.0.0.1:8888"
  if err != nil {
    fmt.Println("net.Listen err:", err)
    return
  }
  //close listen socket
  defer listener.Close() // 为了防止遗忘关闭,使用defer关闭

  //启动监听Message的goroutine
  go ser.ListenMessager()

  for {
    //accept
    conn, err := listener.Accept()
    if err != nil {
      fmt.Println("listener Accept err:", err)
      continue
    }
    //do handler 业务回调
    go ser.Handler(conn)
  }
}

版本三:用户消息广播功能

在server.go中增加handler处理读业务的方法,启动一个针对当前客户端的读goroutine。如果读到的n是0即进程已关闭则广播该用户已下线,对于正常发出的消息,获取去掉换行之后的消息,并将得到的消息进行广播。

func (ser *Server) Handler(conn net.Conn) {
  //fmt.Println("连接建立成功")
  user := NewUser(conn)
  // 用户上线,将用户加入到OnlineMap中
  ser.mapLock.Lock()              // 加锁
  ser.OnlineMap[user.Name] = user // key是用户名,value是用户对象
  ser.mapLock.Unlock()            // 释放锁
  // 广播当前用户上线消息
  ser.BroadCast(user, "已上线")

  // 启动goroutine来接受客户端发送的消息
  go func() {
    buf := make([]byte, 4096)
    for {
      n, err := conn.Read(buf)
      if n == 0 {
        ser.BroadCast(user, "已下线")
        return
      }
      if err != nil && err != io.EOF {
        fmt.Println("Conn Read err:", err)
        return
      }
      // 提取用户的消息(去除结尾的'\n')
      msg := string(buf[:n-1])

      // 将得到的消息进行广播
      ser.BroadCast(user, msg)
    }
  }()

  // 发送完消息先阻塞Handler,保证goroutine存活不死亡
  select {}
}

版本四:对用户业务层进行封装

在之前的版本中,用户上线、下线以及消息处理是在server.go中完成的,为了逻辑更好维护,我们将用户业务层做了封装。首先在user类型中新增server关联 server *Server,在NewServer初始化时增添对server的赋值,增加用户上线Online()、下线Offline()、处理消息DoMessage()方法。将之前server.go中的逻辑进行替换,在Handler中将conn和server都和user进行绑定,然后做上线的调用;当客户端关闭,做下线的调用;当成功获取到用户消息做一个处理当前用户消息的调用。

user.go

package main

import "net"

type User struct {
  Name string
  Addr string
  C    chan string // 跟用户绑定的channel
  conn net.Conn    // 当前用户和客户端通信的连接句柄

  server *Server
}

// 创建一个用户的API
func NewUser(conn net.Conn, server *Server) *User {
  userAddr := conn.RemoteAddr().String()
  user := &User{ // 获取User对象
    Name:   userAddr, // 获取当前客户端地址
    Addr:   userAddr,
    C:      make(chan string),
    conn:   conn,
    server: server}
  // 启动监听当前user channel消息的goroutine
  go user.ListenMessage()

  return user
}

// 用户上线业务
func (u *User) Online() {
  u.server.mapLock.Lock()        // 加锁
  u.server.OnlineMap[u.Name] = u // key是用户名,value是用户对象
  u.server.mapLock.Unlock()      // 释放锁
  // 广播当前用户上线消息
  u.server.BroadCast(u, "已上线")
}

// 用户下线业务
func (u *User) Offline() {
  u.server.mapLock.Lock() // 加锁
  delete(u.server.OnlineMap, u.Name)
  u.server.mapLock.Unlock() // 释放锁
  // 广播当前用户上线消息
  u.server.BroadCast(u, "已下线")
}

// 用户处理消息的业务
func (u *User) DoMessage(msg string) {
  u.server.BroadCast(u, msg)
}

// 监听当前user channel的方法,一旦有消息,就发送给对端客户端
func (u *User) ListenMessage() {
  for {
    msg := <-u.C                     // 从channel中读取数据到msg
    u.conn.Write([]byte(msg + "\n")) // 使用byte数组来写msg
  }
}

版本五、六:查询在线用户及修改用户名

查询在线用户列表:在user.go中DoMessage方法中添加逻辑,当msg发送过来的指令是"who"表示查询在线用户,返回在线用户信息;另外谁查询的需要将查询结果发送给这个指定的user,提供SendMsg方法指定user.conn去做msg的写入。

// 给当前user的客户端发送消息
func (u *User) SendMsg(msg string) {
  u.conn.Write([]byte(msg))
}

// 用户处理消息的业务
func (u *User) DoMessage(msg string) {
  if msg == "who" { // 查询当前所有在线用户
    u.server.mapLock.Lock()
    for _, user := range u.server.OnlineMap {
      onlineMsg := "[" + user.Addr + "]" + user.Name + ":" + "在线...\n"
      u.SendMsg(onlineMsg)
    }
    u.server.mapLock.Unlock()
  } else {
    u.server.BroadCast(u, msg)
  }
}

修改用户名:当消息格式为“rename|”时表示重命名,在user.go中DoMessage方法中添加逻辑,增加一个分支判断新用户名是否合法以及是否已被在线用户占用。

// 用户处理消息的业务
func (u *User) DoMessage(msg string) {
  if msg == "who" { // 查询当前所有在线用户的消息格式是"who"
   ...
  } else if len(msg) > 7 && msg[:7] == "rename|" {
    // 重命名的消息格式是"rename|张三"
    newName := strings.Split(msg, "|")[1]
    // 判断新用户名是否已被占用
    _, ok := u.server.OnlineMap[newName]
    if ok { // 被占用
      u.SendMsg("当前用户名已被占用\n")
    } else {
      u.server.mapLock.Lock()
      delete(u.server.OnlineMap, u.Name) // 删除旧用户名
      u.server.OnlineMap[newName] = u    // 将新用户名及对象存储到map
      u.server.mapLock.Unlock()

      u.Name = newName
      u.SendMsg("您已修改用户名为:" + u.Name + "\n")

    }
  } else {
    u.server.BroadCast(u, msg)
  }
}

版本七:超时强踢功能

在server.go中的Handler方法的goroutine中,添加用户活跃channel isAlive,一旦用户本人发送消息就会使用户处于活跃状态,就向该channel发送true。

在循环中监听isAlive和time.After()定时器两个管道,如果case<-isAlive触发就重置定时器,如果case<-isAlive超过定时器时间没有触发的话,select阻塞,按超时处理关闭管道和连接以及退出Handler goroutine。

func (ser *Server) Handler(conn net.Conn) {
  //fmt.Println("连接建立成功")
  user := NewUser(conn, ser)
  user.Online()

  // 监听用户是否活跃的channel
  isAlive := make(chan bool)

  // 启动goroutine来接受客户端发送的消息
  go func() {...
      // 用户的任意消息,都会使用户处于活跃
      isAlive <- true
    }
  }()

  // 发送完消息先阻塞Handler,保证goroutine存活不死亡
  for {
    select {
    // 定时器
    case <-isAlive:
      // 当前用户活跃,无需处理,激活select更新下面case中的定时器
    case <-time.After(time.Second * 10):
      // 已超时,将当前user客户端强制关闭
      user.SendMsg("您已超时,请重新登录")
      // 销毁channel资源
      close(user.C)
      // 关闭绑定Handler的连接
      conn.Close()
      // 退出Handler goroutine
      runtime.Goexit() // 或 return

    }
  }
}

版本八:私聊功能

在user.go中DoMessage方法中添加逻辑,增加对消息格式为“to|张三|你好啊”时的处理,获取对方的用户名,判断用户是否存在,如果存在根据用户名得到对方user对象,然后获取消息内容,如果消息内容不为空,将消息内容发送给对应的user对象。

func (u *User) DoMessage(msg string) {
  if msg == "who" { ...
  } else if len(msg) > 7 && msg[:7] == "rename|" {...
    }
  } else if len(msg) > 4 && msg[:3] == "to|" {
    // 消息格式:to|张三|你好啊
    //1 获取对方的用户名
    remoteName := strings.Split(msg, "|")[1]
    if remoteName == "" {
      u.SendMsg("消息格式不正确,请使用\"to|张三|你好啊\"格式\n")
      return
    }
    //2 根据用户名得到对方user对象
    remoteUser, ok := u.server.OnlineMap[remoteName]
    if !ok { // 该用户名不存在/不在线
      u.SendMsg("该用户名不存在/不在线\n")
      return
    }
    //3 获取消息内容,通过对方的user对象将消息内容发送过去
    content := strings.Split(msg, "|")[2]
    if content == "" {
      u.SendMsg("无消息内容,请重发\n")
    }
    remoteUser.SendMsg(u.Name + "对您说:" + content)
  } else {
    u.server.BroadCast(u, msg)
  }
}

至此,我们看到对于一个即时通信系统已经实现了它的基本功能。关于其中的知识点之后会陆续出文深入,这个系统也会继续更新和维护。感谢阅读!