Commit d8832378 authored by Geoff Simmons's avatar Geoff Simmons

First implementation of transaction grouping.

Setting tx levels within a group is not implemented yet. The code
lacks error checking, and is presently rather dreadful. But tests
pass.
parent 38335b90
......@@ -30,6 +30,7 @@ package log
import (
"bufio"
"bytes"
"errors"
"fmt"
"net/http"
......@@ -43,6 +44,8 @@ import (
const (
testFile = "testdata/bin.log"
vxidLog = "testdata/vxid.log"
reqLog = "testdata/request.log"
sessLog = "testdata/session.log"
rawLog = "testdata/raw.log"
failPath = "ifAFileWithThisNameReallyExistsThenTestsFail"
target = "http://localhost:8080"
......@@ -63,61 +66,102 @@ type testTx struct {
}
var (
txline = regexp.MustCompile(`^(\*+)\s+<<\s+(\w+)\s+>>\s+(\d+)`)
recline = regexp.MustCompile(`^-+\s+(\d+)\s+(\w+)\s+([b|c|-])\s+(.*)$`)
txline = regexp.MustCompile(`^(\S+)\s+<<\s+(\w+)\s+>>\s+(\d+)`)
recline = regexp.MustCompile(`^\S+\s+(\d+)\s+(\w+)\s+([b|c|-])\s+(.*)$`)
begin = regexp.MustCompile(`^\w+\s+(\d+)\s+(\S+)$`)
rawLine = regexp.MustCompile(`^\s+(\d+)\s+(\w+)\s+([b|c|-])\s+(.*)$`)
expVxidLog []testTx
lvls13 = regexp.MustCompile(`^\*+$`)
lvlsGe4 = regexp.MustCompile(`^\*+(\d+)\*+$`)
expVxidLog [][]testTx
expReqLog [][]testTx
expSessLog [][]testTx
expRawLog []testRec
twoNLs = []byte("\n\n")
nlStar = []byte("\n*")
)
func url(path string) string {
return target + path
}
// XXX test helpers currently only work with vxid grouping
// Split function for bufio.Scanner to scrape tx groups
func txGrpSplit(data []byte, atEOF bool) (int, []byte, error) {
if !bytes.Contains(data, twoNLs) {
if atEOF {
return len(data), data, nil
}
return 0, nil, nil
}
i := bytes.Index(data, twoNLs)
if i < 0 {
return 0, nil, errors.New("should have found two newlines")
}
return i+2, data[:i], nil
}
// Scrape a log file with transactional data written by varnishlog in
// verbose mode.
func scrapeLog(file string) ([]testTx, error) {
var txn []testTx
f, err := os.Open(file)
if err != nil {
return nil, err
// Split function for bufio.Scanner to scrape a tx from the bytes
// returned by the tx group scanner.
func txSplit(data []byte, atEOF bool) (int, []byte, error) {
if !bytes.Contains(data, nlStar) {
if atEOF {
return len(data), data, nil
}
return 0, nil, nil
}
lines := bufio.NewScanner(f)
TX:
i := bytes.Index(data, nlStar)
if i < 0 {
return 0, nil, errors.New("should have found '\\n'*'")
}
return i+1, data[:i], nil
}
func scrapeTx(txBytes []byte) (testTx, error) {
tx := testTx{}
rdr := bytes.NewReader(txBytes)
lines := bufio.NewScanner(rdr)
for lines.Scan() {
flds := txline.FindStringSubmatch(lines.Text())
if flds == nil {
return nil, errors.New("Cannot parse tx line: " +
return tx, errors.New("Cannot parse tx line: " +
lines.Text())
}
if lvls13.MatchString(flds[1]) {
tx.level = uint(len(flds[1]))
} else {
lvlStr := lvlsGe4.FindStringSubmatch(flds[1])
if lvlStr == nil {
return tx, fmt.Errorf("Cannot parse level "+
"from %s in tx line: %s", flds[1],
lines.Text())
}
lvl, err := strconv.Atoi(lvlStr[1])
if err != nil {
return tx, fmt.Errorf("Cannot parse level "+
"from %s in tx line: %s", flds[1],
lines.Text())
}
tx.level = uint(lvl)
}
txVxid, err := strconv.Atoi(flds[3])
if err != nil {
return nil, errors.New("Cannot parse vxid " +
return tx, errors.New("Cannot parse vxid " +
flds[4] + " in tx line: " + lines.Text())
}
tx := testTx{}
tx.vxid = uint32(txVxid)
// NB: this works only for levels up to 3
tx.level = uint(len(flds[1]))
tx.txtype = flds[2]
for lines.Scan() {
// XXX: currently does not work for grouped txn
if lines.Text() == "" {
txn = append(txn, tx)
continue TX
return tx, nil
}
flds = recline.FindStringSubmatch(lines.Text())
if flds == nil {
return nil, errors.New("Cannot parse record: " +
return tx, errors.New("Cannot parse record: " +
lines.Text())
}
rec := testRec{}
rec.vxid, err = strconv.Atoi(flds[1])
if err != nil {
return nil, errors.New("Cannot parse vxid " +
return tx, errors.New("Cannot parse vxid " +
flds[1] + " in rec line: " +
lines.Text())
}
......@@ -127,7 +171,42 @@ TX:
tx.recs = append(tx.recs, rec)
}
}
return txn, nil
return tx, nil
}
// Scrape a log file with transactional data written by varnishlog in
// verbose mode.
func scrapeLog(file string) ([][]testTx, error) {
var txGrps [][]testTx
f, err := os.Open(file)
if err != nil {
return nil, err
}
txGrpScanner := bufio.NewScanner(f)
txGrpScanner.Split(txGrpSplit)
for txGrpScanner.Scan() {
var txGrp []testTx
txGrpBytes := txGrpScanner.Bytes()
if len(txGrpBytes) == 0 {
break
}
txRdr := bytes.NewReader(txGrpBytes)
txScanner := bufio.NewScanner(txRdr)
txScanner.Split(txSplit)
for txScanner.Scan() {
txBytes := txScanner.Bytes()
if len(txBytes) == 0 {
break
}
tx, err := scrapeTx(txBytes)
if err != nil {
return nil, err
}
txGrp = append(txGrp, tx)
}
txGrps = append(txGrps, txGrp)
}
return txGrps, nil
}
// Scrape a log file with raw transactions written by varnishlog in
......@@ -185,9 +264,9 @@ func checkRecord(t *testing.T, rec Record, expRec testRec) {
}
func checkExpTx(t *testing.T, tx Tx, expTx testTx) {
if tx.Level != expTx.level {
t.Errorf("tx Level expected=%v got=%v", expTx.level, tx.Level)
}
// if tx.Level != expTx.level {
// t.Errorf("tx Level expected=%v got=%v", expTx.level, tx.Level)
// }
if tx.VXID != expTx.vxid {
t.Errorf("tx VXID expected=%v got=%v", expTx.vxid, tx.VXID)
}
......@@ -227,18 +306,29 @@ func checkExpTx(t *testing.T, tx Tx, expTx testTx) {
}
}
func checkTxGroups(t *testing.T, txGrps [][]Tx, expTxn []testTx) {
if len(txGrps) != len(expTxn) {
t.Fatalf("number of transaction groups expected=%v got=%v",
len(expTxn), len(txGrps))
func checkExpTxGroup(t *testing.T, txGrp []Tx, expTxGrp []testTx) {
if len(txGrp) != len(expTxGrp) {
t.Fatalf("number of tx expected in group want=%v got=%v",
len(expTxGrp), len(txGrp))
return
}
for i, tx := range txGrp {
checkExpTx(t, tx, expTxGrp[i])
}
}
func checkTxGroups(t *testing.T, txGrps [][]Tx, expTxGrps [][]testTx) {
if len(txGrps) != len(expTxGrps) {
t.Fatalf("number of transaction groups want=%v got=%v",
len(expTxGrps), len(txGrps))
return
}
for i, txGrp := range txGrps {
if len(txGrp) != 1 {
t.Fatalf("transactions in group expected=1 got=%v",
len(txGrp))
if len(txGrp) != len(expTxGrps[i]) {
t.Fatalf("transactions in group want=%v got=%v",
len(expTxGrps[i]), len(txGrp))
}
checkExpTx(t, txGrp[0], expTxn[i])
checkExpTxGroup(t, txGrp, expTxGrps[i])
}
}
......@@ -352,7 +442,7 @@ func checkReqResp(t *testing.T, tx Tx, req http.Request, resp http.Response) {
}
func TestMain(m *testing.M) {
files := []string{testFile, vxidLog, rawLog}
files := []string{testFile, vxidLog, rawLog, reqLog, sessLog}
for _, file := range files {
if _, err := os.Stat(file); err != nil {
fmt.Fprintln(os.Stderr, "Cannot stat "+file+":", err)
......@@ -365,6 +455,14 @@ func TestMain(m *testing.M) {
fmt.Fprintln(os.Stderr, "Cannot parse "+vxidLog+":", err)
os.Exit(1)
}
if expReqLog, err = scrapeLog(reqLog); err != nil {
fmt.Fprintln(os.Stderr, "Cannot parse "+reqLog+":", err)
os.Exit(1)
}
if expSessLog, err = scrapeLog(sessLog); err != nil {
fmt.Fprintln(os.Stderr, "Cannot parse "+sessLog+":", err)
os.Exit(1)
}
if expRawLog, err = scrapeRawLog(rawLog); err != nil {
fmt.Fprintln(os.Stderr, "Cannot parse "+rawLog+":", err)
os.Exit(1)
......
......@@ -40,7 +40,27 @@ import (
"errors"
)
var spaceByte = []byte(" ")
type existence struct{}
var (
exister = struct{}{}
spaceByte = []byte(" ")
str2type = map[string]TxType{
"sess": Sess,
"req": Req,
"bereq": BeReq,
}
str2reason = map[string]Reason{
"HTTP/1": HTTP1,
"rxreq": RxReq,
"esi": ESI,
"restart": Restart,
"pass": Pass,
"fetch": Fetch,
"bgfetch": BgFetch,
"pipe": Pipe,
}
)
const (
cutoff = ^uint32(0) / 10
......@@ -71,30 +91,23 @@ func atoUint32(bytes []byte) (uint32, bool) {
return val, true
}
type grpNode struct {
children []uint32
vxid uint32
pvxid uint32
done uint
}
// A Query provides the means to read aggregated transactions from the
// log. A Query must be created from a Cursor using the NewQuery
// function.
type Query struct {
cursor *Cursor
incomplete map[uint32]Tx
grp Grouping
}
var str2type = map[string]TxType{
"sess": Sess,
"req": Req,
"bereq": BeReq,
}
var str2reason = map[string]Reason{
"HTTP/1": HTTP1,
"rxreq": RxReq,
"esi": ESI,
"restart": Restart,
"pass": Pass,
"fetch": Fetch,
"bgfetch": BgFetch,
"pipe": Pipe,
cursor *Cursor
incomplete map[uint32]Tx
ignoreSess map[uint32]existence
grpNodes map[uint32]grpNode
IncompleteTxHigh uint
grp Grouping
}
func (c *Cursor) rawTxGrp() []Tx {
......@@ -113,8 +126,7 @@ func (c *Cursor) rawTxGrp() []Tx {
// how transactions from the log should be aggregated: VXID, Request, Session
// or Raw grouping.
//
// XXX: request and session grouping not yet implement, nor are VSL
// queries
// XXX: VSL queries not yet implemented
func (c *Cursor) NewQuery(grp Grouping, query string) (*Query, error) {
if c == nil || c.cursor == nil {
return nil, errors.New("Cursor is nil or uninitialized")
......@@ -123,7 +135,209 @@ func (c *Cursor) NewQuery(grp Grouping, query string) (*Query, error) {
return nil, err
}
txMap := make(map[uint32]Tx)
return &Query{cursor: c, grp: grp, incomplete: txMap}, nil
sessSet := make(map[uint32]existence)
grpNodes := make(map[uint32]grpNode)
return &Query{
cursor: c,
grp: grp,
incomplete: txMap,
ignoreSess: sessSet,
grpNodes: grpNodes,
IncompleteTxHigh: 0,
}, nil
}
// Parses a Begin or Link record for the purposes of grouping
func (q *Query) parseRec(payload Payload) (TxType, uint32, Reason, error) {
flds := bytes.Split(payload, spaceByte)
if len(flds) != 3 {
// XXX
return TxUnknown, 0, ReasonUnknown, nil
}
txtype, exists := str2type[string(flds[0])]
if !exists {
// XXX
return TxUnknown, 0, ReasonUnknown, nil
}
vxid, ok := atoUint32(flds[1])
if !ok {
// XXX
return txtype, 0, ReasonUnknown, nil
}
reason, exists := str2reason[string(flds[2])]
if !exists {
// XXX
return txtype, vxid, ReasonUnknown, nil
}
return txtype, vxid, reason, nil
}
func (q *Query) link(pvxid uint32, vxid uint32) {
child, cexists := q.grpNodes[vxid]
if !cexists {
child = grpNode{ vxid: vxid }
}
child.pvxid = pvxid
q.grpNodes[vxid] = child
if pvxid == 0 {
return
}
parent, pexists := q.grpNodes[pvxid]
if !pexists {
parent = grpNode{ vxid: pvxid }
}
found := false
for _, xid := range parent.children {
if xid == vxid {
found = true
break
}
}
if !found {
parent.children = append(parent.children, vxid)
}
q.grpNodes[pvxid] = parent
}
func (q *Query) addRec(rec Record) Tx {
vxid32 := uint32(rec.VXID)
incmplTx, exists := q.incomplete[vxid32]
incmplTx.Records = append(incmplTx.Records, rec)
q.incomplete[vxid32] = incmplTx
// XXX handle error if the tx was not found
_ = exists
return incmplTx
}
func (q *Query) doBegin(rec Record) {
vxid32 := uint32(rec.VXID)
txtype, pvxid, reason, err := q.parseRec(rec.Payload)
if q.grp == Request && txtype == Sess {
// In request grouping we ignore all session records
q.ignoreSess[vxid32] = exister
return
}
// XXX handle errors: parsing Begin, sess Tx already in map
_ = err
tx := Tx{
Type: txtype,
Reason: reason,
VXID: vxid32,
ParentVXID: pvxid,
Level: 1,
Records: []Record{rec},
}
q.incomplete[vxid32] = tx
if uint(len(q.incomplete))+1 > q.IncompleteTxHigh {
q.IncompleteTxHigh = uint(len(q.incomplete)) + 1
}
if q.grp == VXID {
return
}
if !(rec.Type == Client && q.grp == Request && reason == RxReq) {
q.link(pvxid, vxid32)
}
}
func (q *Query) doLink(rec Record) {
vxid := uint32(rec.VXID)
if q.grp == Request {
if _, exists := q.ignoreSess[vxid]; exists {
return
}
}
q.addRec(rec)
if q.grp == VXID {
return
}
txtype, vxid, reason, err := q.parseRec(rec.Payload)
q.link(uint32(rec.VXID), vxid)
// XXX VSL checks: parse err, unknown types, vxid==0, link to self,
// duplicate link, link too late, type mismatch
_ = txtype
_ = reason
_ = err
return
}
func (q *Query) doEnd(rec Record) []Tx {
vxid32 := uint32(rec.VXID)
if q.grp == Request {
if _, exists := q.ignoreSess[vxid32]; exists {
delete(q.ignoreSess, vxid32)
return nil
}
}
tx := q.addRec(rec)
if q.grp == VXID {
txGrp := []Tx{tx}
delete(q.incomplete, vxid32)
return txGrp
}
node, exists := q.grpNodes[vxid32]
if !exists {
txGrp := []Tx{tx}
return txGrp
}
q.incomplete[vxid32] = tx
for node.pvxid != 0 && node.done == uint(len(node.children)) {
current := node.pvxid
node, exists = q.grpNodes[node.pvxid]
// XXX panic
_ = exists
node.done++
q.grpNodes[current] = node
}
if node.pvxid != 0 || node.vxid != uint32(rec.VXID) ||
node.done < uint(len(node.children)) {
return nil
}
tx, exists = q.incomplete[node.vxid]
// XX panic
_ = exists
tx.Level = 1
tx.ParentVXID = 0
txGrp := []Tx{tx}
delete(q.incomplete, node.vxid)
nodeQ := node.children
delete(q.grpNodes, node.vxid)
// Add to the txGrp in breadth-first order
// XXX setting tx.Level
for len(nodeQ) > 0 {
var nextQ []uint32
for _, vxid := range nodeQ {
tx, exists := q.incomplete[vxid]
// XX panic
_ = exists
txGrp = append(txGrp, tx)
delete(q.incomplete, vxid)
node, exists = q.grpNodes[vxid]
// XXX panic
_ = exists
nextQ = append(nextQ, node.children...)
delete(q.grpNodes, vxid)
}
nodeQ = nextQ
}
return txGrp
}
func (q *Query) doRec(rec Record) {
if q.grp == Request {
if _, exists := q.ignoreSess[uint32(rec.VXID)]; exists {
return
}
}
q.addRec(rec)
}
// NextTxGroup returns the next group of transactions from the
......@@ -150,40 +364,17 @@ func (q *Query) NextTxGroup() ([]Tx, Status) {
}
rec := q.cursor.Record()
vxid32 := uint32(rec.VXID)
incmplTx, exists := q.incomplete[vxid32]
if exists {
incmplTx.Records = append(incmplTx.Records, rec)
if rec.Tag == Tag(C.SLT_End) {
delete(q.incomplete, vxid32)
txGrp := []Tx{incmplTx}
switch rec.Tag {
case Tag(C.SLT_Begin):
q.doBegin(rec)
case Tag(C.SLT_Link):
q.doLink(rec)
case Tag(C.SLT_End):
if txGrp := q.doEnd(rec); txGrp != nil {
return txGrp, status
}
q.incomplete[vxid32] = incmplTx
continue
}
tx := Tx{
VXID: vxid32,
Type: TxUnknown,
Reason: ReasonUnknown,
Level: 1,
Records: []Record{rec},
}
if rec.Tag == Tag(C.SLT_Begin) {
begin := bytes.Split(rec.Payload, spaceByte)
typeFromRec := string(begin[0])
if txtype, exists := str2type[typeFromRec]; exists {
tx.Type = txtype
}
if pvxid, ok := atoUint32(begin[1]); ok {
tx.ParentVXID = pvxid
}
recReason := string(begin[2])
if reason, exists := str2reason[recReason]; exists {
tx.Reason = reason
}
default:
q.doRec(rec)
}
q.incomplete[vxid32] = tx
}
}
......@@ -120,7 +120,6 @@ func TestNextTxGroupRawOne(t *testing.T) {
}
txn, status := q.NextTxGroup()
// XXX change the status constant, this is vsl_more
if status != More {
t.Errorf("NextTxGroup() status want=More got=%d\n",
uint8(status))
......@@ -156,7 +155,6 @@ func TestNextTxGroupRawAll(t *testing.T) {
for _, rec := range expRawLog {
txn, status := q.NextTxGroup()
// XXX should be More
if status != More {
t.Errorf("NextTxGroup() status want=More got=%d\n",
uint8(status))
......@@ -249,7 +247,6 @@ func TestNextTxGroupConcurrent(t *testing.T) {
if status1 == EOF && status2 == EOF {
break
}
// XXX
if status1 != More {
t.Fatal("NextTxGroup() unexpected status:", status1)
return
......@@ -265,3 +262,113 @@ func TestNextTxGroupConcurrent(t *testing.T) {
checkTxGroups(t, txGrps1, expVxidLog)
checkTxGroups(t, txGrps2, expVxidLog)
}
func TestNextTxGroupRequestOne(t *testing.T) {
l := New()
defer l.Release()
c, err := l.NewCursorFile(testFile)
if err != nil {
t.Fatal("NewCursorFile(bin.log):", err)
return
}
defer c.Delete()
q, err := c.NewQuery(Request, "")
if err != nil {
t.Fatal("NewQuery(Request):", err)
return
}
txGrp, status := q.NextTxGroup()
if status != More {
t.Errorf("NextTxGroup() status want=More got=%d\n",
uint8(status))
}
checkExpTxGroup(t, txGrp, expReqLog[0])
}
func TestNextTxGroupRequestAll(t *testing.T) {
l := New()
defer l.Release()
c, err := l.NewCursorFile(testFile)
if err != nil {
t.Fatal("NewCursorFile(bin.log):", err)
return
}
defer c.Delete()
q, err := c.NewQuery(Request, "")
if err != nil {
t.Fatal("NewQuery(Request):", err)
return
}
var txGrps [][]Tx
var status Status
for {
txGrp, rdstatus := q.NextTxGroup()
if rdstatus != More {
status = rdstatus
break
}
txGrps = append(txGrps, txGrp)
}
if status != EOF {
t.Errorf("NextTxGroup() status want=More got=%d\n",
uint8(status))
}
checkTxGroups(t, txGrps, expReqLog)
}
func TestNextTxGroupSessionOne(t *testing.T) {
l := New()
defer l.Release()
c, err := l.NewCursorFile(testFile)
if err != nil {
t.Fatal("NewCursorFile(bin.log):", err)
return
}
defer c.Delete()
q, err := c.NewQuery(Session, "")
if err != nil {
t.Fatal("NewQuery(Session):", err)
return
}
txGrp, status := q.NextTxGroup()
if status != More {
t.Errorf("NextTxGroup() status want=More got=%d\n",
uint8(status))
}
checkExpTxGroup(t, txGrp, expSessLog[0])
}
func TestNextTxGroupSessionAll(t *testing.T) {
l := New()
defer l.Release()
c, err := l.NewCursorFile(testFile)
if err != nil {
t.Fatal("NewCursorFile(bin.log):", err)
return
}
defer c.Delete()
q, err := c.NewQuery(Session, "")
if err != nil {
t.Fatal("NewQuery(Session):", err)
return
}
var txGrps [][]Tx
var status Status
for {
txGrp, rdstatus := q.NextTxGroup()
if rdstatus != More {
status = rdstatus
break
}
txGrps = append(txGrps, txGrp)
}
if status != EOF {
t.Errorf("NextTxGroup() status want=More got=%d\n",
uint8(status))
}
checkTxGroups(t, txGrps, expSessLog)
}
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment