Go:并发

时间:Feb. 14, 2020 分类:

目录:

Go并发介绍

Go在语言层面天然支持并发,这是Go流行的一个主要原因

Go的并发通过goruntine实现,通过用户态线程启动协程的方式完成并发,所以迅速创建goruntine的并发。goruntine的运行时是由runtime完成,线程是由操作系统调度

Go使用channel在多个goruntine进行通信

goruntine

goruntine介绍

在其他编程语言中,需要自己维护线程池,调度执行任务和上下文切换。

Go中goruntine能提供以上的机制,由runtime统一进行管理和调度,将任务合理的分配给每个CPU。

不需要自己去写进程,线程和协程

goruntine使用

示例未使用goruntine

package main

import "fmt"

func hello() {
    fmt.Println("Hello Goroutine!")
}

func main() {
    hello()
    fmt.Println("main goroutine done!")
}

打印结果

Hello Goroutine!
main goroutine done!

示例使用goruntine,关键字为go

package main

import "fmt"

func hello() {
    fmt.Println("Hello Goroutine!")
}

func main() {
    go hello()
    fmt.Println("main goroutine done!")
}

打印结果

main goroutine done!

因为时间过短导致协程并没有执行,主函数执行完成退出,主函数启动的goroutine也就结束了

可以通过sync.WaitGroup来实现等待协程完成再退出主函数

package main

import "fmt"
import "sync"

var wg sync.WaitGroup

func hello() {
    fmt.Println("Hello Goroutine!")
    wg.Done()
}

func main() {
    wg.Add(1)
    go hello()
    fmt.Println("main goroutine done!")
    wg.Wait()
}

打印结果

main goroutine done!
Hello Goroutine!

goruntine启动协程设计到的闭包问题

package main

import "fmt"
import "time"

func hello(i int) {
    fmt.Println(i)
}

func main() {
    for i := 0; i < 100; i++ {
        go hello(i)
    }
    fmt.Println("main goroutine done!")
    time.Sleep(time.Second * 2)
}

这样是可以正常打印0~99的,如果使用匿名函数

package main

import "fmt"
import "sync"
import "time"

var wg sync.WaitGroup

func hello(i int) {
    fmt.Println(i)
}

func main() {
    for i := 0; i < 100; i++ {
        go func() {
        fmt.Println(i)
        }()
    }
    fmt.Println("main goroutine done!")
    time.Sleep(time.Second * 2)
}

因为演示主机为单核所以都是100

main goroutine done!
100
100
100
100

多个goruntine

启动多个goruntine更需要sync.Waitgroup来实现了

goruntine和线程

可增长的栈

操作系统一般有默认的栈内存,为2MB,一个goruntine的栈在其生命周期开始的时候只有很小的2KB,会根据实际的情况根据其增大和缩小,goruntine的栈最大可以达到1GB,但是基本不会用到这么大

goruntine调度

GPM是Go语言运行时running层面的实现的调度系统

  • G为goruntine,存放goruntine信息和与P的绑定信息
  • P管理一组goruntine队列,会存储当前goruntine运行的上下文环境(函数指针,堆栈地址和地址边界),当前队列完成消费之后会去全局队列中取,如果全局队列也完成了就去其他的P中获取任务
  • M是Go运行时runtime时对操作系统线程的虚拟,M与内核线程是一一对应

P和M一般也是一一对应的,P管理一组G在M上运行,当一个G长久堵塞在一个M上时,会新建一个M,将其他G挂载到新的M上,当阻塞的G完成或者死掉回收旧的M

P的个数通过runtime.GOMAXPROCS设定,默认为CPU线程数,在并发量大的时候会增加一些P和M,但是不会增加太多,频繁的切换得不偿失

调度器采用了一个m:n的调度技术,调度m个goruntine到n个线程

  • goruntine是在用户态完成的,不涉及内核态和用户态的切换,在用户态维护着一块大的内存,不用直接调用malloc函数,成本比调度线程小
  • 充分利用多核的硬件资源

GOMAXPROCS

runtime.GOMAXPROCS()设置程序并发的CPU逻辑核心数

如果是runtime.GOMAXPROCS(1),是串行执行的

func a() {
    for i := 1; i < 10; i++ {
        fmt.Println("A:", i)
    }
}

