目录
一、sync包简介
二、Mutex(互斥锁)
功能特点
常见用法
示例代码
三、RWMutex(读写互斥锁)
功能特点
常见用法
示例代码
四、WaitGroup(等待组)
功能特点
常见用法
示例代码
五、Cond(条件变量)
功能特点
常见用法
示例代码
六、Map(同步的并发安全的map)
功能特点
常见用法
示例代码
七、Pool(对象池)
功能特点
常见用法
示例代码
八、总结
综合应用示例:在线商店系统
功能概述
代码实现
代码解释
运行结果示例
Go语言提供了多种同步机制和锁,用于控制多个goroutine之间的协作和共享资源的访问,确保数据的安全和一致性。以下是对Go语言中sync
包中的主要同步原语和锁的详细讲解,以及完整的代码示例。
一、sync包简介
sync包提供了基本的同步原语,包括:
- Mutex:互斥锁,用于保护临界区,防止多个goroutine同时访问共享资源。
- RWMutex:读写互斥锁,允许多个读者同时访问共享资源,而写者必须独占。
- WaitGroup:用于等待一组goroutine完成。
- Cond:条件变量,用于在goroutine之间协调事件发生的顺序。
- Map:一个并发安全的map,适合频繁的读写场景。
- Pool:对象池,用于管理一组可重用的对象,减少内存分配和垃圾回收的开销。
二、Mutex(互斥锁)
功能特点
- 互斥性:同时只能有一个goroutine持有Mutex。
- 阻塞与等待:如果Mutex已被锁定,尝试获取Mutex的goroutine将被阻塞,直到Mutex被释放。
- 手动控制:需要显式调用
Lock()
和Unlock()
方法来获取和释放锁。
常见用法
- 保护共享变量:防止多个goroutine同时修改共享变量导致数据竞态。
- 控制对资源的并发访问:如文件句柄、网络连接等。
示例代码
package main import ( "fmt" "sync" "time"
) var ( mu sync.Mutex counter int
) func increment() { for i := 0; i < 5; i++ { // 获取Mutex mu.Lock() // 保证只有一个goroutine可以执行这段代码 fmt.Printf("goroutine %d: counter = %d\n", i, counter) counter++ // 释放Mutex mu.Unlock() // 模拟其他任务 time.Sleep(100 * time.Millisecond) }
} func main() { var wg sync.WaitGroup // 启动5个goroutine for i := 0; i < 5; i++ { wg.Add(1) go func(i int) { defer wg.Done() increment() }(i) } // 等待所有goroutine完成 wg.Wait() fmt.Println("Final counter:", counter)
}
三、RWMutex(读写互斥锁)
功能特点
- 读优先:允许多个读者同时访问共享资源。
- 写独占:当有写者需要修改共享资源时,必须等待所有读者完成后才能获得锁。
- 高效并发:适用于读多写少的场景,提升性能。
常见用法
- 保护读多写少的共享数据:如配置信息、缓存等。
- 减少写操作对读操作的影响。
示例代码
package main import ( "fmt" "sync" "time"
) var ( mu sync.RWMutex cache = map[string]string{"key": "value"}
) func readData(key string) { mu.RLock() // 获取读锁 defer mu.RUnlock() // 确保锁能被释放 fmt.Printf("Reading %s: value = %s\n", key, cache[key]) time.Sleep(500 * time.Millisecond)
} func writeData(key, value string) { mu.Lock() // 获取写锁 defer mu.Unlock() // 确保锁能被释放 cache[key] = value fmt.Printf("Updating %s to %s\n", key, value) time.Sleep(500 * time.Millisecond)
} func main() { var wg sync.WaitGroup // 启动5个读者 for i := 0; i < 5; i++ { wg.Add(1) go func(i int) { defer wg.Done() readData(fmt.Sprintf("key-%d", i)) }(i) } // 启动一个写者 go func() { writeData("commonKey", "newValue") }() // 等待所有goroutine完成 wg.Wait() fmt.Println("Final cache:", cache)
}
四、WaitGroup(等待组)
功能特点
- 等待多个goroutine完成:用于父goroutine等待子goroutine全部完成。
- 安全地管理goroutine的完成状态。
常见用法
- 协调多个任务:确保所有的子任务完成后再继续执行主任务。
- 父goroutine等待子goroutine:避免主程序过早退出。
示例代码
package main import ( "fmt" "sync" "time"
) func task(id int) { defer fmt.Printf("Task %d completed\n", id) time.Sleep(2 * time.Second) fmt.Printf("Task %d is running\n", id)
} func main() { var wg sync.WaitGroup fmt.Println("Starting tasks...") for i := 1; i <= 3; i++ { wg.Add(1) go task(i) } fmt.Println("Waiting for tasks to complete...") wg.Wait() fmt.Println("All tasks completed!")
}
五、Cond(条件变量)
功能特点
- 协调goroutine之间的事件发生顺序。
- 类似于其他语言中的条件变量,用于在线程之间协调事件发生的顺序。
常见用法
- 等待特定条件满足:如等待某个事件发生。
- 在生产者-消费者模型中:用来协调生产者和消费者之间的数据传递。
示例代码
package main import ( "fmt" "sync" "time"
) var cond *sync.Cond func producer() { for i := 0; i < 5; i++ { fmt.Println("Producing", i) cond.Broadcast() // 通知所有等待的goroutine time.Sleep(500 * time.Millisecond) } cond.Broadcast()
} func consumer(id int) { for { fmt.Printf("Consumer %d waiting...\n", id) cond.Wait() // 等待条件被满足 fmt.Printf("Consumer %d received\n", id) }
} func main() { var mu sync.Mutex cond = sync.NewCond(&mu) var wg sync.WaitGroup // 启动5个消费者 for i := 1; i <= 5; i++ { wg.Add(1) go func(id int) { defer wg.Done() consumer(id) }(i) } // 启动生产者 go producer() // 等待所有消费者完成 wg.Wait() fmt.Println("All consumers completed!")
}
六、Map(同步的并发安全的map)
功能特点
- 并发安全:适用于多个goroutine同时读写的场景。
- 高效:比使用Mutex保护一个map更高效。
- 便利:不需要自己加锁解锁,使用起来更为方便。
常见用法
- 高频率的读写操作:适用于需要频繁访问的数据结构。
- 多goroutine同时操作共享的键值对。
示例代码
package main import ( "fmt" "sync" "time"
) var safeMap = &sync.Map{} func main() { var wg sync.WaitGroup // 写入数据 for i := 0; i < 10; i++ { wg.Add(1) go func(i int) { defer wg.Done() key := fmt.Sprintf("key%d", i) safeMap.Store(key, i) fmt.Printf("Stored %s -> %d\n", key, i) time.Sleep(100 * time.Millisecond) }(i) } // 读取数据 for i := 0; i < 5; i++ { wg.Add(1) go func(i int) { defer wg.Done() key := fmt.Sprintf("key%d", i) value, ok := safeMap.Load(key) if ok { fmt.Printf("Retrieved %s -> %d\n", key, value) } else { fmt.Printf("Key %s not found\n", key) } time.Sleep(100 * time.Millisecond) }(i) } // 等待所有goroutine完成 fmt.Println("Waiting for all goroutines to complete...") wg.Wait()
}
七、Pool(对象池)
功能特点
- 对象复用:减少对象的创建和销毁,提升性能。
- 资源管理:适用于需要频繁获取和释放资源的场景。
常见用法
- 数据库连接池:管理多个连接,提高数据库访问的效率。
- 内存分配优化:减少垃圾回收的开销。
示例代码
package main import ( "fmt" "sync" "time"
) type ReusableObject struct { id int
} var pool *sync.Pool func init() { pool = &sync.Pool{ New: func() interface{} { return &ReusableObject{ id: 0, } }, }
} func worker(id int) { obj := pool.Get().(*ReusableObject) obj.id = id fmt.Printf("Worker %d is using object with id %d\n", id, obj.id) time.Sleep(500 * time.Millisecond) pool.Put(obj)
} func main() { var wg sync.WaitGroup for i := 1; i <= 5; i++ { wg.Add(1) go func(id int) { defer wg.Done() worker(id) }(i) } fmt.Println("Waiting for all workers to complete...") wg.Wait()
}
八、总结
Go语言通过sync
包提供了丰富的同步原语和锁,帮助开发者在并发编程中确保数据的安全和一致性。通过理解和掌握这些工具,可以有效避免数据竞态和死锁等问题,编写出高效、可靠的并发程序。
选择合适的同步机制:
- Mutex:适用于多个goroutine竞争独占资源的场景。
- RWMutex:适用于读多写少的场景,提升并发性能。
- WaitGroup:用于等待多个goroutine完成,协调程序流程。
- Cond:用于在线程之间协调特定事件的发生顺序。
- Map:适用于频繁的并发读写场景,高效且安全。
- Pool:用于对象的复用,优化内存分配和管理。
综合应用示例:在线商店系统
以下是一个综合应用示例,模拟一个简单的在线商店系统,展示了Go语言中sync
包中各种同步机制的实际应用。这个系统包括产品目录管理、订单处理、用户会话管理等功能,涵盖了Mutex、RWMutex、Cond、Pool和sync.Map的使用。
功能概述
- 产品目录:维护一系列产品信息,支持读取和更新操作。
- 订单队列:管理客户的订单,确保订单的安全处理。
- 数据库连接池:优化数据库连接的使用,提升效率。
- 用户会话管理:维护用户的登录状态,确保会话的安全性。
代码实现
package main import ( "fmt" "sync" "time"
) // Product 代表产品信息
type Product struct { ID string Name string Price float64 Stock int
} // ProductCatalog 产品目录,使用RWMutex进行同步控制
type ProductCatalog struct { products map[string]Product mu sync.RWMutex
} // NewProductCatalog 创建新的产品目录
func NewProductCatalog() *ProductCatalog { return &ProductCatalog{ products: make(map[string]Product), mu: sync.RWMutex{}, }
} // AddProduct 添加产品到目录中
func (c *ProductCatalog) AddProduct(p Product) { c.mu.Lock() defer c.mu.Unlock() c.products[p.ID] = p fmt.Printf("Product %s added successfully\n", p.Name)
} // GetProduct 通过ID获取产品信息
func (c *ProductCatalog) GetProduct(id string) (Product, error) { c.mu.RLock() defer c.mu.RUnlock() p, ok := c.products[id] if !ok { return Product{}, fmt.Errorf("product %s not found", id) } return p, nil
} // UpdateProduct 更新产品信息
func (c *ProductCatalog) UpdateProduct(id string, name string, price float64) { c.mu.Lock() defer c.mu.Unlock() if _, ok := c.products[id]; ok { c.products[id] = Product{ID: id, Name: name, Price: price, Stock: c.products[id].Stock} fmt.Printf("Product %s updated successfully\n", id) } else { fmt.Printf("Product %s not found\n", id) }
} // Order 代表客户订单
type Order struct { ID string ProductID string Quantity int Status string
} // OrderQueue 订单队列,使用Mutex和Cond进行同步控制
type OrderQueue struct { orders []Order mu sync.Mutex cond *sync.Cond
} // NewOrderQueue 创建新的订单队列
func NewOrderQueue() *OrderQueue { return &OrderQueue{ orders: make([]Order, 0), mu: sync.Mutex{}, cond: sync.NewCond(&sync.Mutex{}), }
} // AddOrder 添加订单到队列
func (q *OrderQueue) AddOrder(order Order) { q.mu.Lock() defer q.mu.Unlock() q.orders = append(q.orders, order) q.cond.Broadcast() fmt.Printf("Order %s added to queue\n", order.ID)
} // ProcessOrder 处理订单队列中的订单
func (q *OrderQueue) ProcessOrder() { for { q.cond.L.Lock() for len(q.orders) == 0 { fmt.Println("No orders to process. Waiting...") q.cond.L.Wait() } q.cond.L.Unlock() q.mu.Lock() order := q.orders[0] q.orders = q.orders[1:] q.mu.Unlock() fmt.Printf("Processing order %s\n", order.ID) time.Sleep(2 * time.Second) order.Status = "completed" fmt.Printf("Order %s processed successfully\n", order.ID) }
} // DatabaseConnection 数据库连接对象
type DatabaseConnection struct { ID int Free bool
} // DatabasePool 数据库连接池,使用Pool进行管理
type DatabasePool struct { pool *sync.Pool
} // NewDatabasePool 创建新的数据库连接池
func NewDatabasePool(initialCount int) *DatabasePool { return &DatabasePool{ pool: sync.Pool{ New: func() interface{} { return &DatabaseConnection{ID: 0, Free: true} }, }, }
} // GetConnection 从池中获取一个可用的数据库连接
func (d *DatabasePool) GetConnection() *DatabaseConnection { conn := d.pool.Get().(*DatabaseConnection) conn.Free = false fmt.Printf("Get connection %d\n", conn.ID) return conn
} // ReturnConnection 将连接返回到池中
func (d *DatabasePool) ReturnConnection(conn *DatabaseConnection) { conn.Free = true d.pool.Put(conn) fmt.Printf("Return connection %d\n", conn.ID)
} // UserSession 用户会话信息,使用sync.Map进行管理
type UserSession struct { userID string data map[string]interface{}
} // NewUserSession 创建新的用户会话
func NewUserSession(userID string) *UserSession { return &UserSession{ userID: userID, data: make(map[string]interface{}), }
} // SessionManager 用户会话管理器,使用sync.Map进行管理
type SessionManager struct { sessions sync.Map
} // NewSessionManager 创建新的会话管理器
func NewSessionManager() *SessionManager { return &SessionManager{ sessions: sync.Map{}, }
} // AddSession 添加用户会话
func (s *SessionManager) AddSession(session *UserSession) { s.sessions.Store(session.userID, session) fmt.Printf("Session added for user %s\n", session.userID)
} // GetSession 获取用户会话
func (s *SessionManager) GetSession(userID string) (*UserSession, bool) { value, ok := s.sessions.Load(userID) if ok { return value.(*UserSession), true } return nil, false
} func main() { // 初始化组件 catalog := NewProductCatalog() orderQueue := NewOrderQueue() dbPool := NewDatabasePool(5) sessionMgr := NewSessionManager() // 添加测试产品到目录 testProduct := Product{ID: "P001", Name: "Test Product", Price: 19.99, Stock: 100} catalog.AddProduct(testProduct) // 测试获取产品信息 p, err := catalog.GetProduct("P001") if err == nil { fmt.Printf("Product info: %+v\n", p) } else { fmt.Println("Product not found") } // 模拟用户下单,添加订单 testOrder := Order{ID: "O001", ProductID: "P001", Quantity: 2, Status: "pending"} orderQueue.AddOrder(testOrder) // 启动订单处理goroutine go orderQueue.ProcessOrder() // 模拟获取数据库连接 conn := dbPool.GetConnection() // 使用连接进行数据库操作 time.Sleep(1 * time.Second) dbPool.ReturnConnection(conn) // 模拟用户登录,创建并管理会话 session := NewUserSession("user123") sessionMgr.AddSession(session) retrievedSession, found := sessionMgr.GetSession("user123") if found { fmt.Printf("Session retrieved for user %s\n", retrievedSession.userID) } else { fmt.Printf("Session not found for user %s\n", "user123") } // 等待所有goroutine完成 fmt.Println("Main function completed")
}
代码解释
- ProductCatalog:使用RWMutex保护产品目录,确保读写操作的安全性。
- OrderQueue:使用Mutex和Cond协调订单的生成和处理,确保订单处理的正确性。
- DatabasePool:使用Pool管理数据库连接,优化资源的使用和释放。
- SessionManager:使用sync.Map管理用户会话,确保高并发下的安全访问。
运行结果示例
Product P001 added successfully
Product info: {ID:P001 Name:Test Product Price:19.99 Stock:100}
Order O001 added to queue
Get connection 0
Processing order O001
Order O001 processed successfully
Session added for user user123
Session retrieved for user user123
Main function completed