Chapter 9 Concurrency with Shared Variables
Exercise 9.1: Add a functionWithdraw(amount int) bool
to thegopl.io/ch9/bank1
program. The result should indicate whether the transaction succeeded or failed due to insufficient funds. The message sent to the monitor goroutine must contain both the amount to withdraw and a new channel over which the monitor goroutine can send the boolean result back toWithdraw
.
// Copyright © 2017 [email protected]
// Copyright © 2016 Alan A. A. Donovan & Brian W. Kernighan.
// License: https://creativecommons.org/licenses/by-nc-sa/4.0/
// See page 261.
//!+
// Package bank provides a concurrency-safe bank with one account.
package bank
var deposits = make(chan int) // send amount to deposit
var balances = make(chan int) // receive balance
var withdraw = make(chan int) // send amount to withdraw
var result = make(chan bool) // withdraw result
func Deposit(amount int) { deposits <- amount }
func Balance() int { return <-balances }
func WithDraw(amount int) bool {
withdraw <- amount
return <-result
}
func teller() {
var balance int // balance is confined to teller goroutine
for {
select {
case amount := <-deposits:
balance += amount
case balances <- balance:
case amount := <-withdraw:
if amount <= balance {
balance -= amount
result <- true
} else {
result <- false
}
}
}
}
func init() {
go teller() // start the monitor goroutine
}
//!-
Exercise 9.2: Rewrite thePopCount
example from Section 2.6.2 so that it initializes the lookup table usingsync.Once
the first time it is needed. (Realistically, the cost of synchronization would be prohibitive for a small and highly optimized function likePopCount.)
// Copyright © 2017 [email protected]
// Copyright © 2016 Alan A. A. Donovan & Brian W. Kernighan.
// License: https://creativecommons.org/licenses/by-nc-sa/4.0/
// See page 45.
// (Package doc comment intentionally malformed to demonstrate golint.)
//!+
package popcount
import "sync"
// pc[i] is the population count of i.
var table [256]byte
func initTable() {
for i := range table {
table[i] = table[i/2] + byte(i&1)
}
}
var load sync.Once
func pc(i byte) byte {
load.Do(initTable)
return table[i]
}
// PopCount returns the population count (number of set bits) of x.
func PopCount(x uint64) int {
return int(pc(byte(x>>(0*8))) +
pc(byte(x>>(1*8))) +
pc(byte(x>>(2*8))) +
pc(byte(x>>(3*8))) +
pc(byte(x>>(4*8))) +
pc(byte(x>>(5*8))) +
pc(byte(x>>(6*8))) +
pc(byte(x>>(7*8))))
}
//!-
Exercise 9.3: Extend theFunc
type and the(*Memo).Get
method so that callers may provide an optionaldone
channel through which they can cancel the operation (§8.9). The results of a cancelledFunc
call should not be cached.
// shared variable and lock
// Func is the type of the function to memoize.
type Func func(string, <-chan struct{}) (interface{}, error)
func (memo *Memo) Get(key string, cancel <-chan struct{}) (value interface{}, err error) {
memo.mu.Lock()
e := memo.cache[key]
if e == nil {
// This is the first request for this key.
// This goroutine becomes responsible for computing
// the value and broadcasting the ready condition.
e = &entry{ready: make(chan struct{})}
memo.cache[key] = e
memo.mu.Unlock()
e.res.value, e.res.err = memo.f(key, cancel)
close(e.ready) // broadcast ready condition
} else {
// This is a repeat request for this key.
memo.mu.Unlock()
<-e.ready // wait for ready condition
}
// cancel detection
select {
case <-cancel:
memo.mu.Lock()
delete(memo.cache, key)
memo.mu.Unlock() // delete canceled entry
return nil, fmt.Errorf("%s is canceled", key)
default:
}
return e.res.value, e.res.err
}
// test
func TestConcurrentCancel(t *testing.T) {
m := memo.New(httpGetBody)
memotest.ConcurrentCancel(t, m)
}
func ConcurrentCancel(t *testing.T, m M) {
//!+conc
var n sync.WaitGroup
cancel := make(chan struct{})
mom := time.NewTimer(1600 * time.Millisecond) // 1.6s
go func() {
<-mom.C
close(cancel)
mom.Stop()
}()
for url := range incomingURLs() {
n.Add(1)
go func(url string) {
defer n.Done()
start := time.Now()
value, err := m.Get(url, cancel)
if err != nil {
log.Print(err)
return
}
fmt.Printf("%s, %s, %d bytes\n",
url, time.Since(start), len(value.([]byte)))
}(url)
}
n.Wait()
//!-conc
}
// channel and goroutines
// A request is a message requesting that the Func be applied to key.
type request struct {
key string
cancel <-chan struct{}
response chan<- result // the client wants a single result
}
type Memo struct {
requests chan request
delete chan string // buffered
}
func (memo *Memo) Delete(key string) {
memo.delete <- key
}
func (memo *Memo) Close() {
close(memo.delete)
close(memo.requests)
}
func (memo *Memo) server(f Func) {
cache := make(map[string]*entry)
loop:
for {
select {
case key, ok := <-memo.delete:
if !ok {
break loop
}
delete(cache, key)
case req, ok := <-memo.requests:
if !ok {
break loop
}
e := cache[req.key]
if e == nil {
// This is the first request for this key.
e = &entry{ready: make(chan struct{})}
cache[req.key] = e
go e.call(f, req.key, req.cancel) // call f(key)
}
go e.deliver(req.response, req.cancel, memo.delete, req.key)
}
}
}
func (e *entry) deliver(response chan<- result, cancel <-chan struct{},
delete chan<- string, key string) {
// Wait for the ready condition.
<-e.ready
// if this request is canceled which must be happened before e.ready
// then delete current entry (the results of a canceled Func call
// should not be cached)
select {
case <-cancel:
delete <- key
response <- result{nil, fmt.Errorf("%s is canceled", key)}
default:
// Send the result to the client.
response <- e.res
}
}
Exercise 9.4: Construct a pipeline that connects an arbitrary number of goroutines with channels. What is the maximum number of pipeline stages you can create without running out of memory? How long does a value take to transit the entire pipeline?
// millions of
// on my computer: 1 millions goroutines, it takes 0.464701s
// Copyright © 2017 [email protected]
// License: https://creativecommons.org/licenses/by-nc-sa/4.0/
package main
import (
"time"
"fmt"
)
const N = 1000000
func main() {
hello := "hello"
var mid [N]chan string
for i := 0; i < N; i++ {
mid[i] = make(chan string)
}
head, tail := mid[0], mid[N-1]
for i := 0; i < N-1; i++ {
go func(i int) {
mid[i+1] <- <-mid[i]
}(i)
}
s := time.Now()
head <- hello
<-tail
fmt.Printf("%d goroutines, %fs\n", N, time.Since(s).Seconds())
}
Exercise 9.5: Write a program with two goroutines that send messages back and forth over two unbuffered channels in ping-pong fashion. How many communications per second can the program sustain?
// output: 73,925,957
// Copyright © 2017 [email protected]
// License: https://creativecommons.org/licenses/by-nc-sa/4.0/
package main
import (
"time"
"fmt"
)
func main() {
ping, pong := make(chan int), make(chan int)
var counter int64
t := time.NewTimer(1 * time.Minute)
done := make(chan struct{})
shutdown := make(chan struct{})
// ping->pong
go func() {
loop:
for {
select {
case <-shutdown:
break loop
case v := <-ping:
counter++
pong <- v
}
}
done <- struct{}{}
}()
go func() {
loop:
for {
select {
case <-shutdown:
break loop
case v := <-pong:
ping <- v
}
}
done <- struct{}{}
}()
// kick it start
ping <- 1
// 1 minutes
<-t.C
close(shutdown)
// drain the one which is blocked
select {
case <-ping:
case <-pong:
}
<-done
<-done
t.Stop()
fmt.Println(counter)
}
Exercise 9.6: Measure how the performance of a compute-bound parallel program (see Exercise 8.5) varies withGOMAXPROCS
. What is the optimal value on your computer? How many CPUs does your computer have?
// around 8, 4 cores each with 2 hardware threads
// default GOMAXPROCS is 8.
Notes:
- Data Race: A data race occurs whenever two goroutines access the same variable concurrently and at least one of the accesses is a write.