Chapter 8 Goroutines and Channels
Exercise 8.1: Modifyclock2
to accept a port number, and write a program,clockwall
, that acts as a client of several clock servers at once, reading the times from each one and displaying the results in a table, akin to the wall of clocks seen in some business offices. If you have access to geographically distributed computers, run instances remotely; otherwise run local instances on different ports with fake time zones.
$ TZ=US/Eastern ./clock2 -port 8010 &
$ TZ=Europe/London ./clock2 -port 8030 &
$ TZ=Asia/Tokyo ./clock2 -port 8020 &
$ clockwall NewYork=localhost:8010 London=localhost:8020 Tokyo=localhost:8030
// clock
...
var port = flag.String("port", "8000", "port number")
...
func main() {
flag.Parse()
listener, err := net.Listen("tcp", "localhost:" + *port)
if err != nil {
log.Fatal(err)
}
//!+
for {
conn, err := listener.Accept()
if err != nil {
log.Print(err) // e.g., connection aborted
continue
}
go handleConn(conn) // handle connections concurrently
}
//!-
}
// clockwall
// Copyright © 2017 [email protected]
// Copyright © 2016 Alan A. A. Donovan & Brian W. Kernighan.
// License: https://creativecommons.org/licenses/by-nc-sa/4.0/
package main
import (
"net"
"log"
"io"
"os"
"strings"
)
func mustCopy(w io.Writer, r io.Reader) {
if _, err := io.Copy(w, r); err != nil {
log.Fatal(err)
}
}
func getTime(addr string) {
conn, err := net.Dial("tcp", addr)
if err != nil {
log.Fatal(err)
}
mustCopy(os.Stdout, conn)
}
// parameter: NewYork=localhost:8010 London=localhost:8020 Tokyo=localhost:8030
func main() {
for _, locate := range os.Args[2:] {
s := strings.Split(locate, "=")
go getTime(s[1])
}
// main goroutine
s := strings.Split(os.Args[1], "=")
getTime(s[1])
}
Exercise 8.2: Implement a concurrent File Transfer Protocol (FTP) server. The server should interpret commands from each client such ascd
to change directory,ls
to list a directory,get
to send the contents of a file, andclose
to close the connection. You can use the standardftp
command as the client, or write your own.
// simple concurrent ftp server
// Copyright © 2017 [email protected]
// License: https://creativecommons.org/licenses/by-nc-sa/4.0/
// simple FTP server
package main
import (
"net"
"log"
"bufio"
"io"
"os/exec"
"strings"
"os"
)
func mustCopy(w io.Writer, r io.Reader) {
if _, err := io.Copy(w, r); err != nil {
log.Fatal(err)
}
}
func exeCmd(w io.Writer, e string, args ...string) {
cmd := exec.Command(e, args...)
cmd.Stdout = w
if err := cmd.Run(); err != nil {
log.Print(err)
}
}
func handleConn(c net.Conn) {
defer c.Close()
input := bufio.NewScanner(c)
for input.Scan() {
cmds := strings.Split(input.Text(), " ")
switch cmds[0] {
case "ls":
exeCmd(c, cmds[0], cmds[1:]...)
case "cd":
if err := os.Chdir(cmds[1]); err != nil {
log.Print(err)
}
case "get":
file, err := os.Open(cmds[1])
if err != nil {
log.Printf("file %s: %v", cmds[1], err)
continue
}
mustCopy(c, file)
case "close":
return
default:
help := "ls: list content\ncd: change directory\nget: get file content\n" +
"close: close connection\n"
mustCopy(c, strings.NewReader(help))
}
}
}
func main() {
listener, err := net.Listen("tcp", "localhost:8000")
if err != nil {
log.Fatal(err)
}
for {
conn, err := listener.Accept()
if err != nil {
log.Print(err) // e.g., connection aborted
continue
}
go handleConn(conn)
}
}
// simple client
package main
import (
"net"
"log"
"io"
"os"
)
func mustCopy(w io.Writer, r io.Reader) {
if _, err := io.Copy(w, r); err != nil {
log.Fatal(err)
}
}
func main() {
conn, err := net.Dial("tcp", "localhost:8000")
if err != nil {
log.Fatal(err)
}
go mustCopy(os.Stdout, conn)
mustCopy(conn, os.Stdin)
}
Exercise 8.4: Modify thereverb2
server to use async.WaitGroup
per connection to count the number of activeecho
goroutines. When it falls to zero, close the write half of the TCP connection as described in Exercise 8.3. Verify that your modifiednetcat3
client from that exercise waits for the final echoes of multiple concurrent shouts, even after the standard input has been closed.
func handleConn(c net.Conn) {
input := bufio.NewScanner(c)
var wg sync.WaitGroup
for input.Scan() {
wg.Add(1)
go func() {
defer wg.Done()
echo(c, input.Text(), 1*time.Second)
}()
}
// NOTE: ignoring potential errors from input.Err()
wg.Wait()
if con, ok := c.(*net.TCPConn); ok {
con.CloseWrite()
} else {
c.Close()
}
}
Exercise 8.5: Take an existing CPU-bound sequential program, such as the Mandelbrot program of Section 3.3 or the 3-D surface computation of Section 3.2, and execute its main loop in parallel using channels for communication. How much faster does it run on a multiprocessor machine? What is the optimal number of goroutines to use?
// 400ms / 80ms = 5 times faster, about 128 goroutines can achieve 80ms
// Copyright © 2017 [email protected]
// Copyright © 2016 Alan A. A. Donovan & Brian W. Kernighan.
// License: https://creativecommons.org/licenses/by-nc-sa/4.0/
// See page 61.
//!+
// Mandelbrot emits a PNG image of the Mandelbrot fractal.
package main
import (
"image"
"image/color"
"math/cmplx"
"sync"
"time"
"fmt"
)
// N represent the number of goroutines
const N = 128
func main() {
const (
xmin, ymin, xmax, ymax = -2, -2, +2, +2
width, height = 1024, 1024
)
img := image.NewRGBA(image.Rect(0, 0, width, height))
start := time.Now()
// partition jobs
size := height / N
var wg sync.WaitGroup
var done = make(chan struct{})
for i := 0; i < N; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
for py := size * i; py < size*(i+1); py++ {
y := float64(py)/height*(ymax-ymin) + ymin
for px := 0; px < width; px++ {
x := float64(px)/width*(xmax-xmin) + xmin
z := complex(x, y)
// Image point (px, py) represents complex value z.
img.Set(px, py, mandelbrot(z))
}
}
done <- struct{}{}
}(i)
}
// closer
go func() {
wg.Wait()
close(done)
}()
// drain done channel
for range done {
}
// skip output
//png.Encode(os.Stdout, img) // NOTE: ignoring errors
fmt.Printf("elapsed %fms\n", time.Since(start).Seconds()*1000)
}
func mandelbrot(z complex128) color.Color {
const iterations = 200
const contrast = 15
var v complex128
for n := uint8(0); n < iterations; n++ {
v = v*v + z
if cmplx.Abs(v) > 2 {
return color.Gray{255 - contrast*n}
}
}
return color.Black
}
Exercise 8.6: Add depth-limiting to the concurrent crawler. That is, if the user sets-depth=3
, then only URLs reachable by at most three links will be fetched.
// Copyright © 2017 [email protected]
// Copyright © 2016 Alan A. A. Donovan & Brian W. Kernighan.
// License: https://creativecommons.org/licenses/by-nc-sa/4.0/
// See page 243.
// Crawl3 crawls web links starting with the command-line arguments.
//
// This version uses bounded parallelism.
// For simplicity, it does not address the termination problem.
//
package main
import (
"flag"
"fmt"
"log"
"os"
"gopl.io/ch5/links"
"strings"
)
var depth = flag.Int("depth", 1,
"Only URLs reachable by at most `depth` links will be fetched")
func crawl(url string) []string {
fmt.Println(url)
list, err := links.Extract(url)
if err != nil {
log.Print(err)
}
return list
}
//!+
func main() {
flag.Parse()
worklist := make(chan []string) // lists of URLs, may have duplicates
unseenLinks := make(chan string) // de-duplicated URLs
var n, d int // number of pending sends to worklist
n++
// record items per depth
counter := make([]int, *depth+2)
counter[d] = n
// Add command-line arguments to worklist.
// skip depth parameter
go func() {
worklist <- flag.Args()
}()
// Create 20 crawler goroutines to fetch each unseen link.
for i := 0; i < 20; i++ {
go func() {
for link := range unseenLinks {
foundLinks := crawl(link)
go func() { worklist <- foundLinks }()
}
}()
}
// The main goroutine de-duplicates worklist items
// and sends the unseen ones to the crawlers.
seen := make(map[string]bool)
for ; n > 0; n-- {
list := <-worklist
// drain worklist then close unseenLinks
if d > *depth {
continue
}
for _, link := range list {
if !seen[link] {
n++ // counter++
counter[d+1]++
seen[link] = true
unseenLinks <- link
}
}
if counter[d]--; counter[d] == 0 {
d++
}
}
close(unseenLinks)
}
//!-
Exercise 8.7: Write a concurrent program that creates a local mirror of a web site, fetching each reachable page and writing it to a directory on the local disk. Only pages within the original domain (for instance,golang.org
) should be fetched. URLs within mirrored pages should be altered as needed so that they refer to the mirrored page, not the original.
// skip for now
Exercise 8.8: Using a select statement, add a timeout to the echo server from Section 8.3 so that it disconnects any client that shouts nothing within 10 seconds.
func handleConn(c net.Conn) {
input := bufio.NewScanner(c)
timeout := time.NewTimer(10 * time.Second)
text := make(chan string)
var wg sync.WaitGroup
go func() {
for input.Scan() {
text <- input.Text()
}
close(text)
}()
for {
select {
case t, ok := <-text:
if ok {
wg.Add(1)
timeout.Reset(10 * time.Second)
go func() {
defer wg.Done()
echo(c, t, 1*time.Second)
}()
} else {
wg.Wait()
c.Close()
return
}
case <-timeout.C:
timeout.Stop()
c.Close()
fmt.Println("disconnect silent client")
return
}
}
}
Exercise 8.9: Write a version ofdu
that computes and periodically displays separate totals for each of theroot
directories.
// Copyright © 2017 [email protected]
// Copyright © 2016 Alan A. A. Donovan & Brian W. Kernighan.
// License: https://creativecommons.org/licenses/by-nc-sa/4.0/
// See page 250.
// The du3 command computes the disk usage of the files in a directory.
package main
// The du3 variant traverses all directories in parallel.
// It uses a concurrency-limiting counting semaphore
// to avoid opening too many files at once.
import (
"flag"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"sync"
"time"
)
var vFlag = flag.Bool("v", false, "show verbose progress messages")
// ref: https://stackoverflow.com/questions/19992334/how-to-listen-to-n-channels-dynamic-select-statement
// aggregate chan array to one share chan
// aggregated message
type Message struct {
filesize int64
id int
}
//!+
func main() {
// ...determine roots...
//!-
flag.Parse()
// Determine the initial directories.
roots := flag.Args()
size := len(roots)
if size == 0 {
roots = []string{"."}
size = 1
}
//!+
var message = make(chan Message)
var wg sync.WaitGroup
// Traverse each root of the file tree in parallel.
var fileSizes = make([]chan int64, size)
var n = make([]*sync.WaitGroup, size)
// separate statistic
for i := 0; i < size; i++ {
fileSizes[i] = make(chan int64)
n[i] = new(sync.WaitGroup)
}
for i, root := range roots {
n[i].Add(1)
go walkDir(root, n[i], fileSizes[i])
go func(i int) {
n[i].Wait()
close(fileSizes[i])
}(i)
// gather all filesize message, with extra id info
wg.Add(1)
go func(i int) {
defer wg.Done()
var msg Message
for fs := range fileSizes[i] {
msg.filesize = fs
msg.id = i
message <- msg
}
}(i)
}
// message closer
go func() {
wg.Wait()
close(message)
}()
//!-
// Print the results periodically.
var tick <-chan time.Time
if *vFlag {
tick = time.Tick(500 * time.Millisecond)
}
var nfiles, nbytes []int64
nfiles = make([]int64, size)
nbytes = make([]int64, size)
// totals
var tfiles, tbytes int64
loop:
for {
// separate
select {
case msg, ok := <-message:
if !ok {
break loop
}
nfiles[msg.id]++
nbytes[msg.id] += msg.filesize
tfiles++
tbytes += msg.filesize
case <-tick:
printAllDiskUsage(roots, nfiles, nbytes)
}
}
printDiskUsage(tfiles, tbytes) // final totals
//!+
// ...select loop...
}
//!-
func printAllDiskUsage(file []string, nfiles, nbytes []int64) {
for i := range nfiles {
fmt.Printf("%s: %d files %.1f GB; ", file[i], nfiles[i], float64(nbytes[i])/1e9)
}
fmt.Println()
}
func printDiskUsage(nfiles, nbytes int64) {
fmt.Printf("%d files %.1f GB\n", nfiles, float64(nbytes)/1e9)
}
// walkDir recursively walks the file tree rooted at dir
// and sends the size of each found file on fileSizes.
//!+walkDir
func walkDir(dir string, n *sync.WaitGroup, fileSizes chan<- int64) {
defer n.Done()
for _, entry := range dirents(dir) {
if entry.IsDir() {
n.Add(1)
subdir := filepath.Join(dir, entry.Name())
go walkDir(subdir, n, fileSizes)
} else {
fileSizes <- entry.Size()
}
}
}
//!-walkDir
//!+sema
// sema is a counting semaphore for limiting concurrency in dirents.
var sema = make(chan struct{}, 20)
// dirents returns the entries of directory dir.
func dirents(dir string) []os.FileInfo {
sema <- struct{}{} // acquire token
defer func() { <-sema }() // release token
// ...
//!-sema
entries, err := ioutil.ReadDir(dir)
if err != nil {
fmt.Fprintf(os.Stderr, "du: %v\n", err)
return nil
}
return entries
}
Exercise 8.10: HTTP requests may be cancelled by closing the optionalCancel
channel in thehttp.Request
struct. Modify the web crawler of Section 8.6 to support cancellation.
Hint: thehttp.Get
convenience function does not give you an opportunity to customize aRequest
. Instead, create the request usinghttp.NewRequest
, set itsCancel
field, then perform the request by callinghttp.DefaultClient.Do(req)
.
// Note: http.Request Cancel channel is Deprecated in 1.9.1
// Extract makes an HTTP GET request to the specified URL, parses
// the response as HTML, and returns the links in the HTML document.
func Extract(url string, cancel <-chan struct{}) ([]string, error) {
//construct
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return nil, err
}
// set Cancel channel, though it's deprecated
req.Cancel = cancel
resp, err := http.DefaultClient.Do(req)
....
}
// main.go
//!+
func main() {
worklist := make(chan []string)
cancel := make(chan struct{})
go func() {
os.Stdin.Read(make([]byte, 1))
close(cancel)
}()
...
}
Exercise 8.11: Following the approach ofmirroredQuery
in Section 8.4.4, implement a variant offetch
that requests several URLs concurrently. As soon as the first response arrives, cancel the other requests.
// Copyright © 2017 [email protected]
// Copyright © 2016 Alan A. A. Donovan & Brian W. Kernighan.
// License: https://creativecommons.org/licenses/by-nc-sa/4.0/
// See page 17.
//!+
// Fetchall fetches URLs in parallel and reports their times and sizes.
package main
import (
"fmt"
"io"
"io/ioutil"
"net/http"
"os"
"time"
)
func main() {
start := time.Now()
ch := make(chan string)
cancel := make(chan struct{})
for i, url := range os.Args[1:] {
fmt.Println(i)
go fetch(url, cancel, ch) // start a goroutine
}
fmt.Println(<-ch) // receive from channel ch
close(cancel)
fmt.Printf("%.2fs elapsed\n", time.Since(start).Seconds())
}
func fetch(url string, cancel <-chan struct{}, ch chan<- string) {
start := time.Now()
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return
}
req.Cancel = cancel
resp, err := http.DefaultClient.Do(req)
if err != nil {
ch <- fmt.Sprint(err) // send to channel ch
return
}
nbytes, err := io.Copy(ioutil.Discard, resp.Body)
resp.Body.Close() // don't leak resources
if err != nil {
ch <- fmt.Sprintf("while reading %s: %v", url, err)
return
}
secs := time.Since(start).Seconds()
ch <- fmt.Sprintf("%.2fs %7d %s", secs, nbytes, url)
}
//!-
Exercise 8.12: Make the broadcaster announce the current set of clients to each new arrival. This requires that theclients
set and theentering
andleaving
channels record the client name too.
// Copyright © 2017 [email protected]
// Copyright © 2016 Alan A. A. Donovan & Brian W. Kernighan.
// License: https://creativecommons.org/licenses/by-nc-sa/4.0/
// See page 254.
//!+
// Chat is a server that lets clients chat with each other.
package main
import (
"bufio"
"fmt"
"log"
"net"
)
//!+broadcaster
type client struct {
outgoing chan<- string // an outgoing message channel
name string
}
var (
entering = make(chan client)
leaving = make(chan client)
messages = make(chan string) // all incoming client messages
)
func broadcaster() {
clients := make(map[client]bool) // all connected clients
for {
select {
case msg := <-messages:
// Broadcast incoming message to all
// clients' outgoing message channels.
for cli := range clients {
cli.outgoing <- msg
}
case cli := <-entering:
clients[cli] = true
cli.outgoing <- "Currently online:"
for c := range clients {
// exclude itself
if c != cli {
cli.outgoing <- c.name
}
}
case cli := <-leaving:
delete(clients, cli)
close(cli.outgoing)
}
}
}
//!-broadcaster
//!+handleConn
func handleConn(conn net.Conn) {
var cli client
ch := make(chan string) // outgoing client messages
go clientWriter(conn, ch)
who := conn.RemoteAddr().String()
cli.outgoing = ch
cli.name = who
ch <- "You are " + who
messages <- who + " has arrived"
entering <- cli
input := bufio.NewScanner(conn)
for input.Scan() {
messages <- who + ": " + input.Text()
}
// NOTE: ignoring potential errors from input.Err()
leaving <- cli
messages <- who + " has left"
conn.Close()
}
//...
Exercise 8.13: Make the chat server disconnect idle clients, such as those that have sent no messages in the last five minutes. Hint: callingconn.Close()
in another goroutine unblocks activeRead
calls such as the one done byinput.Scan()
.
func handleConn(conn net.Conn) {
var cli client
ch := make(chan string) // outgoing client messages
go clientWriter(conn, ch)
who := conn.RemoteAddr().String()
cli.outgoing = ch
cli.name = who
ch <- "You are " + who
messages <- who + " has arrived"
entering <- cli
timeout := time.NewTimer(5 * time.Minute)
go func() {
<-timeout.C
conn.Close()
}()
input := bufio.NewScanner(conn)
for input.Scan() {
timeout.Reset(5 * time.Minute)
messages <- who + ": " + input.Text()
}
// NOTE: ignoring potential errors from input.Err()
leaving <- cli
messages <- who + " has left"
conn.Close()
}
Exercise 8.14: Change the chat server’s network protocol so that each client provides its name on entering. Use that name instead of the network address when prefixing each message with its sender’s identity.
// usage:
// -> ./netcat or telnet
// -> GET /?name=guess HTTP/1.1
// -> HOST:
// -> `enter`
// -> same as before
// ...
func main() {
go broadcaster()
http.HandleFunc("/", handler) // each request calls handler
log.Fatal(http.ListenAndServe("localhost:8000", nil))
}
// handler echoes the Path component of the requested URL.
func handler(w http.ResponseWriter, r *http.Request) {
if err := r.ParseForm(); err != nil {
log.Print(err)
}
var who string
// get client name
for k, v := range r.Form {
if k == "name" {
who = v[0]
break
}
}
if who == "" {
who = r.RemoteAddr
}
hi, ok := w.(http.Hijacker)
if !ok {
log.Fatalln("Can't Hijack.")
}
conn, _, err := hi.Hijack()
if err != nil {
log.Fatalln("Hijack error");
}
var cli client
ch := make(chan string) // outgoing client messages
go clientWriter(conn, ch)
cli.outgoing = ch
cli.name = who
ch <- "You are " + who
messages <- who + " has arrived"
entering <- cli
timeout := time.NewTimer(5 * time.Minute)
go func() {
<-timeout.C
conn.Close()
}()
input := bufio.NewScanner(conn)
for input.Scan() {
timeout.Reset(5 * time.Minute)
messages <- who + ": " + input.Text()
}
// NOTE: ignoring potential errors from input.Err()
leaving <- cli
messages <- who + " has left"
conn.Close()
}
// ...
Exercise 8.15: Failure of any client program to read data in a timely manner ultimately causes all clients to get stuck. Modify the broadcaster to skip a message rather than wait if a client writer is not ready to accept it. Alternatively, add buffering to each client’s outgoing message channel so that most messages are not dropped; the broadcaster should use a non-blocking send to this channel.
// buffering outgoing message channel of each clients
// ...
ch := make(chan string, 20) // outgoing client messages
// ...
Notes
- Goroutines vs Thread: The differences between threads and goroutines are essentially quantitative, not qualitative.