csp模型介绍
csp
模型是golang采用的共享内存模型,比对传统多线程共享内存采用lock、condition等方式来规定执行顺序的方式,golang里的csp更强调信道(channel),不关心信道发送、接收方是谁,双方通过信道收发信息。
**ps:**和csp
模型对应的actor
模型,强调通信中的角色(actor),即收发双方,不看重通道。并且角色对外不提供任何接口访问,只约定通过通信异步交换信息,发送方必须知道接收方是谁,例如一个人要给另个人传递消息通过给对方的邮箱写信或者发短信。
channel
golang的channel实现了多线程并发安全的发送和接收方法,可以轻松实现多协程的生产消费模型。
源码解析(go1.19.3:src/runtime/chan.go)
数据结构定义
// 位于源码src/runtime/chan.go
type hchan struct {
qcount uint // total data in the queue
dataqsiz uint // size of the circular queue
buf unsafe.Pointer // points to an array of dataqsiz elements
elemsize uint16
closed uint32
elemtype *_type // element type
sendx uint // send index
recvx uint // receive index
recvq waitq // list of recv waiters
sendq waitq // list of send waiters
// lock protects all fields in hchan, as well as several
// fields in sudogs blocked on this channel.
//
// Do not change another G's status while holding this lock
// (in particular, do not ready a G), as this can deadlock
// with stack shrinking.
lock mutex
}
channel实现为一个固定大小的环形缓冲区,sendx、recvx表示缓冲区的头和尾,所以channel的收发可以一直复用这块缓冲区。
- qcount: 缓冲区当前元素个数
- dataqsiz: 缓冲区的容量
- buf: 缓冲区(初始分配的元素数组)
- elemsize: 元素的内存格式长度
- closed: channel是否已经关闭
- eletype: 元素的类型
- sendx: 环形缓冲区的已写入元素最大索引
- recvx: 环形缓冲区的已写入元素最小索引
- recvq: 阻塞接收数据的协程队列(队列不为空表示channel无数据)
- sendq: 阻塞发送数据的协程队列(队列不为空表示channel满)
- lock: 一把互斥大锁
初始化channel
func makechan(t *chantype, size int) *hchan {
elem := t.elem
// ***piler checks this but be safe.
if elem.size >= 1<<16 {
throw("makechan: invalid channel element type")
}
if hchanSize%maxAlign != 0 || elem.align > maxAlign {
throw("makechan: bad alignment")
}
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError("makechan: size out of range"))
}
// Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
// buf points into the same allocation, elemtype is persistent.
// SudoG's are referenced from their owning thread so they can't be collected.
// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
var c *hchan
switch {
case mem == 0:
// Queue or element size is zero.
c = (*hchan)(mallocgc(hchanSize, nil, true))
// Race detector uses this location for synchronization.
c.buf = c.raceaddr()
case elem.ptrdata == 0:
// Elements do not contain pointers.
// Allocate hchan and buf in one call.
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
// Elements contain pointers.
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
lockInit(&c.lock, lockRankHchan)
if debugChan {
print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
}
return c
}
1.校验channel元素类型大小不能超过64k、元素内存对齐系数不超过8字节;
2.校验缓冲区申请的长度是否合法;
3.根据channel类型分配不同的内存:阻塞通道(长度为0)就创建包含空buf的通道;非阻塞且结构体数据的通道(长度大于0,元素为struct{xxx})就创建hchan+长度*元素大小的内存;非阻塞且指针数据的通道(长度大于0,元素为ptr)就创建hchan和另一个长度大小的内存作为buf;
channel发送
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
if c == nil {
if !block {
return false
}
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
throw("unreachable")
}
if debugChan {
print("chansend: chan=", c, "\n")
}
if raceenabled {
racereadpc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(chansend))
}
if !block && c.closed == 0 && full(c) {
return false
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
lock(&c.lock)
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
if sg := c.recvq.dequeue(); sg != nil {
// Found a waiting receiver. We pass the value we want to send
// directly to the receiver, bypassing the channel buffer (if any).
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
if c.qcount < c.dataqsiz {
// Space is available in the channel buffer. Enqueue the element to send.
qp := chanbuf(c, c.sendx)
if raceenabled {
racenotify(c, c.sendx, nil)
}
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}
if !block {
unlock(&c.lock)
return false
}
// Block on the channel. Some receiver will ***plete our operation for us.
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
c.sendq.enqueue(mysg)
atomic.Store8(&gp.parkingOnChan, 1)
gopark(chanpark***mit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
KeepAlive(ep)
// someone woke us up.
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
closed := !mysg.su***ess
gp.param = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
mysg.c = nil
releaseSudog(mysg)
if closed {
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
panic(plainError("send on closed channel"))
}
return true
}
1.如果是非阻塞通道且通道满了,忽略本次数据发送,直接返回false;
2.加锁锁住通道;
3.如果通道关闭,表示往关闭的通道发送数据,崩溃;
4.接收协程队列非空,表示通道无数据,接收协程们都在摸鱼等待数据,出队一个协程发送数据(数据拷贝给目标协程,并将目标协程放入当前线程m的p本地队列中等待调度),解锁通道,返回true;
5.通道有数据且没满,将数据移动到缓冲区末尾,解锁通道,返回true;
6.如果通道非阻塞的,经过以上步骤到这里表示通道满了,忽略本次数据发送,解锁通道,返回false;
7.经过以上步骤到这里表示通道满了且是阻塞通道,将当前协程的数据加入发送协程队列,挂起当前协程等待被接收协程唤醒;
8.出让线程m,让m去执行其它逻辑;
channel接收
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// raceenabled: don't need to check ep, as it is always on the stack
// or is new memory allocated by reflect.
if debugChan {
print("chanrecv: chan=", c, "\n")
}
if c == nil {
if !block {
return
}
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw("unreachable")
}
// Fast path: check for failed non-blocking operation without acquiring the lock.
if !block && empty(c) {
if atomic.Load(&c.closed) == 0 {
return
}
if empty(c) {
// The channel is irreversibly closed and empty.
if raceenabled {
raceacquire(c.raceaddr())
}
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
lock(&c.lock)
if c.closed != 0 {
if c.qcount == 0 {
if raceenabled {
raceacquire(c.raceaddr())
}
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
// The channel has been closed, but the channel's buffer have data.
} else {
// Just found waiting sender with not closed.
if sg := c.sendq.dequeue(); sg != nil {
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
}
if c.qcount > 0 {
// Receive directly from queue
qp := chanbuf(c, c.recvx)
if raceenabled {
racenotify(c, c.recvx, nil)
}
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}
if !block {
unlock(&c.lock)
return false, false
}
// no sender available: block on this channel.
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
c.recvq.enqueue(mysg)
atomic.Store8(&gp.parkingOnChan, 1)
gopark(chanpark***mit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
// someone woke us up
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
su***ess := mysg.su***ess
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true, su***ess
}
1.如果通道是非阻塞的且通道没有数据,返回false;
2.加锁锁住通道;
3.通道关闭了,解锁通道,返回false;
4.如果发送协程队列非空,表示阻塞通道上有协程卡在发送数据,执行当前协程接收数据(复制数据到当前协程,和发送时一样环形阻塞的发送协程),解锁通道,返回true;
5.否则经过以上且通道有数据,直接移动数据到当前协程,解锁通道,返回true;
6.挂起当前接收协程,等到发送协程唤醒;
7.让出执行线程m去执行别的逻辑;
channel关闭
func closechan(c *hchan) {
if c == nil {
panic(plainError("close of nil channel"))
}
lock(&c.lock)
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("close of closed channel"))
}
if raceenabled {
callerpc := getcallerpc()
racewritepc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(closechan))
racerelease(c.raceaddr())
}
c.closed = 1
var glist gList
// release all readers
for {
sg := c.recvq.dequeue()
if sg == nil {
break
}
if sg.elem != nil {
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = unsafe.Pointer(sg)
sg.su***ess = false
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
// release all writers (they will panic)
for {
sg := c.sendq.dequeue()
if sg == nil {
break
}
sg.elem = nil
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = unsafe.Pointer(sg)
sg.su***ess = false
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
unlock(&c.lock)
// Ready all Gs now that we've dropped the channel lock.
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
goready(gp, 3)
}
}
1.加锁锁住通道;
2.如果接收协程队列非空,说明通道为空,这些协程在摸鱼,给他们每个发送一个nil数据,并将每个协程加入临时协程队列记录;
3.如果发送协程队列非空,说明通道满,给他们每个发送一个nil数据,并将每个协程加入临时协程队列;
4.解锁通道;
5.判断临时协程队列非空,说明有需要唤醒的协程,遍历唤醒;
总结
channel的大体功能函数就分析完了,逻辑还是挺简单的,主要是利用环形缓冲区、lock的加速设计、协程休眠、唤醒机制。
不过还需要注意select {case: ; default:}语句块包裹下的收发:如果是发送或者接收逻辑包裹了select+default,会标记为无阻塞通道,收发不成功立即返回,不会休眠当前协程。