在构建分布式系统和数据库(如 Redis)时,并发问题可能会出现。本文将通过一个股票交易的例子,展示如何使用 Redis 和 Golang 来解决这些问题。
问题定义
场景: 构建一个股票交易应用,多个用户可以同时购买不同公司的股票。每个公司都有一个剩余的股票数量,用户只能购买剩余的股票。
代码:
type Repository struct {
client goRedis.Client
}
func NewRepository(address string) Repository {
return Repository{
client: *goRedis.NewClient(&goRedis.Options{
Addr: address,
}),
}
}
func (r *Repository) BuyShares(ctx context.Context, userId, companyId string, numShares int, wg *sync.WaitGroup) error {
defer wg.Done()
// 获取当前剩余股票数量
currentShares, err := r.client.Get(ctx, companySharesKey).Int()
if err != nil {
fmt.Print(err.Error())
return err
}
// 验证剩余股票数量是否足够
if currentShares < numShares {
fmt.Print("error: company does not have enough shares \n")
return errors.New("error: company does not have enough shares")
}
currentShares -= numShares
// 更新公司剩余股票数量
r.client.Set(ctx, BuildCompanySharesKey(companyId), currentShares, 0)
return nil
}
func main() {
repository := redis.NewRepository(fmt.Sprintf("XXXXXX:XXXXX", config.Redis.Host, config.Redis.Port), config.Redis.Pass)
// 运行并发客户端
companyId := "TestCompanySL"
var wg sync.WaitGroup
wg.Add(total_clients)
for idx := 1; idx <= total_clients; idx++ {
userId := fmt.Sprintf("user%d", idx)
go repository.BuyShares(context.Background(), userId, companyId, 100, &wg)
}
wg.Wait()
// 获取公司剩余股票数量
shares, err := repository.GetCompanyShares(context.Background(), companyId)
if err != nil {
panic(err)
}
fmt.Printf("the number of free shares the company %s has is: %d", companyId, shares)
}
问题: 当多个用户同时购买股票时,由于多个 goroutine 同时读取和更新 currentShares
,导致最终的剩余股票数量不正确。
解决方法
1. 原子操作
思路: 使用 Redis 原子操作 IncrBy
来更新 currentShares
,避免多个 goroutine 同时修改。
代码:
func (r *Repository) BuyShares(ctx context.Context, userId, companyId string, numShares int, wg *sync.WaitGroup) error {
defer wg.Done()
// 获取当前剩余股票数量
currentShares, err := tx.Get(ctx, companySharesKey).Int()
if err != nil {
fmt.Print(err.Error())
return err
}
// 验证剩余股票数量是否足够
if currentShares < numShares {
fmt.Print("error: company does not have enough shares \n")
return errors.New("error: company does not have enough shares")
}
// 使用 IncrBy 原子操作更新剩余股票数量
r.client.IncrBy(ctx, BuildCompanySharesKey(companyId), -1*int64(numShares))
return nil
}
问题: 该方法仍然存在问题。因为在执行 IncrBy
操作之前,多个 goroutine 已经读取了 currentShares
,导致验证逻辑失效。
2. 事务
思路: 使用 Redis 事务来确保数据的一致性。事务可以将多个操作打包在一起,要么全部执行,要么全部不执行。
代码:
func (r *Repository) BuyShares(ctx context.Context, userId, companyId string, numShares int, wg *sync.WaitGroup) error {
defer wg.Done()
companySharesKey := BuildCompanySharesKey(companyId)
err := r.client.Watch(ctx, func(tx *goredislib.Tx) error {
// 获取当前剩余股票数量
currentShares, err := tx.Get(ctx, companySharesKey).Int()
if err != nil {
fmt.Print(fmt.Errorf("error getting value %v", err.Error()))
return err
}
// 验证剩余股票数量是否足够
if currentShares < numShares {
fmt.Print("error: company does not have enough shares \n")
return errors.New("error: company does not have enough shares")
}
currentShares -= numShares
// 更新公司剩余股票数量
_, err = tx.TxPipelined(ctx, func(pipe goredislib.Pipeliner) error {
// pipe handles the error case
pipe.Pipeline().Set(ctx, companySharesKey, currentShares, 0)
return nil
})
if err != nil {
fmt.Println(fmt.Errorf("error in pipeline %v", err.Error()))
return err
}
return nil
}, companySharesKey)
return err
}
问题: 该方法仍然存在问题。因为多个 goroutine 同时进入事务,导致验证逻辑失效。
3. LUA 脚本
思路: 使用 Redis LUA 脚本,将读取、验证和更新操作封装在一个脚本中,确保原子性。
代码:
var BuyShares = goRedis.NewScript(`
local sharesKey = KEYS[1]
local requestedShares = ARGV[1]
local currentShares = redis.call("GET", sharesKey)
if currentShares < requestedShares then
return {err = "error: company does not have enough shares"}
end
currentShares = currentShares - requestedShares
redis.call("SET", sharesKey, currentShares)
`)
func (r *Repository) BuyShares(ctx context.Context, userId, companyId string, numShares int, wg *sync.WaitGroup) error {
defer wg.Done()
keys := []string{BuildCompanySharesKey(companyId)}
err := BuyShares.Run(ctx, r.client, keys, numShares).Err()
if err != nil {
fmt.Println(err.Error())
}
return err
}
优点:
确保原子性,解决并发问题。 提高性能,因为脚本在 Redis 服务器上执行。
缺点:
需要学习 LUA 语言。 脚本执行期间会阻塞其他 Redis 操作。
4. Redis 锁
思路: 使用 Redis 锁来控制对 currentShares
的访问,确保只有一个 goroutine 可以访问。
代码:
func NewRepository(address, password string) Repository {
client := goredislib.NewClient(&goredislib.Options{
Addr: address,
Password: password,
})
pool := goredis.NewPool(client)
rs := redsync.New(pool)
mutexname := "my-global-mutex"
mutex := rs.NewMutex(mutexname)
return Repository{
client: client,
mutex: mutex,
}
}
func (r *Repository) BuyShares(ctx context.Context, userId, companyId string, numShares int, wg *sync.WaitGroup) error {
defer wg.Done()
// 获取锁
if err := r.mutex.Lock(); err != nil {
fmt.Printf("error during lock: %v \n", err)
return err
}
defer func() {
if ok, err := r.mutex.Unlock(); !ok || err != nil {
fmt.Printf("error during unlock: %v \n", err)
}
}()
// 获取当前剩余股票数量
currentShares, err := r.client.Get(ctx, BuildCompanySharesKey(companyId)).Int()
if err != nil {
fmt.Print(err.Error())
return err
}
// 验证剩余股票数量是否足够
if currentShares < numShares {
fmt.Print("error: company does not have enough shares \n")
return errors.New("error: company does not have enough shares")
}
currentShares -= numShares
// 更新公司剩余股票数量
r.client.Set(ctx, BuildCompanySharesKey(companyId), currentShares, 0)
return nil
}
优点:
简单易用。 适用于各种场景。
缺点:
性能可能不如 LUA 脚本。
总结
Redis 提供了多种方法来解决并发问题,包括原子操作、事务、LUA 脚本和 Redis 锁。选择哪种方法取决于具体的场景和需求。
扩展
可以使用 Redis 发布/订阅功能来实现实时股票价格更新。 可以使用 Redis 流来记录股票交易历史。 可以使用 Redis 集群来提高性能和可用性。
希望本文能够帮助您了解如何使用 Redis 和 Golang 解决并发问题。
还没有评论,来说两句吧...