Chapter 8 Goroutines and Channels

Exercise 8.1: Modifyclock2to accept a port number, and write a program,clockwall, that acts as a client of several clock servers at once, reading the times from each one and displaying the results in a table, akin to the wall of clocks seen in some business offices. If you have access to geographically distributed computers, run instances remotely; otherwise run local instances on different ports with fake time zones.

 $ TZ=US/Eastern    ./clock2 -port 8010 &
 $ TZ=Europe/London ./clock2 -port 8030 &
 $ TZ=Asia/Tokyo    ./clock2 -port 8020 &
 $ clockwall NewYork=localhost:8010 London=localhost:8020 Tokyo=localhost:8030
// clock
...
var port = flag.String("port", "8000", "port number")
...

func main() {
    flag.Parse()
    listener, err := net.Listen("tcp", "localhost:" + *port)
    if err != nil {
        log.Fatal(err)
    }
    //!+
    for {
        conn, err := listener.Accept()
        if err != nil {
            log.Print(err) // e.g., connection aborted
            continue
        }
        go handleConn(conn) // handle connections concurrently
    }
    //!-
}

// clockwall
// Copyright © 2017 [email protected]
// Copyright © 2016 Alan A. A. Donovan & Brian W. Kernighan.
// License: https://creativecommons.org/licenses/by-nc-sa/4.0/

package main

import (
    "net"
    "log"
    "io"
    "os"
    "strings"
)

func mustCopy(w io.Writer, r io.Reader) {
    if _, err := io.Copy(w, r); err != nil {
        log.Fatal(err)
    }
}

func getTime(addr string) {
    conn, err := net.Dial("tcp", addr)
    if err != nil {
        log.Fatal(err)
    }
    mustCopy(os.Stdout, conn)
}

// parameter: NewYork=localhost:8010 London=localhost:8020 Tokyo=localhost:8030
func main() {
    for _, locate := range os.Args[2:] {
        s := strings.Split(locate, "=")
        go getTime(s[1])
    }

    // main goroutine
    s := strings.Split(os.Args[1], "=")
    getTime(s[1])
}

Exercise 8.2: Implement a concurrent File Transfer Protocol (FTP) server. The server should interpret commands from each client such ascdto change directory,lsto list a directory,getto send the contents of a file, andcloseto close the connection. You can use the standardftpcommand as the client, or write your own.

// simple concurrent ftp server
// Copyright © 2017 [email protected]
// License: https://creativecommons.org/licenses/by-nc-sa/4.0/

// simple FTP server
package main

import (
    "net"
    "log"
    "bufio"
    "io"
    "os/exec"
    "strings"
    "os"
)

func mustCopy(w io.Writer, r io.Reader) {
    if _, err := io.Copy(w, r); err != nil {
        log.Fatal(err)
    }
}

func exeCmd(w io.Writer, e string, args ...string) {
    cmd := exec.Command(e, args...)
    cmd.Stdout = w
    if err := cmd.Run(); err != nil {
        log.Print(err)
    }
}

func handleConn(c net.Conn) {
    defer c.Close()
    input := bufio.NewScanner(c)

    for input.Scan() {
        cmds := strings.Split(input.Text(), " ")
        switch cmds[0] {
        case "ls":
            exeCmd(c, cmds[0], cmds[1:]...)
        case "cd":
            if err := os.Chdir(cmds[1]); err != nil {
                log.Print(err)
            }
        case "get":
            file, err := os.Open(cmds[1])
            if err != nil {
                log.Printf("file %s: %v", cmds[1], err)
                continue
            }
            mustCopy(c, file)
        case "close":
            return
        default:
            help := "ls: list content\ncd: change directory\nget: get file content\n" +
                "close: close connection\n"
            mustCopy(c, strings.NewReader(help))
        }
    }
}

