Chapter 9 Concurrency with Shared Variables

Exercise 9.1: Add a functionWithdraw(amount int) boolto thegopl.io/ch9/bank1program. The result should indicate whether the transaction succeeded or failed due to insufficient funds. The message sent to the monitor goroutine must contain both the amount to withdraw and a new channel over which the monitor goroutine can send the boolean result back toWithdraw.

// Copyright © 2017 [email protected]
// Copyright © 2016 Alan A. A. Donovan & Brian W. Kernighan.
// License: https://creativecommons.org/licenses/by-nc-sa/4.0/

// See page 261.
//!+

// Package bank provides a concurrency-safe bank with one account.
package bank

var deposits = make(chan int) // send amount to deposit
var balances = make(chan int) // receive balance
var withdraw = make(chan int) // send amount to withdraw
var result = make(chan bool)  // withdraw result

func Deposit(amount int) { deposits <- amount }
func Balance() int       { return <-balances }
func WithDraw(amount int) bool {
    withdraw <- amount
    return <-result
}

func teller() {
    var balance int // balance is confined to teller goroutine
    for {
        select {
        case amount := <-deposits:
            balance += amount
        case balances <- balance:
        case amount := <-withdraw:
            if amount <= balance {
                balance -= amount
                result <- true
            } else {
                result <- false
            }
        }
    }
}

func init() {
    go teller() // start the monitor goroutine
}

//!-

Exercise 9.2: Rewrite thePopCountexample from Section 2.6.2 so that it initializes the lookup table usingsync.Oncethe first time it is needed. (Realistically, the cost of synchronization would be prohibitive for a small and highly optimized function likePopCount.)

// Copyright © 2017 [email protected]
// Copyright © 2016 Alan A. A. Donovan & Brian W. Kernighan.
// License: https://creativecommons.org/licenses/by-nc-sa/4.0/

// See page 45.

// (Package doc comment intentionally malformed to demonstrate golint.)
//!+
package popcount

import "sync"

// pc[i] is the population count of i.
var table [256]byte

func initTable() {
    for i := range table {
        table[i] = table[i/2] + byte(i&1)
    }
}

var load sync.Once

func pc(i byte) byte {
    load.Do(initTable)
    return table[i]
}

// PopCount returns the population count (number of set bits) of x.
func PopCount(x uint64) int {
    return int(pc(byte(x>>(0*8))) +
        pc(byte(x>>(1*8))) +
        pc(byte(x>>(2*8))) +
        pc(byte(x>>(3*8))) +
        pc(byte(x>>(4*8))) +
        pc(byte(x>>(5*8))) +
        pc(byte(x>>(6*8))) +
        pc(byte(x>>(7*8))))
}

//!-

Exercise 9.3: Extend theFunctype and the(*Memo).Getmethod so that callers may provide an optionaldonechannel through which they can cancel the operation (§8.9). The results of a cancelledFunccall should not be cached.

// shared variable and lock

// Func is the type of the function to memoize.
type Func func(string, <-chan struct{}) (interface{}, error)

func (memo *Memo) Get(key string, cancel <-chan struct{}) (value interface{}, err error) {
    memo.mu.Lock()
    e := memo.cache[key]
    if e == nil {
        // This is the first request for this key.
        // This goroutine becomes responsible for computing
        // the value and broadcasting the ready condition.
        e = &entry{ready: make(chan struct{})}
        memo.cache[key] = e
        memo.mu.Unlock()

        e.res.value, e.res.err = memo.f(key, cancel)

        close(e.ready) // broadcast ready condition
    } else {
        // This is a repeat request for this key.
        memo.mu.Unlock()

        <-e.ready // wait for ready condition
    }

    // cancel detection
    select {
    case <-cancel:
        memo.mu.Lock()
        delete(memo.cache, key)
        memo.mu.Unlock() // delete canceled entry
        return nil, fmt.Errorf("%s is canceled", key)
    default:
    }
    return e.res.value, e.res.err
}

// test
func TestConcurrentCancel(t *testing.T) {
    m := memo.New(httpGetBody)
    memotest.ConcurrentCancel(t, m)
}

func ConcurrentCancel(t *testing.T, m M) {
    //!+conc
    var n sync.WaitGroup

    cancel := make(chan struct{})
    mom := time.NewTimer(1600 * time.Millisecond) // 1.6s
    go func() {
        <-mom.C
        close(cancel)
        mom.Stop()
    }()

    for url := range incomingURLs() {
        n.Add(1)
        go func(url string) {
            defer n.Done()
            start := time.Now()
            value, err := m.Get(url, cancel)
            if err != nil {
                log.Print(err)
                return
            }
            fmt.Printf("%s, %s, %d bytes\n",
                url, time.Since(start), len(value.([]byte)))
        }(url)
    }
    n.Wait()
    //!-conc
}
// channel and goroutines

// A request is a message requesting that the Func be applied to key.
type request struct {
    key      string
    cancel   <-chan struct{}
    response chan<- result // the client wants a single result
}

type Memo struct {
    requests chan request
    delete   chan string // buffered
}

func (memo *Memo) Delete(key string) {
    memo.delete <- key
}

func (memo *Memo) Close() {
    close(memo.delete)
    close(memo.requests)
}

func (memo *Memo) server(f Func) {
    cache := make(map[string]*entry)
loop:
    for {
        select {
        case key, ok := <-memo.delete:
            if !ok {
                break loop
            }
            delete(cache, key)
        case req, ok := <-memo.requests:
            if !ok {
                break loop
            }
            e := cache[req.key]
            if e == nil {
                // This is the first request for this key.
                e = &entry{ready: make(chan struct{})}
                cache[req.key] = e
                go e.call(f, req.key, req.cancel) // call f(key)
            }
            go e.deliver(req.response, req.cancel, memo.delete, req.key)
        }
    }
}

func (e *entry) deliver(response chan<- result, cancel <-chan struct{},
    delete chan<- string, key string) {
    // Wait for the ready condition.
    <-e.ready

    // if this request is canceled which must be happened before e.ready
    // then delete current entry (the results of a canceled Func call
    // should not be cached)
    select {
    case <-cancel:
        delete <- key
        response <- result{nil, fmt.Errorf("%s is canceled", key)}
    default:
        // Send the result to the client.
        response <- e.res
    }
}

Exercise 9.4: Construct a pipeline that connects an arbitrary number of goroutines with channels. What is the maximum number of pipeline stages you can create without running out of memory? How long does a value take to transit the entire pipeline?

// millions of

// on my computer: 1 millions goroutines, it takes 0.464701s

// Copyright © 2017 [email protected]
// License: https://creativecommons.org/licenses/by-nc-sa/4.0/

package main

import (
    "time"
    "fmt"
)

const N = 1000000

func main() {
    hello := "hello"
    var mid [N]chan string
    for i := 0; i < N; i++ {
        mid[i] = make(chan string)
    }
    head, tail := mid[0], mid[N-1]

    for i := 0; i < N-1; i++ {
        go func(i int) {
            mid[i+1] <- <-mid[i]
        }(i)
    }

    s := time.Now()
    head <- hello
    <-tail
    fmt.Printf("%d goroutines, %fs\n", N, time.Since(s).Seconds())
}

Exercise 9.5: Write a program with two goroutines that send messages back and forth over two unbuffered channels in ping-pong fashion. How many communications per second can the program sustain?

// output: 73,925,957

// Copyright © 2017 [email protected]
// License: https://creativecommons.org/licenses/by-nc-sa/4.0/

package main

import (
    "time"
    "fmt"
)

func main() {
    ping, pong := make(chan int), make(chan int)
    var counter int64

    t := time.NewTimer(1 * time.Minute)
    done := make(chan struct{})
    shutdown := make(chan struct{})

    // ping->pong
    go func() {
    loop:
        for {
            select {
            case <-shutdown:
                break loop
            case v := <-ping:
                counter++
                pong <- v
            }
        }
        done <- struct{}{}
    }()

    go func() {
    loop:
        for {
            select {
            case <-shutdown:
                break loop
            case v := <-pong:
                ping <- v
            }
        }
        done <- struct{}{}
    }()

    // kick it start
    ping <- 1

    // 1 minutes
    <-t.C
    close(shutdown)

    // drain the one which is blocked
    select {
    case <-ping:
    case <-pong:
    }

    <-done
    <-done
    t.Stop()
    fmt.Println(counter)
}

Exercise 9.6: Measure how the performance of a compute-bound parallel program (see Exercise 8.5) varies withGOMAXPROCS. What is the optimal value on your computer? How many CPUs does your computer have?

// around 8, 4 cores each with 2 hardware threads
// default GOMAXPROCS is 8.

Notes:

  • Data Race: A data race occurs whenever two goroutines access the same variable concurrently and at least one of the accesses is a write.

results matching ""

    No results matching ""