package main import ( "bufio" "log" "os" "regexp" "strings" "time" "git.scraperwall.com/scw/data/v2" "github.com/hpcloud/tail" "github.com/kr/pretty" "github.com/satyrius/gonx" ) func nginxLogCapture(logfile, format, serverHost string) { if _, err := os.Stat(logfile); err != nil { log.Fatalf("%s: %s", logfile, err) } t, err := tail.TailFile(logfile, tail.Config{ Follow: true, // follow the file ReOpen: true, // reopen log file when it gets closed/rotated Poll: config.TailPoll, // use file polling to detect a file rollover Logger: tail.DiscardingLogger, // don't log anything Location: &tail.SeekInfo{Offset: 0, Whence: 2}, // start at the end of the file }) if err != nil { log.Fatalf("%s: %s", logfile, err) } // `$remote_addr - $remote_user [$time_local] "$request" $status $body_bytes_sent "$http_referer" "$http_user_agent"` p := gonx.NewParser(format) reqRegexp := regexp.MustCompile(`^([A-Z]+)\s+(.+?)\s+(HTTP/\d+\.\d+)$`) for line := range t.Lines { var remote string var err error l := line.Text logEntry, err := p.ParseString(l) if err != nil { log.Println(err) continue } remote, err = logEntry.Field("remote_addr") if err != nil { log.Println(err) continue } xff, err := logEntry.Field("http_x_forwarded_for") if err != nil && xff != "" { if config.Trace { log.Printf("Using XFF: %s\n", xff) } remote = xff } if remote == "" { log.Println("remote is empty: ignoring request.") continue } // only use the first host in case there are multiple hosts in the log if cidx := strings.Index(remote, ","); cidx >= 0 { remote = remote[0:cidx] } timestampStr, err := logEntry.Field("time_local") if err != nil { log.Println(err) continue } timeStamp, err := time.Parse("02/Jan/2006:15:04:05 -0700", timestampStr) if err != nil { log.Println(err) continue } httpRequest, err := logEntry.Field("request") if err != nil { log.Println(err) continue } reqData := reqRegexp.FindStringSubmatch(httpRequest) if len(reqData) < 4 { log.Printf("reqData is too short: %d instead of 4\n", len(reqData)) continue } host, err := logEntry.Field("host") if err != nil { host = serverHost if host == "" { host = "[not available]" } } customer, err := logEntry.Field("sc_customer") if err != nil { customer = "[not set]" } request := data.Request{ IpSrc: remote, Origin: remote, Source: remote, IpDst: "127.0.0.1", PortSrc: 0, PortDst: 0, TcpSeq: 0, CreatedAt: timeStamp.Unix(), Url: reqData[2], Method: reqData[1], Host: host, Protocol: reqData[3], Customer: customer, } request.Referer, _ = logEntry.Field("http_referer") request.UserAgent, _ = logEntry.Field("http_user_agent") if config.Trace { log.Printf("[%s] %s\n", request.Source, request.Url) } count++ publishRequest(config.NatsQueue, &request) } } func nginxLogReplay(logfile, format string) { file, err := os.Open(logfile) if err != nil { log.Fatalf("%s: %s", logfile, err) } defer file.Close() scanner := bufio.NewScanner(file) var tOffset time.Duration // `$remote_addr - $remote_user [$time_local] "$request" $status $body_bytes_sent "$http_referer" "$http_user_agent"` p := gonx.NewParser(format) reqRegexp := regexp.MustCompile(`^([A-Z]+)\s+(.+?)\s+(HTTP/\d+\.\d+)$`) for scanner.Scan() { l := scanner.Text() if err := scanner.Err(); err != nil { log.Fatal(err) } var remote string var err error logEntry, err := p.ParseString(l) if err != nil { log.Println(err) continue } if config.Trace { pretty.Println(logEntry) } remote, err = logEntry.Field("remote_addr") if err != nil { log.Println(err) continue } xff, err := logEntry.Field("http_x_forwarded_for") if err != nil && xff != "" { if config.Trace { log.Printf("Using XFF: %s\n", xff) } remote = xff } if remote == "" { log.Println("remote is empty: ignoring request.") continue } // only use the first host in case there are multiple hosts in the log if cidx := strings.Index(remote, ","); cidx >= 0 { remote = remote[0:cidx] } timestampStr, err := logEntry.Field("time_local") if err != nil { log.Println(err) continue } timeStamp, err := time.Parse("02/Jan/2006:15:04:05 -0700", timestampStr) if err != nil { log.Println(err) continue } if tOffset == 0 { tOffset = time.Now().Sub(timeStamp) } if timeStamp.Add(tOffset).After(time.Now()) { time.Sleep(timeStamp.Add(tOffset).Sub(time.Now())) } httpRequest, err := logEntry.Field("request") if err != nil { log.Println(err) continue } reqData := reqRegexp.FindStringSubmatch(httpRequest) if len(reqData) < 4 { log.Printf("reqData is too short: %d instead of 4\n", len(reqData)) continue } host := config.HostName if host == "" { host = "[not available]" } request := data.Request{ IpSrc: remote, Origin: remote, Source: remote, IpDst: "127.0.0.1", PortSrc: 0, PortDst: 0, TcpSeq: 0, CreatedAt: timeStamp.Add(tOffset).UnixNano(), Url: reqData[2], Method: reqData[1], Host: host, Protocol: reqData[3], } request.Referer, _ = logEntry.Field("http_referer") request.UserAgent, _ = logEntry.Field("http_user_agent") if config.Trace { log.Printf("[%s] %s\n", request.Source, request.Url) } count++ publishRequest(config.NatsQueue, &request) } }