func b() {
    for i := 1; i < 10; i++ {
        fmt.Println("B:", i)
    }
}

func main() {
    runtime.GOMAXPROCS(1)
    go a()
    go b()
    time.Sleep(time.Second)
}

如果是runtime.GOMAXPROCS(2),是并行执行的

func a() {
    for i := 1; i < 10; i++ {
        fmt.Println("A:", i)
    }
}

func b() {
    for i := 1; i < 10; i++ {
        fmt.Println("B:", i)
    }
}

func main() {
    runtime.GOMAXPROCS(2)
    go a()
    go b()
    time.Sleep(time.Second)
}

channel

创建管道

package main

import "fmt"

var a chan int

func main() {
    fmt.Println(a)
    a = make(chan int) // 初始化管道
    a <- 10
}

打印结果,未初始化为空值,因为没有缓冲区,写入数据没有消费者造成死锁

<nil>
fatal error: all goroutines are asleep - deadlock!

goroutine 1 [chan send]:
main.main()
    /root/go_code/src/demo/fudianshu.go:10 +0xa5
exit status 2

先在管道另一端添加一个消费者就好了

package main

import "fmt"
import "sync"

var a chan int
var wg sync.WaitGroup

func main() {
    a = make(chan int)
    wg.Add(1)
    go func() {
        defer wg.Done()
        x := <- a
        fmt.Println("后台从管道接收到了", x)
    }()
    a <- 10
    wg.Wait()
}

打印结果

后台从管道接收到了 10

建立一个有缓冲区的管道就不会有这个问题

package main

import "fmt"
import "sync"

var a chan int
var wg sync.WaitGroup

func main() {
    fmt.Println(a)
    a = make(chan int, 100)
    a <- 10
}

管道中的数据如果大,最好传递内存地址指针

管道的操作

只有<-,符号左边是管道代表写入,符号右边是管道代表读取

发送

ch <- 10 // 把10发送到ch中

接收

x := <- ch // 从ch中接收值并赋值给变量x
<-ch       // 从ch中接收值,忽略结果

关闭

close(ch)

关闭通道不是必须的,关闭之后会有以下现象

  1. 对关闭通道发送数据会导致panic
  2. 对关闭通道进行接收会一直获取直到取到空值
  3. 对关闭通道关闭也会导致panic

循环通道

// channel 练习
func main() {
    ch1 := make(chan int)
    ch2 := make(chan int)
    // 开启goroutine将0~100的数发送到ch1中
    go func() {
        for i := 0; i < 100; i++ {
            ch1 <- i
        }
        close(ch1)
    }()
    // 开启goroutine从ch1中接收值,并将该值的平方发送到ch2中
    go func() {
        for {
            i, ok := <-ch1 // 通道关闭后再取值ok=false
            if !ok {
                break
            }
            ch2 <- i * i
        }
        close(ch2)
    }()
    // 在主goroutine中从ch2中接收值打印
    for i := range ch2 { // 通道关闭后会退出for range循环
        fmt.Println(i)
    }

单向通道

单向通道多用于函数的参数

func counter(out chan<- int) {
    for i := 0; i < 100; i++ {
        out <- i
    }
    close(out)
}

func squarer(out chan<- int, in <-chan int) {
    for i := range in {
        out <- i * i
    }
    close(out)
}
func printer(in <-chan int) {
    for i := range in {
        fmt.Println(i)
    }
}

func main() {
    ch1 := make(chan int)
    ch2 := make(chan int)
    go counter(ch1)
    go squarer(ch2, ch1)
    printer(ch2)
}

通道异常情况

channel nil 非空 空的 满了 没满
接收 堵塞 接收值 阻塞 接收值 接收值
发送 阻塞 发送值 发送值 阻塞 发送值
关闭 panic 关闭成功,读取数据直到返回零值 关闭成功,返回零值 关闭成功,读取数据直到返回零值 关闭成功,读取数据直到返回零值

select多路复用

select关键字就可以实现,例如需要在多个通道接收数据

for{
    // 尝试从ch1接收值
    data, ok := <-ch1
    // 尝试从ch2接收值
    data, ok := <-ch2
    …
}

select是使用

select{
    case <-ch1:
        ...
    case data := <-ch2:
        ...
    case ch3<-data:
        ...
    default:
        默认操作
}

示例

func main() {
    ch := make(chan int, 1)
    for i := 0; i < 10; i++ {
        select {
        case x := <-ch:
            fmt.Println(x)
        case ch <- i:
        }
    }
}

select的优点

  • 可处理一个或多个channel的发送/接收操作。
  • 如果多个case同时满足,select会随机选择一个。
  • 对于没有case的select{}会一直等待,可用于阻塞main函数。

互斥锁

对于无锁情况

package main

import "fmt"
import "sync"

var x = 0 
var wg sync.WaitGroup

func add() {
    for i := 0; i < 50000; i++ {
        x = x + 1
    }
    wg.Done()
}

func main() {
    wg.Add(2)
    go add()
    go add()
    wg.Wait()
    fmt.Println(x)
}

打印的输出是一个小于100000的数,因为协程的重复操作

package main

import "fmt"
import "sync"

var x = 0 
var wg sync.WaitGroup
var lock sync.Mutex

func add() {
    for i := 0; i < 50000; i++ {
        lock.Lock()
        x = x + 1
        lock.Unlock()
    }
    wg.Done()
}

func main() {
    wg.Add(2)
    go add()
    go add()
    wg.Wait()
    fmt.Println(x)
}

这样打印就是100000了

互斥锁保证只有一个goroutine进入临界区,而其他的goroutine等待锁,当锁释放一个等待的goroutine才能进入临界区,多个goroutine同时等待一个锁的时候,唤醒策略是随机的

读写锁

对于读多写少的情况,多个连续的读请求就不需要锁,所以读写锁是一个更好的选择

当一个goroutine获取读锁的时候,其他的goroutine如果是读也可以获取读锁,如果是写锁就等待。当一个goroutine获取写锁,其他的goroutine不论是写锁还是读锁都是等待

package main

import "fmt"
import "sync"
import "time"

var (
    x      int64
    wg     sync.WaitGroup
    lock   sync.Mutex
    rwlock sync.RWMutex
)

func write() {
    // lock.Lock()   // 加互斥锁
    rwlock.Lock() // 加写锁
    x = x + 1
    time.Sleep(10 * time.Millisecond) // 假设读操作耗时10毫秒
    rwlock.Unlock()                   // 解写锁
    // lock.Unlock()                     // 解互斥锁
    wg.Done()
}

func read() {
    // lock.Lock()                  // 加互斥锁
    rwlock.RLock()               // 加读锁
    time.Sleep(time.Millisecond) // 假设读操作耗时1毫秒
    rwlock.RUnlock()             // 解读锁
    // lock.Unlock()                // 解互斥锁
    wg.Done()
}

func main() {
    start := time.Now()
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go write()
    }

    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go read()
    }

    wg.Wait()
    end := time.Now()
    fmt.Println(end.Sub(start))
}

sync.Once

在高并发情况下只执行一次

sync.Once只有Do一个方法

package singleton

import (
    "sync"
)

type singleton struct {}

var instance *singleton
var once sync.Once

func GetInstance() *singleton {
    once.Do(func() {
        instance = &singleton{}
    })
    return instance
}

sync.Map

go语言的map并非并发安全

var m = make(map[string]int)

func get(key string) int {
    return m[key]
}

func set(key string, value int) {
    m[key] = value
}

func main() {
    wg := sync.WaitGroup{}
    for i := 0; i < 20; i++ {
        wg.Add(1)
        go func(n int) {
            key := strconv.Itoa(n)
            set(key, n)
            fmt.Printf("k=:%v,v:=%v\n", key, get(key))
            wg.Done()
        }(i)
    }
    wg.Wait()
}

当并发多了,就会有fatal error: concurrent map writes的错误

Go提供了sync.Map,有Store、Load、LoadOrStore、Delete、Range等方法

var m = sync.Map{}

func main() {
    wg := sync.WaitGroup{}
    for i := 0; i < 20; i++ {
        wg.Add(1)
        go func(n int) {
            key := strconv.Itoa(n)
            m.Store(key, n)
            value, _ := m.Load(key)
            fmt.Printf("k=:%v,v:=%v\n", key, value)
            wg.Done()
        }(i)
    }
    wg.Wait()
}

代码中频繁的加锁涉及到内核态的上下文切换比较耗时,代价较高。

原子操作

对于基本的数据类型可以使用原子操作来保证并发安全,因为原子操作是Go提供的方法在用户态就可以,比加锁性能高

atomic包

方法 解释
func LoadInt32(addr *int32) (val int32) 读取操作
func LoadInt64(addr *int64) (val int64) 读取操作
func LoadUint32(addr *uint32) (val uint32) 读取操作
func LoadUint64(addr *uint64) (val uint64) 读取操作
func LoadUintptr(addr *uintptr) (val uintptr) 读取操作
func LoadPointer(addr *unsafe.Pointer) (val unsafe.Pointer) 读取操作
func StoreInt32(addr *int32, val int32) 写入操作
func StoreInt64(addr *int64, val int64) 写入操作
func StoreUint32(addr *uint32, val uint32) 写入操作
func StoreUint64(addr *uint64, val uint64) 写入操作
func StoreUintptr(addr *uintptr, val uintptr) 写入操作
func StorePointer(addr *unsafe.Pointer, val unsafe.Pointer) 写入操作
func AddInt32(addr *int32, delta int32) (new int32) 修改操作
func AddInt64(addr *int64, delta int64) (new int64) 修改操作
func AddUint32(addr *uint32, delta uint32) (new uint32) 修改操作
func AddUint64(addr *uint64, delta uint64) (new uint64) 修改操作
func AddUintptr(addr *uintptr, delta uintptr) (new uintptr) 修改操作
func SwapInt32(addr *int32, new int32) (old int32) 交换操作
func SwapInt64(addr *int64, new int64) (old int64) 交换操作
func SwapUint32(addr *uint32, new uint32) (old uint32) 交换操作
func SwapUint64(addr *uint64, new uint64) (old uint64) 交换操作
func SwapUintptr(addr *uintptr, new uintptr) (old uintptr) 交换操作
func SwapPointer(addr *unsafe.Pointer, new unsafe.Pointer) (old unsafe.Pointer) 交换操作
func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool) 比较并交换操作
func CompareAndSwapInt64(addr *int64, old, new int64) (swapped bool) 比较并交换操作
func CompareAndSwapUint32(addr *uint32, old, new uint32) (swapped bool) 比较并交换操作
func CompareAndSwapUint64(addr *uint64, old, new uint64) (swapped bool) 比较并交换操作
func CompareAndSwapUintptr(addr *uintptr, old, new uintptr) (swapped bool) 比较并交换操作
func CompareAndSwapPointer(addr *unsafe.Pointer, old, new unsafe.Pointer) (swapped bool) 比较并交换操作

示例

package main

import (
    "fmt"
    "sync"
    "sync/atomic"
    "time"
)

type Counter interface {
    Inc()
    Load() int64
}

// 普通版
type CommonCounter struct {
    counter int64
}

func (c CommonCounter) Inc() {
    c.counter++
}

func (c CommonCounter) Load() int64 {
    return c.counter
}

// 互斥锁版
type MutexCounter struct {
    counter int64
    lock    sync.Mutex
}

func (m *MutexCounter) Inc() {
    m.lock.Lock()
    defer m.lock.Unlock()
    m.counter++
}

func (m *MutexCounter) Load() int64 {
    m.lock.Lock()
    defer m.lock.Unlock()
    return m.counter
}

// 原子操作版
type AtomicCounter struct {
    counter int64
}

func (a *AtomicCounter) Inc() {
    atomic.AddInt64(&a.counter, 1)
}

func (a *AtomicCounter) Load() int64 {
    return atomic.LoadInt64(&a.counter)
}

func test(c Counter) {
    var wg sync.WaitGroup
    start := time.Now()
    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func() {
            c.Inc()
            wg.Done()
        }()
    }
    wg.Wait()
    end := time.Now()
    fmt.Println(c.Load(), end.Sub(start))
}

func main() {
    c1 := CommonCounter{} // 非并发安全
    test(c1)
    c2 := MutexCounter{} // 使用互斥锁实现并发安全
    test(&c2)
    c3 := AtomicCounter{} // 并发安全且比互斥锁效率更高
    test(&c3)
}