Go:并发
目录:
Go并发介绍
Go在语言层面天然支持并发,这是Go流行的一个主要原因
Go的并发通过goruntine实现,通过用户态线程启动协程的方式完成并发,所以迅速创建goruntine的并发。goruntine的运行时是由runtime完成,线程是由操作系统调度
Go使用channel在多个goruntine进行通信
goruntine
goruntine介绍
在其他编程语言中,需要自己维护线程池,调度执行任务和上下文切换。
Go中goruntine能提供以上的机制,由runtime统一进行管理和调度,将任务合理的分配给每个CPU。
不需要自己去写进程,线程和协程
goruntine使用
示例未使用goruntine
package main
import "fmt"
func hello() {
fmt.Println("Hello Goroutine!")
}
func main() {
hello()
fmt.Println("main goroutine done!")
}
打印结果
Hello Goroutine!
main goroutine done!
示例使用goruntine,关键字为go
package main
import "fmt"
func hello() {
fmt.Println("Hello Goroutine!")
}
func main() {
go hello()
fmt.Println("main goroutine done!")
}
打印结果
main goroutine done!
因为时间过短导致协程并没有执行,主函数执行完成退出,主函数启动的goroutine也就结束了
可以通过sync.WaitGroup来实现等待协程完成再退出主函数
package main
import "fmt"
import "sync"
var wg sync.WaitGroup
func hello() {
fmt.Println("Hello Goroutine!")
wg.Done()
}
func main() {
wg.Add(1)
go hello()
fmt.Println("main goroutine done!")
wg.Wait()
}
打印结果
main goroutine done!
Hello Goroutine!
goruntine启动协程设计到的闭包问题
package main
import "fmt"
import "time"
func hello(i int) {
fmt.Println(i)
}
func main() {
for i := 0; i < 100; i++ {
go hello(i)
}
fmt.Println("main goroutine done!")
time.Sleep(time.Second * 2)
}
这样是可以正常打印0~99的,如果使用匿名函数
package main
import "fmt"
import "sync"
import "time"
var wg sync.WaitGroup
func hello(i int) {
fmt.Println(i)
}
func main() {
for i := 0; i < 100; i++ {
go func() {
fmt.Println(i)
}()
}
fmt.Println("main goroutine done!")
time.Sleep(time.Second * 2)
}
因为演示主机为单核所以都是100
main goroutine done!
100
100
100
100
多个goruntine
启动多个goruntine更需要sync.Waitgroup来实现了
goruntine和线程
可增长的栈
操作系统一般有默认的栈内存,为2MB,一个goruntine的栈在其生命周期开始的时候只有很小的2KB,会根据实际的情况根据其增大和缩小,goruntine的栈最大可以达到1GB,但是基本不会用到这么大
goruntine调度
GPM是Go语言运行时running层面的实现的调度系统
- G为goruntine,存放goruntine信息和与P的绑定信息
- P管理一组goruntine队列,会存储当前goruntine运行的上下文环境(函数指针,堆栈地址和地址边界),当前队列完成消费之后会去全局队列中取,如果全局队列也完成了就去其他的P中获取任务
- M是Go运行时runtime时对操作系统线程的虚拟,M与内核线程是一一对应
P和M一般也是一一对应的,P管理一组G在M上运行,当一个G长久堵塞在一个M上时,会新建一个M,将其他G挂载到新的M上,当阻塞的G完成或者死掉回收旧的M
P的个数通过runtime.GOMAXPROCS设定,默认为CPU线程数,在并发量大的时候会增加一些P和M,但是不会增加太多,频繁的切换得不偿失
调度器采用了一个m:n的调度技术,调度m个goruntine到n个线程
- goruntine是在用户态完成的,不涉及内核态和用户态的切换,在用户态维护着一块大的内存,不用直接调用malloc函数,成本比调度线程小
- 充分利用多核的硬件资源
GOMAXPROCS
runtime.GOMAXPROCS()设置程序并发的CPU逻辑核心数
如果是runtime.GOMAXPROCS(1),是串行执行的
func a() {
for i := 1; i < 10; i++ {
fmt.Println("A:", i)
}
}
func b() {
for i := 1; i < 10; i++ {
fmt.Println("B:", i)
}
}
func main() {
runtime.GOMAXPROCS(1)
go a()
go b()
time.Sleep(time.Second)
}
如果是runtime.GOMAXPROCS(2),是并行执行的
func a() {
for i := 1; i < 10; i++ {
fmt.Println("A:", i)
}
}
func b() {
for i := 1; i < 10; i++ {
fmt.Println("B:", i)
}
}
func main() {
runtime.GOMAXPROCS(2)
go a()
go b()
time.Sleep(time.Second)
}
channel
创建管道
package main
import "fmt"
var a chan int
func main() {
fmt.Println(a)
a = make(chan int) // 初始化管道
a <- 10
}
打印结果,未初始化为空值,因为没有缓冲区,写入数据没有消费者造成死锁
<nil>
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [chan send]:
main.main()
/root/go_code/src/demo/fudianshu.go:10 +0xa5
exit status 2
先在管道另一端添加一个消费者就好了
package main
import "fmt"
import "sync"
var a chan int
var wg sync.WaitGroup
func main() {
a = make(chan int)
wg.Add(1)
go func() {
defer wg.Done()
x := <- a
fmt.Println("后台从管道接收到了", x)
}()
a <- 10
wg.Wait()
}
打印结果
后台从管道接收到了 10
建立一个有缓冲区的管道就不会有这个问题
package main
import "fmt"
import "sync"
var a chan int
var wg sync.WaitGroup
func main() {
fmt.Println(a)
a = make(chan int, 100)
a <- 10
}
管道中的数据如果大,最好传递内存地址指针
管道的操作
只有<-
,符号左边是管道代表写入,符号右边是管道代表读取
发送
ch <- 10 // 把10发送到ch中
接收
x := <- ch // 从ch中接收值并赋值给变量x
<-ch // 从ch中接收值,忽略结果
关闭
close(ch)
关闭通道不是必须的,关闭之后会有以下现象
- 对关闭通道发送数据会导致panic
- 对关闭通道进行接收会一直获取直到取到空值
- 对关闭通道关闭也会导致panic
循环通道
// channel 练习
func main() {
ch1 := make(chan int)
ch2 := make(chan int)
// 开启goroutine将0~100的数发送到ch1中
go func() {
for i := 0; i < 100; i++ {
ch1 <- i
}
close(ch1)
}()
// 开启goroutine从ch1中接收值,并将该值的平方发送到ch2中
go func() {
for {
i, ok := <-ch1 // 通道关闭后再取值ok=false
if !ok {
break
}
ch2 <- i * i
}
close(ch2)
}()
// 在主goroutine中从ch2中接收值打印
for i := range ch2 { // 通道关闭后会退出for range循环
fmt.Println(i)
}
单向通道
单向通道多用于函数的参数
func counter(out chan<- int) {
for i := 0; i < 100; i++ {
out <- i
}
close(out)
}
func squarer(out chan<- int, in <-chan int) {
for i := range in {
out <- i * i
}
close(out)
}
func printer(in <-chan int) {
for i := range in {
fmt.Println(i)
}
}
func main() {
ch1 := make(chan int)
ch2 := make(chan int)
go counter(ch1)
go squarer(ch2, ch1)
printer(ch2)
}
通道异常情况
channel | nil | 非空 | 空的 | 满了 | 没满 |
---|---|---|---|---|---|
接收 | 堵塞 | 接收值 | 阻塞 | 接收值 | 接收值 |
发送 | 阻塞 | 发送值 | 发送值 | 阻塞 | 发送值 |
关闭 | panic | 关闭成功,读取数据直到返回零值 | 关闭成功,返回零值 | 关闭成功,读取数据直到返回零值 | 关闭成功,读取数据直到返回零值 |
select多路复用
select关键字就可以实现,例如需要在多个通道接收数据
for{
// 尝试从ch1接收值
data, ok := <-ch1
// 尝试从ch2接收值
data, ok := <-ch2
…
}
select是使用
select{
case <-ch1:
...
case data := <-ch2:
...
case ch3<-data:
...
default:
默认操作
}
示例
func main() {
ch := make(chan int, 1)
for i := 0; i < 10; i++ {
select {
case x := <-ch:
fmt.Println(x)
case ch <- i:
}
}
}
select的优点
- 可处理一个或多个channel的发送/接收操作。
- 如果多个case同时满足,select会随机选择一个。
- 对于没有case的select{}会一直等待,可用于阻塞main函数。
锁
互斥锁
对于无锁情况
package main
import "fmt"
import "sync"
var x = 0
var wg sync.WaitGroup
func add() {
for i := 0; i < 50000; i++ {
x = x + 1
}
wg.Done()
}
func main() {
wg.Add(2)
go add()
go add()
wg.Wait()
fmt.Println(x)
}
打印的输出是一个小于100000的数,因为协程的重复操作
package main
import "fmt"
import "sync"
var x = 0
var wg sync.WaitGroup
var lock sync.Mutex
func add() {
for i := 0; i < 50000; i++ {
lock.Lock()
x = x + 1
lock.Unlock()
}
wg.Done()
}
func main() {
wg.Add(2)
go add()
go add()
wg.Wait()
fmt.Println(x)
}
这样打印就是100000了
互斥锁保证只有一个goroutine进入临界区,而其他的goroutine等待锁,当锁释放一个等待的goroutine才能进入临界区,多个goroutine同时等待一个锁的时候,唤醒策略是随机的
读写锁
对于读多写少的情况,多个连续的读请求就不需要锁,所以读写锁是一个更好的选择
当一个goroutine获取读锁的时候,其他的goroutine如果是读也可以获取读锁,如果是写锁就等待。当一个goroutine获取写锁,其他的goroutine不论是写锁还是读锁都是等待
package main
import "fmt"
import "sync"
import "time"
var (
x int64
wg sync.WaitGroup
lock sync.Mutex
rwlock sync.RWMutex
)
func write() {
// lock.Lock() // 加互斥锁
rwlock.Lock() // 加写锁
x = x + 1
time.Sleep(10 * time.Millisecond) // 假设读操作耗时10毫秒
rwlock.Unlock() // 解写锁
// lock.Unlock() // 解互斥锁
wg.Done()
}
func read() {
// lock.Lock() // 加互斥锁
rwlock.RLock() // 加读锁
time.Sleep(time.Millisecond) // 假设读操作耗时1毫秒
rwlock.RUnlock() // 解读锁
// lock.Unlock() // 解互斥锁
wg.Done()
}
func main() {
start := time.Now()
for i := 0; i < 10; i++ {
wg.Add(1)
go write()
}
for i := 0; i < 1000; i++ {
wg.Add(1)
go read()
}
wg.Wait()
end := time.Now()
fmt.Println(end.Sub(start))
}
sync.Once
在高并发情况下只执行一次
sync.Once只有Do一个方法
package singleton
import (
"sync"
)
type singleton struct {}
var instance *singleton
var once sync.Once
func GetInstance() *singleton {
once.Do(func() {
instance = &singleton{}
})
return instance
}
sync.Map
go语言的map并非并发安全
var m = make(map[string]int)
func get(key string) int {
return m[key]
}
func set(key string, value int) {
m[key] = value
}
func main() {
wg := sync.WaitGroup{}
for i := 0; i < 20; i++ {
wg.Add(1)
go func(n int) {
key := strconv.Itoa(n)
set(key, n)
fmt.Printf("k=:%v,v:=%v\n", key, get(key))
wg.Done()
}(i)
}
wg.Wait()
}
当并发多了,就会有fatal error: concurrent map writes
的错误
Go提供了sync.Map,有Store、Load、LoadOrStore、Delete、Range等方法
var m = sync.Map{}
func main() {
wg := sync.WaitGroup{}
for i := 0; i < 20; i++ {
wg.Add(1)
go func(n int) {
key := strconv.Itoa(n)
m.Store(key, n)
value, _ := m.Load(key)
fmt.Printf("k=:%v,v:=%v\n", key, value)
wg.Done()
}(i)
}
wg.Wait()
}
代码中频繁的加锁涉及到内核态的上下文切换比较耗时,代价较高。
原子操作
对于基本的数据类型可以使用原子操作来保证并发安全,因为原子操作是Go提供的方法在用户态就可以,比加锁性能高
atomic包
方法 | 解释 |
---|---|
func LoadInt32(addr *int32) (val int32) |
读取操作 |
func LoadInt64(addr *int64) (val int64) |
读取操作 |
func LoadUint32(addr *uint32) (val uint32) |
读取操作 |
func LoadUint64(addr *uint64) (val uint64) |
读取操作 |
func LoadUintptr(addr *uintptr) (val uintptr) |
读取操作 |
func LoadPointer(addr *unsafe.Pointer) (val unsafe.Pointer) |
读取操作 |
func StoreInt32(addr *int32, val int32) |
写入操作 |
func StoreInt64(addr *int64, val int64) |
写入操作 |
func StoreUint32(addr *uint32, val uint32) |
写入操作 |
func StoreUint64(addr *uint64, val uint64) |
写入操作 |
func StoreUintptr(addr *uintptr, val uintptr) |
写入操作 |
func StorePointer(addr *unsafe.Pointer, val unsafe.Pointer) |
写入操作 |
func AddInt32(addr *int32, delta int32) (new int32) |
修改操作 |
func AddInt64(addr *int64, delta int64) (new int64) |
修改操作 |
func AddUint32(addr *uint32, delta uint32) (new uint32) |
修改操作 |
func AddUint64(addr *uint64, delta uint64) (new uint64) |
修改操作 |
func AddUintptr(addr *uintptr, delta uintptr) (new uintptr) |
修改操作 |
func SwapInt32(addr *int32, new int32) (old int32) |
交换操作 |
func SwapInt64(addr *int64, new int64) (old int64) |
交换操作 |
func SwapUint32(addr *uint32, new uint32) (old uint32) |
交换操作 |
func SwapUint64(addr *uint64, new uint64) (old uint64) |
交换操作 |
func SwapUintptr(addr *uintptr, new uintptr) (old uintptr) |
交换操作 |
func SwapPointer(addr *unsafe.Pointer, new unsafe.Pointer) (old unsafe.Pointer) |
交换操作 |
func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool) |
比较并交换操作 |
func CompareAndSwapInt64(addr *int64, old, new int64) (swapped bool) |
比较并交换操作 |
func CompareAndSwapUint32(addr *uint32, old, new uint32) (swapped bool) |
比较并交换操作 |
func CompareAndSwapUint64(addr *uint64, old, new uint64) (swapped bool) |
比较并交换操作 |
func CompareAndSwapUintptr(addr *uintptr, old, new uintptr) (swapped bool) |
比较并交换操作 |
func CompareAndSwapPointer(addr *unsafe.Pointer, old, new unsafe.Pointer) (swapped bool) |
比较并交换操作 |
示例
package main
import (
"fmt"
"sync"
"sync/atomic"
"time"
)
type Counter interface {
Inc()
Load() int64
}
// 普通版
type CommonCounter struct {
counter int64
}
func (c CommonCounter) Inc() {
c.counter++
}
func (c CommonCounter) Load() int64 {
return c.counter
}
// 互斥锁版
type MutexCounter struct {
counter int64
lock sync.Mutex
}
func (m *MutexCounter) Inc() {
m.lock.Lock()
defer m.lock.Unlock()
m.counter++
}
func (m *MutexCounter) Load() int64 {
m.lock.Lock()
defer m.lock.Unlock()
return m.counter
}
// 原子操作版
type AtomicCounter struct {
counter int64
}
func (a *AtomicCounter) Inc() {
atomic.AddInt64(&a.counter, 1)
}
func (a *AtomicCounter) Load() int64 {
return atomic.LoadInt64(&a.counter)
}
func test(c Counter) {
var wg sync.WaitGroup
start := time.Now()
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
c.Inc()
wg.Done()
}()
}
wg.Wait()
end := time.Now()
fmt.Println(c.Load(), end.Sub(start))
}
func main() {
c1 := CommonCounter{} // 非并发安全
test(c1)
c2 := MutexCounter{} // 使用互斥锁实现并发安全
test(&c2)
c3 := AtomicCounter{} // 并发安全且比互斥锁效率更高
test(&c3)
}