Ver Fonte

added nginxLogReplay

Tobias von Dewitz há 6 anos atrás
pai
commit
c96e419972
3 ficheiros alterados com 597 adições e 402 exclusões
  1. 198 0
      apache.go
  2. 6 402
      main.go
  3. 393 0
      nginx.go

+ 198 - 0
apache.go

@@ -0,0 +1,198 @@
+package main
+
+import (
+	"bufio"
+	"log"
+	"os"
+	"strings"
+	"time"
+
+	"git.scraperwall.com/scw/data"
+	"github.com/Songmu/axslogparser"
+	"github.com/hpcloud/tail"
+)
+
+func apacheLogReplay(logfile string) {
+	file, err := os.Open(logfile)
+	if err != nil {
+		log.Fatalf("%s: %s", logfile, err)
+	}
+	defer file.Close()
+
+	scanner := bufio.NewScanner(file)
+
+	var p axslogparser.Parser
+	parserSet := false
+
+	var tOffset time.Duration
+
+	for scanner.Scan() {
+		l := scanner.Text()
+
+		if err := scanner.Err(); err != nil {
+			log.Fatal(err)
+		}
+
+		if !parserSet {
+			p, _, err = axslogparser.GuessParser(l)
+			if err != nil {
+				log.Println(err)
+				continue
+			}
+			parserSet = true
+		}
+
+		logEntry, err := p.Parse(l)
+		if err != nil {
+			log.Println(err)
+			continue
+		}
+
+		if tOffset == 0 {
+			tOffset = time.Now().Sub(logEntry.Time)
+		}
+
+		ts := logEntry.Time.Add(tOffset)
+		if ts.After(time.Now()) {
+			time.Sleep(ts.Sub(time.Now()))
+		}
+
+		// fmt.Println(l)
+		remote := logEntry.Host
+		if *useXForwardedAsSource && logEntry.ForwardedFor != "" {
+			remote = logEntry.ForwardedFor
+		}
+
+		// only use the first host in case there are multiple hosts in the log
+		if cidx := strings.Index(remote, ","); cidx >= 0 {
+			remote = remote[0:cidx]
+		}
+
+		// extract the virtual host
+		var virtualHost string
+		vhost := logEntry.VirtualHost
+		if vhost != "" {
+			vhostAndPort := strings.Split(logEntry.VirtualHost, ":")
+			virtualHost = vhostAndPort[0]
+		} else {
+			if config.HostName != "" {
+				vhost = config.HostName
+			} else {
+				vhost = "[not available]"
+			}
+		}
+
+		request := data.Request{
+			IpSrc:         remote,
+			IpDst:         "127.0.0.1",
+			PortSrc:       0,
+			PortDst:       0,
+			TcpSeq:        0,
+			CreatedAt:     (logEntry.Time.Add(tOffset)).UnixNano(),
+			Url:           logEntry.RequestURI,
+			Method:        logEntry.Method,
+			Host:          virtualHost,
+			Protocol:      logEntry.Protocol,
+			Origin:        remote,
+			Source:        remote,
+			Referer:       logEntry.Referer,
+			XForwardedFor: logEntry.ForwardedFor,
+			UserAgent:     logEntry.UserAgent,
+		}
+
+		if config.Trace {
+			log.Printf("[%s] %s\n", request.Source, request.Url)
+		}
+
+		count++
+		publishRequest(config.NatsQueue, &request)
+
+	}
+}
+
+func apacheLogCapture(logfile string) {
+	if _, err := os.Stat(logfile); err != nil {
+		log.Fatalf("%s: %s", logfile, err)
+	}
+
+	t, err := tail.TailFile(logfile, tail.Config{
+		Follow:   true,                                 // follow the file
+		ReOpen:   true,                                 // reopen log file when it gets closed/rotated
+		Logger:   tail.DiscardingLogger,                // don't log anything
+		Location: &tail.SeekInfo{Offset: 0, Whence: 2}, // start at the end of the file
+	})
+	if err != nil {
+		log.Fatalf("%s: %s", logfile, err)
+	}
+
+	var p axslogparser.Parser
+	parserSet := false
+
+	for line := range t.Lines {
+		l := line.Text
+
+		if !parserSet {
+			p, _, err = axslogparser.GuessParser(l)
+			if err != nil {
+				log.Println(err)
+				continue
+			}
+			parserSet = true
+		}
+
+		logEntry, err := p.Parse(l)
+		if err != nil {
+			log.Println(err)
+			continue
+		}
+
+		remote := logEntry.Host
+		if config.UseXForwardedAsSource && logEntry.ForwardedFor != "" {
+			remote = logEntry.ForwardedFor
+		}
+
+		// only use the first host in case there are multiple hosts in the log
+		if cidx := strings.Index(remote, ","); cidx >= 0 {
+			remote = remote[0:cidx]
+		}
+
+		// extract the virtual host
+		var virtualHost string
+		vhost := logEntry.VirtualHost
+		if vhost != "" {
+			vhostAndPort := strings.Split(logEntry.VirtualHost, ":")
+			virtualHost = vhostAndPort[0]
+		} else {
+			if config.HostName != "" {
+				vhost = config.HostName
+			} else {
+				vhost = "[not available]"
+			}
+		}
+
+		request := data.Request{
+			IpSrc:         remote,
+			IpDst:         "127.0.0.1",
+			PortSrc:       0,
+			PortDst:       0,
+			TcpSeq:        0,
+			CreatedAt:     logEntry.Time.UnixNano(),
+			Url:           logEntry.RequestURI,
+			Method:        logEntry.Method,
+			Host:          virtualHost,
+			Protocol:      logEntry.Protocol,
+			Origin:        remote,
+			Source:        remote,
+			Referer:       logEntry.Referer,
+			XForwardedFor: logEntry.ForwardedFor,
+			UserAgent:     logEntry.UserAgent,
+		}
+
+		if config.Trace {
+			log.Printf("[%s] %s\n", request.Source, request.Url)
+		}
+
+		count++
+		publishRequest(config.NatsQueue, &request)
+	}
+}

+ 6 - 402
main.go

@@ -14,20 +14,15 @@ import (
 	"net"
 	"net/http"
 	"os"
-	"regexp"
 	"strings"
 	"time"
 
 	"github.com/BurntSushi/toml"
-	"github.com/Songmu/axslogparser"
 	"github.com/google/gopacket"
 	"github.com/google/gopacket/layers"
 	"github.com/google/gopacket/pcap"
-	"github.com/hpcloud/tail"
-	"github.com/kr/pretty"
 	"github.com/nats-io/nats"
 	"github.com/nats-io/nats/encoders/protobuf"
-	"github.com/satyrius/gonx"
 
 	"git.scraperwall.com/scw/ajp13"
 	"git.scraperwall.com/scw/data"
@@ -54,6 +49,7 @@ var (
 	apacheReplay          = flag.String("apache-replay", "", "Apache log file to replay into the system")
 	nginxLog              = flag.String("nginx-log", "", "Nginx log file to tail")
 	nginxFormat           = flag.String("nginx-format", "", "The nginx log file format")
+	nginxReplay           = flag.String("nginx-replay", "", "Replay this nginx logfile")
 	hostName              = flag.String("hostname", "", "Override the captured hostname with this one")
 	accessWatchKey        = flag.String("access-watch-key", "", "access.watch API key")
 	configFile            = flag.String("config", "", "The location of the TOML config file")
@@ -99,6 +95,7 @@ type Config struct {
 	ApacheReplay          string
 	NginxLog              string
 	NginxLogFormat        string
+	NginxReplay           string
 	HostName              string
 	AccessWatchKey        string
 }
@@ -130,6 +127,7 @@ func (c Config) print() {
 	fmt.Printf("Apache Replay:         %s\n", c.ApacheReplay)
 	fmt.Printf("Nginx Log:             %s\n", c.NginxLog)
 	fmt.Printf("Nginx Log Format:      %s\n", c.NginxLogFormat)
+	fmt.Printf("NginxReplay:           %s\n", c.NginxReplay)
 	fmt.Printf("HostName:              %s\n", c.HostName)
 	fmt.Printf("AccessWatchKey:        %s\n", c.AccessWatchKey)
 	fmt.Printf("UseXForwardedAsSource: %t\n", c.UseXForwardedAsSource)
@@ -184,6 +182,8 @@ func main() {
 		replayFile()
 	} else if config.ApacheReplay != "" {
 		apacheLogReplay(config.ApacheReplay)
+	} else if config.NginxReplay != "" {
+		nginxLogReplay(config.NginxReplay, config.NginxLogFormat)
 	} else if config.ApacheLog != "" {
 		apacheLogCapture(config.ApacheLog)
 	} else if config.Live {
@@ -250,403 +250,6 @@ func connectToNATS() error {
 	return nil
 }
 
-func nginxLogCapture(logfile, format string) {
-	if _, err := os.Stat(logfile); err != nil {
-		log.Fatalf("%s: %s", logfile, err)
-	}
-
-	t, err := tail.TailFile(logfile, tail.Config{
-		Follow:   true,                                 // follow the file
-		ReOpen:   true,                                 // reopen log file when it gets closed/rotated
-		Logger:   tail.DiscardingLogger,                // don't log anything
-		Location: &tail.SeekInfo{Offset: 0, Whence: 2}, // start at the end of the file
-	})
-	if err != nil {
-		log.Fatalf("%s: %s", logfile, err)
-	}
-
-	// `$remote_addr - $remote_user [$time_local] "$request" $status $body_bytes_sent "$http_referer" "$http_user_agent"`
-	p := gonx.NewParser(format)
-	reqRegexp := regexp.MustCompile(`^([A-Z]+)\s+(.+?)\s+(HTTP/\d+\.\d+)$`)
-
-	for line := range t.Lines {
-		var remote string
-		var err error
-
-		l := line.Text
-
-		logEntry, err := p.ParseString(l)
-		if err != nil {
-			log.Println(err)
-			continue
-		}
-
-		if config.Trace {
-			pretty.Println(logEntry)
-		}
-
-		remote, err = logEntry.Field("remote_addr")
-		if err != nil {
-			log.Println(err)
-			continue
-		}
-
-		xff, err := logEntry.Field("http_x_forwarded_for")
-		if err != nil && xff != "" {
-			if config.Trace {
-				log.Printf("Using XFF: %s\n", xff)
-			}
-
-			remote = xff
-		}
-
-		if remote == "" {
-			log.Println("remote is empty: ignoring request.")
-			continue
-		}
-
-		// only use the first host in case there are multiple hosts in the log
-		if cidx := strings.Index(remote, ","); cidx >= 0 {
-			remote = remote[0:cidx]
-		}
-
-		timestampStr, err := logEntry.Field("time_local")
-		if err != nil {
-			log.Println(err)
-			continue
-		}
-		timeStamp, err := time.Parse("02/Jan/2006:15:04:05 -0700", timestampStr)
-		if err != nil {
-			log.Println(err)
-			continue
-		}
-
-		httpRequest, err := logEntry.Field("request")
-		if err != nil {
-			log.Println(err)
-			continue
-		}
-
-		reqData := reqRegexp.FindStringSubmatch(httpRequest)
-		if len(reqData) < 4 {
-			log.Printf("reqData is too short: %d instead of 4\n", len(reqData))
-			continue
-		}
-
-		host := config.HostName
-		if host == "" {
-			host = "[not available]"
-		}
-
-		request := data.Request{
-			IpSrc:     remote,
-			Origin:    remote,
-			Source:    remote,
-			IpDst:     "127.0.0.1",
-			PortSrc:   0,
-			PortDst:   0,
-			TcpSeq:    0,
-			CreatedAt: timeStamp.Unix(),
-			Url:       reqData[2],
-			Method:    reqData[1],
-			Host:      host,
-			Protocol:  reqData[3],
-		}
-
-		request.Referer, _ = logEntry.Field("http_referer")
-		request.UserAgent, _ = logEntry.Field("http_user_agent")
-
-		if config.Trace {
-			log.Printf("[%s] %s\n", request.Source, request.Url)
-		}
-
-		count++
-		publishRequest(config.NatsQueue, &request)
-	}
-}
-
-func apacheLogReplay(logfile string) {
-	file, err := os.Open(logfile)
-	if err != nil {
-		log.Fatalf("%s: %s", logfile, err)
-	}
-	defer file.Close()
-
-	scanner := bufio.NewScanner(file)
-
-	var p axslogparser.Parser
-	parserSet := false
-
-	var tOffset time.Duration
-
-	for scanner.Scan() {
-		l := scanner.Text()
-
-		if err := scanner.Err(); err != nil {
-			log.Fatal(err)
-		}
-
-		if !parserSet {
-			p, _, err = axslogparser.GuessParser(l)
-			if err != nil {
-				log.Println(err)
-				continue
-			}
-			parserSet = true
-		}
-
-		logEntry, err := p.Parse(l)
-		if err != nil {
-			log.Println(err)
-			continue
-		}
-
-		if tOffset == 0 {
-			tOffset = time.Now().Sub(logEntry.Time)
-		}
-
-		ts := logEntry.Time.Add(tOffset)
-		if ts.After(time.Now()) {
-			time.Sleep(ts.Sub(time.Now()))
-		}
-
-		// fmt.Println(l)
-		remote := logEntry.Host
-		if *useXForwardedAsSource && logEntry.ForwardedFor != "" {
-			remote = logEntry.ForwardedFor
-		}
-
-		// only use the first host in case there are multiple hosts in the log
-		if cidx := strings.Index(remote, ","); cidx >= 0 {
-			remote = remote[0:cidx]
-		}
-
-		// extract the virtual host
-		var virtualHost string
-		vhost := logEntry.VirtualHost
-		if vhost != "" {
-			vhostAndPort := strings.Split(logEntry.VirtualHost, ":")
-			virtualHost = vhostAndPort[0]
-		} else {
-			if config.HostName != "" {
-				vhost = config.HostName
-			} else {
-				vhost = "[not available]"
-			}
-		}
-
-		request := data.Request{
-			IpSrc:         remote,
-			IpDst:         "127.0.0.1",
-			PortSrc:       0,
-			PortDst:       0,
-			TcpSeq:        0,
-			CreatedAt:     (logEntry.Time.Add(tOffset)).UnixNano(),
-			Url:           logEntry.RequestURI,
-			Method:        logEntry.Method,
-			Host:          virtualHost,
-			Protocol:      logEntry.Protocol,
-			Origin:        remote,
-			Source:        remote,
-			Referer:       logEntry.Referer,
-			XForwardedFor: logEntry.ForwardedFor,
-			UserAgent:     logEntry.UserAgent,
-		}
-
-		if config.Trace {
-			log.Printf("[%s] %s\n", request.Source, request.Url)
-		}
-
-		count++
-		publishRequest(config.NatsQueue, &request)
-
-	}
-}
-
-/*
-func apacheLogReplay(logfile string) {
-	if _, err := os.Stat(logfile); err != nil {
-		log.Fatalf("%s: %s", logfile, err)
-	}
-
-	file, err := os.Open(logfile)
-	if err != nil {
-		log.Fatalf("%s: %s", logfile, err)
-	}
-	defer file.Close()
-
-	scanner := bufio.NewScanner(file)
-
-	var p axslogparser.Parser
-	parserSet := false
-
-	var tOffset time.Duration
-
-	for scanner.Scan() {
-		l := scanner.Text()
-
-		if err := scanner.Err(); err != nil {
-			log.Fatal(err)
-		}
-
-		if !parserSet {
-			p, _, err = axslogparser.GuessParser(l)
-			if err != nil {
-				log.Println(err)
-				continue
-			}
-			parserSet = true
-		}
-
-		logEntry, err := p.Parse(l)
-		if err != nil {
-			log.Println(err)
-			continue
-		}
-
-		remote := logEntry.Host
-		if *useXForwardedAsSource && logEntry.ForwardedFor != "" {
-			remote = logEntry.ForwardedFor
-		}
-
-		// only use the first host in case there are multiple hosts in the log
-		if cidx := strings.Index(remote, ","); cidx >= 0 {
-			remote = remote[0:cidx]
-		}
-
-		// extract the virtual host
-		var virtualHost string
-		vhost := logEntry.VirtualHost
-		if vhost != "" {
-			vhostAndPort := strings.Split(logEntry.VirtualHost, ":")
-			virtualHost = vhostAndPort[0]
-		} else {
-			if config.HostName != "" {
-				vhost = config.HostName
-			} else {
-				vhost = "[not available]"
-			}
-		}
-
-		if tOffset == 0 {
-			tOffset = time.Now().Sub(logEntry.Time)
-		}
-
-		request := data.Request{
-			IpSrc:         remote,
-			IpDst:         "127.0.0.1",
-			PortSrc:       0,
-			PortDst:       0,
-			TcpSeq:        0,
-			CreatedAt:     (logEntry.Time.Add(tOffset)).UnixNano(),
-			Url:           logEntry.RequestURI,
-			Method:        logEntry.Method,
-			Host:          virtualHost,
-			Protocol:      logEntry.Protocol,
-			Origin:        remote,
-			Source:        remote,
-			Referer:       logEntry.Referer,
-			XForwardedFor: logEntry.ForwardedFor,
-			UserAgent:     logEntry.UserAgent,
-		}
-
-		if config.Trace {
-			log.Printf("[%s] %s\n", request.Source, request.Url)
-		}
-
-		count++
-		publishRequest(config.NatsQueue, &request)
-	}
-}
-*/
-
-func apacheLogCapture(logfile string) {
-	if _, err := os.Stat(logfile); err != nil {
-		log.Fatalf("%s: %s", logfile, err)
-	}
-
-	t, err := tail.TailFile(logfile, tail.Config{
-		Follow:   true,                                 // follow the file
-		ReOpen:   true,                                 // reopen log file when it gets closed/rotated
-		Logger:   tail.DiscardingLogger,                // don't log anything
-		Location: &tail.SeekInfo{Offset: 0, Whence: 2}, // start at the end of the file
-	})
-	if err != nil {
-		log.Fatalf("%s: %s", logfile, err)
-	}
-
-	var p axslogparser.Parser
-	parserSet := false
-
-	for line := range t.Lines {
-		l := line.Text
-
-		if !parserSet {
-			p, _, err = axslogparser.GuessParser(l)
-			if err != nil {
-				log.Println(err)
-				continue
-			}
-			parserSet = true
-		}
-
-		logEntry, err := p.Parse(l)
-		if err != nil {
-			log.Println(err)
-			continue
-		}
-
-		remote := logEntry.Host
-		if config.UseXForwardedAsSource && logEntry.ForwardedFor != "" {
-			remote = logEntry.ForwardedFor
-		}
-
-		// only use the first host in case there are multiple hosts in the log
-		if cidx := strings.Index(remote, ","); cidx >= 0 {
-			remote = remote[0:cidx]
-		}
-
-		// extract the virtual host
-		var virtualHost string
-		vhost := logEntry.VirtualHost
-		if vhost != "" {
-			vhostAndPort := strings.Split(logEntry.VirtualHost, ":")
-			virtualHost = vhostAndPort[0]
-		} else {
-			if config.HostName != "" {
-				vhost = config.HostName
-			} else {
-				vhost = "[not available]"
-			}
-		}
-
-		request := data.Request{
-			IpSrc:         remote,
-			IpDst:         "127.0.0.1",
-			PortSrc:       0,
-			PortDst:       0,
-			TcpSeq:        0,
-			CreatedAt:     logEntry.Time.UnixNano(),
-			Url:           logEntry.RequestURI,
-			Method:        logEntry.Method,
-			Host:          virtualHost,
-			Protocol:      logEntry.Protocol,
-			Origin:        remote,
-			Source:        remote,
-			Referer:       logEntry.Referer,
-			XForwardedFor: logEntry.ForwardedFor,
-			UserAgent:     logEntry.UserAgent,
-		}
-
-		if config.Trace {
-			log.Printf("[%s] %s\n", request.Source, request.Url)
-		}
-
-		count++
-		publishRequest(config.NatsQueue, &request)
-	}
-}
-
 func liveCapture() {
 	ipPriv = ip.NewIP()
 
@@ -1076,6 +679,7 @@ func loadConfig() {
 	config.ApacheReplay = *apacheReplay
 	config.NginxLog = *nginxLog
 	config.NginxLogFormat = *nginxFormat
+	config.NginxReplay = *nginxReplay
 	config.HostName = *hostName
 	config.Quiet = *beQuiet
 	config.Trace = *trace

+ 393 - 0
nginx.go

@@ -0,0 +1,393 @@
+package main
+
+import (
+	"bufio"
+	"log"
+	"os"
+	"regexp"
+	"strings"
+	"time"
+
+	"git.scraperwall.com/scw/data"
+	"github.com/hpcloud/tail"
+	"github.com/kr/pretty"
+	"github.com/satyrius/gonx"
+)
+
+func nginxLogCapture(logfile, format string) {
+	if _, err := os.Stat(logfile); err != nil {
+		log.Fatalf("%s: %s", logfile, err)
+	}
+
+	t, err := tail.TailFile(logfile, tail.Config{
+		Follow:   true,                                 // follow the file
+		ReOpen:   true,                                 // reopen log file when it gets closed/rotated
+		Logger:   tail.DiscardingLogger,                // don't log anything
+		Location: &tail.SeekInfo{Offset: 0, Whence: 2}, // start at the end of the file
+	})
+	if err != nil {
+		log.Fatalf("%s: %s", logfile, err)
+	}
+
+	// `$remote_addr - $remote_user [$time_local] "$request" $status $body_bytes_sent "$http_referer" "$http_user_agent"`
+	p := gonx.NewParser(format)
+	reqRegexp := regexp.MustCompile(`^([A-Z]+)\s+(.+?)\s+(HTTP/\d+\.\d+)$`)
+
+	for line := range t.Lines {
+		var remote string
+		var err error
+
+		l := line.Text
+
+		logEntry, err := p.ParseString(l)
+		if err != nil {
+			log.Println(err)
+			continue
+		}
+
+		if config.Trace {
+			pretty.Println(logEntry)
+		}
+
+		remote, err = logEntry.Field("remote_addr")
+		if err != nil {
+			log.Println(err)
+			continue
+		}
+
+		xff, err := logEntry.Field("http_x_forwarded_for")
+		if err != nil && xff != "" {
+			if config.Trace {
+				log.Printf("Using XFF: %s\n", xff)
+			}
+
+			remote = xff
+		}
+
+		if remote == "" {
+			log.Println("remote is empty: ignoring request.")
+			continue
+		}
+
+		// only use the first host in case there are multiple hosts in the log
+		if cidx := strings.Index(remote, ","); cidx >= 0 {
+			remote = remote[0:cidx]
+		}
+
+		timestampStr, err := logEntry.Field("time_local")
+		if err != nil {
+			log.Println(err)
+			continue
+		}
+		timeStamp, err := time.Parse("02/Jan/2006:15:04:05 -0700", timestampStr)
+		if err != nil {
+			log.Println(err)
+			continue
+		}
+
+		httpRequest, err := logEntry.Field("request")
+		if err != nil {
+			log.Println(err)
+			continue
+		}
+
+		reqData := reqRegexp.FindStringSubmatch(httpRequest)
+		if len(reqData) < 4 {
+			log.Printf("reqData is too short: %d instead of 4\n", len(reqData))
+			continue
+		}
+
+		host := config.HostName
+		if host == "" {
+			host = "[not available]"
+		}
+
+		request := data.Request{
+			IpSrc:     remote,
+			Origin:    remote,
+			Source:    remote,
+			IpDst:     "127.0.0.1",
+			PortSrc:   0,
+			PortDst:   0,
+			TcpSeq:    0,
+			CreatedAt: timeStamp.Unix(),
+			Url:       reqData[2],
+			Method:    reqData[1],
+			Host:      host,
+			Protocol:  reqData[3],
+		}
+
+		request.Referer, _ = logEntry.Field("http_referer")
+		request.UserAgent, _ = logEntry.Field("http_user_agent")
+
+		if config.Trace {
+			log.Printf("[%s] %s\n", request.Source, request.Url)
+		}
+
+		count++
+		publishRequest(config.NatsQueue, &request)
+	}
+}
+
+func nginx2LogCapture(logfile, format string) {
+	if _, err := os.Stat(logfile); err != nil {
+		log.Fatalf("%s: %s", logfile, err)
+	}
+
+	t, err := tail.TailFile(logfile, tail.Config{
+		Follow:   true,                                 // follow the file
+		ReOpen:   true,                                 // reopen log file when it gets closed/rotated
+		Logger:   tail.DiscardingLogger,                // don't log anything
+		Location: &tail.SeekInfo{Offset: 0, Whence: 2}, // start at the end of the file
+	})
+	if err != nil {
+		log.Fatalf("%s: %s", logfile, err)
+	}
+
+	// `$remote_addr - $remote_user [$time_local] "$request" $status $body_bytes_sent "$http_referer" "$http_user_agent"`
+	p := gonx.NewParser(format)
+	reqRegexp := regexp.MustCompile(`^([A-Z]+)\s+(.+?)\s+(HTTP/\d+\.\d+)$`)
+	var tOffset time.Duration
+	var ts time.Time
+
+	for line := range t.Lines {
+		var remote string
+		var err error
+
+		l := line.Text
+
+		logEntry, err := p.ParseString(l)
+		if err != nil {
+			log.Println(err)
+			continue
+		}
+
+		if tOffset == 0 {
+			tsStr, err := logEntry.Field("time_local")
+			if err != nil {
+				log.Printf("%s: %s\n", tsStr, err)
+				continue
+			}
+
+			ts, err = time.Parse("02/Jan/2006 15:04:05 -0700", tsStr)
+			if err != nil {
+				log.Printf("%s: %s\n", tsStr, err)
+				continue
+			}
+
+			tOffset = time.Now().Sub(ts)
+		}
+
+		tsCheck := ts.Add(tOffset)
+		if tsCheck.After(time.Now()) {
+			time.Sleep(tsCheck.Sub(time.Now()))
+		}
+
+		if config.Trace {
+			pretty.Println(logEntry)
+		}
+
+		remote, err = logEntry.Field("remote_addr")
+		if err != nil {
+			log.Println(err)
+			continue
+		}
+
+		xff, err := logEntry.Field("http_x_forwarded_for")
+		if err != nil && xff != "" {
+			if config.Trace {
+				log.Printf("Using XFF: %s\n", xff)
+			}
+
+			remote = xff
+		}
+
+		if remote == "" {
+			log.Println("remote is empty: ignoring request.")
+			continue
+		}
+
+		// only use the first host in case there are multiple hosts in the log
+		if cidx := strings.Index(remote, ","); cidx >= 0 {
+			remote = remote[0:cidx]
+		}
+
+		/*
+			timestampStr, err := logEntry.Field("time_local")
+			if err != nil {
+				log.Println(err)
+				continue
+			}
+				timeStamp, err := time.Parse("02/Jan/2006:15:04:05 -0700", timestampStr)
+				if err != nil {
+					log.Println(err)
+					continue
+				}
+		*/
+
+		httpRequest, err := logEntry.Field("request")
+		if err != nil {
+			log.Println(err)
+			continue
+		}
+
+		reqData := reqRegexp.FindStringSubmatch(httpRequest)
+		if len(reqData) < 4 {
+			log.Printf("reqData is too short: %d instead of 4\n", len(reqData))
+			continue
+		}
+
+		host := config.HostName
+		if host == "" {
+			host = "[not available]"
+		}
+
+		request := data.Request{
+			IpSrc:     remote,
+			Origin:    remote,
+			Source:    remote,
+			IpDst:     "127.0.0.1",
+			PortSrc:   0,
+			PortDst:   0,
+			TcpSeq:    0,
+			CreatedAt: tsCheck.UnixNano(),
+			Url:       reqData[2],
+			Method:    reqData[1],
+			Host:      host,
+			Protocol:  reqData[3],
+		}
+
+		request.Referer, _ = logEntry.Field("http_referer")
+		request.UserAgent, _ = logEntry.Field("http_user_agent")
+
+		if config.Trace {
+			log.Printf("[%s] %s\n", request.Source, request.Url)
+		}
+
+		count++
+		publishRequest(config.NatsQueue, &request)
+	}
+}
+
+func nginxLogReplay(logfile, format string) {
+	file, err := os.Open(logfile)
+	if err != nil {
+		log.Fatalf("%s: %s", logfile, err)
+	}
+	defer file.Close()
+
+	scanner := bufio.NewScanner(file)
+	var tOffset time.Duration
+
+	// `$remote_addr - $remote_user [$time_local] "$request" $status $body_bytes_sent "$http_referer" "$http_user_agent"`
+	p := gonx.NewParser(format)
+	reqRegexp := regexp.MustCompile(`^([A-Z]+)\s+(.+?)\s+(HTTP/\d+\.\d+)$`)
+
+	for scanner.Scan() {
+		l := scanner.Text()
+
+		if err := scanner.Err(); err != nil {
+			log.Fatal(err)
+		}
+
+		var remote string
+		var err error
+
+		logEntry, err := p.ParseString(l)
+		if err != nil {
+			log.Println(err)
+			continue
+		}
+
+		if config.Trace {
+			pretty.Println(logEntry)
+		}
+
+		remote, err = logEntry.Field("remote_addr")
+		if err != nil {
+			log.Println(err)
+			continue
+		}
+
+		xff, err := logEntry.Field("http_x_forwarded_for")
+		if err != nil && xff != "" {
+			if config.Trace {
+				log.Printf("Using XFF: %s\n", xff)
+			}
+
+			remote = xff
+		}
+
+		if remote == "" {
+			log.Println("remote is empty: ignoring request.")
+			continue
+		}
+
+		// only use the first host in case there are multiple hosts in the log
+		if cidx := strings.Index(remote, ","); cidx >= 0 {
+			remote = remote[0:cidx]
+		}
+
+		timestampStr, err := logEntry.Field("time_local")
+		if err != nil {
+			log.Println(err)
+			continue
+		}
+		timeStamp, err := time.Parse("02/Jan/2006:15:04:05 -0700", timestampStr)
+		if err != nil {
+			log.Println(err)
+			continue
+		}
+
+		if tOffset == 0 {
+			tOffset = time.Now().Sub(timeStamp)
+		}
+
+		if timeStamp.Add(tOffset).After(time.Now()) {
+			time.Sleep(timeStamp.Add(tOffset).Sub(time.Now()))
+		}
+
+		httpRequest, err := logEntry.Field("request")
+		if err != nil {
+			log.Println(err)
+			continue
+		}
+
+		reqData := reqRegexp.FindStringSubmatch(httpRequest)
+		if len(reqData) < 4 {
+			log.Printf("reqData is too short: %d instead of 4\n", len(reqData))
+			continue
+		}
+
+		host := config.HostName
+		if host == "" {
+			host = "[not available]"
+		}
+
+		request := data.Request{
+			IpSrc:     remote,
+			Origin:    remote,
+			Source:    remote,
+			IpDst:     "127.0.0.1",
+			PortSrc:   0,
+			PortDst:   0,
+			TcpSeq:    0,
+			CreatedAt: timeStamp.Add(tOffset).UnixNano(),
+			Url:       reqData[2],
+			Method:    reqData[1],
+			Host:      host,
+			Protocol:  reqData[3],
+		}
+
+		request.Referer, _ = logEntry.Field("http_referer")
+		request.UserAgent, _ = logEntry.Field("http_user_agent")
+
+		if config.Trace {
+			log.Printf("[%s] %s\n", request.Source, request.Url)
+		}
+
+		count++
+		publishRequest(config.NatsQueue, &request)
+
+	}
+}