func main() {
    listener, err := net.Listen("tcp", "localhost:8000")
    if err != nil {
        log.Fatal(err)
    }

    for {
        conn, err := listener.Accept()
        if err != nil {
            log.Print(err) // e.g., connection aborted
            continue
        }
        go handleConn(conn)
    }
}


// simple client
package main

import (
    "net"
    "log"
    "io"
    "os"
)

func mustCopy(w io.Writer, r io.Reader) {
    if _, err := io.Copy(w, r); err != nil {
        log.Fatal(err)
    }
}

func main() {
    conn, err := net.Dial("tcp", "localhost:8000")
    if err != nil {
        log.Fatal(err)
    }
    go mustCopy(os.Stdout, conn)
    mustCopy(conn, os.Stdin)
}

Exercise 8.4: Modify thereverb2server to use async.WaitGroupper connection to count the number of activeechogoroutines. When it falls to zero, close the write half of the TCP connection as described in Exercise 8.3. Verify that your modifiednetcat3client from that exercise waits for the final echoes of multiple concurrent shouts, even after the standard input has been closed.

func handleConn(c net.Conn) {
    input := bufio.NewScanner(c)
    var wg sync.WaitGroup

    for input.Scan() {
        wg.Add(1)
        go func() {
            defer wg.Done()
            echo(c, input.Text(), 1*time.Second)
        }()
    }

    // NOTE: ignoring potential errors from input.Err()
    wg.Wait()
    if con, ok := c.(*net.TCPConn); ok {
        con.CloseWrite()
    } else {
        c.Close()
    }
}

Exercise 8.5: Take an existing CPU-bound sequential program, such as the Mandelbrot program of Section 3.3 or the 3-D surface computation of Section 3.2, and execute its main loop in parallel using channels for communication. How much faster does it run on a multiprocessor machine? What is the optimal number of goroutines to use?

// 400ms / 80ms = 5 times faster, about 128 goroutines can achieve 80ms

// Copyright © 2017 [email protected]
// Copyright © 2016 Alan A. A. Donovan & Brian W. Kernighan.
// License: https://creativecommons.org/licenses/by-nc-sa/4.0/

// See page 61.
//!+

// Mandelbrot emits a PNG image of the Mandelbrot fractal.
package main

import (
    "image"
    "image/color"
    "math/cmplx"
    "sync"
    "time"
    "fmt"
)

// N represent the number of goroutines
const N = 128

func main() {
    const (
        xmin, ymin, xmax, ymax = -2, -2, +2, +2
        width, height          = 1024, 1024
    )

    img := image.NewRGBA(image.Rect(0, 0, width, height))
    start := time.Now()
    // partition jobs
    size := height / N
    var wg sync.WaitGroup
    var done = make(chan struct{})

    for i := 0; i < N; i++ {
        wg.Add(1)
        go func(i int) {
            defer wg.Done()
            for py := size * i; py < size*(i+1); py++ {
                y := float64(py)/height*(ymax-ymin) + ymin
                for px := 0; px < width; px++ {
                    x := float64(px)/width*(xmax-xmin) + xmin
                    z := complex(x, y)

                    // Image point (px, py) represents complex value z.
                    img.Set(px, py, mandelbrot(z))
                }
            }
            done <- struct{}{}
        }(i)
    }

    // closer
    go func() {
        wg.Wait()
        close(done)
    }()

    // drain done channel
    for range done {
    }

    // skip output
    //png.Encode(os.Stdout, img) // NOTE: ignoring errors
    fmt.Printf("elapsed %fms\n", time.Since(start).Seconds()*1000)
}

func mandelbrot(z complex128) color.Color {
    const iterations = 200
    const contrast = 15

    var v complex128
    for n := uint8(0); n < iterations; n++ {
        v = v*v + z
        if cmplx.Abs(v) > 2 {
            return color.Gray{255 - contrast*n}
        }
    }
    return color.Black
}

Exercise 8.6: Add depth-limiting to the concurrent crawler. That is, if the user sets-depth=3, then only URLs reachable by at most three links will be fetched.

