Commit 4803ddbf authored by Geoff Simmons's avatar Geoff Simmons

Refactor concurrency in govarnishlog.

parent bcf25f55
...@@ -38,6 +38,7 @@ import ( ...@@ -38,6 +38,7 @@ import (
"os/signal" "os/signal"
"runtime" "runtime"
"runtime/pprof" "runtime/pprof"
"sync"
"syscall" "syscall"
"time" "time"
...@@ -64,6 +65,7 @@ var ( ...@@ -64,6 +65,7 @@ var (
totIdle = time.Duration(0) totIdle = time.Duration(0)
tRdrStart time.Time tRdrStart time.Time
seen = uint64(0) seen = uint64(0)
writes = uint64(0)
eol = uint64(0) eol = uint64(0)
printTxFunc = printTxTerse printTxFunc = printTxTerse
printRecFunc = printRecTerse printRecFunc = printRecTerse
...@@ -87,13 +89,22 @@ func newIdle(seen uint64, seenLast uint64, t time.Time, ...@@ -87,13 +89,22 @@ func newIdle(seen uint64, seenLast uint64, t time.Time,
} }
func txReader(q *log.Query, txChan chan []log.Tx, statusChan chan log.Status, func txReader(q *log.Query, txChan chan []log.Tx, statusChan chan log.Status,
sigChan chan os.Signal) { stopChan chan struct{}, wg *sync.WaitGroup) {
defer wg.Done()
seenLast := uint64(0) seenLast := uint64(0)
idle := maxIdle idle := maxIdle
tRdrStart = time.Now() tRdrStart = time.Now()
tLast := tRdrStart tLast := tRdrStart
for { for {
select {
case <-stopChan:
close(txChan)
return
default:
break
}
txGrp, status := q.NextTxGroup() txGrp, status := q.NextTxGroup()
if status == log.EOL { if status == log.EOL {
eol++ eol++
...@@ -167,6 +178,14 @@ func print(txGrp []log.Tx) { ...@@ -167,6 +178,14 @@ func print(txGrp []log.Tx) {
out.Flush() out.Flush()
} }
func txWriter(txChan chan []log.Tx, wg *sync.WaitGroup) {
defer wg.Done()
for txGrp := range txChan {
print(txGrp)
writes++
}
}
func main() { func main() {
flag.Parse() flag.Parse()
...@@ -203,6 +222,7 @@ func main() { ...@@ -203,6 +222,7 @@ func main() {
statusChan := make(chan log.Status) statusChan := make(chan log.Status)
signalChan := make(chan os.Signal, 1) signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, syscall.SIGTERM, syscall.SIGINT) signal.Notify(signalChan, syscall.SIGTERM, syscall.SIGINT)
stopChan := make(chan struct{})
if *cpuprof != "" { if *cpuprof != "" {
f, err := os.Create(*cpuprof) f, err := os.Create(*cpuprof)
...@@ -217,25 +237,22 @@ func main() { ...@@ -217,25 +237,22 @@ func main() {
defer pprof.StopCPUProfile() defer pprof.StopCPUProfile()
} }
var wg sync.WaitGroup
wg.Add(2)
start := time.Now() start := time.Now()
go txReader(q, txChan, statusChan, signalChan) go txReader(q, txChan, statusChan, stopChan, &wg)
go txWriter(txChan, &wg)
n := uint64(0)
LOOP: select {
for { case status := <-statusChan:
select { fmt.Fprintln(os.Stderr, "status:", status)
case status := <-statusChan: break
fmt.Fprintln(os.Stderr, "status:", status) case sig := <-signalChan:
break LOOP fmt.Fprintln(os.Stderr, "received signal:", sig)
case sig := <-signalChan: stopChan <- struct{}{}
fmt.Fprintln(os.Stderr, "received signal:", sig)
break LOOP
case txGrp := <-txChan:
print(txGrp)
n++
}
} }
wg.Wait()
stop := time.Now() stop := time.Now()
out.Flush() out.Flush()
...@@ -256,7 +273,7 @@ LOOP: ...@@ -256,7 +273,7 @@ LOOP:
fmt.Fprintf(os.Stderr, "%v wall clock time\n", fmt.Fprintf(os.Stderr, "%v wall clock time\n",
stop.Sub(start)) stop.Sub(start))
fmt.Fprintf(os.Stderr, "%d tx grps read\n", seen) fmt.Fprintf(os.Stderr, "%d tx grps read\n", seen)
fmt.Fprintf(os.Stderr, "%d tx grps written\n", n) fmt.Fprintf(os.Stderr, "%d tx grps written\n", writes)
fmt.Fprintf(os.Stderr, "%d r/w channel high watermark\n", fmt.Fprintf(os.Stderr, "%d r/w channel high watermark\n",
chanHi) chanHi)
fmt.Fprintf(os.Stderr, "%d times reader at eol\n", eol) fmt.Fprintf(os.Stderr, "%d times reader at eol\n", eol)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment