Commit 93d65c49 authored by Geoff Simmons's avatar Geoff Simmons

Refactor Log.Read().

No longer starting a goroutine -- users decide if they want one.

Native reads are locked per Log object, since they are not safe for
concurrency with the same native handles.

The global callback map is keyed with an atomically incremented uint64.
parent 20c6af8a
......@@ -439,18 +439,20 @@ func TestE2EDefaultRead(t *testing.T) {
case More:
txChan <- txGrp
return true
case Stopped:
return false
default:
statusChan <- rdstatus
return false
}
}
err := l.Read(rdHndlr, nil)
if err != nil {
t.Fatal("Read(): " + err.Error())
return
}
readFailed := false
go func() {
err := l.Read(rdHndlr, nil)
if err != nil {
t.Fatal("Read(): " + err.Error())
readFailed = true
return
}
}()
client := &http.Client{}
req, err := http.NewRequest("GET", url("/uncacheable"), nil)
......@@ -479,6 +481,10 @@ TX:
}
}
if readFailed {
return
}
checkTxGrps(t, txGrps, expDefaultRead)
for _, txGrp := range txGrps {
tx := txGrp[0]
......
......@@ -33,11 +33,12 @@ static int
dispatch_wrapped(struct VSL_data *vsl, struct VSL_transaction * const trans[],
void *priv)
{
return (publish((void *)trans, priv));
(void) vsl;
return (publish((void *)trans, *((uint64_t *)priv)));
}
int
dispatch(struct VSLQ *vslq, int n)
dispatch(struct VSLQ *vslq, uint64_t key)
{
return VSLQ_Dispatch(vslq, dispatch_wrapped, (void *)&n);
return VSLQ_Dispatch(vslq, dispatch_wrapped, (void *)&key);
}
......@@ -44,6 +44,7 @@ import (
"os"
"regexp"
"sync"
"sync/atomic"
"time"
"unsafe"
)
......@@ -232,17 +233,12 @@ func initTags() []string {
return tags
}
type rdData struct {
hndler ReadHandler
cancel chan bool
wg *sync.WaitGroup
}
var (
tags = initTags()
nonprint = regexp.MustCompile("[^[:print:]]")
cbMap = make(map[int]*rdData)
cbLock = new(sync.RWMutex)
mapCtr = uint64(0)
cbMap = make(map[uint64]ReadHandler)
cbLock sync.RWMutex
)
// A Tag is a classifier for the contents of a Record's payload,
......@@ -348,7 +344,7 @@ type Log struct {
cursor *C.struct_VSL_cursor
grp C.enum_VSL_grouping_e
query *C.char
cbIdx int
rdLock sync.Mutex
}
// New returns a new and initialized instance of Log. Log instances
......@@ -368,7 +364,6 @@ func New() *Log {
log.vsmopts = C.vsl_opt_batch() | C.vsl_opt_tail()
log.grp = C.VSL_g_vxid
log.query = nil
log.cbIdx = -1
return log
}
......@@ -382,10 +377,9 @@ func (log *Log) checkNil() error {
return nil
}
// Release stops any goroutine started by the Read method, and frees
// native resources associated with this client. You should always
// call Release when you're done using a Log client, otherwise there
// is risk of resource leakage.
// Release frees native resources associated with this client. You
// should always call Release when you're done using a Log client,
// otherwise there is risk of resource leakage.
//
// It is wise to make use of Go's defer mechanism:
//
......@@ -397,20 +391,6 @@ func (log *Log) Release() {
if log == nil || log.vsl == nil {
return
}
// Stop any running read goroutine, and delete the cbMap entry.
cbLock.Lock()
if log.cbIdx != -1 {
rddata, ok := cbMap[log.cbIdx]
if !ok {
panic("read goroutine data not found")
}
if rddata.cancel != nil {
rddata.cancel <- true
}
rddata.wg.Wait()
delete(cbMap, log.cbIdx)
}
cbLock.Unlock()
if log.cursor != nil {
C.VSL_DeleteCursor(log.cursor)
}
......@@ -510,21 +490,20 @@ func quote(s string) string {
}
//export publish
func publish(trans unsafe.Pointer, priv unsafe.Pointer) C.int {
func publish(trans unsafe.Pointer, key uint64) C.int {
txn := (*[1 << 30]*C.struct_VSL_transaction)(unsafe.Pointer(trans))
if txn[0] == nil {
return 0
}
var gtxn []Tx
n := int(*(*C.int)(priv))
cbLock.RLock()
rddata, ok := cbMap[n]
cb, ok := cbMap[key]
cbLock.RUnlock()
if !ok {
panic("unmapped read data")
panic("unmapped read handler")
}
cb := rddata.hndler
var gtxn []Tx
status := C.int(0)
for i := 0; txn[i] != nil; i++ {
var tx Tx
......@@ -645,8 +624,7 @@ func (status Status) Error() string {
}
// ReadHandler is the type of a callback function that is passed to
// the Read method, and is invoked by a goroutine while the log is
// being read.
// the Read method, and is invoked while the log is being read.
//
// When the log read has aggregated a group of transactions, they are
// passed to the handler as a slice of Tx objects. The second
......@@ -660,30 +638,23 @@ func (status Status) Error() string {
// The handler returns true or false to signal whether the log read
// should continue. If the handler returns false, it is invoked one
// more time with the Status set to Stopped, and nil for the
// transaction slice, and the goroutine initiated by the Read method
// is stopped.
//
// If the read goroutine is still running when Log.Release() is
// called, then the handler is invoked with status Stopped and nil for
// the transaction slice.
// transaction slice, and the Read method finishes.
//
// All of the data passed into the paramters of the handler (the slice
// of Tx and the Status) were allocated as local variables during the
// execution of the goroutine initiated by Read. They go out of scope
// (and hence are eligible for garbage collection) when handler
// finishes execution.
// execution of Read. They go out of scope (and hence are eligible for
// garbage collection) when the handler finishes execution.
type ReadHandler func([]Tx, Status) bool
// EOLHandler is the type of a function that is passed to the Read
// method. It is executed when the end-of-log condition is encountered
// while the log of a live Varnish instance is being read during the
// goroutine initiated by Read. This means that the end of the log
// buffer has been reached, and there are currently no new
// transactions.
// execution of Read. This means that the end of the log buffer has
// been reached, and there are currently no new transactions.
//
// If the handler returns true, then log reads are continued in the
// same goroutine. If it returns false, then log reads are ended, and
// the goroutine finishes.
// Read invocation. If it returns false, then log reads are ended, and
// Read finishes.
//
// If nil is passed to Read as the handler, then a default EOLHandler
// is executed that sleeps for 10 milliseconds (as is done by default
......@@ -707,9 +678,8 @@ func defaultEOLHndlr() bool {
return true
}
// Read starts a goroutine that reads from the currently attached log
// source, and invokes rdHndler with data from the log, as documented
// for ReadHandler.
// Read reads from the currently attached log source, and invokes
// rdHndler with data from the log, as documented for ReadHandler.
//
// You can also pass in an EOLHandler that is executed when the log
// reaches the EOL condition reading from the log of a live Varnish
......@@ -717,21 +687,23 @@ func defaultEOLHndlr() bool {
// default EOLHandler is used. The EOLHandler is never executed for
// reads from a binary log file.
//
// Note that the native read executed during Read is locked per Log
// object, because it is not safe for concurrency with the same
// internal handles. If Read is continuously reading from a live
// Varnish log, it will be locked the entire time. For concurrent
// Reads, initialize different Log instances that are attached
// separately to the log sources.
//
// Returns an error if rdHndlr is nil, if the Log object is not
// attached to a log source, or if the native library could not
// initialize the read.
//
// Example:
//
// // set up a channel to block waiting for end or error conditions
// statusChan := make(chan log.Status)
//
// // this callback will be passed to the Read method
// rdHndlr := func(txGrp []log.Tx, rdstatus log.Status) bool {
// // if the status is not log.More, send it on the status
// // channel and discontinue reads
// // discontinue reads if the status is not log.More
// if rdstatus != log.More {
// statusChan <- rdstatus
// return false
// }
// for i, tx := range txGrp {
......@@ -747,59 +719,45 @@ func defaultEOLHndlr() bool {
// if err != nil {
// // error handling ...
// }
//
// // block waiting for the signal from the status channel
// status := <- statusChan
func (log *Log) Read(rdHndlr ReadHandler, eolHndlr EOLHandler) error {
if err := log.checkNil(); err != nil {
return err
}
if log.cbIdx != -1 {
return errors.New("read already running for this Log instance")
}
if log.vsm == nil && log.cursor == nil {
return errors.New("not attached to a log source")
}
if rdHndlr == nil {
return errors.New("callback is nil")
}
if eolHndlr == nil {
eolHndlr = defaultEOLHndlr
}
vslq := C.VSLQ_New(log.vsl, &log.cursor, log.grp, log.query)
if vslq == nil {
return log
}
if eolHndlr == nil {
eolHndlr = defaultEOLHndlr
}
defer C.VSLQ_Delete(&vslq)
var wg sync.WaitGroup
var rddata = rdData{ rdHndlr, make(chan bool), &wg }
key := atomic.AddUint64(&mapCtr, 1)
cbLock.Lock()
n := len(cbMap)
cbMap[n] = &rddata
log.cbIdx = n
cbMap[key] = rdHndlr
cbLock.Unlock()
go func(vslq *C.struct_VSLQ, n int, rddata *rdData, eol EOLHandler) {
var status C.int
rddata.wg.Add(1)
defer rddata.wg.Done()
cancelChan := rddata.cancel
defer close(cancelChan)
for status = C.vsl_more; status == C.vsl_more; {
select {
case <- cancelChan:
status = C.int(Stopped)
break
default:
status = C.dispatch(vslq, C.int(n))
if status == C.vsl_end && eol() {
status = C.vsl_more
}
continue
}
status := C.int(C.vsl_more)
log.rdLock.Lock()
for status == C.vsl_more {
status = C.dispatch(vslq, C.uint64_t(key))
if status == C.vsl_end && eolHndlr() {
status = C.vsl_more
}
rddata.cancel = nil
rdHndlr(nil, Status(status))
C.VSLQ_Delete(&vslq)
}(vslq, n, &rddata, eolHndlr)
}
rdHndlr(nil, Status(status))
log.rdLock.Unlock()
cbLock.Lock()
delete(cbMap, key)
cbLock.Unlock()
return nil
}
......@@ -32,7 +32,7 @@
#include <stdio.h>
#include <vapi/vsl.h>
int dispatch(struct VSLQ *vslq, int n);
int dispatch(struct VSLQ *vslq, uint64_t key);
// Necessary because the field name is a go keyword.
static inline enum VSL_transaction_e
......
......@@ -176,7 +176,8 @@ func TestStatusString(t *testing.T) {
// Just make sure it doesn't panic, don't want tests to depend
// on specific values of the strings.
stati := [...]Status{
WriteErr, IOErr, Overrun, Abandoned, EOF, EOL, More, Stopped }
WriteErr, IOErr, Overrun, Abandoned, EOF, EOL, More, Stopped,
}
for _, v := range stati {
_ = v.String()
_ = v.Error()
......@@ -360,10 +361,10 @@ func TestDefaultRead(t *testing.T) {
t.Error("expected l.Read() with nil ReadHandler to fail")
}
statusChan := make(chan Status)
var status Status
rdHndlr := func(txGrp []Tx, rdstatus Status) bool {
if rdstatus != More {
statusChan <- rdstatus
status = rdstatus
return false
}
txGrps = append(txGrps, txGrp)
......@@ -379,7 +380,6 @@ func TestDefaultRead(t *testing.T) {
t.Error("expected l.Read() with read already running to fail")
}
status := <-statusChan
if status != Status(EOF) {
t.Errorf("expected EOF status got: %s", status.Error())
}
......@@ -387,9 +387,9 @@ func TestDefaultRead(t *testing.T) {
checkTxGroups(t, txGrps, expVxidLog)
}
func readHndl(txGrp []Tx, status Status, txGrps *[][]Tx, ch chan Status) bool {
func readHndl(txGrp []Tx, status Status, txGrps *[][]Tx, end *Status) bool {
if status != More {
ch <- status
*end = status
return false
}
*txGrps = append(*txGrps, txGrp)
......@@ -412,13 +412,13 @@ func TestConcurrentRead(t *testing.T) {
t.Fatal("l2 attach to " + testFile + ": " + err.Error())
}
chan1 := make(chan Status)
chan2 := make(chan Status)
var endStatus1 Status
var endStatus2 Status
hndlr1 := func(txGrp []Tx, rdstatus Status) bool {
return readHndl(txGrp, rdstatus, &txGrps1, chan1)
return readHndl(txGrp, rdstatus, &txGrps1, &endStatus1)
}
hndlr2 := func(txGrp []Tx, rdstatus Status) bool {
return readHndl(txGrp, rdstatus, &txGrps2, chan2)
return readHndl(txGrp, rdstatus, &txGrps2, &endStatus2)
}
err1 := l1.Read(hndlr1, nil)
......@@ -430,13 +430,11 @@ func TestConcurrentRead(t *testing.T) {
t.Fatal("l2.Read(): " + err.Error())
}
status := <-chan1
if status != Status(EOF) {
t.Errorf("expected EOF status got: %s", status.Error())
if endStatus1 != Status(EOF) {
t.Errorf("expected EOF status got: %s", endStatus1.Error())
}
status = <-chan2
if status != Status(EOF) {
t.Errorf("expected EOF status got: %s", status.Error())
if endStatus2 != Status(EOF) {
t.Errorf("expected EOF status got: %s", endStatus2.Error())
}
checkTxGroups(t, txGrps1, expVxidLog)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment