Commit 7147cd55 authored by Geoff Simmons's avatar Geoff Simmons

Comprehensive refactoring of the Log package, adding Cursor and Query.

VSLQ_Dispatch() is a powerful tool for the C API, but the restrictions
and overhead involved with the C/Go bridge make it too unwieldy to use.
Too much synchronization becomes necessary for a low-level API like
this. The cgocall overhead becomes excessive.

It seems to be difficult, perhaps impossible, to implement callbacks
without creating many objects on the Go heap. Since many objects from
the log are created in hot code, this puts excessive pressure on the
Go garbage collector.

This version uses low-level log read operations, excusively VSL_Next()
and related functions. The C/Go bridge is much smaller and simpler. It
should be possible to create the objects for log data on the Go stack.

The Cursor and Query interfaces reflect the underlying VSL interface
more faithfully. Distinct Cursors are safe for concurrent reads (as
for struct VSL_cursor from the VSL interface).

This means that grouping and VSL queries will have to re-implemented
in Go.
parent 85332d75
/*-
* Copyright (c) 2018 UPLEX Nils Goroll Systemoptimierung
* All rights reserved
*
* Author: Geoffrey Simmons <geoffrey.simmons@uplex.de>
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*/
package log
/*
#cgo pkg-config: varnishapi
#include <stdio.h>
#include <vapi/vsl.h>
*/
import "C"
import "errors"
// A Cursor is used to advance through the records in a Varnish
// log. It can be created from an attached instance of Log with
// NewCursor(), to read from a live Varnish instance, or with
// Log.NewCursorFile(), to read from a binary log file (as produced by
// varnishlog -w).
//
// A Cursor can in turn be used to create a Query object for reading
// aggregated log transactions. Or you can use Cursor's Next() method
// to read disaggregated Records directly from the log (equivalent to
// reading raw transactions via a Query). The results of intermingled
// reads via Next() and via a Query created from the same Cursor are
// undefined.
//
// Distinct Cursors may be used to read a log concurrently. Concurrent
// reads with the same Cursor are not safe.
//
// Because they are low-level operations used frequently in hot code,
// the Cursor methods have no error checking. So it is vital to
// observe these two rules:
//
// The Cursor methods will panic if applied to a nil Cursor, or to a
// Cursor that has not been initialized through one of the New*
// functions.
//
// Call Next() before using one of the methods that read data from the
// log record to which a Cursor is concurrently pointing (Match(),
// Tag(), VXID(), Payload(), Client(), Backend() or Record()). The
// results of calling one of these methods before Next() are
// undefined, and may lead to panic.
type Cursor struct {
cursor *C.struct_VSL_cursor
log *Log
}
// NewCursor creates a Cursor to read from a live Varnish log. Fails
// if the Log object is not attached to a log.
func (log *Log) NewCursor() (*Cursor, error) {
if err := log.checkNil(); err != nil {
return nil, err
}
if log.vsm == nil {
return nil, errors.New("Not attached to a Varnish log")
}
if log.vsm.VSM == nil {
panic("VSM handle is nil")
}
C.VSL_ResetError(log.vsl)
cursor := C.VSL_CursorVSM(log.vsl, (*C.struct_vsm)(log.vsm.VSM),
log.vsmopts)
if cursor == nil {
return nil, log
}
return &Cursor{cursor: cursor, log: log}, nil
}
// NewCursorFile creates a Cursor to read from a binary log file,
// created by varnishlog -w.
func (log *Log) NewCursorFile(path string) (*Cursor, error) {
if err := log.checkNil(); err != nil {
return nil, err
}
C.VSL_ResetError(log.vsl)
cursor := C.VSL_CursorFile(log.vsl, C.CString(path), 0)
if cursor == nil {
return nil, log
}
return &Cursor{cursor: cursor, log: log}, nil
}
// Delete releases native resources associated with the Cursor. You
// should always call Delete when you're done with a Cursor, otherwise
// there is a a risk of resource leakage. It is wise to schedule
// invocation with defer after successfully creating a Cursor with one
// of the New*() functions.
func (c *Cursor) Delete() {
if c == nil || c.cursor == nil {
// return silently in the nil case
return
}
C.VSL_DeleteCursor(c.cursor)
c.cursor = nil
}
// Next advances the Cursor to the next Record in the log. It returns
// a Status to indicate an error condition, the end of the log (EOF or
// EOL), or that there are more records in the log.
//
// Panics if Cursor is nil, or if was not initialized with one of the
// New* functions.
func (c *Cursor) Next() Status {
return Status(C.VSL_Next(c.cursor))
}
// Match returns true if the record at which the Cursor is currently
// pointing should be included, according to the include/exclude
// filters set for the Log object with which the Cursor was created.
//
// Panics if Cursor is nil, or if was not initialized with one of the
// New* functions.
func (c *Cursor) Match() bool {
return C.VSL_Match(c.log.vsl, c.cursor) == 1
}
// Tag returns the Tag of the record at which the Cursor is currently
// pointing.
//
// Panics if Cursor is nil, or if was not initialized with one of the
// New* functions.
func (c *Cursor) Tag() Tag {
return c.cursor.rec.tag()
}
// VXID returns the VXID of the record at which the Cursor is
// currently pointing.
//
// Panics if Cursor is nil, or if was not initialized with one of the
// New* functions.
func (c *Cursor) VXID() uint32 {
return c.cursor.rec.vxid()
}
// Payload returns the payload (message content) of the record at
// which the Cursor is currently pointing.
//
// Panics if Cursor is nil, or if was not initialized with one of the
// New* functions.
func (c *Cursor) Payload() []byte {
return c.cursor.rec.payload()
}
// Client returns true if the record at which the Cursor is currently
// pointing belongs to a client transaction.
//
// Panics if Cursor is nil, or if was not initialized with one of the
// New* functions.
func (c *Cursor) Client() bool {
return c.cursor.rec.client()
}
// Backend returns true if the record at which the Cursor is currently
// pointing belongs to a backend transaction.
//
// Panics if Cursor is nil, or if was not initialized with one of the
// New* functions.
func (c *Cursor) Backend() bool {
return c.cursor.rec.backend()
}
// Record returns structured data for the record at which the Cursor
// is currently pointing.
//
// Panics if Cursor is nil, or if was not initialized with one of the
// New* functions.
func (c *Cursor) Record() Record {
rec := Record{
Tag: c.cursor.rec.tag(),
VXID: uint(c.cursor.rec.vxid()),
Payload: string(c.cursor.rec.payload()),
}
switch {
case c.cursor.rec.backend():
rec.Type = Backend
case c.cursor.rec.client():
rec.Type = Client
default:
rec.Type = None
}
return rec
}
// +build e2e
/*-
* Copyright (c) 2018 UPLEX Nils Goroll Systemoptimierung
* All rights reserved
......@@ -26,93 +28,117 @@
* SUCH DAMAGE.
*/
#ifndef _LOG_H_INCLUDED
#define _LOG_H_INCLUDED
#include <stdio.h>
#include <vapi/vsl.h>
int dispatch(struct VSLQ *vslq, uint64_t key);
// Necessary because the field name is a go keyword.
static inline enum VSL_transaction_e
txtype(struct VSL_transaction *tx)
{
return tx->type;
}
// The rest are necessary due to the use of macros.
static inline int
slt_max()
{
return SLT__MAX;
}
static inline unsigned
vxid(struct VSL_cursor *c)
{
return (VSL_ID(c->rec.ptr));
}
static inline enum VSL_tag_e
tag(struct VSL_cursor *c)
{
return (VSL_TAG(c->rec.ptr));
}
static inline const char *
cdata(struct VSL_cursor *c)
{
return (VSL_CDATA(c->rec.ptr));
}
static inline int
len(struct VSL_cursor *c)
{
// Subtracting one because Go doesn't need the terminating null.
return (VSL_LEN(c->rec.ptr) - 1);
// This test is only run if go test is invoked with a -tags option
// that includes "e2e". It requires that two instances of Varnish are
// running: a default instance (varnishd invoked without the -n
// option), and an instance named "gotest" (varnishd -n "gotest").
package log
import (
"net/http"
"testing"
)
func TestNewCursor(t *testing.T) {
l := New()
defer l.Release()
if err := l.Attach(""); err != nil {
t.Fatal("Attach(default):", err)
return
}
c, err := l.NewCursor()
if err != nil {
t.Fatal("NewCursor():", err)
return
}
defer c.Delete()
ln := New()
defer ln.Release()
if err := ln.Attach("gotest"); err != nil {
t.Fatal("Attach(gotest):", err)
return
}
cn, err := ln.NewCursor()
if err != nil {
t.Fatal("NewCursor() for gotest:", err)
return
}
defer cn.Delete()
}
static inline int
client(struct VSL_cursor *c)
{
return (VSL_CLIENT(c->rec.ptr));
func TestE2ECursor(t *testing.T) {
l := New()
defer l.Release()
if err := l.Attach(""); err != nil {
t.Fatal("Attach(default):", err)
return
}
c, err := l.NewCursor()
if err != nil {
t.Fatal("NewCursor():", err)
return
}
defer c.Delete()
client := &http.Client{}
req, err := http.NewRequest("GET", url("/uncacheable"), nil)
if err != nil {
t.Fatal("/uncacheable Request", err)
}
req.Close = true
stopChan := make(chan bool)
respChan := make(chan bool)
go func() {
sent := false
for {
select {
case <-stopChan:
return
default:
_, err := client.Do(req)
if err != nil {
t.Fatal("GET request:", err)
return
}
if !sent {
respChan <- true
sent = true
}
}
}
}()
<-respChan
var recs []testRec
for i := 0; i < 100; i++ {
status := c.Next()
vxid := c.VXID()
tag := c.Tag()
payload := c.Payload()
backend := c.Backend()
client := c.Client()
rec := testRec{
vxid: int(vxid),
tag: Tag(tag).String(),
payload: string(payload),
}
if backend {
rec.rectype = 'b'
} else if client {
rec.rectype = 'c'
} else {
rec.rectype = '-'
}
recs = append(recs, rec)
if status == EOL {
break
}
}
stopChan <- true
t.Logf("read %d records", len(recs))
t.Log(recs)
}
static inline int
backend(struct VSL_cursor *c)
{
return (VSL_BACKEND(c->rec.ptr));
}
static inline int
binary(enum VSL_tag_e tag)
{
return (VSL_tagflags[tag] & SLT_F_BINARY);
}
static inline int
unsafe(enum VSL_tag_e tag)
{
return (VSL_tagflags[tag] & SLT_F_UNSAFE);
}
static inline unsigned
vsl_opt_batch()
{
return VSL_COPT_BATCH;
}
static inline unsigned
vsl_opt_tail()
{
return VSL_COPT_TAIL;
}
static inline unsigned
vsl_opt_tailstop()
{
return VSL_COPT_TAILSTOP;
}
#endif
/*-
* Copyright (c) 2018 UPLEX Nils Goroll Systemoptimierung
* All rights reserved
*
* Author: Geoffrey Simmons <geoffrey.simmons@uplex.de>
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*/
package log
import "testing"
func TestNewCursorFile(t *testing.T) {
l := New()
defer l.Release()
c, err := l.NewCursorFile(testFile)
if err != nil {
t.Fatal("NewCursorFile(bin.log):", err)
}
defer c.Delete()
if _, err := l.NewCursorFile(failPath); err == nil {
t.Error("Expected NewCursorFile(nonexistent) to fail")
}
var n *Log
if _, err := n.NewCursorFile(testFile); err == nil {
t.Error("Expected nil.NewCursorFile() to fail")
}
uninit := &Log{}
if _, err := uninit.NewCursorFile(testFile); err == nil {
t.Error("Expected uninitiailized.NewCursorFile() to fail")
}
}
func TestDelete(t *testing.T) {
// don't panic if the Cursor is nil or uninitialized
var c *Cursor
c.Delete()
uninit := &Cursor{}
uninit.Delete()
}
func TestNext(t *testing.T) {
l := New()
defer l.Release()
c, err := l.NewCursorFile(testFile)
if err != nil {
t.Fatal("NewCursorFile(bin.log):", err)
return
}
defer c.Delete()
status := c.Next()
// XXX change the status constant, this is vcl_more
if status != More {
t.Errorf("Next() status want=More got=%d\n", uint8(status))
}
}
var expFirstRec = testRec{
vxid: 0,
tag: "Backend_health",
rectype: '-',
payload: "boot.test Still healthy 4---X-RH 1 1 1 0.000402 0.000484 HTTP/1.1 204 No Content",
}
func TestTag(t *testing.T) {
l := New()
defer l.Release()
c, err := l.NewCursorFile(testFile)
if err != nil {
t.Fatal("NewCursorFile(bin.log):", err)
return
}
defer c.Delete()
status := c.Next()
// XXX
if status != More {
t.Errorf("Next() status want=More got=%d\n", uint8(status))
}
tag := c.Tag()
if tag.String() != expFirstRec.tag {
t.Errorf("Tag() want=%v got=%v\n", expFirstRec.tag,
tag.String())
}
}
func TestVXID(t *testing.T) {
l := New()
defer l.Release()
c, err := l.NewCursorFile(testFile)
if err != nil {
t.Fatal("NewCursorFile(bin.log):", err)
return
}
defer c.Delete()
status := c.Next()
// XXX
if status != More {
t.Errorf("Next() status want=More got=%d\n", uint8(status))
}
vxid := c.VXID()
if vxid != uint32(expFirstRec.vxid) {
t.Errorf("VXID() want=%v got=%v\n", expFirstRec.vxid, vxid)
}
}
func TestBackend(t *testing.T) {
l := New()
defer l.Release()
c, err := l.NewCursorFile(testFile)
if err != nil {
t.Fatal("NewCursorFile(bin.log):", err)
return
}
defer c.Delete()
status := c.Next()
// XXX
if status != More {
t.Errorf("Next() status want=More got=%d\n", uint8(status))
}
backend := c.Backend()
if backend != (expFirstRec.rectype == rune(Backend)) {
t.Errorf("Backend() want=%v got=%v\n",
expFirstRec.rectype == rune(Backend), backend)
}
}
func TestClient(t *testing.T) {
l := New()
defer l.Release()
c, err := l.NewCursorFile(testFile)
if err != nil {
t.Fatal("NewCursorFile(bin.log):", err)
}
defer c.Delete()
status := c.Next()
// XXX
if status != More {
t.Errorf("Next() status want=More got=%d\n", uint8(status))
}
client := c.Client()
if client != (expFirstRec.rectype == rune(Client)) {
t.Errorf("Client() want=%v got=%v\n",
expFirstRec.rectype == rune(Client), client)
}
}
func TestPayload(t *testing.T) {
l := New()
defer l.Release()
c, err := l.NewCursorFile(testFile)
if err != nil {
t.Fatal("NewCursorFile(bin.log):", err)
return
}
defer c.Delete()
status := c.Next()
// XXX
if status != More {
t.Errorf("Next() status want=More got=%d\n", uint8(status))
}
payload := c.Payload()
if string(payload) != expFirstRec.payload {
t.Errorf("Payload() want=%v got=%v\n", expFirstRec.payload,
payload)
}
}
func TestNextFileAll(t *testing.T) {
l := New()
defer l.Release()
c, err := l.NewCursorFile(testFile)
if err != nil {
t.Fatal("NewCursorFile(bin.log):", err)
return
}
defer c.Delete()
for _, rec := range expRawLog {
status := c.Next()
if status == EOF {
return
}
// XXX should be More
if status != More {
t.Fatal("Next() unexpected status:", status)
return
}
vxid := c.VXID()
if vxid != uint32(rec.vxid) {
t.Errorf("VXID want=%v got=%v\n", rec.vxid, vxid)
}
tag := c.Tag()
if tag.String() != rec.tag {
t.Errorf("Tag() want=%v got=%v\n", rec.tag,
tag.String())
}
payload := c.Payload()
if string(payload) != rec.payload {
t.Errorf("Payload() want=%v got=%v\n", rec.payload,
string(payload))
}
backend := c.Backend()
if backend && rec.rectype != 'b' {
t.Errorf("Backend() want=%v got=b\n", rec.rectype)
}
client := c.Client()
if client && rec.rectype != 'c' {
t.Errorf("Client() want=%v got=c\n", rec.rectype)
}
}
}
func TestNextFileConcurrent(t *testing.T) {
l := New()
defer l.Release()
c1, err := l.NewCursorFile(testFile)
if err != nil {
t.Fatal("NewCursorFile(bin.log):", err)
return
}
defer c1.Delete()
c2, err := l.NewCursorFile(testFile)
if err != nil {
t.Fatal("NewCursorFile(bin.log):", err)
return
}
defer c2.Delete()
for _, rec := range expRawLog {
status1 := c1.Next()
status2 := c2.Next()
if status1 != status2 {
t.Errorf("Next() differing status: %v %v\n", status1,
status2)
}
if status1 == EOF {
return
}
// XXX
if status1 != More {
t.Fatal("Next() unexpected status:", status1)
return
}
cursors := [2]*Cursor{c1, c2}
for _, c := range cursors {
vxid := c.VXID()
if vxid != uint32(rec.vxid) {
t.Errorf("VXID want=%v got=%v\n", rec.vxid,
vxid)
}
tag := c.Tag()
if tag.String() != rec.tag {
t.Errorf("Tag() want=%v got=%v\n", rec.tag,
tag.String())
}
payload := c.Payload()
if string(payload) != rec.payload {
t.Errorf("Payload() want=%v got=%v\n",
rec.payload, string(payload))
}
backend := c.Backend()
if backend && rec.rectype != 'b' {
t.Errorf("Backend() want=%v got=b\n",
rec.rectype)
}
client := c.Client()
if client && rec.rectype != 'c' {
t.Errorf("Client() want=%v got=c\n",
rec.rectype)
}
}
}
}
func BenchmarkNextFileOne(b *testing.B) {
for i := 0; i < b.N; i++ {
b.StopTimer()
b.ReportAllocs()
l := New()
defer l.Release()
c, err := l.NewCursorFile(testFile)
if err != nil {
b.Fatal("NewCursorFile(bin.log):", err)
}
b.StartTimer()
status := c.Next()
b.StopTimer()
if status != More {
b.Fatalf("Next() status want=More got=%d\n",
uint8(status))
}
c.Delete()
}
}
......@@ -32,12 +32,10 @@
// that includes "e2e". It requires that two instances of Varnish are
// running: a default instance (varnishd invoked without the -n
// option), and an instance named "gotest" (varnishd -n "gotest").
package log
import (
"net/http"
"strconv"
"strings"
"testing"
"time"
)
......@@ -97,409 +95,3 @@ func TestAttachInstance(t *testing.T) {
"non-existent instance")
}
}
var tx2recType = map[TxType]RecordType{
Sess: Client,
Req: Client,
BeReq: Backend,
}
const (
undefLen int = int(^uint(0) >> 1)
txTypeUndef TxType = TxUnknown
txReasonUndef Reason = ReasonUnknown
txLevelUndef uint = ^uint(0)
txVXIDUndef uint32 = ^uint32(0)
recTypeUndef RecordType = RecordType(0)
recTagUndef string = "undefined tag string"
recVXIDUndef uint = ^uint(0)
recPayloadUndef string = "undefined payload string"
)
type expRec struct {
tag string
payload string
}
type expTx struct {
typ TxType
reason Reason
level uint
vxid uint32
pvxid uint32
recs []expRec
}
type expTxLen struct {
expRecs int
tx expTx
}
type expTxGrp struct {
expTx int
txGrp []expTxLen
}
var expDefaultRead = map[TxType]expTxGrp{
BeReq: {
expTx: 1,
txGrp: []expTxLen{{
expRecs: undefLen,
tx: expTx{
typ: BeReq,
reason: Fetch,
level: 1,
vxid: txVXIDUndef,
pvxid: txVXIDUndef,
recs: nil,
}}},
},
Req: {
expTx: 1,
txGrp: []expTxLen{{
expRecs: undefLen,
tx: expTx{
typ: Req,
reason: RxReq,
level: 1,
vxid: txVXIDUndef,
pvxid: txVXIDUndef,
recs: nil,
}}},
},
Sess: {
expTx: 1,
txGrp: []expTxLen{{
expRecs: 5,
tx: expTx{
typ: Sess,
reason: HTTP1,
level: 1,
vxid: txVXIDUndef,
pvxid: 0,
recs: []expRec{
{
tag: "Begin",
payload: "sess 0 HTTP/1",
},
{
tag: "SessOpen",
payload: recPayloadUndef,
},
{
tag: "Link",
payload: recPayloadUndef,
},
{
tag: "SessClose",
payload: recPayloadUndef,
},
{
tag: "End",
payload: "",
},
}},
}},
},
}
func checkRec(t *testing.T, rec Record, expRec expRec) {
if expRec.tag != recTagUndef && rec.Tag.String() != expRec.tag {
t.Errorf("record tag want=%v got=%s", expRec.tag, rec.Tag)
}
if expRec.payload != recPayloadUndef && rec.Payload != expRec.payload {
t.Errorf("record payload want=%v got=%s", expRec.payload,
rec.Payload)
}
}
func checkTx(t *testing.T, tx Tx, expTxLen expTxLen) {
expTx := expTxLen.tx
if expTx.typ != txTypeUndef && expTx.typ != tx.Type {
t.Errorf("tx.Type want=%v got=%v", expTx.typ, tx.Type)
}
if expTx.reason != txReasonUndef && expTx.reason != tx.Reason {
t.Errorf("tx.Reason want=%v got=%v", expTx.reason, tx.Reason)
}
if expTx.level != txLevelUndef && expTx.level != tx.Level {
t.Errorf("tx.Level want=%v got=%v", expTx.level, tx.Level)
}
if expTx.vxid != txVXIDUndef && expTx.vxid != tx.VXID {
t.Errorf("tx.VXID want=%v got=%v", expTx.vxid, tx.VXID)
}
if expTx.pvxid != txVXIDUndef && expTx.pvxid != tx.ParentVXID {
t.Errorf("tx.ParentVXID want=%v got=%v", expTx.pvxid,
tx.ParentVXID)
}
if expTxLen.expRecs != undefLen && expTxLen.expRecs != len(tx.Records) {
t.Errorf("number of records want=%v got=%v", expTxLen.expRecs,
len(tx.Records))
}
if tx.Records == nil || len(tx.Records) < 1 {
t.Fatal("tx has no records:", tx)
return
}
if tx.Type == TxRaw {
if len(tx.Records) != 1 {
t.Errorf("tx type raw number of records want=1 got=%v",
len(tx.Records))
}
if tx.Records[0].VXID != uint(tx.VXID) {
t.Errorf("record VXID want=%v got=%v", uint(tx.VXID),
tx.Records[0].VXID)
}
} else {
// These conditions always hold for the records in a
// non-raw transaction.
if tx.Records[0].Tag.String() != "Begin" {
t.Errorf("tx first record tag want=Begin got=%s",
tx.Records[0].Tag)
}
lastIdx := len(tx.Records) - 1
if tx.Records[lastIdx].Tag.String() != "End" {
t.Errorf("tx last record tag want=End got=%s",
tx.Records[lastIdx].Tag)
}
if tx.Records[lastIdx].Payload != "" {
t.Errorf("tx last record tag want=%q got=%q", "",
tx.Records[lastIdx].Payload)
}
expType, ok := tx2recType[tx.Type]
if !ok {
t.Fatal("tx2recType no value for", tx.Type)
t.FailNow()
return
}
for _, rec := range tx.Records {
if rec.Type != expType {
t.Errorf("record type want=%c got=%c", expType,
rec.Type)
}
if rec.VXID != uint(tx.VXID) {
t.Errorf("record VXID want=%v got=%v",
uint(tx.VXID), rec.VXID)
}
}
}
if expTx.recs != nil {
for i, rec := range tx.Records {
checkRec(t, rec, expTx.recs[i])
}
}
}
func checkTxGrp(t *testing.T, txGrp []Tx, expTxGrp expTxGrp) {
if expTxGrp.expTx != undefLen && len(txGrp) != expTxGrp.expTx {
t.Fatalf("number of tx in group got=%v want=%v",
len(txGrp), expTxGrp.expTx)
return
}
for i, tx := range txGrp {
checkTx(t, tx, expTxGrp.txGrp[i])
}
}
func checkBereq(t *testing.T, tx Tx, req http.Request) {
if tx.Type != BeReq {
t.Fatalf("checkBereq() tx.Type want=BeReq got=%s", tx.Type)
t.FailNow()
return
}
for _, rec := range tx.Records {
if rec.Tag.String() != "BereqURL" {
continue
}
if rec.Payload != req.URL.Path {
t.Errorf("bereq URL want=%v got=%v", req.URL.Path,
rec.Payload)
}
break
}
}
func checkReqResp(t *testing.T, tx Tx, req http.Request, resp http.Response) {
if tx.Type != Req {
t.Fatalf("checkReqResp() tx.Type want=Req got=%s", tx.Type)
t.FailNow()
return
}
respHdr := make(http.Header)
for _, rec := range tx.Records {
switch rec.Tag.String() {
case "ReqMethod":
if rec.Payload != req.Method {
t.Errorf("request method want=%v got=%v",
req.Method, rec.Payload)
}
case "ReqURL":
if rec.Payload != req.URL.Path {
t.Errorf("request URL want=%v got=%v",
req.URL.Path, rec.Payload)
}
case "ReqProtocol":
if rec.Payload != req.Proto {
t.Errorf("request protocol want=%v got=%v",
req.Proto, rec.Payload)
}
case "ReqHeader":
if !strings.HasPrefix(rec.Payload, "Host:") {
continue
}
host := rec.Payload[len("Host: "):]
if host != req.Host {
t.Errorf("request Host want=%v got=%v",
req.Host, host)
}
case "RespProtocol":
if rec.Payload != resp.Proto {
t.Errorf("response protocol want=%v got=%v",
resp.Proto, rec.Payload)
}
case "RespStatus":
expStatus := strconv.FormatInt(int64(resp.StatusCode),
10)
if rec.Payload != expStatus {
t.Errorf("response status want=%d got=%v",
resp.StatusCode, rec.Payload)
}
case "RespReason":
expReason := resp.Status[4:]
if rec.Payload != expReason {
t.Errorf("response reason want=%v got=%v",
expReason, rec.Payload)
}
case "RespHeader":
colonIdx := strings.Index(rec.Payload, ": ")
if colonIdx < 0 {
t.Fatal("cannot parse response header:",
rec.Payload)
continue
}
hdr := rec.Payload[:colonIdx]
val := rec.Payload[colonIdx+2:]
respHdr.Set(hdr, val)
default:
continue
}
}
// http.Response apparently removes the Connection header, but
// sets its Close field to true if it was present and set to
// "close".
if resp.Close {
conn := respHdr.Get("Connection")
if conn != "close" {
t.Errorf("response Connection:close not found")
}
if conn != "" {
respHdr.Del("Connection")
}
}
// Won't worry about multiple values for a header
if len(resp.Header) != len(respHdr) {
t.Errorf("response headers want=%v got=%v", len(resp.Header),
len(respHdr))
return
}
for h, _ := range resp.Header {
if respHdr.Get(h) != resp.Header.Get(h) {
t.Errorf("response header %v want=%v got=%v", h,
resp.Header.Get(h), respHdr.Get(h))
}
}
}
const target = "http://localhost:8080"
func url(path string) string {
return target + path
}
func TestE2EDefaultRead(t *testing.T) {
l := New()
defer l.Release()
if err := l.Attach(""); err != nil {
t.Fatal("Attach(default):", err)
}
var status Status
var txGrps [][]Tx
rdHndlr := func(txGrp []Tx, rdstatus Status) bool {
switch rdstatus {
case More:
txGrps = append(txGrps, txGrp)
if len(txGrps) == 3 {
return false
}
return true
default:
status = rdstatus
return false
}
}
stopChan := make(chan bool)
readFailed := false
go func() {
err := l.Read(rdHndlr, nil)
if err != nil {
t.Fatal("Read(): " + err.Error())
readFailed = true
}
stopChan <- true
}()
client := &http.Client{}
req, err := http.NewRequest("GET", url("/uncacheable"), nil)
if err != nil {
t.Fatal("/uncacheable Request", err)
}
req.Close = true
resp, err := client.Do(req)
if err != nil {
t.Fatal("/uncacheable Response", err)
}
<-stopChan
if readFailed {
return
}
if status != Stopped {
t.Errorf("End read status want=%v got=%v", Stopped, status)
}
if len(txGrps) != 3 {
t.Errorf("number of tx groups want=3 got=%v", len(txGrps))
}
for _, txGrp := range txGrps {
tx := txGrp[0]
checkTxGrp(t, txGrp, expDefaultRead[tx.Type])
if tx.Type == BeReq {
checkBereq(t, tx, *req)
}
if tx.Type == Req {
checkReqResp(t, tx, *req, *resp)
}
}
}
func TestStartAtHead(t *testing.T) {
l := New()
defer l.Release()
if err := l.Attach(""); err != nil {
t.Fatal("Attach(default):", err)
}
if err := l.StartAtHead(true); err != nil {
t.Error("StartAtHead(true):", err)
}
if err := l.StartAtHead(false); err != nil {
t.Error("StartAtHead(false):", err)
}
var n *Log
if err := n.StartAtHead(true); err == nil {
t.Error("expected nil.StartAtHead() to fail")
}
uninit := new(Log)
if err := uninit.StartAtHead(true); err == nil {
t.Error("expected uninitialized.StartAtHead() to fail")
}
}
This diff is collapsed.
......@@ -28,111 +28,7 @@
package log
import (
"bufio"
"errors"
"fmt"
"os"
"regexp"
"strconv"
"testing"
)
const (
testFile = "testdata/bin.log"
vxidLog = "testdata/vxid.log"
failPath = "ifAFileWithThisNameReallyExistsThenTestsFail"
)
type testRec struct {
vxid int
tag string
rectype rune
payload string
}
type testTx struct {
level uint
vxid uint32
txtype string
recs []testRec
}
var (
txline = regexp.MustCompile(`^(\*+)\s+<<\s+(\w+)\s+>>\s+(\d+)`)
recline = regexp.MustCompile(`^-+\s+(\d+)\s+(\w+)\s+([b|c|-])\s+(.*)$`)
begin = regexp.MustCompile(`^\w+\s+(\d+)\s+(\S+)$`)
)
func scrapeLog(file string) ([]testTx, error) {
var txn []testTx
f, err := os.Open(file)
if err != nil {
return nil, err
}
lines := bufio.NewScanner(f)
TX:
for lines.Scan() {
flds := txline.FindStringSubmatch(lines.Text())
if flds == nil {
return nil, errors.New("Cannot parse tx line: " +
lines.Text())
}
txVxid, err := strconv.Atoi(flds[3])
if err != nil {
return nil, errors.New("Cannot parse vxid " +
flds[4] + " in tx line: " + lines.Text())
}
tx := testTx{}
tx.vxid = uint32(txVxid)
// NB: this works only for levels up to 3
tx.level = uint(len(flds[1]))
tx.txtype = flds[2]
for lines.Scan() {
// XXX: currently does not work for grouped txn
if lines.Text() == "" {
txn = append(txn, tx)
continue TX
}
flds = recline.FindStringSubmatch(lines.Text())
if flds == nil {
return nil, errors.New("Cannot parse record: " +
lines.Text())
}
rec := testRec{}
rec.vxid, err = strconv.Atoi(flds[1])
if err != nil {
return nil, errors.New("Cannot parse vxid " +
flds[1] + " in rec line: " +
lines.Text())
}
rec.tag = flds[2]
rec.rectype = rune(flds[3][0])
rec.payload = flds[4]
tx.recs = append(tx.recs, rec)
}
}
return txn, nil
}
var expVxidLog []testTx
func TestMain(m *testing.M) {
files := []string{testFile, vxidLog}
for _, file := range files {
if _, err := os.Stat(file); err != nil {
fmt.Fprintln(os.Stderr, "Cannot stat "+file+":", err)
os.Exit(1)
}
}
var err error
if expVxidLog, err = scrapeLog(vxidLog); err != nil {
fmt.Fprintln(os.Stderr, "Cannot parse "+vxidLog+":", err)
os.Exit(1)
}
os.Exit(m.Run())
}
import "testing"
var expTxTypeStr = map[TxType]string{
TxUnknown: "Unknown",
......@@ -176,7 +72,7 @@ func TestStatusString(t *testing.T) {
// Just make sure it doesn't panic, don't want tests to depend
// on specific values of the strings.
stati := [...]Status{
WriteErr, IOErr, Overrun, Abandoned, EOF, EOL, More, Stopped,
WriteErr, IOErr, Overrun, Abandoned, EOF, EOL, More,
}
for _, v := range stati {
_ = v.String()
......@@ -213,49 +109,6 @@ func TestNewRelease(t *testing.T) {
uninit.Release()
}
func TestAttachFile(t *testing.T) {
l := New()
defer l.Release()
err := l.AttachFile(failPath)
if err == nil {
t.Error("Expected AttachFile() to fail for " + failPath)
}
err = l.AttachFile(testFile)
if err != nil {
t.Fatal("Cannot attach to " + testFile + ": " + err.Error())
}
var n *Log
if err := n.AttachFile(testFile); err == nil {
t.Error("expected nil.AttachFile() to fail")
}
uninit := &Log{}
if err := uninit.AttachFile(testFile); err == nil {
t.Error("expected uninitialized.AttachFile() to fail")
}
}
func TestRead(t *testing.T) {
rdHndlr := func(txGrp []Tx, rdstatus Status) bool {
return false
}
var n *Log
if err := n.Read(rdHndlr, nil); err == nil {
t.Error("expected nil.Read() to fail")
}
uninit := &Log{}
if err := uninit.Read(rdHndlr, nil); err == nil {
t.Error("expected uninitialized.Read() to fail")
}
l := New()
defer l.Release()
if err := l.Read(rdHndlr, nil); err == nil {
t.Error("expected l.Read() before attach to fail")
}
}
func TestError(t *testing.T) {
// just make sure there is no panic for the nil or "no error" cases
var n *Log
......@@ -267,222 +120,3 @@ func TestError(t *testing.T) {
defer l.Release()
_ = l.Error()
}
func checkTxGroups(t *testing.T, txGrps [][]Tx, expTxn []testTx) {
if len(txGrps) != len(expTxn) {
t.Fatalf("number of transaction groups expected=%v got=%v",
len(expTxn), len(txGrps))
return
}
for i, txGrp := range txGrps {
if len(txGrp) != 1 {
t.Fatalf("transactions in group expected=1 got=%v",
len(txGrp))
}
tx := txGrp[0]
expTx := expTxn[i]
if tx.Level != expTx.level {
t.Errorf("tx Level expected=%v got=%v", expTx.level,
tx.Level)
}
if tx.VXID != expTx.vxid {
t.Errorf("tx VXID expected=%v got=%v", expTx.vxid,
tx.VXID)
}
if tx.Type.String() != expTx.txtype {
t.Errorf("tx Type expected=%v got=%v", expTx.txtype,
tx.Type.String())
}
if tx.Type != TxRaw {
if len(expTx.recs) == 0 {
t.Error("scraped tx has no records")
break
}
if expTx.recs[0].tag != "Begin" {
t.Error("scraped tx does not begin with Begin")
}
payload := expTx.recs[0].payload
flds := begin.FindStringSubmatch(payload)
if len(flds) != 3 {
t.Errorf("could not parse Begin: " + payload)
break
}
if tx.Reason.String() != flds[2] {
t.Errorf("tx reason expected=%v got=%s",
flds[2], tx.Reason)
}
// XXX test parent vxid in grouped txn
}
if len(tx.Records) != len(expTx.recs) {
t.Errorf("tx number of records expected=%v got=%v",
len(expTx.recs), len(tx.Records))
continue
}
for j, rec := range tx.Records {
expRec := expTx.recs[j]
if rec.VXID != uint(expRec.vxid) {
t.Errorf("rec vxid expected=%v got=%v",
expRec.vxid, rec.VXID)
}
if rec.Tag.String() != expRec.tag {
t.Errorf("rec tag expected=%v got=%s",
expRec.tag, rec.Tag)
}
if rune(rec.Type) != expRec.rectype {
t.Errorf("rec type expected=%v got=%v",
expRec.rectype, rune(rec.Type))
}
if len(rec.Payload) != len(expRec.payload) {
t.Errorf("rec payload length "+
"expected=%v got=%v",
len(expRec.payload), len(rec.Payload))
}
if rec.Payload != expRec.payload {
t.Errorf("rec payload expected='%v' got='%v'",
expRec.payload, rec.Payload)
}
}
}
}
func TestDefaultRead(t *testing.T) {
var txGrps [][]Tx
l := New()
defer l.Release()
err := l.AttachFile(testFile)
if err != nil {
t.Fatal("Cannot attach to " + testFile + ": " + err.Error())
}
// Test coverage for this case
if err := l.Read(nil, nil); err == nil {
t.Error("expected l.Read() with nil ReadHandler to fail")
}
var status Status
rdHndlr := func(txGrp []Tx, rdstatus Status) bool {
if rdstatus != More {
status = rdstatus
return false
}
txGrps = append(txGrps, txGrp)
return true
}
err = l.Read(rdHndlr, nil)
if err != nil {
t.Fatal("Read(): " + err.Error())
}
// More test coverage
if err := l.Read(rdHndlr, nil); err == nil {
t.Error("expected l.Read() with read already running to fail")
}
if status != Status(EOF) {
t.Errorf("expected EOF status got: %s", status.Error())
}
checkTxGroups(t, txGrps, expVxidLog)
}
func readHndl(txGrp []Tx, status Status, txGrps *[][]Tx, end *Status) bool {
if status != More {
*end = status
return false
}
*txGrps = append(*txGrps, txGrp)
return true
}
func TestConcurrentRead(t *testing.T) {
var txGrps1, txGrps2 [][]Tx
l1 := New()
defer l1.Release()
err := l1.AttachFile(testFile)
if err != nil {
t.Fatal("l1 attach to " + testFile + ": " + err.Error())
}
l2 := New()
defer l2.Release()
err = l2.AttachFile(testFile)
if err != nil {
t.Fatal("l2 attach to " + testFile + ": " + err.Error())
}
var endStatus1 Status
var endStatus2 Status
hndlr1 := func(txGrp []Tx, rdstatus Status) bool {
return readHndl(txGrp, rdstatus, &txGrps1, &endStatus1)
}
hndlr2 := func(txGrp []Tx, rdstatus Status) bool {
return readHndl(txGrp, rdstatus, &txGrps2, &endStatus2)
}
err1 := l1.Read(hndlr1, nil)
err2 := l2.Read(hndlr2, nil)
if err1 != nil {
t.Fatal("l1.Read(): " + err.Error())
}
if err2 != nil {
t.Fatal("l2.Read(): " + err.Error())
}
if endStatus1 != Status(EOF) {
t.Errorf("expected EOF status got: %s", endStatus1.Error())
}
if endStatus2 != Status(EOF) {
t.Errorf("expected EOF status got: %s", endStatus2.Error())
}
checkTxGroups(t, txGrps1, expVxidLog)
checkTxGroups(t, txGrps2, expVxidLog)
}
func BenchmarkReadFileOneTxGrp(b *testing.B) {
for i := 0; i < b.N; i++ {
b.StopTimer()
b.ReportAllocs()
l := New()
defer l.Release()
if err := l.AttachFile(testFile); err != nil {
b.Fatal("AttachFile():", err)
return
}
rdHndlr := func(txGrp []Tx, rdstatus Status) bool {
return false
}
b.StartTimer()
if err := l.Read(rdHndlr, nil); err != nil {
b.Fatal("Read():", err)
}
}
}
func BenchmarkReadFileAllTxGrps(b *testing.B) {
for i := 0; i < b.N; i++ {
b.StopTimer()
b.ReportAllocs()
l := New()
defer l.Release()
if err := l.AttachFile(testFile); err != nil {
b.Fatal("AttachFile():", err)
return
}
rdHndlr := func(txGrp []Tx, rdstatus Status) bool {
return true
}
b.StartTimer()
if err := l.Read(rdHndlr, nil); err != nil {
b.Fatal("Read():", err)
}
}
}
This diff is collapsed.
/*-
* Copyright (c) 2018 UPLEX Nils Goroll Systemoptimierung
* All rights reserved
*
* Author: Geoffrey Simmons <geoffrey.simmons@uplex.de>
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*/
package log
/*
#cgo pkg-config: varnishapi
#include <stdio.h>
#include <vapi/vsl.h>
*/
import "C"
import (
"errors"
"strconv"
"strings"
)
// A Query provides the means to read aggregated transactions from the
// log. A Query must be created from a Cursor using the NewQuery
// function.
type Query struct {
cursor *Cursor
incomplete map[uint32]*Tx
grp Grouping
}
var str2type = map[string]TxType{
"sess": Sess,
"req": Req,
"bereq": BeReq,
}
var str2reason = map[string]Reason{
"HTTP/1": HTTP1,
"rxreq": RxReq,
"esi": ESI,
"restart": Restart,
"pass": Pass,
"fetch": Fetch,
"bgfetch": BgFetch,
"pipe": Pipe,
}
func (c *Cursor) rawTxGrp() []Tx {
rec := c.Record()
return []Tx{Tx{
Type: TxRaw,
Reason: ReasonUnknown,
Level: 0,
VXID: uint32(rec.VXID),
ParentVXID: 0,
Records: []Record{rec},
}}
}
// NewQuery creates a Query from a Cursor. The Grouping argument indicates
// how transactions from the log should be aggregated: VXID, Request, Session
// or Raw grouping.
//
// XXX: request and session grouping not yet implement, nor are VSL
// queries
func (c *Cursor) NewQuery(grp Grouping, query string) (*Query, error) {
if c == nil || c.cursor == nil {
return nil, errors.New("Cursor is nil or uninitialized")
}
if err := c.log.checkNil(); err != nil {
return nil, err
}
txMap := make(map[uint32]*Tx)
return &Query{cursor: c, grp: grp, incomplete: txMap}, nil
}
// NextTxGroup returns the next group of transactions from the
// log. Transactions are aggregated according to the Grouping argument
// given in NewQuery().
//
// The Status return values indicates whether there was an error
// reading the log, whether there was an end condition (EOL or EOF),
// or if there are more transactions to be read from the log.
func (q *Query) NextTxGroup() ([]Tx, Status) {
for {
status := q.cursor.Next()
if status != More {
return nil, status
}
if q.grp == GRaw {
return q.cursor.rawTxGrp(), status
}
backend := q.cursor.Backend()
client := q.cursor.Client()
if !backend && !client {
continue
}
rec := q.cursor.Record()
vxid32 := uint32(rec.VXID)
incmplTx, exists := q.incomplete[vxid32]
if exists {
incmplTx.Records = append(incmplTx.Records, rec)
if rec.Tag == Tag(C.SLT_End) {
delete(q.incomplete, vxid32)
txGrp := []Tx{*incmplTx}
return txGrp, status
}
continue
}
tx := Tx{
VXID: vxid32,
Type: TxUnknown,
Reason: ReasonUnknown,
Level: 1,
Records: []Record{rec},
}
if rec.Tag == Tag(C.SLT_Begin) {
begin := strings.Split(rec.Payload, " ")
if txtype, exists := str2type[begin[0]]; exists {
tx.Type = txtype
}
if pvxid, err := strconv.Atoi(begin[1]); err != nil {
tx.ParentVXID = uint32(pvxid)
}
if reason, exists := str2reason[begin[2]]; exists {
tx.Reason = reason
}
}
q.incomplete[vxid32] = &tx
}
}
// +build e2e
/*-
* Copyright (c) 2018 UPLEX Nils Goroll Systemoptimierung
* All rights reserved
*
* Author: Geoffrey Simmons <geoffrey.simmons@uplex.de>
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*/
// This test is only run if go test is invoked with a -tags option
// that includes "e2e". It requires that two instances of Varnish are
// running: a default instance (varnishd invoked without the -n
// option), and an instance named "gotest" (varnishd -n "gotest").
package log
import (
"net/http"
"testing"
)
// XXX remove the duplication with the test helpers in main_test.go
var tx2recType = map[TxType]RecordType{
Sess: Client,
Req: Client,
BeReq: Backend,
}
const (
undefLen int = int(^uint(0) >> 1)
txTypeUndef TxType = TxUnknown
txReasonUndef Reason = ReasonUnknown
txLevelUndef uint = ^uint(0)
txVXIDUndef uint32 = ^uint32(0)
recTypeUndef RecordType = RecordType(0)
recTagUndef string = "undefined tag string"
recVXIDUndef uint = ^uint(0)
recPayloadUndef string = "undefined payload string"
)
type expRec struct {
tag string
payload string
}
type expTx struct {
typ TxType
reason Reason
level uint
vxid uint32
pvxid uint32
recs []expRec
}
type expTxLen struct {
expRecs int
tx expTx
}
type expTxGrp struct {
expTx int
txGrp []expTxLen
}
var expDefaultRead = map[TxType]expTxGrp{
BeReq: {
expTx: 1,
txGrp: []expTxLen{{
expRecs: undefLen,
tx: expTx{
typ: BeReq,
reason: Fetch,
level: 1,
vxid: txVXIDUndef,
pvxid: txVXIDUndef,
recs: nil,
}}},
},
Req: {
expTx: 1,
txGrp: []expTxLen{{
expRecs: undefLen,
tx: expTx{
typ: Req,
reason: RxReq,
level: 1,
vxid: txVXIDUndef,
pvxid: txVXIDUndef,
recs: nil,
}}},
},
Sess: {
expTx: 1,
txGrp: []expTxLen{{
expRecs: 5,
tx: expTx{
typ: Sess,
reason: HTTP1,
level: 1,
vxid: txVXIDUndef,
pvxid: 0,
recs: []expRec{
{
tag: "Begin",
payload: "sess 0 HTTP/1",
},
{
tag: "SessOpen",
payload: recPayloadUndef,
},
{
tag: "Link",
payload: recPayloadUndef,
},
{
tag: "SessClose",
payload: recPayloadUndef,
},
{
tag: "End",
payload: "",
},
}},
}},
},
}
func checkRec(t *testing.T, rec Record, expRec expRec) {
if expRec.tag != recTagUndef && rec.Tag.String() != expRec.tag {
t.Errorf("record tag want=%v got=%s", expRec.tag, rec.Tag)
}
if expRec.payload != recPayloadUndef && rec.Payload != expRec.payload {
t.Errorf("record payload want=%v got=%s", expRec.payload,
rec.Payload)
}
}
func checkTx(t *testing.T, tx Tx, expTxLen expTxLen) {
expTx := expTxLen.tx
if expTx.typ != txTypeUndef && expTx.typ != tx.Type {
t.Errorf("tx.Type want=%v got=%v", expTx.typ, tx.Type)
}
if expTx.reason != txReasonUndef && expTx.reason != tx.Reason {
t.Errorf("tx.Reason want=%v got=%v", expTx.reason, tx.Reason)
}
if expTx.level != txLevelUndef && expTx.level != tx.Level {
t.Errorf("tx.Level want=%v got=%v", expTx.level, tx.Level)
}
if expTx.vxid != txVXIDUndef && expTx.vxid != tx.VXID {
t.Errorf("tx.VXID want=%v got=%v", expTx.vxid, tx.VXID)
}
if expTx.pvxid != txVXIDUndef && expTx.pvxid != tx.ParentVXID {
t.Errorf("tx.ParentVXID want=%v got=%v", expTx.pvxid,
tx.ParentVXID)
}
if expTxLen.expRecs != undefLen && expTxLen.expRecs != len(tx.Records) {
t.Errorf("number of records want=%v got=%v", expTxLen.expRecs,
len(tx.Records))
}
if tx.Records == nil || len(tx.Records) < 1 {
t.Fatal("tx has no records:", tx)
return
}
if tx.Type == TxRaw {
if len(tx.Records) != 1 {
t.Errorf("tx type raw number of records want=1 got=%v",
len(tx.Records))
}
if tx.Records[0].VXID != uint(tx.VXID) {
t.Errorf("record VXID want=%v got=%v", uint(tx.VXID),
tx.Records[0].VXID)
}
} else {
// These conditions always hold for the records in a
// non-raw transaction.
if tx.Records[0].Tag.String() != "Begin" {
t.Errorf("tx first record tag want=Begin got=%s",
tx.Records[0].Tag)
}
lastIdx := len(tx.Records) - 1
if tx.Records[lastIdx].Tag.String() != "End" {
t.Errorf("tx last record tag want=End got=%s",
tx.Records[lastIdx].Tag)
}
if tx.Records[lastIdx].Payload != "" {
t.Errorf("tx last record tag want=%q got=%q", "",
tx.Records[lastIdx].Payload)
}
expType, ok := tx2recType[tx.Type]
if !ok {
t.Fatal("tx2recType no value for", tx.Type)
t.FailNow()
return
}
for _, rec := range tx.Records {
if rec.Type != expType {
t.Errorf("record type want=%c got=%c", expType,
rec.Type)
}
if rec.VXID != uint(tx.VXID) {
t.Errorf("record VXID want=%v got=%v",
uint(tx.VXID), rec.VXID)
}
}
}
if expTx.recs != nil {
for i, rec := range tx.Records {
checkRec(t, rec, expTx.recs[i])
}
}
}
func checkTxGrp(t *testing.T, txGrp []Tx, expTxGrp expTxGrp) {
if expTxGrp.expTx != undefLen && len(txGrp) != expTxGrp.expTx {
t.Fatalf("number of tx in group got=%v want=%v",
len(txGrp), expTxGrp.expTx)
return
}
for i, tx := range txGrp {
checkTx(t, tx, expTxGrp.txGrp[i])
}
}
func TestAttachQuery(t *testing.T) {
l := New()
defer l.Release()
if err := l.Attach(""); err != nil {
t.Fatal("Attach(default):", err)
return
}
c, err := l.NewCursor()
if err != nil {
t.Fatal("NewCursor():", err)
return
}
defer c.Delete()
q, err := c.NewQuery(GRaw, "")
if err != nil {
t.Fatal("NewQuery():", err)
return
}
if q == nil {
t.Fatal("NewQuery() returned nil")
return
}
ln := New()
defer ln.Release()
if err := ln.Attach("gotest"); err != nil {
t.Fatal("Attach(gotest):", err)
return
}
cn, err := ln.NewCursor()
if err != nil {
t.Fatal("NewCursor() for gotest:", err)
return
}
defer cn.Delete()
qn, err := cn.NewQuery(GRaw, "")
if err != nil {
t.Fatal("NewQuery():", err)
return
}
if qn == nil {
t.Fatal("NewQuery() for gotest returned nil")
return
}
}
func TestE2EQueryRaw(t *testing.T) {
l := New()
defer l.Release()
if err := l.Attach(""); err != nil {
t.Fatal("Attach(default):", err)
return
}
c, err := l.NewCursor()
if err != nil {
t.Fatal("NewCursor():", err)
return
}
defer c.Delete()
q, err := c.NewQuery(GRaw, "")
if err != nil {
t.Fatal("NewQuery():", err)
return
}
client := &http.Client{}
req, err := http.NewRequest("GET", url("/uncacheable"), nil)
if err != nil {
t.Fatal("/uncacheable Request", err)
}
req.Close = true
stopChan := make(chan bool)
respChan := make(chan bool)
go func() {
sent := false
for {
select {
case <-stopChan:
return
default:
_, err := client.Do(req)
if err != nil {
t.Fatal("GET request:", err)
return
}
if !sent {
respChan <- true
sent = true
}
}
}
}()
<-respChan
var txGrps [][]Tx
for i := 0; i < 100; i++ {
txGrp, status := q.NextTxGroup()
txGrps = append(txGrps, txGrp)
if status == EOL {
break
}
}
stopChan <- true
t.Logf("read %d txGrps", len(txGrps))
t.Log(txGrps)
}
func TestE2EQueryVXID(t *testing.T) {
l := New()
defer l.Release()
if err := l.Attach(""); err != nil {
t.Fatal("Attach(default):", err)
return
}
c, err := l.NewCursor()
if err != nil {
t.Fatal("NewCursor():", err)
return
}
defer c.Delete()
q, err := c.NewQuery(VXID, "")
if err != nil {
t.Fatal("NewQuery():", err)
return
}
var status Status
var txGrps [][]Tx
stopChan := make(chan bool)
go func() {
for len(txGrps) < 3 {
txGrp, txStatus := q.NextTxGroup()
status = txStatus
if txStatus == EOL {
continue
}
if txStatus != More {
break
}
txGrps = append(txGrps, txGrp)
}
stopChan <- true
}()
client := &http.Client{}
req, err := http.NewRequest("GET", url("/uncacheable"), nil)
if err != nil {
t.Fatal("/uncacheable Request", err)
}
req.Close = true
resp, err := client.Do(req)
if err != nil {
t.Fatal("/uncacheable Response", err)
}
<-stopChan
if status != More && status != EOL {
t.Error("Read status:", status)
}
if len(txGrps) != 3 {
t.Errorf("number of tx groups want=3 got=%v", len(txGrps))
}
for _, txGrp := range txGrps {
tx := txGrp[0]
checkTxGrp(t, txGrp, expDefaultRead[tx.Type])
if tx.Type == BeReq {
checkBereq(t, tx, *req)
}
if tx.Type == Req {
checkReqResp(t, tx, *req, *resp)
}
}
}
/*-
* Copyright (c) 2018 UPLEX Nils Goroll Systemoptimierung
* All rights reserved
*
* Author: Geoffrey Simmons <geoffrey.simmons@uplex.de>
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*/
package log
import "testing"
func TestNewQuery(t *testing.T) {
l := New()
defer l.Release()
c, err := l.NewCursorFile(testFile)
if err != nil {
t.Fatal("NewCursorFile(bin.log):", err)
return
}
defer c.Delete()
q, err := c.NewQuery(GRaw, "")
if err != nil {
t.Fatal("NewQuery():", err)
return
}
if q == nil {
t.Fatal("NewQuery() returned nil")
return
}
var n *Cursor
if _, err := n.NewQuery(GRaw, ""); err == nil {
t.Error("Expected nil.NewQuery() to fail")
}
uninit := &Cursor{}
if _, err := uninit.NewQuery(GRaw, ""); err == nil {
t.Error("Expected uninitiailized.NewQuery() to fail")
}
}
var expFirstTx = testTx{
level: 0,
txtype: "Record",
vxid: 0,
recs: []testRec{expFirstRec},
}
func TestNextTxGroupRawOne(t *testing.T) {
l := New()
defer l.Release()
c, err := l.NewCursorFile(testFile)
if err != nil {
t.Fatal("NewCursorFile(bin.log):", err)
return
}
defer c.Delete()
q, err := c.NewQuery(GRaw, "")
if err != nil {
t.Fatal("NewQuery():", err)
return
}
txn, status := q.NextTxGroup()
// XXX change the status constant, this is vsl_more
if status != More {
t.Errorf("NextTxGroup() status want=More got=%d\n",
uint8(status))
}
if len(txn) != 1 {
t.Errorf("len(Txn) from NextTxGroup want=1 got=%v\n",
len(txn))
}
tx := txn[0]
checkExpTx(t, tx, expFirstTx)
if tx.Reason != ReasonUnknown {
t.Errorf("tx Reason want=unknown got=%v\n", tx.Reason)
}
if tx.ParentVXID != 0 {
t.Errorf("tx ParentVXID want=0 got=%v\n", tx.ParentVXID)
}
}
func TestNextTxGroupRawAll(t *testing.T) {
l := New()
defer l.Release()
c, err := l.NewCursorFile(testFile)
if err != nil {
t.Fatal("NewCursorFile(bin.log):", err)
return
}
defer c.Delete()
q, err := c.NewQuery(GRaw, "")
if err != nil {
t.Fatal("NewQuery():", err)
return
}
for _, rec := range expRawLog {
txn, status := q.NextTxGroup()
// XXX should be More
if status != More {
t.Errorf("NextTxGroup() status want=More got=%d\n",
uint8(status))
}
if len(txn) != 1 {
t.Errorf("len(Txn) from NextTxGroup want=1 got=%v\n",
len(txn))
}
tx := txn[0]
expTx := testTx{
level: 0,
txtype: "Record",
vxid: uint32(rec.vxid),
recs: []testRec{rec},
}
checkExpTx(t, tx, expTx)
if tx.Reason != ReasonUnknown {
t.Errorf("tx Reason want=unknown got=%v\n", tx.Reason)
}
if tx.ParentVXID != 0 {
t.Errorf("tx ParentVXID want=0 got=%v\n", tx.ParentVXID)
}
}
}
func TestNextTxGroupDefault(t *testing.T) {
var txGrps [][]Tx
l := New()
defer l.Release()
c, err := l.NewCursorFile(testFile)
if err != nil {
t.Fatal("NewCursorFile(bin.log):", err)
return
}
defer c.Delete()
q, err := c.NewQuery(VXID, "")
if err != nil {
t.Fatal("NewQuery():", err)
return
}
var status Status
for {
txn, rdstatus := q.NextTxGroup()
if rdstatus != More {
status = rdstatus
break
}
txGrps = append(txGrps, txn)
}
if status != Status(EOF) {
t.Errorf("expected EOF status got: %s", status.Error())
}
checkTxGroups(t, txGrps, expVxidLog)
}
func TestNextTxGroupConcurrent(t *testing.T) {
var txGrps1, txGrps2 [][]Tx
l := New()
defer l.Release()
c1, err := l.NewCursorFile(testFile)
if err != nil {
t.Fatal("NewCursorFile(bin.log):", err)
return
}
defer c1.Delete()
q1, err := c1.NewQuery(VXID, "")
if err != nil {
t.Fatal("NewQuery():", err)
return
}
c2, err := l.NewCursorFile(testFile)
if err != nil {
t.Fatal("NewCursorFile(bin.log):", err)
return
}
defer c2.Delete()
q2, err := c2.NewQuery(VXID, "")
if err != nil {
t.Fatal("NewQuery():", err)
return
}
for {
txGrp1, status1 := q1.NextTxGroup()
txGrp2, status2 := q2.NextTxGroup()
if status1 == EOF && status2 == EOF {
break
}
// XXX
if status1 != More {
t.Fatal("NextTxGroup() unexpected status:", status1)
return
}
if status2 != More {
t.Fatal("NextTxGroup() unexpected status:", status2)
return
}
txGrps1 = append(txGrps1, txGrp1)
txGrps2 = append(txGrps2, txGrp2)
}
checkTxGroups(t, txGrps1, expVxidLog)
checkTxGroups(t, txGrps2, expVxidLog)
}
This diff is collapsed.
......@@ -26,19 +26,55 @@
* SUCH DAMAGE.
*/
#include "log.h"
#include "_cgo_export.h"
static int
dispatch_wrapped(struct VSL_data *vsl, struct VSL_transaction * const trans[],
void *priv)
{
(void) vsl;
return (publish((void *)trans, *((uint64_t *)priv)));
package log
/*
#cgo pkg-config: varnishapi
#include <stdio.h>
#include <vapi/vsl.h>
*/
import "C"
import "unsafe"
const (
identmask = ^uint32(3 << 30)
lenmask = uint32(0x0ffff)
clientmarker = uint32(1 << 30)
backendmarker = uint32(1 << 31)
maxRecLen = 4096
)
func (rec *C.struct_VSLC_ptr) tag() Tag {
ptr := (*uint32)(unsafe.Pointer(rec.ptr))
return Tag(uint8(*ptr >> 24))
}
func (rec *C.struct_VSLC_ptr) vxid() uint32 {
ptr := (*[2]uint32)(unsafe.Pointer(rec.ptr))
return ptr[1] & identmask
}
func (rec *C.struct_VSLC_ptr) length() uint32 {
ptr := (*uint32)(unsafe.Pointer(rec.ptr))
return *ptr & lenmask
}
func (rec *C.struct_VSLC_ptr) payload() []byte {
length := rec.length() - 1
ptr := (*[maxRecLen]byte)(unsafe.Pointer(uintptr(unsafe.Pointer(rec.ptr)) + 2*unsafe.Sizeof(*rec.ptr)))
b := ptr[:length]
bytes := make([]byte, length, length)
copy(bytes, b)
return bytes
}
func (rec *C.struct_VSLC_ptr) client() bool {
ptr := (*[2]uint32)(unsafe.Pointer(rec.ptr))
return ptr[1]&clientmarker != 0
}
int
dispatch(struct VSLQ *vslq, uint64_t key)
{
return VSLQ_Dispatch(vslq, dispatch_wrapped, (void *)&key);
func (rec *C.struct_VSLC_ptr) backend() bool {
ptr := (*[2]uint32)(unsafe.Pointer(rec.ptr))
return ptr[1]&backendmarker != 0
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment