Goroutine并发编程

Goroutine基础

Goroutine是Go语言中的轻量级线程,由Go运行时管理。本节将介绍Goroutine的基本概念和使用方法。

package main

import (
    "fmt"
    "time"
)

func say(s string) {
    for i := 0; i < 5; i++ {
        time.Sleep(100 * time.Millisecond)
        fmt.Println(s)
    }
}

func main() {
    // 启动一个新的goroutine
    go say("world")
    
    // 主goroutine继续执行
    say("hello")
}

Goroutine调度

Go运行时使用M:N调度模型,将M个goroutine调度到N个操作系统线程上执行。

package main

import (
    "fmt"
    "runtime"
    "time"
)

func main() {
    // 设置最大CPU核心数
    runtime.GOMAXPROCS(4)
    
    // 启动多个goroutine
    for i := 0; i < 10; i++ {
        go func(id int) {
            for {
                fmt.Printf("Goroutine %d running\n", id)
                time.Sleep(time.Second)
            }
        }(i)
    }
    
    // 主goroutine等待
    time.Sleep(5 * time.Second)
}

Goroutine同步

使用sync包提供的同步原语来协调多个goroutine的执行。

package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    var wg sync.WaitGroup
    
    // 启动多个goroutine
    for i := 0; i < 5; i++ {
        wg.Add(1) // 增加等待计数
        go func(id int) {
            defer wg.Done() // 完成时减少等待计数
            fmt.Printf("Worker %d starting\n", id)
            time.Sleep(time.Second)
            fmt.Printf("Worker %d done\n", id)
        }(i)
    }
    
    // 等待所有goroutine完成
    wg.Wait()
    fmt.Println("All workers completed")
}

Goroutine通信

使用channel在goroutine之间进行通信和同步。

package main

import (
    "fmt"
    "time"
)

func worker(id int, jobs <-chan int, results chan<- int) {
    for j := range jobs {
        fmt.Printf("worker %d processing job %d\n", id, j)
        time.Sleep(time.Second)
        results <- j * 2
    }
}

func main() {
    jobs := make(chan int, 100)
    results := make(chan int, 100)
    
    // 启动3个worker
    for w := 1; w <= 3; w++ {
        go worker(w, jobs, results)
    }
    
    // 发送9个任务
    for j := 1; j <= 9; j++ {
        jobs <- j
    }
    close(jobs)
    
    // 收集结果
    for a := 1; a <= 9; a++ {
        <-results
    }
}

Goroutine最佳实践

编写高效、可靠的并发程序需要注意的一些关键点。

package main

import (
    "context"
    "fmt"
    "time"
)

func worker(ctx context.Context, id int) {
    for {
        select {
        case <-ctx.Done():
            fmt.Printf("Worker %d: Shutting down\n", id)
            return
        default:
            fmt.Printf("Worker %d: Working...\n", id)
            time.Sleep(time.Second)
        }
    }
}

func main() {
    // 创建带超时的context
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()
    
    // 启动worker
    go worker(ctx, 1)
    go worker(ctx, 2)
    
    // 等待context超时
    <-ctx.Done()
    fmt.Println("Main: Shutting down")
}

Goroutine错误处理

在并发程序中正确处理错误和异常情况。

package main

import (
    "fmt"
    "sync"
)

func process(id int, wg *sync.WaitGroup, errChan chan<- error) {
    defer wg.Done()
    
    // 模拟可能发生的错误
    if id == 2 {
        errChan <- fmt.Errorf("error in worker %d", id)
        return
    }
    
    fmt.Printf("Worker %d completed successfully\n", id)
}

func main() {
    var wg sync.WaitGroup
    errChan := make(chan error, 5)
    
    // 启动多个worker
    for i := 0; i < 5; i++ {
        wg.Add(1)
        go process(i, &wg, errChan)
    }
    
    // 等待所有worker完成
    go func() {
        wg.Wait()
        close(errChan)
    }()
    
    // 处理错误
    for err := range errChan {
        fmt.Printf("Error: %v\n", err)
    }
}

Goroutine性能优化

优化goroutine的使用以提高程序性能。

package main

import (
    "fmt"
    "runtime"
    "sync"
    "time"
)

func main() {
    // 设置CPU核心数
    numCPU := runtime.NumCPU()
    runtime.GOMAXPROCS(numCPU)
    
    // 使用对象池
    var pool sync.Pool
    pool.New = func() interface{} {
        return make([]byte, 1024)
    }
    
    // 并发处理任务
    start := time.Now()
    var wg sync.WaitGroup
    
    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            
            // 从对象池获取对象
            buf := pool.Get().([]byte)
            defer pool.Put(buf)
            
            // 使用对象处理任务
            time.Sleep(time.Millisecond)
        }()
    }
    
    wg.Wait()
    fmt.Printf("Processed 1000 tasks in %v\n", time.Since(start))
}