基于Go语言和Redis的实时聊天室项目详解
项目概述
在这个项目中,我们实现了一个基于Go语言和Redis的实时聊天室系统。该系统允许用户通过客户端连接到服务器,进行实时聊天,并且支持以下功能:
- 用户网名注册和验证
- 消息广播和接收
- 心跳检测和自动重连
- 用户活跃度统计和排名
- 消息存储到Redis
技术栈
- Go语言:用于实现客户端和服务器逻辑
- Redis:用于存储用户活跃度和聊天记录
- TCP协议:用于客户端和服务器之间的通信
项目结构
项目由三个主要文件组成:
- client.go:客户端逻辑
- server.go:服务器逻辑
- utils.go:工具函数(消息发送和读取)
详细实现
客户端实现(client.go)
1. 连接到服务器
客户端通过TCP协议连接到服务器:
conn, err := net.Dial("tcp", "localhost:8080")
if err != nil {log.Fatal("连接服务器时出错:", err)
}
defer conn.Close()
2. 输入网名
用户输入网名并发送到服务器:
fmt.Print("请输入你的网名: ")
nameInput, err := reader.ReadString('\n')
name = strings.TrimSpace(nameInput)
err = utils.SendMessage(conn, []byte(name))
3. 消息发送和接收
用户输入消息后,通过utils.SendMessage
发送到服务器:
message, err := reader.ReadString('\n')
err = utils.SendMessage(conn, []byte(message))
服务器发送的消息通过handleServerMessages
函数接收并打印:
func handleServerMessages(conn *net.Conn) {reader := bufio.NewReader(*conn)for {message, err := utils.ReadMessage(reader)if err != nil {// 处理错误和重连逻辑}fmt.Println(string(message))}
}
4. 心跳检测和自动重连
客户端会接收服务器的心跳检测消息(PING
),并发送PONG
响应:
if string(message) == "PING" {log.Printf("【心跳检测】接收到服务端 PING,发送 PONG 响应")utils.SendMessage(*conn, []byte("PONG"))
}
如果连接断开,客户端会尝试重新连接:
func reconnect(oldConn net.Conn) (net.Conn, bool) {for i := 0; i < 3; i++ {newConn, err := net.Dial("tcp", "localhost:8080")if err == nil {oldConn.Close()return newConn, true}time.Sleep(time.Duration(2<<uint(i)) * time.Second)}return nil, false
}
服务器实现(server.go)
1. 初始化Redis
服务器使用Redis存储用户活跃度和聊天记录:
redisClient := redis.NewClient(&redis.Options{Addr: "localhost:6379",Password: "",DB: 0,
})
2. 客户端管理
服务器维护一个ChatRoom
结构体,管理所有在线客户端:
type ChatRoom struct {Clients map[*Client]boolJoin chan *ClientLeave chan *ClientMessage chan []byteRedis *redis.Clientmu sync.RWMutex
}
3. 消息广播
服务器接收客户端的消息,并广播到所有在线客户端:
func (cr *ChatRoom) Run() {for {select {case message := <-cr.Message:cr.mu.RLock()for client := range cr.Clients {client.Send <- message}cr.mu.RUnlock()batchStoreToRedis(ctx, cr.Redis, message)}}
}
4. 心跳检测
服务器定期向客户端发送PING
消息,并等待PONG
响应:
ticker := time.NewTicker(10 * time.Second)
heartbeatTimeout := time.NewTimer(15 * time.Second)
for {select {case <-ticker.C:utils.SendMessage(client.Conn, []byte("PING"))heartbeatTimeout.Reset(15 * time.Second)case <-heartbeatTimeout.C:cr.Leave <- client}
}
5. Redis存储
服务器将消息批量存储到Redis,并更新用户活跃度:
func batchStoreToRedis(ctx context.Context, redisClient *redis.Client, message []byte) {queue := redisClient.LRange(ctx, "chat_messages_queue", 0, -1).Val()queue = append(queue, string(message))if len(queue) >= 10 {pipe := redisClient.Pipeline()for _, msg := range queue {pipe.RPush(ctx, "chat_messages", msg)pipe.ZIncrBy(ctx, "user_activity", 1, extractUsername(msg))}pipe.Exec(ctx)redisClient.Del(ctx, "chat_messages_queue")} else {redisClient.RPush(ctx, "chat_messages_queue", string(message))}
}
工具函数(utils.go)
工具函数提供了消息的发送和读取功能:
func SendMessage(conn net.Conn, message []byte) error {length := uint32(len(message))err := binary.Write(conn, binary.BigEndian, length)if err != nil {return err}_, err = conn.Write(message)return err
}func ReadMessage(reader *bufio.Reader) ([]byte, error) {var length uint32err := binary.Read(reader, binary.BigEndian, &length)if err != nil {return nil, err}message := make([]byte, length)_, err = io.ReadFull(reader, message)return message, err
}
重点问题分析
问题 1:心跳检测机制与消息处理分支的逻辑冲突导致服务端无法接收客户端消息
1.1 原始代码结构的问题
在未修复的代码中,服务端的 HandleClient
函数使用了如下逻辑:
for {select {case <-ticker.C: // 心跳检测分支(每10秒触发一次)sendPing()default: // 默认分支(非阻塞)message := readMessage()processMessage(message)}
}
1.2 关键问题分析
default
分支的局限性
default
分支仅在 所有其他case
未就绪时触发。
当ticker.C
每隔 10 秒触发一次心跳检测时,select
会优先执行case <-ticker.C
,而default
分支仅在心跳未触发时才会执行。
这会导致:- 消息读取延迟:客户端发送的消息可能堆积在
bufio.Reader
的缓冲区中,但服务端因default
分支未及时执行而无法读取。 - 竞争条件:如果客户端在心跳触发时发送消息,服务端会优先处理心跳,消息可能被跳过。
- 消息读取延迟:客户端发送的消息可能堆积在
1.3 具体场景模拟
假设客户端连续发送两条消息 消息A
和 消息B
,时间线如下:
时间点 0ms: 服务端开始循环
时间点 5ms: 客户端发送消息A
时间点 10ms: ticker.C 触发心跳检测(发送PING)
时间点 15ms: 客户端发送消息B
- 服务端行为:
- 在
10ms
时,case <-ticker.C
触发,发送 PING。 default
分支在10ms
后才有机会执行,但此时bufio.Reader
中可能已有消息A
和消息B
。- 由于
default
分支是非阻塞的,服务端可能只读取到部分消息,甚至因心跳频繁触发而完全跳过消息处理。
- 在
1.4 Go 的 select
调度机制
-
随机选择原则
当多个case
同时就绪时,select
会随机选择一个执行。
但若某个case
(如ticker.C
)周期性触发,它会频繁占用执行机会,导致其他分支(如消息读取)被“饿死”。 -
default
分支的陷阱
default
分支的设计初衷是避免阻塞,但它不适合需要持续监听的操作。
在您的场景中,消息读取需要主动检查缓冲区,而default
分支无法保证这一点。
1.5 修复方案的核心逻辑
修改后的代码通过以下方式解决问题:
for {select {case <-ticker.C: // 心跳检测sendPing()case <-msgTicker.C: // 专用消息读取分支(每50ms触发一次)message := readMessage()processMessage(message)}
}
1.6 为何有效?
-
独立的定时器 (
msgTicker
)
添加了一个专用的msgTicker
,每隔 50ms 触发一次消息读取。- 即使心跳检测占用
select
的执行机会,消息读取仍有独立的触发窗口。 - 避免了心跳和消息处理的竞争。
- 即使心跳检测占用
-
消除
default
分支的不可靠性
用显式的case <-msgTicker.C
替代default
,确保消息读取按固定频率执行。
1.7 总结:冲突的本质
- 心跳机制干扰:周期性的心跳检测(
ticker.C
)占用了select
的执行机会,导致default
分支无法及时处理消息。 - 修复思路:为消息读取分配独立的触发通道(
msgTicker
),与心跳检测解耦,确保两者互不阻塞。
1.8 类比解释
想象一个餐厅服务员(服务端)需要同时做两件事:
- 定时检查厨房温度(心跳检测):每10分钟一次。
- 接待顾客点餐(消息处理):需要随时响应。
- 原始方案:服务员大部分时间站在厨房门口检查温度,只有偶尔看一眼大堂(
default
分支),导致顾客长时间无人接待。 - 修复方案:服务员每隔5分钟主动巡视大堂一次(
msgTicker
),同时定期检查厨房,两者互不干扰。
问题 2:网名重复导致身份冲突
2.1 前因后果
- 问题本质:多个用户使用相同网名加入聊天室,导致消息归属混乱、活跃度统计错误。
- 根本原因:服务端未验证网名唯一性,直接接受客户端提交的名称,未检查是否已被占用。
- 具体表现:
- 用户A和用户B使用相同网名“小明”加入后,服务端无法区分两者消息。
- Redis中
user_activity
的活跃度分数会被错误累加到同一用户名下。
2.2 具体场景模拟
假设用户A和用户B同时尝试使用网名“小明”连接:
时间点 0ms: 用户A发送网名“小明” → 服务端接受并加入。
时间点 50ms: 用户B发送网名“小明” → 服务端未检查唯一性,直接加入。
时间点 100ms: 用户A发送消息“你好”,用户B发送消息“大家好”。
- 服务端行为:
- 用户A和用户B的消息均被标记为“小明: 消息内容”。
- Redis中用户“小明”的活跃度分数错误累加为2(实际应为两个独立用户)。
2.3 Go 语言机制分析
- 并发写入冲突:多个协程同时操作Redis集合时,若未加锁可能导致数据竞争。
- 集合操作的原子性:Redis的
SADD
和SISMEMBER
命令是原子操作,但Go代码中需确保逻辑顺序正确。
2.4 解决方案
- Redis 集合管理在线用户:
- 用户加入时,检查网名是否存在于集合
online_users
:exists, err := cr.Redis.SIsMember(ctx, "online_users", clientName).Result() if exists {utils.SendMessage(conn, []byte("ERROR: 网名已被占用"))return }
- 网名合法后添加到集合:
cr.Redis.SAdd(ctx, "online_users", clientName)
- 用户加入时,检查网名是否存在于集合
- 退出时清理网名:
- 使用
defer
确保客户端断开时从集合中移除:defer cr.Redis.SRem(ctx, "online_users", clientName)
- 使用
2.5 总结类比
- 问题类比:多人共用同一身份证号登记入住酒店,前台无法区分客人。
- 修复思路:前台要求每位客人提供唯一身份证号,登记前查重,退房时注销。
问题 3:空消息和空网名导致无效数据
3.1 前因后果
- 问题本质:用户输入空白内容作为网名或消息,导致系统处理无效数据。
- 根本原因:客户端和服务端未对输入内容做非空验证。
- 具体表现:
- 空网名:用户直接回车加入,服务端记录空名称,广播消息时无法标识来源。
- 空消息:用户发送空格或回车,占用网络带宽和存储资源。
3.2 具体场景模拟
假设用户执行以下操作:
时间点 0ms: 用户输入空网名(直接回车) → 服务端允许加入。
时间点 100ms: 用户发送空消息(空格 + 回车) → 服务端广播“: ”。
- 服务端行为:
- 网名为空的客户端加入,广播“系统广播: 加入了聊天室”。
- 空消息被广播到所有客户端,消息内容无意义。
3.3 Go 语言机制分析
- 字符串处理:
strings.TrimSpace
可过滤首尾空白字符,但需主动调用。 - 输入流阻塞:客户端的
ReadString
可能读取到换行符,需显式检查内容是否为空。
3.4 解决方案
- 客户端验证:
- 循环读取网名直到非空:
for {fmt.Print("请输入网名: ")name = strings.TrimSpace(reader.ReadString())if name != "" {break}fmt.Println("网名不能为空!") }
- 发送消息前检查内容:
message = strings.TrimSpace(input) if message == "" {fmt.Println("消息不能为空!")continue }
- 循环读取网名直到非空:
- 服务端二次过滤:
- 网名非空检查:
if clientName == "" {utils.SendMessage(conn, []byte("ERROR: 网名不能为空"))return }
- 消息内容过滤:
msgContent := strings.TrimSpace(string(message)) if msgContent == "" {log.Printf("客户端 %s 发送了空消息,已忽略", client.Name)continue }
- 网名非空检查:
3.5 总结类比
- 问题类比:用户向邮箱发送空白信件,邮局仍派送,浪费资源。
- 修复思路:邮局拒绝投递无内容信件,并要求寄件人填写有效地址。
问题 4:心跳超时导致僵尸连接
4.1 前因后果
- 问题本质:客户端异常退出后,服务端未检测到离线状态,维持无效连接。
- 根本原因:服务端仅依赖客户端显式退出信号,缺乏被动检测机制。
- 具体表现:
- 客户端断网后,服务端持续等待消息,占用内存和连接资源。
- Redis中
online_users
集合保留无效网名,影响新用户注册。
4.2 具体场景模拟
假设客户端因网络故障断开:
时间点 0ms: 服务端发送PING。
时间点 10ms: 客户端未响应(已断开)。
时间点 25ms: 服务端仍认为客户端在线,未清理资源。
- 服务端行为:
- 客户端连接残留,占用
Clients
集合和TCP端口。 - 新用户无法使用相同网名注册。
- 客户端连接残留,占用
4.3 Go 语言机制分析
- 计时器管理:
time.Timer
需手动重置,避免误触发。 - 协程泄漏风险:未关闭的协程可能持续占用内存。
4.4 解决方案
- 心跳检测与超时机制:
- 服务端每10秒发送PING:
ticker := time.NewTicker(10 * time.Second) defer ticker.Stop()
- 客户端响应PONG后重置超时计时器:
case <-msgTicker.C:if message == "PONG" {heartbeatTimeout.Reset(15 * time.Second)}
- 服务端每10秒发送PING:
- 超时强制断开:
- 若15秒未收到PONG,判定客户端离线:
heartbeatTimeout := time.NewTimer(15 * time.Second) defer heartbeatTimeout.Stop()select { case <-heartbeatTimeout.C:cr.Leave <- clientreturn }
- 若15秒未收到PONG,判定客户端离线:
4.5 总结类比
- 问题类比:电话通话中对方突然静默,但未挂断,导致线路一直被占用。
- 修复思路:运营商设定“无响应超时”,若一段时间无声音,自动挂断释放线路。
总结
通过上述措施,项目实现了:
- 身份唯一性:Redis集合保障网名全局唯一。
- 数据有效性:双重验证过滤空输入。
- 连接健康性:心跳超时机制自动清理僵尸连接。
完整代码
client.go(客户端)
package mainimport ("bufio""fmt""log""net""os""strings""time""Learn/kaohe/redis2/utils"
)// handleServerMessages 负责处理从服务器接收到的消息
func handleServerMessages(conn *net.Conn) {reader := bufio.NewReader(*conn) // 创建一个读取器,用于从连接中读取消息for {message, err := utils.ReadMessage(reader) // 从服务器读取消息if err != nil {log.Println("接收服务器消息时出错:", err)// 如果连接断开,尝试重新连接newConn, ok := reconnect(*conn)if !ok {return // 如果重新连接失败,退出处理}*conn = newConn // 更新连接reader = bufio.NewReader(*conn) // 重新创建读取器continue}log.Printf("接收到消息: %s", string(message))fmt.Println(string(message)) // 打印消息到控制台// 如果收到的是 PING 消息,发送 PONG 响应if string(message) == "PING" {log.Printf("【心跳检测】接收到服务端 PING,发送 PONG 响应")err := utils.SendMessage(*conn, []byte("PONG"))if err != nil {log.Println("发送心跳响应时出错:", err)}}}
}// reconnect 尝试重新连接到服务器
func reconnect(oldConn net.Conn) (net.Conn, bool) {for i := 0; i < 3; i++ { // 最多尝试 3 次log.Printf("尝试第 %d 次重新连接服务器...", i+1)newConn, err := net.Dial("tcp", "localhost:8080") // 尝试连接服务器if err == nil {log.Println("重新连接服务器成功")oldConn.Close() // 关闭旧连接return newConn, true // 返回新连接}time.Sleep(time.Duration(2<<uint(i)) * time.Second) // 指数退避等待}log.Println("多次尝试重新连接服务器失败,退出程序")return nil, false // 重新连接失败
}func main() {conn, err := net.Dial("tcp", "localhost:8080") // 连接到服务器if err != nil {log.Fatal("连接服务器时出错:", err)}defer conn.Close() // 程序结束时关闭连接go handleServerMessages(&conn) // 启动一个 goroutine 处理服务器消息reader := bufio.NewReader(os.Stdin) // 创建一个读取器,用于读取用户输入// 循环读取网名直到输入有效var name stringfor {fmt.Print("请输入你的网名: ")nameInput, err := reader.ReadString('\n') // 读取用户输入的网名if err != nil {log.Fatal("读取网名时出错:", err)}name = strings.TrimSpace(nameInput) // 去除多余空格if name == "" {fmt.Println("网名不能为空,请重新输入!")continue}break}err = utils.SendMessage(conn, []byte(name)) // 向服务器发送网名if err != nil {log.Fatal("发送网名时出错:", err)}fmt.Println("你已成功加入聊天室,可以开始聊天了!")for {message, err := reader.ReadString('\n') // 读取用户输入的消息if err != nil {log.Println("读取用户输入时出错:", err)break}message = strings.TrimSpace(message) // 去除多余空格if message == "" {fmt.Println("消息不能为空,请重新输入!")continue}if message == "/quit" { // 如果输入是 "/quit",退出聊天室fmt.Println("你已退出聊天室")utils.SendMessage(conn, []byte("/quit"))break}err = utils.SendMessage(conn, []byte(message)) // 向服务器发送消息log.Printf("DEBUG: 客户端尝试发送消息: %s, 错误: %v", message, err)if err != nil {log.Println("发送消息到服务器时出错:", err)// 如果发送失败,尝试重新连接newConn, ok := reconnect(conn)if !ok {return}conn = newConnerr = utils.SendMessage(conn, []byte(name)) // 重新发送网名if err != nil {log.Fatal("重新连接后发送网名时出错:", err)}}}
}
server.go(服务端)
package mainimport ("bufio""context""fmt""io""log""net""strings""sync""time""Learn/kaohe/redis2/utils""github.com/redis/go-redis/v9"
)// Client 表示一个客户端连接
type Client struct {Name string // 客户端的网名Conn net.Conn // 客户端的连接Send chan []byte // 用于向客户端发送消息的通道
}// ChatRoom 表示聊天室,管理所有客户端连接
type ChatRoom struct {Clients map[*Client]bool // 当前在线的客户端Join chan *Client // 新客户端加入的通道Leave chan *Client // 客户端离开的通道Message chan []byte // 广播消息的通道Redis *redis.Client // Redis 客户端mu sync.RWMutex // 用于并发控制的互斥锁
}// NewChatRoom 创建一个新的聊天室
func NewChatRoom(redisClient *redis.Client) *ChatRoom {return &ChatRoom{Clients: make(map[*Client]bool), // 初始化客户端列表Join: make(chan *Client), // 初始化加入通道Leave: make(chan *Client), // 初始化离开通道Message: make(chan []byte, 100), // 初始化消息通道,缓冲大小为 100Redis: redisClient, // 初始化 Redis 客户端}
}// Run 聊天室的主循环,处理客户端的加入、离开和消息广播
func (cr *ChatRoom) Run() {for {select {case client := <-cr.Join: // 处理新客户端加入cr.mu.Lock()cr.Clients[client] = true // 将客户端添加到列表cr.mu.Unlock()message := fmt.Sprintf("系统广播: %s 加入了聊天室", client.Name)cr.Message <- []byte(message) // 广播加入消息case client := <-cr.Leave: // 处理客户端离开cr.mu.Lock()if _, ok := cr.Clients[client]; ok {delete(cr.Clients, client) // 从列表中删除客户端close(client.Send) // 关闭客户端的发送通道}cr.mu.Unlock()if client.Name != "" {message := fmt.Sprintf("系统广播: %s 离开了聊天室", client.Name)cr.Message <- []byte(message) // 广播离开消息}case message := <-cr.Message: // 处理广播消息cr.mu.RLock()log.Printf("准备广播消息: %s", string(message))for client := range cr.Clients { // 向所有客户端发送消息select {case client.Send <- message: // 发送消息log.Printf("已发送消息到客户端 %s", client.Name)default:log.Printf("客户端 %s 的发送通道已满", client.Name)}}cr.mu.RUnlock()// 将消息存储到 Redisctx := context.Background()batchStoreToRedis(ctx, cr.Redis, message)}}
}// batchStoreToRedis 将消息批量存储到 Redis
func batchStoreToRedis(ctx context.Context, redisClient *redis.Client, message []byte) {const batchSize = 10 // 批量存储的大小// 从 Redis 获取当前队列中的消息queue := redisClient.LRange(ctx, "chat_messages_queue", 0, -1).Val()queue = append(queue, string(message)) // 将新消息添加到队列if len(queue) >= batchSize { // 如果队列达到批量大小pipe := redisClient.Pipeline() // 创建 Redis 管道for _, msg := range queue {// 将消息存储到 chat_messages 列表pipe.RPush(ctx, "chat_messages", msg)// 更新用户活跃度pipe.ZIncrBy(ctx, "user_activity", 1, extractUsername(msg))}_, _ = pipe.Exec(ctx) // 执行管道redisClient.Del(ctx, "chat_messages_queue") // 删除队列} else {// 如果队列未达到批量大小,将消息存储到队列redisClient.RPush(ctx, "chat_messages_queue", string(message))}
}// extractUsername 从消息中提取用户名
func extractUsername(message string) string {parts := strings.SplitN(message, ": ", 2)if len(parts) > 0 {return parts[0]}return ""
}// HandleClient 处理单个客户端的连接
func (cr *ChatRoom) HandleClient(conn net.Conn) {var clientName stringdefer func() {conn.Close() // 关闭连接cr.Leave <- &Client{Name: clientName, Conn: conn, Send: nil} // 通知聊天室客户端离开log.Printf("客户端连接已关闭: %s", conn.RemoteAddr())}()reader := bufio.NewReader(conn) // 创建读取器nameBytes, err := utils.ReadMessage(reader) // 读取客户端的网名if err != nil {log.Println("读取网名时出错:", err)return}clientName = strings.TrimSpace(string(nameBytes))// 检查网名是否为空if clientName == "" {log.Println("客户端尝试使用空网名连接")utils.SendMessage(conn, []byte("ERROR: 网名不能为空"))return}// 检查网名是否已存在ctx := context.Background()exists, err := cr.Redis.SIsMember(ctx, "online_users", clientName).Result()if err != nil {log.Printf("检查网名 %s 时 Redis 出错: %v", clientName, err)utils.SendMessage(conn, []byte("ERROR: 服务器内部错误"))return}if exists {log.Printf("网名 %s 已存在,拒绝连接", clientName)utils.SendMessage(conn, []byte("ERROR: 网名已被占用,请更换其他名称"))return}// 添加网名到在线用户集合if _, err := cr.Redis.SAdd(ctx, "online_users", clientName).Result(); err != nil {log.Printf("存储网名 %s 到 Redis 失败: %v", clientName, err)utils.SendMessage(conn, []byte("ERROR: 服务器内部错误"))return}defer cr.Redis.SRem(ctx, "online_users", clientName)client := &Client{Name: clientName,Conn: conn,Send: make(chan []byte, 10), // 初始化发送通道}cr.Join <- client // 通知聊天室客户端加入// 启动一个 goroutine 处理发送消息go func() {for message := range client.Send {err := utils.SendMessage(client.Conn, message)if err != nil {log.Println("发送消息给客户端时出错:", err)cr.Leave <- client // 通知聊天室客户端离开break}}}()// 定时发送心跳检测ticker := time.NewTicker(10 * time.Second)// 定时读取消息msgTicker := time.NewTicker(50 * time.Millisecond)// 心跳超时检测heartbeatTimeout := time.NewTimer(15 * time.Second)defer func() {ticker.Stop()msgTicker.Stop()heartbeatTimeout.Stop()}()for {select {case <-ticker.C: // 心跳检测log.Printf("【心跳检测】向客户端 %s 发送 PING", client.Name)if err := utils.SendMessage(client.Conn, []byte("PING")); err != nil {log.Printf("【心跳异常】客户端 %s 心跳发送失败: %v", client.Name, err)cr.Leave <- clientreturn}heartbeatTimeout.Reset(15 * time.Second)case <-heartbeatTimeout.C:log.Printf("【心跳超时】客户端 %s 未响应 PONG,强制断开", client.Name)cr.Leave <- clientreturncase <-msgTicker.C: // 读取消息message, err := utils.ReadMessage(reader)if err != nil {if err == io.EOF {log.Printf("客户端 %s 主动断开连接", client.Name)} else {log.Printf("读取消息时出错: %v", err)}cr.Leave <- clientreturn}if string(message) == "PONG" { // 处理心跳响应log.Printf("【心跳响应】客户端 %s 返回 PONG", client.Name)heartbeatTimeout.Reset(15 * time.Second)continue}msgContent := strings.TrimSpace(string(message))if msgContent == "" {log.Printf("客户端 %s 发送了空消息,已忽略", client.Name)continue}log.Printf("接收到来自 %s 的消息: %s", client.Name, msgContent)// 构造完整消息并广播fullMessage := fmt.Sprintf("%s: %s", client.Name, msgContent)cr.Message <- []byte(fullMessage)}}
}// printActivityRankings 打印用户活跃度排名
func printActivityRankings(redisClient *redis.Client) {ctx := context.Background()topUsers, err := redisClient.ZRevRangeWithScores(ctx, "user_activity", 0, 9).Result()if err != nil {log.Println("获取活跃度排名时出错:", err)return}fmt.Println("活跃度排名:")for i, user := range topUsers {fmt.Printf("%d. %s: %.0f\n", i+1, user.Member.(string), user.Score)}
}// PrintConnectedClients 打印当前在线客户端
func (cr *ChatRoom) PrintConnectedClients() {cr.mu.RLock()defer cr.mu.RUnlock()log.Println("【活跃客户端】当前在线客户端列表:")for client := range cr.Clients {log.Printf(" - %s (IP: %s)", client.Name, client.Conn.RemoteAddr())}
}func main() {// 初始化 Redis 客户端redisClient := redis.NewClient(&redis.Options{Addr: "localhost:6379",Password: "",DB: 0,})ctx := context.Background()_, err := redisClient.Ping(ctx).Result()if err != nil {log.Fatal("连接 Redis 时出错:", err)}// 监听端口listener, err := net.Listen("tcp", ":8080")if err != nil {log.Fatal("监听端口时出错:", err)}defer listener.Close()fmt.Println("聊天室服务器已启动,等待客户端连接...")// 创建聊天室chatRoom := NewChatRoom(redisClient)go chatRoom.Run() // 启动聊天室// 定期打印活跃度排名和在线客户端go func() {ticker := time.NewTicker(30 * time.Second)defer ticker.Stop()for range ticker.C {printActivityRankings(redisClient)chatRoom.PrintConnectedClients()}}()// 接受客户端连接for {conn, err := listener.Accept()if err != nil {log.Println("接受客户端连接时出错:", err)continue}go chatRoom.HandleClient(conn) // 处理客户端连接}
}
utils.go(工具函数,发送和接收消息)
package utilsimport ("bufio""encoding/binary""fmt""io""log""net"
)const maxMessageLength = 1 << 20 // 1MB,最大消息长度限制// SendMessage 向连接发送消息
func SendMessage(conn net.Conn, message []byte) error {length := uint32(len(message)) // 消息长度if length > maxMessageLength {log.Println("消息长度超出限制:", length)return fmt.Errorf("message too long")}// 写入消息长度err := binary.Write(conn, binary.BigEndian, length)if err != nil {log.Printf("写入消息长度时出错: %v", err)return err}// 写入消息内容_, err = conn.Write(message)if err != nil {log.Printf("写入消息内容时出错: %v", err)}return err
}// ReadMessage 从连接读取消息
func ReadMessage(reader *bufio.Reader) ([]byte, error) {var length uint32 // 消息长度err := binary.Read(reader, binary.BigEndian, &length)if err != nil {log.Printf("读取消息长度时出错: %v", err)return nil, err}if length > maxMessageLength {log.Println("消息长度超出限制:", length)return nil, fmt.Errorf("message too long")}// 读取消息内容message := make([]byte, length)_, err = io.ReadFull(reader, message)if err != nil {log.Printf("读取消息内容时出错: %v", err)return nil, err}return message, nil
}
如果对项目有任何问题或建议,欢迎在评论区留言!