// Copyright © 2017 [email protected]
// Copyright © 2016 Alan A. A. Donovan & Brian W. Kernighan.
// License: https://creativecommons.org/licenses/by-nc-sa/4.0/

// See page 243.

// Crawl3 crawls web links starting with the command-line arguments.
//
// This version uses bounded parallelism.
// For simplicity, it does not address the termination problem.
//
package main

import (
    "flag"
    "fmt"
    "log"
    "os"

    "gopl.io/ch5/links"
    "strings"
)

var depth = flag.Int("depth", 1,
    "Only URLs reachable by at most `depth` links will be fetched")

func crawl(url string) []string {
    fmt.Println(url)
    list, err := links.Extract(url)
    if err != nil {
        log.Print(err)
    }
    return list
}

//!+
func main() {
    flag.Parse()
    worklist := make(chan []string)  // lists of URLs, may have duplicates
    unseenLinks := make(chan string) // de-duplicated URLs
    var n, d int                     // number of pending sends to worklist

    n++
    // record items per depth
    counter := make([]int, *depth+2)
    counter[d] = n

    // Add command-line arguments to worklist.
    // skip depth parameter
    go func() {
        worklist <- flag.Args()
    }()

    // Create 20 crawler goroutines to fetch each unseen link.
    for i := 0; i < 20; i++ {
        go func() {
            for link := range unseenLinks {
                foundLinks := crawl(link)
                go func() { worklist <- foundLinks }()
            }
        }()
    }

    // The main goroutine de-duplicates worklist items
    // and sends the unseen ones to the crawlers.
    seen := make(map[string]bool)
    for ; n > 0; n-- {
        list := <-worklist

        // drain worklist then close unseenLinks
        if d > *depth {
            continue
        }
        for _, link := range list {
            if !seen[link] {
                n++ // counter++
                counter[d+1]++

                seen[link] = true
                unseenLinks <- link
            }
        }
        if counter[d]--; counter[d] == 0 {
            d++
        }
    }
    close(unseenLinks)
}

//!-

Exercise 8.7: Write a concurrent program that creates a local mirror of a web site, fetching each reachable page and writing it to a directory on the local disk. Only pages within the original domain (for instance,golang.org) should be fetched. URLs within mirrored pages should be altered as needed so that they refer to the mirrored page, not the original.

// skip for now

Exercise 8.8: Using a select statement, add a timeout to the echo server from Section 8.3 so that it disconnects any client that shouts nothing within 10 seconds.

func handleConn(c net.Conn) {
    input := bufio.NewScanner(c)
    timeout := time.NewTimer(10 * time.Second)
    text := make(chan string)

    var wg sync.WaitGroup
    go func() {
        for input.Scan() {
            text <- input.Text()
        }
        close(text)
    }()

    for {
        select {
        case t, ok := <-text:
            if ok {
                wg.Add(1)
                timeout.Reset(10 * time.Second)
                go func() {
                    defer wg.Done()
                    echo(c, t, 1*time.Second)
                }()
            } else {
                wg.Wait()
                c.Close()
                return
            }
        case <-timeout.C:
            timeout.Stop()
            c.Close()
            fmt.Println("disconnect silent client")
            return
        }
    }
}

Exercise 8.9: Write a version ofduthat computes and periodically displays separate totals for each of therootdirectories.

// Copyright © 2017 [email protected]
// Copyright © 2016 Alan A. A. Donovan & Brian W. Kernighan.
// License: https://creativecommons.org/licenses/by-nc-sa/4.0/

// See page 250.

// The du3 command computes the disk usage of the files in a directory.
package main

// The du3 variant traverses all directories in parallel.
// It uses a concurrency-limiting counting semaphore
// to avoid opening too many files at once.

import (
    "flag"
    "fmt"
    "io/ioutil"
    "os"
    "path/filepath"
    "sync"
    "time"
)

var vFlag = flag.Bool("v", false, "show verbose progress messages")

// ref: https://stackoverflow.com/questions/19992334/how-to-listen-to-n-channels-dynamic-select-statement
// aggregate chan array to one share chan

// aggregated message
type Message struct {
    filesize int64
    id       int
}

//!+
func main() {
    // ...determine roots...

    //!-
    flag.Parse()

    // Determine the initial directories.
    roots := flag.Args()
    size := len(roots)
    if size == 0 {
        roots = []string{"."}
        size = 1
    }

    //!+
    var message = make(chan Message)
    var wg sync.WaitGroup

    // Traverse each root of the file tree in parallel.
    var fileSizes = make([]chan int64, size)

    var n = make([]*sync.WaitGroup, size)
    // separate statistic
    for i := 0; i < size; i++ {
        fileSizes[i] = make(chan int64)
        n[i] = new(sync.WaitGroup)
    }

    for i, root := range roots {
        n[i].Add(1)
        go walkDir(root, n[i], fileSizes[i])

        go func(i int) {
            n[i].Wait()
            close(fileSizes[i])
        }(i)

        // gather all filesize message, with extra id info
        wg.Add(1)
        go func(i int) {
            defer wg.Done()
            var msg Message
            for fs := range fileSizes[i] {
                msg.filesize = fs
                msg.id = i
                message <- msg
            }
        }(i)
    }

    // message closer
    go func() {
        wg.Wait()
        close(message)
    }()

    //!-

    // Print the results periodically.
    var tick <-chan time.Time
    if *vFlag {
        tick = time.Tick(500 * time.Millisecond)
    }

    var nfiles, nbytes []int64
    nfiles = make([]int64, size)
    nbytes = make([]int64, size)

    // totals
    var tfiles, tbytes int64

loop:
    for {
        // separate
        select {
        case msg, ok := <-message:
            if !ok {
                break loop
            }
            nfiles[msg.id]++
            nbytes[msg.id] += msg.filesize

            tfiles++
            tbytes += msg.filesize
        case <-tick:
            printAllDiskUsage(roots, nfiles, nbytes)
        }
    }

    printDiskUsage(tfiles, tbytes) // final totals
    //!+
    // ...select loop...
}

//!-

func printAllDiskUsage(file []string, nfiles, nbytes []int64) {
    for i := range nfiles {
        fmt.Printf("%s: %d files  %.1f GB; ", file[i], nfiles[i], float64(nbytes[i])/1e9)
    }
    fmt.Println()
}

func printDiskUsage(nfiles, nbytes int64) {
    fmt.Printf("%d files  %.1f GB\n", nfiles, float64(nbytes)/1e9)
}

// walkDir recursively walks the file tree rooted at dir
// and sends the size of each found file on fileSizes.
//!+walkDir
func walkDir(dir string, n *sync.WaitGroup, fileSizes chan<- int64) {
    defer n.Done()
    for _, entry := range dirents(dir) {
        if entry.IsDir() {
            n.Add(1)
            subdir := filepath.Join(dir, entry.Name())
            go walkDir(subdir, n, fileSizes)
        } else {
            fileSizes <- entry.Size()
        }
    }
}

//!-walkDir

//!+sema
// sema is a counting semaphore for limiting concurrency in dirents.
var sema = make(chan struct{}, 20)

// dirents returns the entries of directory dir.
func dirents(dir string) []os.FileInfo {
    sema <- struct{}{}        // acquire token
    defer func() { <-sema }() // release token
    // ...
    //!-sema

    entries, err := ioutil.ReadDir(dir)
    if err != nil {
        fmt.Fprintf(os.Stderr, "du: %v\n", err)
        return nil
    }
    return entries
}

Exercise 8.10: HTTP requests may be cancelled by closing the optionalCancelchannel in thehttp.Requeststruct. Modify the web crawler of Section 8.6 to support cancellation.

Hint: thehttp.Getconvenience function does not give you an opportunity to customize aRequest. Instead, create the request usinghttp.NewRequest, set itsCancelfield, then perform the request by callinghttp.DefaultClient.Do(req).

// Note: http.Request Cancel channel is Deprecated in 1.9.1

// Extract makes an HTTP GET request to the specified URL, parses
// the response as HTML, and returns the links in the HTML document.
func Extract(url string, cancel <-chan struct{}) ([]string, error) {
    //construct
    req, err := http.NewRequest("GET", url, nil)
    if err != nil {
        return nil, err
    }
    // set Cancel channel, though it's deprecated
    req.Cancel = cancel

    resp, err := http.DefaultClient.Do(req)
    ....
}

// main.go
//!+
func main() {
    worklist := make(chan []string)
    cancel := make(chan struct{})

    go func() {
        os.Stdin.Read(make([]byte, 1))
        close(cancel)
    }()

    ...
}

Exercise 8.11: Following the approach ofmirroredQueryin Section 8.4.4, implement a variant offetchthat requests several URLs concurrently. As soon as the first response arrives, cancel the other requests.

// Copyright © 2017 [email protected]
// Copyright © 2016 Alan A. A. Donovan & Brian W. Kernighan.
// License: https://creativecommons.org/licenses/by-nc-sa/4.0/

// See page 17.
//!+

// Fetchall fetches URLs in parallel and reports their times and sizes.
package main

import (
    "fmt"
    "io"
    "io/ioutil"
    "net/http"
    "os"
    "time"
)

func main() {
    start := time.Now()
    ch := make(chan string)
    cancel := make(chan struct{})

    for i, url := range os.Args[1:] {
        fmt.Println(i)
        go fetch(url, cancel, ch) // start a goroutine
    }

    fmt.Println(<-ch) // receive from channel ch
    close(cancel)

    fmt.Printf("%.2fs elapsed\n", time.Since(start).Seconds())
}

func fetch(url string, cancel <-chan struct{}, ch chan<- string) {
    start := time.Now()
    req, err := http.NewRequest("GET", url, nil)
    if err != nil {
        return
    }
    req.Cancel = cancel
    resp, err := http.DefaultClient.Do(req)
    if err != nil {
        ch <- fmt.Sprint(err) // send to channel ch
        return
    }

    nbytes, err := io.Copy(ioutil.Discard, resp.Body)
    resp.Body.Close() // don't leak resources
    if err != nil {
        ch <- fmt.Sprintf("while reading %s: %v", url, err)
        return
    }
    secs := time.Since(start).Seconds()
    ch <- fmt.Sprintf("%.2fs  %7d  %s", secs, nbytes, url)
}

//!-

Exercise 8.12: Make the broadcaster announce the current set of clients to each new arrival. This requires that theclientsset and theenteringandleavingchannels record the client name too.

// Copyright © 2017 [email protected]
// Copyright © 2016 Alan A. A. Donovan & Brian W. Kernighan.
// License: https://creativecommons.org/licenses/by-nc-sa/4.0/

// See page 254.
//!+

// Chat is a server that lets clients chat with each other.
package main

import (
    "bufio"
    "fmt"
    "log"
    "net"
)

//!+broadcaster
type client struct {
    outgoing chan<- string // an outgoing message channel
    name     string
}

var (
    entering = make(chan client)
    leaving  = make(chan client)
    messages = make(chan string) // all incoming client messages
)

func broadcaster() {
    clients := make(map[client]bool) // all connected clients
    for {
        select {
        case msg := <-messages:
            // Broadcast incoming message to all
            // clients' outgoing message channels.
            for cli := range clients {
                cli.outgoing <- msg
            }

        case cli := <-entering:
            clients[cli] = true

            cli.outgoing <- "Currently online:"
            for c := range clients {
                // exclude itself
                if c != cli {
                    cli.outgoing <- c.name
                }
            }

        case cli := <-leaving:
            delete(clients, cli)
            close(cli.outgoing)
        }
    }
}

//!-broadcaster

//!+handleConn
func handleConn(conn net.Conn) {
    var cli client
    ch := make(chan string) // outgoing client messages
    go clientWriter(conn, ch)

    who := conn.RemoteAddr().String()

    cli.outgoing = ch
    cli.name = who

    ch <- "You are " + who
    messages <- who + " has arrived"
    entering <- cli

    input := bufio.NewScanner(conn)
    for input.Scan() {
        messages <- who + ": " + input.Text()
    }
    // NOTE: ignoring potential errors from input.Err()

    leaving <- cli
    messages <- who + " has left"
    conn.Close()
}

//...

Exercise 8.13: Make the chat server disconnect idle clients, such as those that have sent no messages in the last five minutes. Hint: callingconn.Close()in another goroutine unblocks activeReadcalls such as the one done byinput.Scan().

func handleConn(conn net.Conn) {
    var cli client
    ch := make(chan string) // outgoing client messages
    go clientWriter(conn, ch)

    who := conn.RemoteAddr().String()

    cli.outgoing = ch
    cli.name = who

    ch <- "You are " + who
    messages <- who + " has arrived"
    entering <- cli

    timeout := time.NewTimer(5 * time.Minute)
    go func() {
        <-timeout.C
        conn.Close()
    }()

    input := bufio.NewScanner(conn)
    for input.Scan() {
        timeout.Reset(5 * time.Minute)
        messages <- who + ": " + input.Text()
    }
    // NOTE: ignoring potential errors from input.Err()

    leaving <- cli
    messages <- who + " has left"
    conn.Close()
}

Exercise 8.14: Change the chat server’s network protocol so that each client provides its name on entering. Use that name instead of the network address when prefixing each message with its sender’s identity.

// usage:
// -> ./netcat or telnet
// -> GET /?name=guess HTTP/1.1
// -> HOST:
// -> `enter`
// -> same as before
// ...

func main() {
    go broadcaster()
    http.HandleFunc("/", handler) // each request calls handler
    log.Fatal(http.ListenAndServe("localhost:8000", nil))
}

// handler echoes the Path component of the requested URL.
func handler(w http.ResponseWriter, r *http.Request) {
    if err := r.ParseForm(); err != nil {
        log.Print(err)
    }
    var who string

    // get client name
    for k, v := range r.Form {
        if k == "name" {
            who = v[0]
            break
        }
    }
    if who == "" {
        who = r.RemoteAddr
    }

    hi, ok := w.(http.Hijacker)
    if !ok {
        log.Fatalln("Can't Hijack.")
    }
    conn, _, err := hi.Hijack()
    if err != nil {
        log.Fatalln("Hijack error");
    }

    var cli client
    ch := make(chan string) // outgoing client messages
    go clientWriter(conn, ch)

    cli.outgoing = ch
    cli.name = who

    ch <- "You are " + who
    messages <- who + " has arrived"
    entering <- cli

    timeout := time.NewTimer(5 * time.Minute)
    go func() {
        <-timeout.C
        conn.Close()
    }()

    input := bufio.NewScanner(conn)
    for input.Scan() {
        timeout.Reset(5 * time.Minute)
        messages <- who + ": " + input.Text()
    }
    // NOTE: ignoring potential errors from input.Err()

    leaving <- cli
    messages <- who + " has left"
    conn.Close()
}
// ...

Exercise 8.15: Failure of any client program to read data in a timely manner ultimately causes all clients to get stuck. Modify the broadcaster to skip a message rather than wait if a client writer is not ready to accept it. Alternatively, add buffering to each client’s outgoing message channel so that most messages are not dropped; the broadcaster should use a non-blocking send to this channel.

// buffering outgoing message channel of each clients
// ...
ch := make(chan string, 20) // outgoing client messages
// ...

Notes

  • Goroutines vs Thread: The differences between threads and goroutines are essentially quantitative, not qualitative.

results matching ""

    No results matching ""