在我們前面的一些介紹 sync包相關(guān)的文章中,我們應(yīng)該也發(fā)現(xiàn)了,其中有不少地方使用了原子操作。比如 sync.WaitGroup、sync.Map再到 sync.Pool,這些結(jié)構(gòu)體的實現(xiàn)中都有原子操作的身影。原子操作在并發(fā)編程中是一種非常重要的操作,它可以保證并發(fā)安全,而且效率也很高。本文將會深入探討一下 go 中原子操作的原理、使用場景、用法等內(nèi)容。
如果讓我用一句話來說明什么是原子操作,那就是:原子操作是變量級別的互斥鎖。簡單來說,就是同一時刻,只能有一個 CPU 對變量進行讀或?qū)憽.?dāng)我們想要對某個變量做并發(fā)安全的修改,除了使用官方提供的 Mutex,還可以使用 sync/atomic包的原子操作,它能夠保證對變量的讀取或修改期間不被其他的協(xié)程所影響。
(資料圖片)
我們可以用下圖來表示:
說明:在上圖中,我們有三個 CPU 邏輯核,其中 CPU 1 正在對變量 v做原子操作,這個時候 CPU 2 和 CPU 3 不能對 v做任何操作,在 CPU 1 操作完成后,CPU 2 和 CPU 3 可以獲取到 v的最新值。
從這個角度看,我們可以把 sync/atomic包中的原子操作看成是變量級別的互斥鎖。就是說,在 go 中,當(dāng)一個協(xié)程對變量做原子操作時,其他協(xié)程不能對這個變量做任何操作,直到這個協(xié)程操作完成。
拿一個簡單的例子來說明一下原子操作的使用場景:
func TestAtomic(t *testing.T) {var sum = 0var wg sync.WaitGroupwg.Add(1000)// 啟動 1000 個協(xié)程,每個協(xié)程對 sum 做加法操作for i := 0; i < 1000; i++ {go func() {defer wg.Done()sum++}()}// 等待所有的協(xié)程都執(zhí)行完畢wg.Wait()fmt.Println(sum) // 這里輸出多少呢?}我們可以在自己的電腦上運行一下這段代碼,看看輸出的結(jié)果是多少。不出意外的話,應(yīng)該每次可能都不一樣,而且應(yīng)該也不是 1000,這是為什么呢?
這是因為,CPU 在對 sum做加法的時候,需要先將 sum目前的值讀取到 CPU 的寄存器中,然后再進行加法操作,最后再寫回到內(nèi)存中。如果有兩個 CPU 同時取了 sum的值,然后都進行了加法操作,然后都再寫回到內(nèi)存中,那么就會導(dǎo)致 sum的值被覆蓋,從而導(dǎo)致結(jié)果不正確。
舉個例子,目前內(nèi)存中的 sum為 1,然后兩個 CPU 同時取了這個 1 來做加法,然后都得到了結(jié)果 2,然后這兩個 CPU 將各自的計算結(jié)果寫回到內(nèi)存中,那么內(nèi)存中的 sum就變成了 2,而不是 3。
在這種場景下,我們可以使用原子操作來實現(xiàn)并發(fā)安全的加法操作:
func TestAtomic1(t *testing.T) {// 將 sum 的類型改成 int32,因為原子操作只能針對 int32、int64、uint32、uint64、uintptr 這幾種類型var sum int32 = 0var wg sync.WaitGroupwg.Add(1000) // 啟動 1000 個協(xié)程,每個協(xié)程對 sum 做加法操作for i := 0; i < 1000; i++ {go func() {defer wg.Done()// 將 sum++ 改成下面這樣atomic.AddInt32(&sum, 1)}()}wg.Wait()fmt.Println(sum) // 輸出 1000}在上面這個例子中,我們每次執(zhí)行都能得到 1000 這個結(jié)果。
因為使用原子操作的時候,同一時刻只能有一個 CPU 對變量進行讀或?qū)懀跃筒粫霈F(xiàn)上面的問題了。
所以很多需要對變量做并發(fā)讀寫的地方,我們都可以考慮一下,是否可以使用原子操作來實現(xiàn)并發(fā)安全的操作(而不是使用互斥鎖,互斥鎖效率相比原子操作要低一些)。
原子操作的使用場景也是和互斥鎖類似的,但是不一樣的是,我們的鎖粒度只是一個變量而已。也就是說,當(dāng)我們不允許多個 CPU 同時對變量進行讀寫的時候(保證變量同一時刻只能一個 CPU 操作),就可以使用原子操作。
看完上面原子操作的介紹,有沒有覺得原子操作很神奇,居然有這么好用的東西。那它到底是怎么實現(xiàn)的呢?
一般情況下,原子操作的實現(xiàn)需要特殊的 CPU 指令或者系統(tǒng)調(diào)用。這些指令或者系統(tǒng)調(diào)用可以保證在執(zhí)行期間不會被其他操作或事件中斷,從而保證操作的原子性。
例如,在 x86 架構(gòu)的 CPU 中,可以使用 LOCK前綴來實現(xiàn)原子操作。LOCK前綴可以與其他指令一起使用,用于鎖定內(nèi)存總線,防止其他 CPU 訪問同一內(nèi)存地址,從而實現(xiàn)原子操作。在使用 LOCK前綴的指令執(zhí)行期間,CPU 會將當(dāng)前處理器緩存中的數(shù)據(jù)寫回到內(nèi)存中,并鎖定該內(nèi)存地址,防止其他 CPU 修改該地址的數(shù)據(jù)(所以原子操作總是可以讀取到最新的數(shù)據(jù))。一旦當(dāng)前 CPU 對該地址的操作完成,CPU 會釋放該內(nèi)存地址的鎖定,其他 CPU 才能繼續(xù)對該地址進行訪問。
我們再來捋一下上面的內(nèi)容,看看 LOCK前綴是如何實現(xiàn)原子操作的:
其他架構(gòu)的 CPU 可能會略有不同,但是原理是一樣的。
在 go 中,主要有以下幾種原子操作:Add、CompareAndSwap、Load、Store、Swap。
Add為前綴,后綴針對特定類型的名稱。原子增被操作的類型只能是數(shù)值類型,即 int32、int64、uint32、uint64、uintptr原子增減函數(shù)的第一個參數(shù)為原值,第二個參數(shù)是要增減多少。方法:func AddInt32(addr *int32, delta int32) (new int32)func AddInt64(addr *int64, delta int64) (new int64)func AddUint32(addr *uint32, delta uint32) (new uint32)func AddUint64(addr *uint64, delta uint64) (new uint64)func AddUintptr(addr *uintptr, delta uintptr) (new uintptr)
int32和 int64的第二個參數(shù)可以是負數(shù),這樣就可以做原子減法了。
也就是我們常見的 CAS,在 CAS操作中,會需要拿舊的值跟 old比較,如果相等,就將 new賦值給 addr。如果不相等,則不做任何操作。最后返回一個 bool值,表示是否成功 swap。
也就是說,這個操作可能是不成功的。這很正常,在并發(fā)環(huán)境下,多個協(xié)程對同一個變量進行操作,肯定會存在競爭的情況。在這種情況下,偶爾的失敗是正常的,我們只需要在失敗的時候,重新嘗試即可。因為原子操作需要的時間往往是比較短的,因此在失敗的時候,我們可以通過自旋的方式來再次進行嘗試。
在這種情況下,如果不自旋,那就需要將這個協(xié)程掛起,等待其他協(xié)程完成操作,然后再次嘗試。這個過程相比自旋可能會更加耗時。因為很有可能這次原子操作不成功,下一次就成功了。如果我們每次都將協(xié)程掛起,那么效率就會大大降低。
for+ 原子操作的方式,在 go 的 sync包中很多地方都有使用,比如 sync.Map,sync.Pool等。這也是使用原子操作時一個非常常見的使用模式。
CompareAndSwap的功能:
CompareAndSwap為前綴,后綴針對特定類型的名稱。原子比較并交換被操作的類型可以是數(shù)值類型或指針類型,即 int32、int64、uint32、uint64、uintptr、unsafe.Pointer原子比較并交換函數(shù)的第一個參數(shù)為原值指針,第二個參數(shù)是要比較的值,第三個參數(shù)是要交換的值。方法:func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool)func CompareAndSwapInt64(addr *int64, old, new int64) (swapped bool)func CompareAndSwapUint32(addr *uint32, old, new uint32) (swapped bool)func CompareAndSwapUint64(addr *uint64, old, new uint64) (swapped bool)func CompareAndSwapUintptr(addr *uintptr, old, new uintptr) (swapped bool)func CompareAndSwapPointer(addr *unsafe.Pointer, old, new unsafe.Pointer) (swapped bool)
原子性的讀取操作接受一個對應(yīng)類型的指針值,返回該指針指向的值。原子性讀取意味著讀取值的同時,當(dāng)前計算機的任何 CPU 都不會進行針對值的讀寫操作。
如果不使用原子 Load,當(dāng)使用 v := value這種賦值方式為變量 v賦值時,讀取到的 value可能不是最新的,因為在讀取操作時其他協(xié)程對它的讀寫操作可能會同時發(fā)生。
Load 操作有下面這些:
func LoadInt32(addr *int32) (val int32)func LoadInt64(addr *int64) (val int64)func LoadUint32(addr *uint32) (val uint32)func LoadUint64(addr *uint64) (val uint64)func LoadUintptr(addr *uintptr) (val uintptr)func LoadPointer(addr *unsafe.Pointer) (val unsafe.Pointer)
Store可以將 val值保存到 *addr中,Store操作是原子性的,因此在執(zhí)行 Store操作時,當(dāng)前計算機的任何 CPU 都不會進行針對 *addr的讀寫操作。
val值保存到 *addr中。與讀操作對應(yīng)的寫入操作,sync/atomic提供了與原子值載入 Load函數(shù)相對應(yīng)的原子值存儲 Store函數(shù),原子性存儲函數(shù)均以 Store為前綴。Store操作有下面這些:
func StoreInt32(addr *int32, val int32)func StoreInt64(addr *int64, val int64)func StoreUint32(addr *uint32, val uint32)func StoreUint64(addr *uint64, val uint64)func StoreUintptr(addr *uintpre, val uintptr)func StorePointer(addr *unsafe.Pointer, val unsafe.Pointer)
Swap跟 Store有點類似,但是它會返回 *addr的舊值。
func SwapInt32(addr *int32, new int32) (old int32)func SwapInt64(addr *int64, new int64) (old int64)func SwapUint32(addr *uint32, new uint32) (old uint32)func SwapUint64(addr *uint64, new uint64) (old uint64)func SwapUintptr(addr *uintptr, new uintptr) (old uintptr)func SwapPointer(addr *unsafe.Pointer, new unsafe.Pointer) (old unsafe.Pointer)
從上一節(jié)中,我們知道了在 go 中原子操作可以操作 int32、int64、uint32、uint64、uintptr、unsafe.Pointer這些類型的值。但是在實際開發(fā)中,我們的類型還有很多,比如 string、struct等等,那這些類型的值如何進行原子操作呢?答案是使用 atomic.Value。
atomic.Value是一個結(jié)構(gòu)體,它的內(nèi)部有一個 any類型的字段,存儲了我們要原子操作的值,也就是一個任意類型的值。
atomic.Value支持以下操作:
Load:原子性的讀取 Value中的值。Store:原子性的存儲一個值到 Value中。Swap:原子性的交換 Value中的值,返回舊值。CompareAndSwap:原子性的比較并交換 Value中的值,如果舊值和 old相等,則將 new存入 Value中,返回 true,否則返回 false。atomic.Value的這些操作跟上面講到的那些操作其實差不多,只不過 atomic.Value可以操作任意類型的值。那 atomic.Value是如何實現(xiàn)的呢?
atomic.Value是一個結(jié)構(gòu)體,這個結(jié)構(gòu)體只有一個字段:
// Value 提供一致類型值的原子加載和存儲。type Value struct {v any}Load返回由最近的 Store設(shè)置的值。如果還沒有 Store過任何值,則返回 nil。
// Load 返回由最近的 Store 設(shè)置的值。func (v *Value) Load() (val any) {// atomic.Value 轉(zhuǎn)換為 efaceWordsvp := (*efaceWords)(unsafe.Pointer(v))// 判斷 atomic.Value 的類型typ := LoadPointer(&vp.typ)// 第一次 Store 還沒有完成,直接返回 nilif typ == nil || typ == unsafe.Pointer(&firstStoreInProgress) {// firstStoreInProgress 是一個特殊的變量,存儲到 typ 中用來表示第一次 Store 還沒有完成return nil}// 獲取 atomic.Value 的值data := LoadPointer(&vp.data)// 將 val 轉(zhuǎn)換為 efaceWords 類型vlp := (*efaceWords)(unsafe.Pointer(&val))// 分別賦值給 val 的 typ 和 datavlp.typ = typvlp.data = datareturn}在 atomic.Value的源碼中,我們都可以看到 efaceWords的身影,它實際上代表的是 interface{}/any類型:
// 表示一個 interface{}/any 類型type efaceWords struct {typ unsafe.Pointerdata unsafe.Pointer}看到這里我們會不會覺得很困惑,直接返回 val不就可以了嗎?為什么要將 val轉(zhuǎn)換為 efaceWords類型呢?
這是因為 go 中的原子操作只能操作 int32、int64、uint32、uint64、uintptr、unsafe.Pointer這些類型的值,不支持 interface{}類型,但是如果了解 interface{}底層結(jié)構(gòu)的話,我們就知道 interface{}底層其實就是一個結(jié)構(gòu)體,它有兩個字段,一個是 type,一個是 data,type用來存儲 interface{}的類型,data用來存儲 interface{}的值。而且這兩個字段都是 unsafe.Pointer類型的,所以其實我們可以對 interface{}的 type和 data分別進行原子操作,這樣最終其實也可以達到了原子操作 interface{}的目的了,是不是非常地巧妙呢?
Store將 Value的值設(shè)置為 val。對給定值的所有存儲調(diào)用必須使用相同具體類型的值。不一致類型的存儲會發(fā)生恐慌,Store(nil)也會 panic。
// Store 將 Value 的值設(shè)置為 val。func (v *Value) Store(val any) {// 不能存儲 nil 值if val == nil {panic("sync/atomic: store of nil value into Value")}// atomic.Value 轉(zhuǎn)換為 efaceWordsvp := (*efaceWords)(unsafe.Pointer(v))// val 轉(zhuǎn)換為 efaceWordsvlp := (*efaceWords)(unsafe.Pointer(&val))// 自旋進行原子操作,這個過程不會很久,開銷相比互斥鎖小for {// LoadPointer 可以保證獲取到的是最新的typ := LoadPointer(&vp.typ)// 第一次 store 的時候 typ 還是 nil,說明是第一次 storeif typ == nil {// 嘗試開始第一次 Store。// 禁用搶占,以便其他 goroutines 可以自旋等待完成。// (如果允許搶占,那么其他 goroutine 自旋等待的時間可能會比較長,因為可能會需要進行協(xié)程調(diào)度。)runtime_procPin()// 搶占失敗,意味著有其他 goroutine 成功 store 了,允許搶占,再次嘗試 Store// 這也是一個原子操作。if !CompareAndSwapPointer(&vp.typ, nil, unsafe.Pointer(&firstStoreInProgress)) {runtime_procUnpin()continue}// 完成第一次 store// 因為有 firstStoreInProgress 標識的保護,所以下面的兩個原子操作是安全的。StorePointer(&vp.data, vlp.data) // 存儲值(原子操作)StorePointer(&vp.typ, vlp.typ) // 存儲類型(原子操作)runtime_procUnpin() // 允許搶占return}// 另外一個 goroutine 正在進行第一次 Store。自旋等待。if typ == unsafe.Pointer(&firstStoreInProgress) {continue}// 第一次 Store 已經(jīng)完成了,下面不是第一次 Store 了。// 需要檢查當(dāng)前 Store 的類型跟第一次 Store 的類型是否一致,不一致就 panic。if typ != vlp.typ {panic("sync/atomic: store of inconsistently typed value into Value")}// 后續(xù)的 Store 只需要 Store 值部分就可以了。// 因為 atomic.Value 只能保存一種類型的值。StorePointer(&vp.data, vlp.data)return}}在 Store中,有以下幾個注意的點:
firstStoreInProgress來確保第一次 Store的時候,只有一個 goroutine可以進行 Store操作,其他的 goroutine需要自旋等待。如果沒有這個保護,那么存儲 typ和 data的時候就會出現(xiàn)競爭(因為需要兩個原子操作),導(dǎo)致數(shù)據(jù)不一致。在這里其實可以將 firstStoreInProgress看作是一個互斥鎖。在進行第一次 Store的時候,會將當(dāng)前的 goroutine 和 P綁定,這樣拿到 firstStoreInProgress鎖的協(xié)程就可以盡快地完成第一次 Store操作,這樣一來,其他的協(xié)程也不用等待太久。在第一次 Store的時候,會有兩個原子操作,分別存儲類型和值,但是因為有 firstStoreInProgress的保護,所以這兩個原子操作本質(zhì)上是對 interface{}的一個原子存儲操作。其他協(xié)程在看到有 firstStoreInProgress標識的時候,就會自旋等待,直到第一次 Store完成。在后續(xù)的 Store操作中,只需要存儲值就可以了,因為 atomic.Value只能保存一種類型的值。Swap將 Value的值設(shè)置為 new并返回舊值。對給定值的所有交換調(diào)用必須使用相同具體類型的值。同時,不一致類型的交換會發(fā)生恐慌,Swap(nil)也會 panic。
// Swap 將 Value 的值設(shè)置為 new 并返回舊值。func (v *Value) Swap(new any) (old any) {// 不能存儲 nil 值if new == nil {panic("sync/atomic: swap of nil value into Value")}// atomic.Value 轉(zhuǎn)換為 efaceWordsvp := (*efaceWords)(unsafe.Pointer(v))// new 轉(zhuǎn)換為 efaceWordsnp := (*efaceWords)(unsafe.Pointer(&new))// 自旋進行原子操作,這個過程不會很久,開銷相比互斥鎖小for {// 下面這部分代碼跟 Store 一樣,不細說了。// 這部分代碼是進行第一次存儲的代碼。typ := LoadPointer(&vp.typ)if typ == nil {runtime_procPin()if !CompareAndSwapPointer(&vp.typ, nil, unsafe.Pointer(&firstStoreInProgress)) {runtime_procUnpin()continue}StorePointer(&vp.data, np.data)StorePointer(&vp.typ, np.typ)runtime_procUnpin()return nil}if typ == unsafe.Pointer(&firstStoreInProgress) {continue}if typ != np.typ {panic("sync/atomic: swap of inconsistently typed value into Value")}// ---- 下面是 Swap 的特有邏輯 ----// op 是返回值op := (*efaceWords)(unsafe.Pointer(&old))// 返回舊的值op.typ, op.data = np.typ, SwapPointer(&vp.data, np.data)return old}}CompareAndSwap將 Value的值與 old比較,如果相等則設(shè)置為 new并返回 true,否則返回 false。對給定值的所有比較和交換調(diào)用必須使用相同具體類型的值。同時,不一致類型的比較和交換會發(fā)生恐慌,CompareAndSwap(nil, nil)也會 panic。
// CompareAndSwap 比較并交換。func (v *Value) CompareAndSwap(old, new any) (swapped bool) {// 注意:old 是可以為 nil 的,new 不能為 nil。// old 是 nil 表示是第一次進行 Store 操作。if new == nil {panic("sync/atomic: compare and swap of nil value into Value")}// atomic.Value 轉(zhuǎn)換為 efaceWordsvp := (*efaceWords)(unsafe.Pointer(v))// new 轉(zhuǎn)換為 efaceWordsnp := (*efaceWords)(unsafe.Pointer(&new))// old 轉(zhuǎn)換為 efaceWordsop := (*efaceWords)(unsafe.Pointer(&old))// old 和 new 類型必須一致,且不能為 nilif op.typ != nil && np.typ != op.typ {panic("sync/atomic: compare and swap of inconsistently typed values")}// 自旋進行原子操作,這個過程不會很久,開銷相比互斥鎖小for {// LoadPointer 可以保證獲取到的 typ 是最新的typ := LoadPointer(&vp.typ)if typ == nil { // atomic.Value 是 nil,還沒 Store 過// 準備進行第一次 Store,但是傳遞進來的 old 不是 nil,compare 這一步就失敗了。直接返回 falseif old != nil {return false}// 下面這部分代碼跟 Store 一樣,不細說了。 // 這部分代碼是進行第一次存儲的代碼。runtime_procPin()if !CompareAndSwapPointer(&vp.typ, nil, unsafe.Pointer(&firstStoreInProgress)) {runtime_procUnpin()continue}StorePointer(&vp.data, np.data)StorePointer(&vp.typ, np.typ)runtime_procUnpin()return true}if typ == unsafe.Pointer(&firstStoreInProgress) {continue}if typ != np.typ {panic("sync/atomic: compare and swap of inconsistently typed value into Value")}// 通過運行時相等性檢查比較舊版本和當(dāng)前版本。// 這允許對值類型進行比較,這是包函數(shù)所沒有的。// 下面的 CompareAndSwapPointer 僅確保 vp.data 自 LoadPointer 以來沒有更改。data := LoadPointer(&vp.data)var i any(*efaceWords)(unsafe.Pointer(&i)).typ = typ(*efaceWords)(unsafe.Pointer(&i)).data = dataif i != old { // atomic.Value 跟 old 不相等return false}// 只做 val 部分的 cas 操作return CompareAndSwapPointer(&vp.data, data, np.data)}}這里需要特別說明的只有最后那個比較相等的判斷,也就是 data := LoadPointer(&vp.data)以及往后的幾行代碼。在開發(fā) atomic.Value第一版的時候,那個開發(fā)者其實是將這幾行寫成 CompareAndSwapPointer(&vp.data, old.data, np.data)這種形式的。但是在舊的寫法中,會存在一個問題,如果我們做 CAS操作的時候,如果傳遞的參數(shù) old是一個結(jié)構(gòu)體的值這種類型,那么這個結(jié)構(gòu)體的值是會被拷貝一份的,同時再會被轉(zhuǎn)換為 interface{}/any類型,這個過程中,其實參數(shù)的 old的 data部分指針指向的內(nèi)存跟 vp.data指向的內(nèi)存是不一樣的。這樣的話,CAS操作就會失敗,這個時候就會返回 false,但是我們本意是要比較它的值,出現(xiàn)這種結(jié)果顯然不是我們想要的。
將值作為 interface{}參數(shù)使用的時候,會存在一個將值轉(zhuǎn)換為 interface{}的過程。具體我們可以看看 interface{}的實現(xiàn)原理。
所以,在上面的實現(xiàn)中,會將舊值的 typ和 data賦值給一個 any類型的變量,然后使用 i != old這種方式進行判斷,這樣就可以實現(xiàn)在比較的時候,比較的是值,而不是由值轉(zhuǎn)換為 interface{}后的指針。
我們現(xiàn)在知道了,atomic.Value可以對任意類型做原子操作。而對于其他的原子類型,比如 int32、int64、uint32、uint64、uintptr、unsafe.Pointer等,其實在 go 中也提供了包裝的類型,讓我們可以以對象的方式來操作這些類型。
對應(yīng)的類型如下:
atomic.Bool:這個比較特別,但底層實際上是一個 uint32類型的值。我們對 atomic.Bool做原子操作的時候,實際上是對 uint32做原子操作。atomic.Int32:int32類型的包裝類型atomic.Int64:int64類型的包裝類型atomic.Uint32:uint32類型的包裝類型atomic.Uint64:uint64類型的包裝類型atomic.Uintptr:uintptr類型的包裝類型atomic.Pointer:unsafe.Pointer類型的包裝類型這幾種類型的實現(xiàn)的代碼基本一樣,除了類型不一樣,我們可以看看 atomic.Int32的實現(xiàn):
// An Int32 is an atomic int32. The zero value is zero.type Int32 struct {_ noCopyv int32}// Load atomically loads and returns the value stored in x.func (x *Int32) Load() int32 { return LoadInt32(&x.v) }// Store atomically stores val into x.func (x *Int32) Store(val int32) { StoreInt32(&x.v, val) }// Swap atomically stores new into x and returns the previous value.func (x *Int32) Swap(new int32) (old int32) { return SwapInt32(&x.v, new) }// CompareAndSwap executes the compare-and-swap operation for x.func (x *Int32) CompareAndSwap(old, new int32) (swapped bool) {return CompareAndSwapInt32(&x.v, old, new)}可以看到,atomic.Int32的實現(xiàn)都是基于 atomic包中 int32類型相關(guān)的原子操作函數(shù)來實現(xiàn)的。
那我們有了互斥鎖,為什么還要有原子操作呢?我們進行比較一下就知道了:
| 原子操作 | 互斥鎖 | |
|---|---|---|
| 保護的范圍 | 變量 | 代碼塊 |
| 保護的粒度 | 小 | 大 |
| 性能 | 高 | 低 |
| 如何實現(xiàn)的 | 硬件指令 | 軟件層面實現(xiàn),邏輯較多 |
如果我們只需要對某一個變量做并發(fā)讀寫,那么使用原子操作就可以了,因為原子操作的性能比互斥鎖高很多。但是如果我們需要對多個變量做并發(fā)讀寫,那么就需要用到互斥鎖了,這種場景往往是在一段代碼中對不同變量做讀寫。
我們前面這個表格提到了原子操作與互斥鎖性能上有差異,我們寫幾行代碼來進行比較一下:
// 系統(tǒng)信息 cpu: Intel(R) Core(TM) i7-8700 CPU @ 3.20GHz// 10.13 ns/opfunc BenchmarkMutex(b *testing.B) { var mu sync.Mutex for i := 0; i < b.N; i++ { mu.Lock() mu.Unlock() }}// 5.849 ns/opfunc BenchmarkAtomic(b *testing.B) { var sum atomic.Uint64 for i := 0; i < b.N; i++ { sum.Add(uint64(1)) }}在對 Mutex的性能測試中,我只是寫了簡單的 Lock()和 UnLock()操作,因為這種比較才算是對 Mutex本身的測試,而在 Atomic的性能測試中,對 sum做原子累加的操作。最終結(jié)果是,使用 Atomic的操作耗時大概比 Mutex少了 40%以上。
在實際開發(fā)中,Mutex保護的臨界區(qū)內(nèi)往往有更多操作,也就意味著 Mutex鎖需要耗費更長的時間才能釋放,也就是會需要耗費比上面這個 40%還要多的時間另外一個協(xié)程才能獲取到 Mutex鎖。
在文章的開頭,我們就說了,在 go 的 sync.Map和 sync.Pool中都有用到了原子操作,本節(jié)就來看一看這些操作。
在 sync.Map中使用到了一個 entry結(jié)構(gòu)體,這個結(jié)構(gòu)體中大部分操作都是原子操作,我們可以看看它下面這兩個方法的定義:
// 刪除 entryfunc (e *entry) delete() (value any, ok bool) {for {p := e.p.Load()// 已經(jīng)被刪除了,不需要再刪除if p == nil || p == expunged {return nil, false}// 刪除成功if e.p.CompareAndSwap(p, nil) {return *p, true}}}// 如果條目尚未刪除,trySwap 將交換一個值。func (e *entry) trySwap(i *any) (*any, bool) {for {p := e.p.Load()// 已經(jīng)被刪除了if p == expunged {return nil, false}// swap 成功if e.p.CompareAndSwap(p, i) {return p, true}}}我們可以看到一個非常典型的特征就是 for+ CompareAndSwap的組合,這個組合在 entry中出現(xiàn)了很多次。
如果我們也需要對變量做并發(fā)讀寫,也可以嘗試一下這種 for + CompareAndSwap 的組合。
在 sync.WaitGroup中有一個類型為 atomic.Uint64的 state字段,這個變量是用來記錄 WaitGroup的狀態(tài)的。在實際使用中,它的高 32 位用來記錄 WaitGroup的計數(shù)器,低 32 位用來記錄 WaitGroup的 Waiter的數(shù)量,也就是等待條件變量滿足的協(xié)程數(shù)量。
如果不使用一個變量來記錄這兩個值,那么我們就需要使用兩個變量來記錄,這樣就會導(dǎo)致我們需要對兩個變量做并發(fā)讀寫,在這種情況下,我們就需要使用互斥鎖來保護這兩個變量,這樣就會導(dǎo)致性能的下降。
而使用一個變量來記錄這兩個值,我們就可以使用原子操作來保護這個變量,這樣就可以保證并發(fā)讀寫的安全性,同時也能得到更好的性能:
// WaitGroup 的 Add 函數(shù):高 32 位加上 deltastate := wg.state.Add(uint64(delta) << 32)// WaitGroup 的 Wait 函數(shù):低 32 位加 1// 等待者的數(shù)量加 1wg.state.CompareAndSwap(state, state+1)
當(dāng)然這里是指指向同一行 CAS代碼的時候(也就是有競爭的時候),如果是指向不同行 CAS代碼的時候,那么就不一定了。比如下面這個例子,我們把前面計算 sum的例子改一改,改成用 CAS操作來完成:
func TestCas(t *testing.T) {var sum int32 = 0var wg sync.WaitGroupwg.Add(1000)for i := 0; i < 1000; i++ {go func() {defer wg.Done()// 這一行是有可能會失敗的atomic.CompareAndSwapInt32(&sum, sum, sum+1)}()}wg.Wait()fmt.Println(sum) // 不是 1000}在這個例子中,我們把 atomic.AddInt32(&sum, 1)改成了 atomic.CompareAndSwapInt32(&sum, sum, sum+1),這樣就會導(dǎo)致有可能會有多個 goroutine 同時執(zhí)行到 atomic.CompareAndSwapInt32(&sum, sum, sum+1)這一行代碼,這樣肯定會有不同的 goroutine 同時拿到一個相同的 sum的舊值,那么在這種情況下,就會導(dǎo)致 CAS操作失敗。也就是說,將 sum替換為 sum + 1的操作可能會失敗。
失敗意味著什么呢?意味著另外一個協(xié)程序先把 sum的值加 1 了,這個時候其實我們不應(yīng)該在舊的 sum上加 1 了,而是應(yīng)該在最新的 sum上加上 1,那我們應(yīng)該怎么做呢?我們可以在 CAS操作失敗的時候,重新獲取 sum的值,然后再次嘗試 CAS操作,直到成功為止:
func TestCas(t *testing.T) {var sum int32 = 0var wg sync.WaitGroupwg.Add(1000)for i := 0; i < 1000; i++ {go func() {defer wg.Done()// cas 失敗的時候,重新獲取 sum 的值進行計算。// cas 成功則返回。for {if atomic.CompareAndSwapInt32(&sum, sum, sum+1) {return}}}()}wg.Wait()fmt.Println(sum)}原子操作是并發(fā)編程中非常重要的一個概念,它可以保證并發(fā)讀寫的安全性,同時也能得到更好的性能。
最后,總結(jié)一下本文講到的內(nèi)容:
原子操作是更加底層的操作,它保護的是單個變量,而互斥鎖可以保護一個代碼片段,它們的使用場景是不一樣的。原子操作需要通過 CPU 指令來實現(xiàn),而互斥鎖是在軟件層面實現(xiàn)的。go 里面的原子操作有以下這些:Add:原子增減CompareAndSwap:原子比較并交換Load:原子讀取Store:原子寫入Swap:原子交換go 里面所有類型都能使用原子操作,只是不同類型的原子操作使用的函數(shù)不太一樣。atomic.Value可以用來原子操作任意類型的變量。go 里面有些底層實現(xiàn)也使用了原子操作,比如:sync.WaitGroup:使用原子操作來保證計數(shù)器和等待者數(shù)量的并發(fā)讀寫安全性。sync.Map:entry結(jié)構(gòu)體中基本所有操作都有原子操作的身影。原子操作有失敗必然有成功(說的是同一行 CAS操作),如果 CAS操作失敗了,那么我們可以重新獲取舊值,然后再次嘗試 CAS操作,直到成功為止。總的來說,原子操作本身其實沒有太復(fù)雜的邏輯,我們理解了它的原理之后,就可以很容易的使用它了。
推薦學(xué)習(xí):Golang教程
以上就是什么是原子操作?深入淺析go中的原子操作的詳細內(nèi)容,更多請關(guān)注php中文網(wǎng)其它相關(guān)文章!
關(guān)鍵詞: