Skip to content
公众号 - 佳佳的博客

Go 语言实战 第 7 章 并发模式

本章学习 3 个可以在实际工程里使用的包,这 3 个包分别实现了不同的并发模式。

7.1 runner

go
// Example is provided with help by Gabriel Aszalos.
// Package runner manages the running and lifetime of a process.
package runner

import (
    "errors"
    "os"
    "os/signal"
    "time"
)

// Runner runs a set of tasks within a given timeout and can be
// shut down on an operating system interrupt.
type Runner struct {
    // interrupt channel reports a signal from the
    // operating system.
    interrupt chan os.Signal

    // complete channel reports that processing is done.
    complete chan error

    // timeout reports that time has run out.
    timeout <-chan time.Time

    // tasks holds a set of functions that are executed
    // synchronously in index order.
    tasks []func(int)
}

// ErrTimeout is returned when a value is received on the timeout channel.
var ErrTimeout = errors.New("received timeout")

// ErrInterrupt is returned when an event from the OS is received.
var ErrInterrupt = errors.New("received interrupt")

// New returns a new ready-to-use Runner.
func New(d time.Duration) *Runner {
    return &Runner{
        interrupt: make(chan os.Signal, 1),
        complete:  make(chan error),
        timeout:   time.After(d),
    }
}

// Add attaches tasks to the Runner. A task is a function that
// takes an int ID.
func (r *Runner) Add(tasks ...func(int)) {
    r.tasks = append(r.tasks, tasks...)
}

// Start runs all tasks and monitors channel events.
func (r *Runner) Start() error {
    // We want to receive all interrupt based signals.
    signal.Notify(r.interrupt, os.Interrupt)

    // Run the different tasks on a different goroutine.
    go func() {
        r.complete <- r.run()
    }()

    select {
    // Signaled when processing is done.
    case err := <-r.complete:
        return err

    // Signaled when we run out of time.
    case <-r.timeout:
        return ErrTimeout
    }
}

// run executes each registered task.
func (r *Runner) run() error {
    for id, task := range r.tasks {
        // Check for an interrupt signal from the OS.
        if r.gotInterrupt() {
            return ErrInterrupt
        }

        // Execute the registered task.
        task(id)
    }

    return nil
}

// gotInterrupt verifies if the interrupt signal has been issued.
func (r *Runner) gotInterrupt() bool {
    select {
    // Signaled when an interrupt event is sent.
    case <-r.interrupt:
        // Stop receiving any further signals.
        signal.Stop(r.interrupt)
        return true

    // Continue running as normal.
    default:
        return false
    }
}

在设计上,支持以下终止点:

  • 程序可以在分配的时间内完成工作,正常终止;
  • 程序没有及时完成工作,“自杀”;
  • 接收到操作系统发送的中断事件,程序立即试图清理状态并停止工作。
go
// This sample program demonstrates how to use a channel to
// monitor the amount of time the program is running and terminate
// the program if it runs too long.
package main

import (
    "log"
    "os"
    "time"
    "github.com/goinaction/code/chapter7/patterns/runner"
)

// timeout is the number of second the program has to finish.
const timeout = 3 * time.Second

// main is the entry point for the program.
func main() {
    log.Println("Starting work.")

    // Create a new timer value for this run.
    r := runner.New(timeout)

    // Add the tasks to be run.
    r.Add(createTask(), createTask(), createTask())

    // Run the tasks and handle the result.
    if err := r.Start(); err != nil {
        switch err {
        case runner.ErrTimeout:
            log.Println("Terminating due to timeout.")
            os.Exit(1)
        case runner.ErrInterrupt:
            log.Println("Terminating due to interrupt.")
            os.Exit(2)
        }
    }

    log.Println("Process ended.")
}

// createTask returns an example task that sleeps for the specified
// number of seconds based on the id.
func createTask() func(int) {
    return func(id int) {
        log.Printf("Processor - Task #%d.", id)
        time.Sleep(time.Duration(id) * time.Second)
    }
}

7.2 pool

pool 包用于展示如何使用有缓冲的通道实现资源池,来管理可以在任意数量的 goroutine 之间共享及独立使用的资源。
这种模式在需要共享一组静态资源的情况下非常有用。
如果 goroutine 需要从池里得到这些资源中的一个,它可以从池里申请,使用完后归还到资源池里。

go
// Example provided with help from Fatih Arslan and Gabriel Aszalos.
// Package pool manages a user defined set of resources.
package pool

import (
    "errors"
    "io"
    "log"
    "sync"
)

// Pool manages a set of resources that can be shared safely by
// multiple goroutines. The resource being managed must implement
// the io.Closer interface.
type Pool struct {
    m         sync.Mutex
    resources chan io.Closer
    factory   func() (io.Closer, error)
    closed    bool
}

// ErrPoolClosed is returned when an Acquire returns on a
// closed pool.
var ErrPoolClosed = errors.New("Pool has been closed.")

// New creates a pool that manages resources. A pool requires a
// function that can allocate a new resource and the size of
// the pool.
func New(fn func() (io.Closer, error), size uint) (*Pool, error) {
    if size <= 0 {
        return nil, errors.New("Size value too small.")
    }

    return &Pool{
        factory:   fn,
        resources: make(chan io.Closer, size),
    }, nil
}

// Acquire retrieves a resource    from the pool.
func (p *Pool) Acquire() (io.Closer, error) {
    select {
    // Check for a free resource.
    case r, ok := <-p.resources:
        log.Println("Acquire:", "Shared Resource")
        if !ok {
            return nil, ErrPoolClosed
        }
        return r, nil

    // Provide a new resource since there are none available.
    default:
        log.Println("Acquire:", "New Resource")
        return p.factory()
    }
}

// Release places a new resource onto the pool.
func (p *Pool) Release(r io.Closer) {
    // Secure this operation with the Close operation.
    p.m.Lock()
    defer p.m.Unlock()

    // If the pool is closed, discard the resource.
    if p.closed {
        r.Close()
        return
    }

    select {
    // Attempt to place the new resource on the queue.
    case p.resources <- r:
        log.Println("Release:", "In Queue")

    // If the queue is already at cap we close the resource.
    default:
        log.Println("Release:", "Closing")
        r.Close()
    }
}

// Close will shutdown the pool and close all existing resources.
func (p *Pool) Close() {
    // Secure this operation with the Release operation.
    p.m.Lock()
    defer p.m.Unlock()

    // If the pool is already close, don't do anything.
    if p.closed {
        return
    }

    // Set the pool as closed.
    p.closed = true

    // Close the channel before we drain the channel of its
    // resources. If we don't do this, we will have a deadlock.
    close(p.resources)

    // Close the resources
    for r := range p.resources {
        r.Close()
    }
}

使用示例:

go
// This sample program demonstrates how to use the pool package
// to share a simulated set of database connections.
package main

import (
    "io"
    "log"
    "math/rand"
    "sync"
    "sync/atomic"
    "time"

    "github.com/goinaction/code/chapter7/patterns/pool"
)

const (
    maxGoroutines   = 25 // the number of routines to use.
    pooledResources = 2  // number of resources in the pool
)

// dbConnection simulates a resource to share.
type dbConnection struct {
    ID int32
}

// Close implements the io.Closer interface so dbConnection
// can be managed by the pool. Close performs any resource
// release management.
func (dbConn *dbConnection) Close() error {
    log.Println("Close: Connection", dbConn.ID)
    return nil
}

// idCounter provides support for giving each connection a unique id.
var idCounter int32

// createConnection is a factory method that will be called by
// the pool when a new connection is needed.
func createConnection() (io.Closer, error) {
    id := atomic.AddInt32(&idCounter, 1)
    log.Println("Create: New Connection", id)

    return &dbConnection{id}, nil
}

// main is the entry point for all Go programs.
func main() {
    var wg sync.WaitGroup
    wg.Add(maxGoroutines)

    // Create the pool to manage our connections.
    p, err := pool.New(createConnection, pooledResources)
    if err != nil {
        log.Println(err)
    }

    // Perform queries using connections from the pool.
    for query := 0; query < maxGoroutines; query++ {
        // Each goroutine needs its own copy of the query
        // value else they will all be sharing the same query
        // variable.
        go func(q int) {
            performQueries(q, p)
            wg.Done()
        }(query)
    }

    // Wait for the goroutines to finish.
    wg.Wait()

    // Close the pool.
    log.Println("Shutdown Program.")
    p.Close()
}

// performQueries tests the resource pool of connections.
func performQueries(query int, p *pool.Pool) {
    // Acquire a connection from the pool.
    conn, err := p.Acquire()
    if err != nil {
        log.Println(err)
        return
    }

    // Release the connection back to the pool.
    defer p.Release(conn)

    // Wait to simulate a query response.
    time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
    log.Printf("Query: QID[%d] CID[%d]\n", query, conn.(*dbConnection).ID)
}

7.3 work

work 包的目的是展示如何使用无缓冲的通道来创建一个 goroutine 池,这些 goroutine 执行并控制一组工作,让其并发执行。
在这种情况下,使用无缓冲的通道要比随意指定一个缓冲区大小的有缓冲通道好,因为这个情况下既不需要一个工作队列,也不需要一组 goroutine 配合执行。
无缓冲的通道保证两个 goroutine 之间的数据交换。
这种使用无缓冲通道的方法允许使用者知道什么时候 goroutine 池正在执行工作,而且如果池里的所有 goroutine 都忙,无法接受新的工作的时候,也能及时通过通过来通知调用者。

go
// Example provided with help from Jason Waldrip.
// Package work manages a pool of goroutines to perform work.
package work

import "sync"

// Worker must be implemented by types that want to use
// the work pool.
type Worker interface {
    Task()
}

// Pool provides a pool of goroutines that can execute any Worker
// tasks that are submitted.
type Pool struct {
    work chan Worker
    wg   sync.WaitGroup
}

// New creates a new work pool.
func New(maxGoroutines int) *Pool {
    p := Pool{
        work: make(chan Worker),
    }

    p.wg.Add(maxGoroutines)
    for i := 0; i < maxGoroutines; i++ {
        go func() {
            for w := range p.work {
                w.Task()
            }
            p.wg.Done()
        }()
    }

    return &p
}

// Run submits work to the pool.
func (p *Pool) Run(w Worker) {
    p.work <- w
}

// Shutdown waits for all the goroutines to shutdown.
func (p *Pool) Shutdown() {
    close(p.work)
    p.wg.Wait()
}

使用示例:

go
// This sample program demonstrates how to use the work package
// to use a pool of goroutines to get work done.
package main

import (
    "log"
    "sync"
    "time"

    "github.com/goinaction/code/chapter7/patterns/work"
)

// names provides a set of names to display.
var names = []string{
    "steve",
    "bob",
    "mary",
    "therese",
    "jason",
}

// namePrinter provides special support for printing names.
type namePrinter struct {
    name string
}

// Task implements the Worker interface.
func (m *namePrinter) Task() {
    log.Println(m.name)
    time.Sleep(time.Second)
}

// main is the entry point for all Go programs.
func main() {
    // Create a work pool with 2 goroutines.
    p := work.New(2)

    var wg sync.WaitGroup
    wg.Add(100 * len(names))

    for i := 0; i < 100; i++ {
        // Iterate over the slice of names.
        for _, name := range names {
            // Create a namePrinter and provide the
            // specific name.
            np := namePrinter{
                name: name,
            }

            go func() {
                // Submit the task to be worked on. When RunTask
                // returns we know it is being handled.
                p.Run(&np)
                wg.Done()
            }()
        }
    }

    wg.Wait()

    // Shutdown the work pool and wait for all existing work
    // to be completed.
    p.Shutdown()
}

7.4 小结

  • 可以使用通道来控制程序的生命周期。
  • default 分支的 select 语句可以用来尝试向通道发送或者接收数据,而不会阻塞。
  • 有缓冲的通道可以用来管理一组可服用的资源。
  • 语言运行时会处理好通道的协作和同步。
  • 使用无缓冲的通道来创建完成工作的 goroutine 池。
  • 任何时间都可以用无缓冲的通道来让两个 goroutine 交换数据,在通道操作完成时一定保证对方接收到了数据。