golang编程核心-内存屏障与原子操作

在并发编程中,内存屏障和原子操作是保证程序正确性的底层机制。它们直接操作硬件级别的内存访问,确保多核处理器环境下的数据一致性。

import "sync/atomic"

// 原子操作基础
func atomicBasics() {
    var counter int64
    
    // 原子递增
    atomic.AddInt64(&counter, 1)
    
    // 原子读取
    value := atomic.LoadInt64(&counter)
    fmt.Printf("计数器值: %d
", value)
    
    // 原子存储
    atomic.StoreInt64(&counter, 100)
    
    // 原子比较并交换
    swapped := atomic.CompareAndSwapInt64(&counter, 100, 200)
    fmt.Printf("交换成功: %t
", swapped)
}

原子操作类型

// 原子操作类型
func atomicTypes() {
    // 整数类型
    var int32Val int32
    var int64Val int64
    var uint32Val uint32
    var uint64Val uint64
    var uintptrVal uintptr
    
    // 指针类型
    var ptrVal unsafe.Pointer
    
    // 原子操作
    atomic.AddInt32(&int32Val, 1)
    atomic.AddInt64(&int64Val, 1)
    atomic.AddUint32(&uint32Val, 1)
    atomic.AddUint64(&uint64Val, 1)
    atomic.AddUintptr(&uintptrVal, 1)
    
    // 指针操作
    var x int = 42
    atomic.StorePointer(&ptrVal, unsafe.Pointer(&x))
    
    fmt.Printf("int32: %d
", atomic.LoadInt32(&int32Val))
    fmt.Printf("int64: %d
", atomic.LoadInt64(&int64Val))
    fmt.Printf("uint32: %d
", atomic.LoadUint32(&uint32Val))
    fmt.Printf("uint64: %d
", atomic.LoadUint64(&uint64Val))
    fmt.Printf("uintptr: %d
", atomic.LoadUintptr(&uintptrVal))
    fmt.Printf("pointer: %v
", atomic.LoadPointer(&ptrVal))
}

原子操作函数

基本操作

// 基本原子操作
func basicAtomicOperations() {
    var counter int64
    
    // Load - 原子读取
    value := atomic.LoadInt64(&counter)
    fmt.Printf("读取值: %d
", value)
    
    // Store - 原子存储
    atomic.StoreInt64(&counter, 100)
    fmt.Printf("存储后: %d
", atomic.LoadInt64(&counter))
    
    // Add - 原子加法
    atomic.AddInt64(&counter, 50)
    fmt.Printf("加法后: %d
", atomic.LoadInt64(&counter))
    
    // Swap - 原子交换
    oldValue := atomic.SwapInt64(&counter, 200)
    fmt.Printf("交换前: %d, 交换后: %d
", oldValue, atomic.LoadInt64(&counter))
}

比较并交换

// 比较并交换
func compareAndSwap() {
    var counter int64 = 100
    
    // CompareAndSwap - 比较并交换
    swapped := atomic.CompareAndSwapInt64(&counter, 100, 200)
    fmt.Printf("第一次交换: %t, 值: %d
", swapped, atomic.LoadInt64(&counter))
    
    swapped = atomic.CompareAndSwapInt64(&counter, 100, 300)
    fmt.Printf("第二次交换: %t, 值: %d
", swapped, atomic.LoadInt64(&counter))
    
    // 使用CAS实现原子递增
    for {
        current := atomic.LoadInt64(&counter)
        if atomic.CompareAndSwapInt64(&counter, current, current+1) {
            break
        }
    }
    fmt.Printf("CAS递增后: %d
", atomic.LoadInt64(&counter))
}

内存屏障

内存屏障概念

// 内存屏障概念
func memoryBarrier() {
    var x, y int64
    var ready int32
    
    // Goroutine 1 - 写入数据
    go func() {
        x = 1
        y = 2
        atomic.StoreInt32(&ready, 1) // 内存屏障
    }()
    
    // Goroutine 2 - 读取数据
    go func() {
        for atomic.LoadInt32(&ready) == 0 {
            // 等待
        }
        // 内存屏障确保x和y的值可见
        fmt.Printf("x: %d, y: %d
", x, y)
    }()
    
    time.Sleep(100 * time.Millisecond)
}

内存屏障类型

// 内存屏障类型
func memoryBarrierTypes() {
    var x, y int64
    var flag int32
    
    // 1. 获取屏障 (Acquire Barrier)
    // 确保后续的读取操作不会重排序到屏障之前
    go func() {
        x = 1
        y = 2
        atomic.StoreInt32(&flag, 1) // 释放屏障
    }()
    
    // 2. 释放屏障 (Release Barrier)
    // 确保之前的写入操作不会重排序到屏障之后
    go func() {
        for atomic.LoadInt32(&flag) == 0 {
            // 获取屏障
        }
        fmt.Printf("x: %d, y: %d
", x, y)
    }()
    
    time.Sleep(100 * time.Millisecond)
}

无锁数据结构

无锁栈

// 无锁栈
type LockFreeStack struct {
    head unsafe.Pointer
}

type node struct {
    value interface{}
    next  unsafe.Pointer
}

func NewLockFreeStack() *LockFreeStack {
    return &LockFreeStack{}
}

func (s *LockFreeStack) Push(value interface{}) {
    n := &node{value: value}
    
    for {
        head := atomic.LoadPointer(&s.head)
        n.next = head
        
        if atomic.CompareAndSwapPointer(&s.head, head, unsafe.Pointer(n)) {
            break
        }
    }
}

func (s *LockFreeStack) Pop() interface{} {
    for {
        head := atomic.LoadPointer(&s.head)
        if head == nil {
            return nil
        }
        
        n := (*node)(head)
        next := atomic.LoadPointer(&n.next)
        
        if atomic.CompareAndSwapPointer(&s.head, head, next) {
            return n.value
        }
    }
}

func lockFreeStackDemo() {
    stack := NewLockFreeStack()
    var wg sync.WaitGroup
    
    // 并发推入
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            stack.Push(id)
        }(i)
    }
    
    wg.Wait()
    
    // 并发弹出
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            value := stack.Pop()
            if value != nil {
                fmt.Printf("弹出: %v
", value)
            }
        }()
    }
    
    wg.Wait()
}

无锁队列

// 无锁队列
type LockFreeQueue struct {
    head unsafe.Pointer
    tail unsafe.Pointer
}

type queueNode struct {
    value interface{}
    next  unsafe.Pointer
}

func NewLockFreeQueue() *LockFreeQueue {
    q := &LockFreeQueue{}
    n := &queueNode{}
    q.head = unsafe.Pointer(n)
    q.tail = unsafe.Pointer(n)
    return q
}

func (q *LockFreeQueue) Enqueue(value interface{}) {
    n := &queueNode{value: value}
    
    for {
        tail := atomic.LoadPointer(&q.tail)
        next := atomic.LoadPointer(&(*queueNode)(tail).next)
        
        if next == nil {
            if atomic.CompareAndSwapPointer(&(*queueNode)(tail).next, nil, unsafe.Pointer(n)) {
                break
            }
        } else {
            atomic.CompareAndSwapPointer(&q.tail, tail, next)
        }
    }
    
    atomic.CompareAndSwapPointer(&q.tail, atomic.LoadPointer(&q.tail), unsafe.Pointer(n))
}

func (q *LockFreeQueue) Dequeue() interface{} {
    for {
        head := atomic.LoadPointer(&q.head)
        tail := atomic.LoadPointer(&q.tail)
        next := atomic.LoadPointer(&(*queueNode)(head).next)
        
        if head == tail {
            if next == nil {
                return nil
            }
            atomic.CompareAndSwapPointer(&q.tail, tail, next)
        } else {
            if next == nil {
                continue
            }
            
            value := (*queueNode)(next).value
            if atomic.CompareAndSwapPointer(&q.head, head, next) {
                return value
            }
        }
    }
}

func lockFreeQueueDemo() {
    queue := NewLockFreeQueue()
    var wg sync.WaitGroup
    
    // 并发入队
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            queue.Enqueue(id)
        }(i)
    }
    
    wg.Wait()
    
    // 并发出队
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            value := queue.Dequeue()
            if value != nil {
                fmt.Printf("出队: %v
", value)
            }
        }()
    }
    
    wg.Wait()
}

原子计数器

基本计数器

// 基本原子计数器
type AtomicCounter struct {
    value int64
}

func NewAtomicCounter() *AtomicCounter {
    return &AtomicCounter{}
}

func (c *AtomicCounter) Increment() int64 {
    return atomic.AddInt64(&c.value, 1)
}

func (c *AtomicCounter) Decrement() int64 {
    return atomic.AddInt64(&c.value, -1)
}

func (c *AtomicCounter) Add(delta int64) int64 {
    return atomic.AddInt64(&c.value, delta)
}

func (c *AtomicCounter) Value() int64 {
    return atomic.LoadInt64(&c.value)
}

func (c *AtomicCounter) Set(value int64) {
    atomic.StoreInt64(&c.value, value)
}

func atomicCounterDemo() {
    counter := NewAtomicCounter()
    var wg sync.WaitGroup
    
    // 并发递增
    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            counter.Increment()
        }()
    }
    
    wg.Wait()
    fmt.Printf("最终计数: %d
", counter.Value())
}

高级计数器

// 高级原子计数器
type AdvancedAtomicCounter struct {
    value int64
}

func NewAdvancedAtomicCounter() *AdvancedAtomicCounter {
    return &AdvancedAtomicCounter{}
}

func (c *AdvancedAtomicCounter) CompareAndIncrement(expected int64) bool {
    return atomic.CompareAndSwapInt64(&c.value, expected, expected+1)
}

func (c *AdvancedAtomicCounter) CompareAndDecrement(expected int64) bool {
    return atomic.CompareAndSwapInt64(&c.value, expected, expected-1)
}

func (c *AdvancedAtomicCounter) IncrementIfPositive() bool {
    for {
        current := atomic.LoadInt64(&c.value)
        if current <= 0 {
            return false
        }
        if atomic.CompareAndSwapInt64(&c.value, current, current+1) {
            return true
        }
    }
}

func (c *AdvancedAtomicCounter) DecrementIfPositive() bool {
    for {
        current := atomic.LoadInt64(&c.value)
        if current <= 0 {
            return false
        }
        if atomic.CompareAndSwapInt64(&c.value, current, current-1) {
            return true
        }
    }
}

func advancedAtomicCounterDemo() {
    counter := NewAdvancedAtomicCounter()
    counter.Set(10)
    
    var wg sync.WaitGroup
    
    // 并发条件递增
    for i := 0; i < 100; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            if counter.IncrementIfPositive() {
                fmt.Println("递增成功")
            } else {
                fmt.Println("递增失败")
            }
        }()
    }
    
    wg.Wait()
    fmt.Printf("最终计数: %d
", counter.Value())
}

原子标志

基本标志

// 基本原子标志
type AtomicFlag struct {
    value int32
}

func NewAtomicFlag() *AtomicFlag {
    return &AtomicFlag{}
}

func (f *AtomicFlag) Set() {
    atomic.StoreInt32(&f.value, 1)
}

func (f *AtomicFlag) Clear() {
    atomic.StoreInt32(&f.value, 0)
}

func (f *AtomicFlag) IsSet() bool {
    return atomic.LoadInt32(&f.value) == 1
}

func (f *AtomicFlag) TestAndSet() bool {
    return atomic.CompareAndSwapInt32(&f.value, 0, 1)
}

func atomicFlagDemo() {
    flag := NewAtomicFlag()
    var wg sync.WaitGroup
    
    // 并发测试和设置
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            if flag.TestAndSet() {
                fmt.Printf("Goroutine %d 获得标志
", id)
            } else {
                fmt.Printf("Goroutine %d 未获得标志
", id)
            }
        }(i)
    }
    
    wg.Wait()
}

性能优化

原子操作性能

// 原子操作性能测试
func atomicPerformance() {
    var counter int64
    var wg sync.WaitGroup
    
    // 测试原子操作性能
    start := time.Now()
    for i := 0; i < 1000000; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            atomic.AddInt64(&counter, 1)
        }()
    }
    wg.Wait()
    atomicTime := time.Since(start)
    
    fmt.Printf("原子操作耗时: %v
", atomicTime)
    fmt.Printf("最终计数: %d
", atomic.LoadInt64(&counter))
}

内存对齐

// 内存对齐
func memoryAlignment() {
    // 确保原子变量内存对齐
    type AlignedStruct struct {
        _    [7]byte // 填充字节
        counter int64 // 8字节对齐
    }
    
    var s AlignedStruct
    atomic.AddInt64(&s.counter, 1)
    fmt.Printf("对齐后的计数: %d
", atomic.LoadInt64(&s.counter))
}

实战应用

无锁缓存

// 无锁缓存
type LockFreeCache struct {
    data map[string]interface{}
    mu   sync.RWMutex
}

func NewLockFreeCache() *LockFreeCache {
    return &LockFreeCache{
        data: make(map[string]interface{}),
    }
}

func (c *LockFreeCache) Get(key string) (interface{}, bool) {
    c.mu.RLock()
    defer c.mu.RUnlock()
    value, ok := c.data[key]
    return value, ok
}

func (c *LockFreeCache) Set(key string, value interface{}) {
    c.mu.Lock()
    defer c.mu.Unlock()
    c.data[key] = value
}

func lockFreeCacheDemo() {
    cache := NewLockFreeCache()
    var wg sync.WaitGroup
    
    // 并发读写
    for i := 0; i < 100; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            key := fmt.Sprintf("key%d", id)
            value := id * 10
            cache.Set(key, value)
            
            if v, ok := cache.Get(key); ok {
                _ = v
            }
        }(i)
    }
    
    wg.Wait()
}

无锁环形缓冲区

// 无锁环形缓冲区
type LockFreeRingBuffer struct {
    buffer []interface{}
    size   int64
    head   int64
    tail   int64
}

func NewLockFreeRingBuffer(size int) *LockFreeRingBuffer {
    return &LockFreeRingBuffer{
        buffer: make([]interface{}, size),
        size:   int64(size),
    }
}

func (rb *LockFreeRingBuffer) Enqueue(value interface{}) bool {
    for {
        tail := atomic.LoadInt64(&rb.tail)
        head := atomic.LoadInt64(&rb.head)
        
        nextTail := (tail + 1) % rb.size
        if nextTail == head {
            return false // 缓冲区已满
        }
        
        if atomic.CompareAndSwapInt64(&rb.tail, tail, nextTail) {
            rb.buffer[tail] = value
            return true
        }
    }
}

func (rb *LockFreeRingBuffer) Dequeue() (interface{}, bool) {
    for {
        head := atomic.LoadInt64(&rb.head)
        tail := atomic.LoadInt64(&rb.tail)
        
        if head == tail {
            return nil, false // 缓冲区为空
        }
        
        value := rb.buffer[head]
        nextHead := (head + 1) % rb.size
        
        if atomic.CompareAndSwapInt64(&rb.head, head, nextHead) {
            return value, true
        }
    }
}

func lockFreeRingBufferDemo() {
    buffer := NewLockFreeRingBuffer(10)
    var wg sync.WaitGroup
    
    // 并发入队
    for i := 0; i < 20; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            if buffer.Enqueue(id) {
                fmt.Printf("入队成功: %d
", id)
            } else {
                fmt.Printf("入队失败: %d
", id)
            }
        }(i)
    }
    
    wg.Wait()
    
    // 并发出队
    for i := 0; i < 20; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            if value, ok := buffer.Dequeue(); ok {
                fmt.Printf("出队: %v
", value)
            }
        }()
    }
    
    wg.Wait()
}

写在最后

内存屏障与原子操作是Go语言并发编程的底层机制,它们直接操作硬件级别的内存访问,确保多核处理器环境下的数据一致性。从简单的计数器到复杂的无锁数据结构,原子操作都展现出了其独特的价值。

作为Go开发者,掌握内存屏障与原子操作不仅能够提高程序的并发性能,还能让我们更好地理解并发编程的底层原理。通过合理使用原子操作,我们可以构建出更加高效和可靠的并发程序。

记住,原子操作不仅仅是工具,更是并发编程的基石。通过原子操作,我们可以实现无锁的并发数据结构,构建出更加高性能的并发程序。

© 版权声明
THE END
如果内容对您有所帮助,就支持一下吧!
点赞0 分享
评论 抢沙发

请登录后发表评论

    暂无评论内容