在并发编程中,内存屏障和原子操作是保证程序正确性的底层机制。它们直接操作硬件级别的内存访问,确保多核处理器环境下的数据一致性。
import "sync/atomic"
// 原子操作基础
func atomicBasics() {
var counter int64
// 原子递增
atomic.AddInt64(&counter, 1)
// 原子读取
value := atomic.LoadInt64(&counter)
fmt.Printf("计数器值: %d
", value)
// 原子存储
atomic.StoreInt64(&counter, 100)
// 原子比较并交换
swapped := atomic.CompareAndSwapInt64(&counter, 100, 200)
fmt.Printf("交换成功: %t
", swapped)
}
原子操作类型
// 原子操作类型
func atomicTypes() {
// 整数类型
var int32Val int32
var int64Val int64
var uint32Val uint32
var uint64Val uint64
var uintptrVal uintptr
// 指针类型
var ptrVal unsafe.Pointer
// 原子操作
atomic.AddInt32(&int32Val, 1)
atomic.AddInt64(&int64Val, 1)
atomic.AddUint32(&uint32Val, 1)
atomic.AddUint64(&uint64Val, 1)
atomic.AddUintptr(&uintptrVal, 1)
// 指针操作
var x int = 42
atomic.StorePointer(&ptrVal, unsafe.Pointer(&x))
fmt.Printf("int32: %d
", atomic.LoadInt32(&int32Val))
fmt.Printf("int64: %d
", atomic.LoadInt64(&int64Val))
fmt.Printf("uint32: %d
", atomic.LoadUint32(&uint32Val))
fmt.Printf("uint64: %d
", atomic.LoadUint64(&uint64Val))
fmt.Printf("uintptr: %d
", atomic.LoadUintptr(&uintptrVal))
fmt.Printf("pointer: %v
", atomic.LoadPointer(&ptrVal))
}
原子操作函数
基本操作
// 基本原子操作
func basicAtomicOperations() {
var counter int64
// Load - 原子读取
value := atomic.LoadInt64(&counter)
fmt.Printf("读取值: %d
", value)
// Store - 原子存储
atomic.StoreInt64(&counter, 100)
fmt.Printf("存储后: %d
", atomic.LoadInt64(&counter))
// Add - 原子加法
atomic.AddInt64(&counter, 50)
fmt.Printf("加法后: %d
", atomic.LoadInt64(&counter))
// Swap - 原子交换
oldValue := atomic.SwapInt64(&counter, 200)
fmt.Printf("交换前: %d, 交换后: %d
", oldValue, atomic.LoadInt64(&counter))
}
比较并交换
// 比较并交换
func compareAndSwap() {
var counter int64 = 100
// CompareAndSwap - 比较并交换
swapped := atomic.CompareAndSwapInt64(&counter, 100, 200)
fmt.Printf("第一次交换: %t, 值: %d
", swapped, atomic.LoadInt64(&counter))
swapped = atomic.CompareAndSwapInt64(&counter, 100, 300)
fmt.Printf("第二次交换: %t, 值: %d
", swapped, atomic.LoadInt64(&counter))
// 使用CAS实现原子递增
for {
current := atomic.LoadInt64(&counter)
if atomic.CompareAndSwapInt64(&counter, current, current+1) {
break
}
}
fmt.Printf("CAS递增后: %d
", atomic.LoadInt64(&counter))
}
内存屏障
内存屏障概念
// 内存屏障概念
func memoryBarrier() {
var x, y int64
var ready int32
// Goroutine 1 - 写入数据
go func() {
x = 1
y = 2
atomic.StoreInt32(&ready, 1) // 内存屏障
}()
// Goroutine 2 - 读取数据
go func() {
for atomic.LoadInt32(&ready) == 0 {
// 等待
}
// 内存屏障确保x和y的值可见
fmt.Printf("x: %d, y: %d
", x, y)
}()
time.Sleep(100 * time.Millisecond)
}
内存屏障类型
// 内存屏障类型
func memoryBarrierTypes() {
var x, y int64
var flag int32
// 1. 获取屏障 (Acquire Barrier)
// 确保后续的读取操作不会重排序到屏障之前
go func() {
x = 1
y = 2
atomic.StoreInt32(&flag, 1) // 释放屏障
}()
// 2. 释放屏障 (Release Barrier)
// 确保之前的写入操作不会重排序到屏障之后
go func() {
for atomic.LoadInt32(&flag) == 0 {
// 获取屏障
}
fmt.Printf("x: %d, y: %d
", x, y)
}()
time.Sleep(100 * time.Millisecond)
}
无锁数据结构
无锁栈
// 无锁栈
type LockFreeStack struct {
head unsafe.Pointer
}
type node struct {
value interface{}
next unsafe.Pointer
}
func NewLockFreeStack() *LockFreeStack {
return &LockFreeStack{}
}
func (s *LockFreeStack) Push(value interface{}) {
n := &node{value: value}
for {
head := atomic.LoadPointer(&s.head)
n.next = head
if atomic.CompareAndSwapPointer(&s.head, head, unsafe.Pointer(n)) {
break
}
}
}
func (s *LockFreeStack) Pop() interface{} {
for {
head := atomic.LoadPointer(&s.head)
if head == nil {
return nil
}
n := (*node)(head)
next := atomic.LoadPointer(&n.next)
if atomic.CompareAndSwapPointer(&s.head, head, next) {
return n.value
}
}
}
func lockFreeStackDemo() {
stack := NewLockFreeStack()
var wg sync.WaitGroup
// 并发推入
for i := 0; i < 10; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
stack.Push(id)
}(i)
}
wg.Wait()
// 并发弹出
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
value := stack.Pop()
if value != nil {
fmt.Printf("弹出: %v
", value)
}
}()
}
wg.Wait()
}
无锁队列
// 无锁队列
type LockFreeQueue struct {
head unsafe.Pointer
tail unsafe.Pointer
}
type queueNode struct {
value interface{}
next unsafe.Pointer
}
func NewLockFreeQueue() *LockFreeQueue {
q := &LockFreeQueue{}
n := &queueNode{}
q.head = unsafe.Pointer(n)
q.tail = unsafe.Pointer(n)
return q
}
func (q *LockFreeQueue) Enqueue(value interface{}) {
n := &queueNode{value: value}
for {
tail := atomic.LoadPointer(&q.tail)
next := atomic.LoadPointer(&(*queueNode)(tail).next)
if next == nil {
if atomic.CompareAndSwapPointer(&(*queueNode)(tail).next, nil, unsafe.Pointer(n)) {
break
}
} else {
atomic.CompareAndSwapPointer(&q.tail, tail, next)
}
}
atomic.CompareAndSwapPointer(&q.tail, atomic.LoadPointer(&q.tail), unsafe.Pointer(n))
}
func (q *LockFreeQueue) Dequeue() interface{} {
for {
head := atomic.LoadPointer(&q.head)
tail := atomic.LoadPointer(&q.tail)
next := atomic.LoadPointer(&(*queueNode)(head).next)
if head == tail {
if next == nil {
return nil
}
atomic.CompareAndSwapPointer(&q.tail, tail, next)
} else {
if next == nil {
continue
}
value := (*queueNode)(next).value
if atomic.CompareAndSwapPointer(&q.head, head, next) {
return value
}
}
}
}
func lockFreeQueueDemo() {
queue := NewLockFreeQueue()
var wg sync.WaitGroup
// 并发入队
for i := 0; i < 10; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
queue.Enqueue(id)
}(i)
}
wg.Wait()
// 并发出队
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
value := queue.Dequeue()
if value != nil {
fmt.Printf("出队: %v
", value)
}
}()
}
wg.Wait()
}
原子计数器
基本计数器
// 基本原子计数器
type AtomicCounter struct {
value int64
}
func NewAtomicCounter() *AtomicCounter {
return &AtomicCounter{}
}
func (c *AtomicCounter) Increment() int64 {
return atomic.AddInt64(&c.value, 1)
}
func (c *AtomicCounter) Decrement() int64 {
return atomic.AddInt64(&c.value, -1)
}
func (c *AtomicCounter) Add(delta int64) int64 {
return atomic.AddInt64(&c.value, delta)
}
func (c *AtomicCounter) Value() int64 {
return atomic.LoadInt64(&c.value)
}
func (c *AtomicCounter) Set(value int64) {
atomic.StoreInt64(&c.value, value)
}
func atomicCounterDemo() {
counter := NewAtomicCounter()
var wg sync.WaitGroup
// 并发递增
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
counter.Increment()
}()
}
wg.Wait()
fmt.Printf("最终计数: %d
", counter.Value())
}
高级计数器
// 高级原子计数器
type AdvancedAtomicCounter struct {
value int64
}
func NewAdvancedAtomicCounter() *AdvancedAtomicCounter {
return &AdvancedAtomicCounter{}
}
func (c *AdvancedAtomicCounter) CompareAndIncrement(expected int64) bool {
return atomic.CompareAndSwapInt64(&c.value, expected, expected+1)
}
func (c *AdvancedAtomicCounter) CompareAndDecrement(expected int64) bool {
return atomic.CompareAndSwapInt64(&c.value, expected, expected-1)
}
func (c *AdvancedAtomicCounter) IncrementIfPositive() bool {
for {
current := atomic.LoadInt64(&c.value)
if current <= 0 {
return false
}
if atomic.CompareAndSwapInt64(&c.value, current, current+1) {
return true
}
}
}
func (c *AdvancedAtomicCounter) DecrementIfPositive() bool {
for {
current := atomic.LoadInt64(&c.value)
if current <= 0 {
return false
}
if atomic.CompareAndSwapInt64(&c.value, current, current-1) {
return true
}
}
}
func advancedAtomicCounterDemo() {
counter := NewAdvancedAtomicCounter()
counter.Set(10)
var wg sync.WaitGroup
// 并发条件递增
for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
defer wg.Done()
if counter.IncrementIfPositive() {
fmt.Println("递增成功")
} else {
fmt.Println("递增失败")
}
}()
}
wg.Wait()
fmt.Printf("最终计数: %d
", counter.Value())
}
原子标志
基本标志
// 基本原子标志
type AtomicFlag struct {
value int32
}
func NewAtomicFlag() *AtomicFlag {
return &AtomicFlag{}
}
func (f *AtomicFlag) Set() {
atomic.StoreInt32(&f.value, 1)
}
func (f *AtomicFlag) Clear() {
atomic.StoreInt32(&f.value, 0)
}
func (f *AtomicFlag) IsSet() bool {
return atomic.LoadInt32(&f.value) == 1
}
func (f *AtomicFlag) TestAndSet() bool {
return atomic.CompareAndSwapInt32(&f.value, 0, 1)
}
func atomicFlagDemo() {
flag := NewAtomicFlag()
var wg sync.WaitGroup
// 并发测试和设置
for i := 0; i < 10; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
if flag.TestAndSet() {
fmt.Printf("Goroutine %d 获得标志
", id)
} else {
fmt.Printf("Goroutine %d 未获得标志
", id)
}
}(i)
}
wg.Wait()
}
性能优化
原子操作性能
// 原子操作性能测试
func atomicPerformance() {
var counter int64
var wg sync.WaitGroup
// 测试原子操作性能
start := time.Now()
for i := 0; i < 1000000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
atomic.AddInt64(&counter, 1)
}()
}
wg.Wait()
atomicTime := time.Since(start)
fmt.Printf("原子操作耗时: %v
", atomicTime)
fmt.Printf("最终计数: %d
", atomic.LoadInt64(&counter))
}
内存对齐
// 内存对齐
func memoryAlignment() {
// 确保原子变量内存对齐
type AlignedStruct struct {
_ [7]byte // 填充字节
counter int64 // 8字节对齐
}
var s AlignedStruct
atomic.AddInt64(&s.counter, 1)
fmt.Printf("对齐后的计数: %d
", atomic.LoadInt64(&s.counter))
}
实战应用
无锁缓存
// 无锁缓存
type LockFreeCache struct {
data map[string]interface{}
mu sync.RWMutex
}
func NewLockFreeCache() *LockFreeCache {
return &LockFreeCache{
data: make(map[string]interface{}),
}
}
func (c *LockFreeCache) Get(key string) (interface{}, bool) {
c.mu.RLock()
defer c.mu.RUnlock()
value, ok := c.data[key]
return value, ok
}
func (c *LockFreeCache) Set(key string, value interface{}) {
c.mu.Lock()
defer c.mu.Unlock()
c.data[key] = value
}
func lockFreeCacheDemo() {
cache := NewLockFreeCache()
var wg sync.WaitGroup
// 并发读写
for i := 0; i < 100; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
key := fmt.Sprintf("key%d", id)
value := id * 10
cache.Set(key, value)
if v, ok := cache.Get(key); ok {
_ = v
}
}(i)
}
wg.Wait()
}
无锁环形缓冲区
// 无锁环形缓冲区
type LockFreeRingBuffer struct {
buffer []interface{}
size int64
head int64
tail int64
}
func NewLockFreeRingBuffer(size int) *LockFreeRingBuffer {
return &LockFreeRingBuffer{
buffer: make([]interface{}, size),
size: int64(size),
}
}
func (rb *LockFreeRingBuffer) Enqueue(value interface{}) bool {
for {
tail := atomic.LoadInt64(&rb.tail)
head := atomic.LoadInt64(&rb.head)
nextTail := (tail + 1) % rb.size
if nextTail == head {
return false // 缓冲区已满
}
if atomic.CompareAndSwapInt64(&rb.tail, tail, nextTail) {
rb.buffer[tail] = value
return true
}
}
}
func (rb *LockFreeRingBuffer) Dequeue() (interface{}, bool) {
for {
head := atomic.LoadInt64(&rb.head)
tail := atomic.LoadInt64(&rb.tail)
if head == tail {
return nil, false // 缓冲区为空
}
value := rb.buffer[head]
nextHead := (head + 1) % rb.size
if atomic.CompareAndSwapInt64(&rb.head, head, nextHead) {
return value, true
}
}
}
func lockFreeRingBufferDemo() {
buffer := NewLockFreeRingBuffer(10)
var wg sync.WaitGroup
// 并发入队
for i := 0; i < 20; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
if buffer.Enqueue(id) {
fmt.Printf("入队成功: %d
", id)
} else {
fmt.Printf("入队失败: %d
", id)
}
}(i)
}
wg.Wait()
// 并发出队
for i := 0; i < 20; i++ {
wg.Add(1)
go func() {
defer wg.Done()
if value, ok := buffer.Dequeue(); ok {
fmt.Printf("出队: %v
", value)
}
}()
}
wg.Wait()
}
写在最后
内存屏障与原子操作是Go语言并发编程的底层机制,它们直接操作硬件级别的内存访问,确保多核处理器环境下的数据一致性。从简单的计数器到复杂的无锁数据结构,原子操作都展现出了其独特的价值。
作为Go开发者,掌握内存屏障与原子操作不仅能够提高程序的并发性能,还能让我们更好地理解并发编程的底层原理。通过合理使用原子操作,我们可以构建出更加高效和可靠的并发程序。
记住,原子操作不仅仅是工具,更是并发编程的基石。通过原子操作,我们可以实现无锁的并发数据结构,构建出更加高性能的并发程序。
© 版权声明
文章版权归作者所有,未经允许请勿转载。如内容涉嫌侵权,请在本页底部进入<联系我们>进行举报投诉!
THE END




















暂无评论内容