Chapter 9 Concurrency with Shared Variables
Exercise 9.1: Add a functionWithdraw(amount int) boolto thegopl.io/ch9/bank1program. The result should indicate whether the transaction succeeded or failed due to insufficient funds. The message sent to the monitor goroutine must contain both the amount to withdraw and a new channel over which the monitor goroutine can send the boolean result back toWithdraw.
// Copyright © 2017 [email protected]
// Copyright © 2016 Alan A. A. Donovan & Brian W. Kernighan.
// License: https://creativecommons.org/licenses/by-nc-sa/4.0/
// See page 261.
//!+
// Package bank provides a concurrency-safe bank with one account.
package bank
var deposits = make(chan int) // send amount to deposit
var balances = make(chan int) // receive balance
var withdraw = make(chan int) // send amount to withdraw
var result = make(chan bool) // withdraw result
func Deposit(amount int) { deposits <- amount }
func Balance() int { return <-balances }
func WithDraw(amount int) bool {
withdraw <- amount
return <-result
}
func teller() {
var balance int // balance is confined to teller goroutine
for {
select {
case amount := <-deposits:
balance += amount
case balances <- balance:
case amount := <-withdraw:
if amount <= balance {
balance -= amount
result <- true
} else {
result <- false
}
}
}
}
func init() {
go teller() // start the monitor goroutine
}
//!-
Exercise 9.2: Rewrite thePopCountexample from Section 2.6.2 so that it initializes the lookup table usingsync.Oncethe first time it is needed. (Realistically, the cost of synchronization would be prohibitive for a small and highly optimized function likePopCount.)
// Copyright © 2017 [email protected]
// Copyright © 2016 Alan A. A. Donovan & Brian W. Kernighan.
// License: https://creativecommons.org/licenses/by-nc-sa/4.0/
// See page 45.
// (Package doc comment intentionally malformed to demonstrate golint.)
//!+
package popcount
import "sync"
// pc[i] is the population count of i.
var table [256]byte
func initTable() {
for i := range table {
table[i] = table[i/2] + byte(i&1)
}
}
var load sync.Once
func pc(i byte) byte {
load.Do(initTable)
return table[i]
}
// PopCount returns the population count (number of set bits) of x.
func PopCount(x uint64) int {
return int(pc(byte(x>>(0*8))) +
pc(byte(x>>(1*8))) +
pc(byte(x>>(2*8))) +
pc(byte(x>>(3*8))) +
pc(byte(x>>(4*8))) +
pc(byte(x>>(5*8))) +
pc(byte(x>>(6*8))) +
pc(byte(x>>(7*8))))
}
//!-
Exercise 9.3: Extend theFunctype and the(*Memo).Getmethod so that callers may provide an optionaldonechannel through which they can cancel the operation (§8.9). The results of a cancelledFunccall should not be cached.
// shared variable and lock
// Func is the type of the function to memoize.
type Func func(string, <-chan struct{}) (interface{}, error)
func (memo *Memo) Get(key string, cancel <-chan struct{}) (value interface{}, err error) {
memo.mu.Lock()
e := memo.cache[key]
if e == nil {
// This is the first request for this key.
// This goroutine becomes responsible for computing
// the value and broadcasting the ready condition.
e = &entry{ready: make(chan struct{})}
memo.cache[key] = e
memo.mu.Unlock()
e.res.value, e.res.err = memo.f(key, cancel)
close(e.ready) // broadcast ready condition
} else {
// This is a repeat request for this key.
memo.mu.Unlock()
<-e.ready // wait for ready condition
}
// cancel detection
select {
case <-cancel:
memo.mu.Lock()
delete(memo.cache, key)
memo.mu.Unlock() // delete canceled entry
return nil, fmt.Errorf("%s is canceled", key)
default:
}
return e.res.value, e.res.err
}
// test
func TestConcurrentCancel(t *testing.T) {
m := memo.New(httpGetBody)
memotest.ConcurrentCancel(t, m)
}
func ConcurrentCancel(t *testing.T, m M) {
//!+conc
var n sync.WaitGroup
cancel := make(chan struct{})
mom := time.NewTimer(1600 * time.Millisecond) // 1.6s
go func() {
<-mom.C
close(cancel)
mom.Stop()
}()
for url := range incomingURLs() {
n.Add(1)
go func(url string) {
defer n.Done()
start := time.Now()
value, err := m.Get(url, cancel)
if err != nil {
log.Print(err)
return
}
fmt.Printf("%s, %s, %d bytes\n",
url, time.Since(start), len(value.([]byte)))
}(url)
}
n.Wait()
//!-conc
}
// channel and goroutines
// A request is a message requesting that the Func be applied to key.
type request struct {
key string
cancel <-chan struct{}
response chan<- result // the client wants a single result
}
type Memo struct {
requests chan request
delete chan string // buffered
}
func (memo *Memo) Delete(key string) {
memo.delete <- key
}
func (memo *Memo) Close() {
close(memo.delete)
close(memo.requests)
}
func (memo *Memo) server(f Func) {
cache := make(map[string]*entry)
loop:
for {
select {
case key, ok := <-memo.delete:
if !ok {
break loop
}
delete(cache, key)
case req, ok := <-memo.requests:
if !ok {
break loop
}
e := cache[req.key]
if e == nil {
// This is the first request for this key.
e = &entry{ready: make(chan struct{})}
cache[req.key] = e
go e.call(f, req.key, req.cancel) // call f(key)
}
go e.deliver(req.response, req.cancel, memo.delete, req.key)
}
}
}
func (e *entry) deliver(response chan<- result, cancel <-chan struct{},
delete chan<- string, key string) {
// Wait for the ready condition.
<-e.ready
// if this request is canceled which must be happened before e.ready
// then delete current entry (the results of a canceled Func call
// should not be cached)
select {
case <-cancel:
delete <- key
response <- result{nil, fmt.Errorf("%s is canceled", key)}
default:
// Send the result to the client.
response <- e.res
}
}
Exercise 9.4: Construct a pipeline that connects an arbitrary number of goroutines with channels. What is the maximum number of pipeline stages you can create without running out of memory? How long does a value take to transit the entire pipeline?
// millions of
// on my computer: 1 millions goroutines, it takes 0.464701s
// Copyright © 2017 [email protected]
// License: https://creativecommons.org/licenses/by-nc-sa/4.0/
package main
import (
"time"
"fmt"
)
const N = 1000000
func main() {
hello := "hello"
var mid [N]chan string
for i := 0; i < N; i++ {
mid[i] = make(chan string)
}
head, tail := mid[0], mid[N-1]
for i := 0; i < N-1; i++ {
go func(i int) {
mid[i+1] <- <-mid[i]
}(i)
}
s := time.Now()
head <- hello
<-tail
fmt.Printf("%d goroutines, %fs\n", N, time.Since(s).Seconds())
}
Exercise 9.5: Write a program with two goroutines that send messages back and forth over two unbuffered channels in ping-pong fashion. How many communications per second can the program sustain?
// output: 73,925,957
// Copyright © 2017 [email protected]
// License: https://creativecommons.org/licenses/by-nc-sa/4.0/
package main
import (
"time"
"fmt"
)
func main() {
ping, pong := make(chan int), make(chan int)
var counter int64
t := time.NewTimer(1 * time.Minute)
done := make(chan struct{})
shutdown := make(chan struct{})
// ping->pong
go func() {
loop:
for {
select {
case <-shutdown:
break loop
case v := <-ping:
counter++
pong <- v
}
}
done <- struct{}{}
}()
go func() {
loop:
for {
select {
case <-shutdown:
break loop
case v := <-pong:
ping <- v
}
}
done <- struct{}{}
}()
// kick it start
ping <- 1
// 1 minutes
<-t.C
close(shutdown)
// drain the one which is blocked
select {
case <-ping:
case <-pong:
}
<-done
<-done
t.Stop()
fmt.Println(counter)
}
Exercise 9.6: Measure how the performance of a compute-bound parallel program (see Exercise 8.5) varies withGOMAXPROCS. What is the optimal value on your computer? How many CPUs does your computer have?
// around 8, 4 cores each with 2 hardware threads
// default GOMAXPROCS is 8.
Notes:
- Data Race: A data race occurs whenever two goroutines access the same variable concurrently and at least one of the accesses is a write.