12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106 |
- package main
- import (
- "bufio"
- "bytes"
- "crypto/sha1"
- "encoding/csv"
- "encoding/json"
- "flag"
- "fmt"
- "io"
- "log"
- "math/rand"
- "net"
- "net/http"
- "os"
- "regexp"
- "strings"
- "time"
- "github.com/BurntSushi/toml"
- "github.com/Songmu/axslogparser"
- "github.com/google/gopacket"
- "github.com/google/gopacket/layers"
- "github.com/google/gopacket/pcap"
- "github.com/hpcloud/tail"
- "github.com/kr/pretty"
- "github.com/nats-io/nats"
- "github.com/nats-io/nats/encoders/protobuf"
- "github.com/satyrius/gonx"
- "git.scraperwall.com/scw/ajp13"
- "git.scraperwall.com/scw/data"
- "git.scraperwall.com/scw/ip"
- )
- var (
- doLiveCapture = flag.Bool("live", false, "Capture data in real time from a given interface")
- iface = flag.String("interface", "eth0", "Interface to get packets from")
- snapshotLen = flag.Int("snapshot-len", 8192, "Snapshot Length in Bytes")
- filter = flag.String("filter", "tcp", "PCAP filter expression")
- promiscuous = flag.Bool("promiscuous", false, "Switch interface into promiscuous mode?")
- natsURL = flag.String("nats-url", "nats://127.0.0.1:4222", "The URL of the NATS server")
- natsUser = flag.String("nats-user", "", "The user for NATS authentication")
- natsPassword = flag.String("nats-password", "", "The password for NATS authentication")
- natsQueue = flag.String("nats-queue", "requests", "The NATS queue name")
- natsCA = flag.String("nats-ca", "", "CA chain for NATS TLS")
- sleepFor = flag.Duration("sleep", 0, "Sleep this long between sending data (only when replaying a file)")
- requestsFile = flag.String("requests", "", "CSV file containing requests (IP and URL)")
- protocol = flag.String("protocol", "http", "which protocol to parse: http or ajp13")
- useXForwardedAsSource = flag.Bool("use-x-forwarded", false, "Use the IP address in X-Forwarded-For as source")
- trace = flag.Bool("trace", false, "Trace the packet capturing")
- apacheLog = flag.String("apache-log", "", "Parse an Apache Log file")
- apacheReplay = flag.String("apache-replay", "", "Apache log file to replay into the system")
- nginxLog = flag.String("nginx-log", "", "Nginx log file to tail")
- nginxFormat = flag.String("nginx-format", "", "The nginx log file format")
- hostName = flag.String("hostname", "", "Override the captured hostname with this one")
- accessWatchKey = flag.String("access-watch-key", "", "access.watch API key")
- configFile = flag.String("config", "", "The location of the TOML config file")
- beQuiet = flag.Bool("quiet", true, "Be quiet")
- doVersion = flag.Bool("version", false, "Show version information")
- natsEC *nats.EncodedConn
- natsJSONEC *nats.EncodedConn
- natsErrorChan chan error
- natsIsAvailable bool
- count uint64
- timeout = -1 * time.Second
- ipPriv *ip.IP
- config Config
- // Version contains the program Version, e.g. 1.0.1
- Version string
- // BuildDate contains the date and time at which the program was compiled
- BuildDate string
- )
- // Config contains the program configuration
- type Config struct {
- Live bool
- Interface string
- SnapshotLen int
- Filter string
- Promiscuous bool
- NatsURL string
- NatsQueue string
- NatsUser string
- NatsPassword string
- NatsCA string
- SleepFor duration
- RequestsFile string
- UseXForwardedAsSource bool
- Quiet bool
- Protocol string
- Trace bool
- ApacheLog string
- ApacheReplay string
- NginxLog string
- NginxLogFormat string
- HostName string
- AccessWatchKey string
- }
- type duration struct {
- time.Duration
- }
- func (d *duration) UnmarshalText(text []byte) error {
- var err error
- d.Duration, err = time.ParseDuration(string(text))
- return err
- }
- func (c Config) print() {
- fmt.Printf("Live: %t\n", c.Live)
- fmt.Printf("Interface: %s\n", c.Interface)
- fmt.Printf("SnapshotLen: %d\n", c.SnapshotLen)
- fmt.Printf("Filter: %s\n", c.Filter)
- fmt.Printf("Promiscuous: %t\n", c.Promiscuous)
- fmt.Printf("NatsURL: %s\n", c.NatsURL)
- fmt.Printf("NatsQueue: %s\n", c.NatsQueue)
- fmt.Printf("NatsUser: %s\n", c.NatsUser)
- fmt.Printf("NatsPassword: %s\n", c.NatsPassword)
- fmt.Printf("NatsCA: %s\n", c.NatsCA)
- fmt.Printf("SleepFor: %s\n", c.SleepFor.String())
- fmt.Printf("RequestsFile: %s\n", c.RequestsFile)
- fmt.Printf("Apache Log: %s\n", c.ApacheLog)
- fmt.Printf("Apache Replay: %s\n", c.ApacheReplay)
- fmt.Printf("Nginx Log: %s\n", c.NginxLog)
- fmt.Printf("Nginx Log Format: %s\n", c.NginxLogFormat)
- fmt.Printf("HostName: %s\n", c.HostName)
- fmt.Printf("AccessWatchKey: %s\n", c.AccessWatchKey)
- fmt.Printf("UseXForwardedAsSource: %t\n", c.UseXForwardedAsSource)
- fmt.Printf("Protocol: %s\n", c.Protocol)
- fmt.Printf("Quiet: %t\n", c.Quiet)
- fmt.Printf("Trace: %t\n", c.Trace)
- }
- func init() {
- flag.Parse()
- nats.RegisterEncoder(protobuf.PROTOBUF_ENCODER, &protobuf.ProtobufEncoder{})
- }
- func main() {
- if *doVersion {
- version()
- os.Exit(0)
- }
- loadConfig()
- // Output how many requests per second were sent
- if !config.Quiet {
- go func(c *uint64) {
- for {
- fmt.Printf("%d requests per second\n", *c)
- *c = 0
- time.Sleep(time.Second)
- }
- }(&count)
- }
- // NATS
- //
- if config.NatsURL == "" && config.AccessWatchKey == "" {
- log.Fatal("No NATS URL specified (-nats-url)!")
- }
- natsIsAvailable = false
- natsErrorChan = make(chan error, 1)
- err := connectToNATS()
- if err != nil && config.AccessWatchKey == "" {
- log.Fatal(err)
- }
- go natsWatchdog(natsErrorChan)
- // What should I do?
- if config.RequestsFile != "" {
- replayFile()
- } else if config.ApacheReplay != "" {
- apacheLogReplay(config.ApacheReplay)
- } else if config.ApacheLog != "" {
- apacheLogCapture(config.ApacheLog)
- } else if config.Live {
- fmt.Printf("live capture (%s, %s) to %s\n", config.Interface, config.Filter, config.NatsURL)
- liveCapture()
- } else if config.NginxLog != "" && config.NginxLogFormat != "" {
- nginxLogCapture(config.NginxLog, config.NginxLogFormat)
- }
- }
- func natsWatchdog(closedChan chan error) {
- var lastError error
- for err := range closedChan {
- if lastError != err {
- lastError = err
- log.Println(err)
- }
- if err != nats.ErrConnectionClosed {
- continue
- }
- RECONNECT:
- for {
- log.Printf("Reconnecting to NATS at %s\n", *natsURL)
- err := connectToNATS()
- if err == nil {
- break RECONNECT
- }
- time.Sleep(1 * time.Second)
- }
- }
- }
- func connectToNATS() error {
- var natsConn *nats.Conn
- var err error
- if config.NatsUser != "" && config.NatsPassword != "" && config.NatsCA != "" {
- natsConn, err = nats.Connect(config.NatsURL, nats.UserInfo(config.NatsUser, config.NatsPassword), nats.RootCAs(config.NatsCA))
- } else {
- if config.NatsPassword != "" && config.NatsUser != "" {
- natsConn, err = nats.Connect(config.NatsURL, nats.UserInfo(config.NatsUser, config.NatsPassword))
- } else {
- natsConn, err = nats.Connect(config.NatsURL)
- }
- }
- if err != nil {
- return err
- }
- natsEC, err = nats.NewEncodedConn(natsConn, protobuf.PROTOBUF_ENCODER)
- if err != nil {
- return fmt.Errorf("Encoded Connection: %v", err)
- }
- natsJSONEC, err = nats.NewEncodedConn(natsConn, nats.JSON_ENCODER)
- if err != nil {
- return fmt.Errorf("Encoded Connection: %v", err)
- }
- natsIsAvailable = true
- return nil
- }
- func nginxLogCapture(logfile, format string) {
- if _, err := os.Stat(logfile); err != nil {
- log.Fatalf("%s: %s", logfile, err)
- }
- t, err := tail.TailFile(logfile, tail.Config{
- Follow: true, // follow the file
- ReOpen: true, // reopen log file when it gets closed/rotated
- Logger: tail.DiscardingLogger, // don't log anything
- Location: &tail.SeekInfo{Offset: 0, Whence: 2}, // start at the end of the file
- })
- if err != nil {
- log.Fatalf("%s: %s", logfile, err)
- }
- // `$remote_addr - $remote_user [$time_local] "$request" $status $body_bytes_sent "$http_referer" "$http_user_agent"`
- p := gonx.NewParser(format)
- reqRegexp := regexp.MustCompile(`^([A-Z]+)\s+(.+?)\s+(HTTP/\d+\.\d+)$`)
- for line := range t.Lines {
- var remote string
- var err error
- l := line.Text
- logEntry, err := p.ParseString(l)
- if err != nil {
- log.Println(err)
- continue
- }
- if config.Trace {
- pretty.Println(logEntry)
- }
- remote, err = logEntry.Field("remote_addr")
- if err != nil {
- log.Println(err)
- continue
- }
- xff, err := logEntry.Field("http_x_forwarded_for")
- if err != nil && xff != "" {
- if config.Trace {
- log.Printf("Using XFF: %s\n", xff)
- }
- remote = xff
- }
- if remote == "" {
- log.Println("remote is empty: ignoring request.")
- continue
- }
- // only use the first host in case there are multiple hosts in the log
- if cidx := strings.Index(remote, ","); cidx >= 0 {
- remote = remote[0:cidx]
- }
- timestampStr, err := logEntry.Field("time_local")
- if err != nil {
- log.Println(err)
- continue
- }
- timeStamp, err := time.Parse("02/Jan/2006:15:04:05 -0700", timestampStr)
- if err != nil {
- log.Println(err)
- continue
- }
- httpRequest, err := logEntry.Field("request")
- if err != nil {
- log.Println(err)
- continue
- }
- reqData := reqRegexp.FindStringSubmatch(httpRequest)
- if len(reqData) < 4 {
- log.Printf("reqData is too short: %d instead of 4\n", len(reqData))
- continue
- }
- host := config.HostName
- if host == "" {
- host = "[not available]"
- }
- request := data.Request{
- IpSrc: remote,
- Origin: remote,
- Source: remote,
- IpDst: "127.0.0.1",
- PortSrc: 0,
- PortDst: 0,
- TcpSeq: 0,
- CreatedAt: timeStamp.Unix(),
- Url: reqData[2],
- Method: reqData[1],
- Host: host,
- Protocol: reqData[3],
- }
- request.Referer, _ = logEntry.Field("http_referer")
- request.UserAgent, _ = logEntry.Field("http_user_agent")
- if config.Trace {
- log.Printf("[%s] %s\n", request.Source, request.Url)
- }
- count++
- publishRequest(config.NatsQueue, &request)
- }
- }
- func apacheLogReplay(logfile string) {
- file, err := os.Open(logfile)
- if err != nil {
- log.Fatalf("%s: %s", logfile, err)
- }
- defer file.Close()
- scanner := bufio.NewScanner(file)
- var p axslogparser.Parser
- parserSet := false
- var tOffset time.Duration
- for scanner.Scan() {
- l := scanner.Text()
- if err := scanner.Err(); err != nil {
- log.Fatal(err)
- }
- if !parserSet {
- p, _, err = axslogparser.GuessParser(l)
- if err != nil {
- log.Println(err)
- continue
- }
- parserSet = true
- }
- logEntry, err := p.Parse(l)
- if err != nil {
- log.Println(err)
- continue
- }
- if tOffset == 0 {
- tOffset = time.Now().Sub(logEntry.Time)
- }
- ts := logEntry.Time.Add(tOffset)
- if ts.After(time.Now()) {
- time.Sleep(ts.Sub(time.Now()))
- }
- // fmt.Println(l)
- remote := logEntry.Host
- if *useXForwardedAsSource && logEntry.ForwardedFor != "" {
- remote = logEntry.ForwardedFor
- }
- // only use the first host in case there are multiple hosts in the log
- if cidx := strings.Index(remote, ","); cidx >= 0 {
- remote = remote[0:cidx]
- }
- // extract the virtual host
- var virtualHost string
- vhost := logEntry.VirtualHost
- if vhost != "" {
- vhostAndPort := strings.Split(logEntry.VirtualHost, ":")
- virtualHost = vhostAndPort[0]
- } else {
- if config.HostName != "" {
- vhost = config.HostName
- } else {
- vhost = "[not available]"
- }
- }
- request := data.Request{
- IpSrc: remote,
- IpDst: "127.0.0.1",
- PortSrc: 0,
- PortDst: 0,
- TcpSeq: 0,
- CreatedAt: (logEntry.Time.Add(tOffset)).UnixNano(),
- Url: logEntry.RequestURI,
- Method: logEntry.Method,
- Host: virtualHost,
- Protocol: logEntry.Protocol,
- Origin: remote,
- Source: remote,
- Referer: logEntry.Referer,
- XForwardedFor: logEntry.ForwardedFor,
- UserAgent: logEntry.UserAgent,
- }
- if config.Trace {
- log.Printf("[%s] %s\n", request.Source, request.Url)
- }
- count++
- publishRequest(config.NatsQueue, &request)
- }
- }
- /*
- func apacheLogReplay(logfile string) {
- if _, err := os.Stat(logfile); err != nil {
- log.Fatalf("%s: %s", logfile, err)
- }
- file, err := os.Open(logfile)
- if err != nil {
- log.Fatalf("%s: %s", logfile, err)
- }
- defer file.Close()
- scanner := bufio.NewScanner(file)
- var p axslogparser.Parser
- parserSet := false
- var tOffset time.Duration
- for scanner.Scan() {
- l := scanner.Text()
- if err := scanner.Err(); err != nil {
- log.Fatal(err)
- }
- if !parserSet {
- p, _, err = axslogparser.GuessParser(l)
- if err != nil {
- log.Println(err)
- continue
- }
- parserSet = true
- }
- logEntry, err := p.Parse(l)
- if err != nil {
- log.Println(err)
- continue
- }
- remote := logEntry.Host
- if *useXForwardedAsSource && logEntry.ForwardedFor != "" {
- remote = logEntry.ForwardedFor
- }
- // only use the first host in case there are multiple hosts in the log
- if cidx := strings.Index(remote, ","); cidx >= 0 {
- remote = remote[0:cidx]
- }
- // extract the virtual host
- var virtualHost string
- vhost := logEntry.VirtualHost
- if vhost != "" {
- vhostAndPort := strings.Split(logEntry.VirtualHost, ":")
- virtualHost = vhostAndPort[0]
- } else {
- if config.HostName != "" {
- vhost = config.HostName
- } else {
- vhost = "[not available]"
- }
- }
- if tOffset == 0 {
- tOffset = time.Now().Sub(logEntry.Time)
- }
- request := data.Request{
- IpSrc: remote,
- IpDst: "127.0.0.1",
- PortSrc: 0,
- PortDst: 0,
- TcpSeq: 0,
- CreatedAt: (logEntry.Time.Add(tOffset)).UnixNano(),
- Url: logEntry.RequestURI,
- Method: logEntry.Method,
- Host: virtualHost,
- Protocol: logEntry.Protocol,
- Origin: remote,
- Source: remote,
- Referer: logEntry.Referer,
- XForwardedFor: logEntry.ForwardedFor,
- UserAgent: logEntry.UserAgent,
- }
- if config.Trace {
- log.Printf("[%s] %s\n", request.Source, request.Url)
- }
- count++
- publishRequest(config.NatsQueue, &request)
- }
- }
- */
- func apacheLogCapture(logfile string) {
- if _, err := os.Stat(logfile); err != nil {
- log.Fatalf("%s: %s", logfile, err)
- }
- t, err := tail.TailFile(logfile, tail.Config{
- Follow: true, // follow the file
- ReOpen: true, // reopen log file when it gets closed/rotated
- Logger: tail.DiscardingLogger, // don't log anything
- Location: &tail.SeekInfo{Offset: 0, Whence: 2}, // start at the end of the file
- })
- if err != nil {
- log.Fatalf("%s: %s", logfile, err)
- }
- var p axslogparser.Parser
- parserSet := false
- for line := range t.Lines {
- l := line.Text
- if !parserSet {
- p, _, err = axslogparser.GuessParser(l)
- if err != nil {
- log.Println(err)
- continue
- }
- parserSet = true
- }
- logEntry, err := p.Parse(l)
- if err != nil {
- log.Println(err)
- continue
- }
- remote := logEntry.Host
- if config.UseXForwardedAsSource && logEntry.ForwardedFor != "" {
- remote = logEntry.ForwardedFor
- }
- // only use the first host in case there are multiple hosts in the log
- if cidx := strings.Index(remote, ","); cidx >= 0 {
- remote = remote[0:cidx]
- }
- // extract the virtual host
- var virtualHost string
- vhost := logEntry.VirtualHost
- if vhost != "" {
- vhostAndPort := strings.Split(logEntry.VirtualHost, ":")
- virtualHost = vhostAndPort[0]
- } else {
- if config.HostName != "" {
- vhost = config.HostName
- } else {
- vhost = "[not available]"
- }
- }
- request := data.Request{
- IpSrc: remote,
- IpDst: "127.0.0.1",
- PortSrc: 0,
- PortDst: 0,
- TcpSeq: 0,
- CreatedAt: logEntry.Time.UnixNano(),
- Url: logEntry.RequestURI,
- Method: logEntry.Method,
- Host: virtualHost,
- Protocol: logEntry.Protocol,
- Origin: remote,
- Source: remote,
- Referer: logEntry.Referer,
- XForwardedFor: logEntry.ForwardedFor,
- UserAgent: logEntry.UserAgent,
- }
- if config.Trace {
- log.Printf("[%s] %s\n", request.Source, request.Url)
- }
- count++
- publishRequest(config.NatsQueue, &request)
- }
- }
- func liveCapture() {
- ipPriv = ip.NewIP()
- // PCAP setup
- //
- handle, err := pcap.OpenLive(config.Interface, int32(config.SnapshotLen), config.Promiscuous, timeout)
- if err != nil {
- log.Fatal(err)
- }
- defer handle.Close()
- err = handle.SetBPFFilter(config.Filter)
- if err != nil {
- log.Fatal(err)
- }
- packetSource := gopacket.NewPacketSource(handle, handle.LinkType())
- for packet := range packetSource.Packets() {
- go processPacket(packet)
- }
- }
- func writeLogToWatch(r *data.Request) {
- h := map[string]string{}
- if r.AcceptEncoding != "" {
- h["Accept-Encoding"] = r.AcceptEncoding
- }
- if r.Accept != "" {
- h["Accept"] = r.Accept
- }
- if r.AcceptLanguage != "" {
- h["Accept-Language"] = r.AcceptLanguage
- }
- if r.Cookie != "" {
- h["Cookie"] = r.Cookie
- }
- if r.Host != "" {
- h["Host"] = r.Host
- }
- if r.Referer != "" {
- h["Referer"] = r.Referer
- }
- if r.UserAgent != "" {
- h["User-Agent"] = r.UserAgent
- }
- if r.Via != "" {
- h["Via"] = r.Via
- }
- if r.XForwardedFor != "" {
- h["X-Forwarded-For"] = r.XForwardedFor
- }
- if r.XRequestedWith != "" {
- h["X-Requested-With"] = r.XRequestedWith
- }
- data := map[string]interface{}{
- "request": map[string]interface{}{
- "time": time.Unix(0, r.CreatedAt),
- "address": r.Source,
- "protocol": r.Protocol,
- "scheme": "https",
- "method": r.Method,
- "url": r.Url,
- "headers": h,
- },
- "response": map[string]interface{}{"status": "200"},
- }
- jdata, err := json.Marshal(data)
- client := &http.Client{}
- fmt.Println(string(jdata))
- buf := bytes.NewBuffer(jdata)
- req, err := http.NewRequest("POST", "https://log.access.watch/1.1/log", buf)
- req.Header.Add("Api-Key", config.AccessWatchKey)
- req.Header.Add("Accept", "application/json")
- req.Header.Add("Content-Type", "application/json")
- resp, err := client.Do(req)
- if err != nil {
- log.Println(err)
- }
- resp.Body.Close()
- }
- func publishRequest(queue string, request *data.Request) {
- if config.AccessWatchKey != "" {
- writeLogToWatch(request)
- return
- }
- if !natsIsAvailable {
- return
- }
- if err := natsEC.Publish(config.NatsQueue, request); err != nil {
- natsErrorChan <- err
- if err == nats.ErrConnectionClosed {
- natsIsAvailable = false
- }
- }
- }
- // processPacket receives a raw packet from pcap, builds a Request item from it and sends it to the queue
- func processPacket(packet gopacket.Packet) {
- hasIPv4 := false
- var ipSrc, ipDst string
- // IPv4
- if ipLayer := packet.Layer(layers.LayerTypeIPv4); ipLayer != nil {
- ip := ipLayer.(*layers.IPv4)
- ipSrc = ip.SrcIP.String()
- ipDst = ip.DstIP.String()
- hasIPv4 = true
- }
- // IPv6
- if !hasIPv4 {
- if ipLayer := packet.Layer(layers.LayerTypeIPv6); ipLayer != nil {
- ip := ipLayer.(*layers.IPv6)
- ipSrc = ip.SrcIP.String()
- ipDst = ip.DstIP.String()
- }
- }
- // TCP
- tcpLayer := packet.Layer(layers.LayerTypeTCP)
- if tcpLayer == nil {
- return
- }
- tcp, _ := tcpLayer.(*layers.TCP)
- portSrc := tcp.SrcPort
- portDst := tcp.DstPort
- sequence := tcp.Seq
- applicationLayer := packet.ApplicationLayer()
- if applicationLayer == nil {
- return
- }
- count++
- if len(applicationLayer.Payload()) < 50 {
- log.Println("application layer too small!")
- return
- }
- request := data.Request{
- IpSrc: ipSrc,
- IpDst: ipDst,
- PortSrc: uint32(portSrc),
- PortDst: uint32(portDst),
- TcpSeq: uint32(sequence),
- CreatedAt: packet.Metadata().CaptureInfo.Timestamp.UnixNano(),
- }
- switch config.Protocol {
- case "http":
- err := processHTTP(&request, applicationLayer.Payload())
- if err != nil {
- log.Println(err)
- return
- }
- case "ajp13":
- err := processAJP13(&request, applicationLayer.Payload())
- if err != nil {
- log.Println(err)
- return
- }
- }
- if config.UseXForwardedAsSource && request.XForwardedFor != "" {
- if strings.Contains(request.XForwardedFor, ",") {
- ips := strings.Split(request.XForwardedFor, ",")
- for i := len(ips) - 1; i >= 0; i-- {
- ipRaw := strings.TrimSpace(ips[i])
- ipAddr := net.ParseIP(ipRaw)
- if ipAddr != nil && !ipPriv.IsPrivate(ipAddr) {
- request.Source = ipRaw
- break
- }
- }
- } else {
- ipAddr := net.ParseIP(strings.TrimSpace(request.XForwardedFor))
- if !ipPriv.IsPrivate(ipAddr) {
- request.Source = request.XForwardedFor
- }
- }
- }
- if request.Source == request.IpSrc && request.XRealIP != "" {
- request.Source = request.XRealIP
- }
- if config.Trace {
- log.Printf("[%s] %s\n", request.Source, request.Url)
- }
- publishRequest(config.NatsQueue, &request)
- }
- func processAJP13(request *data.Request, appData []byte) error {
- a, err := ajp13.Parse(appData)
- if err != nil {
- return fmt.Errorf("Failed to parse AJP13 request: %s", err)
- }
- request.Url = a.URI
- request.Method = a.Method()
- request.Host = a.Server
- request.Protocol = a.Version
- request.Origin = a.RemoteAddr.String()
- request.Source = a.RemoteAddr.String()
- if v, ok := a.Header("Referer"); ok {
- request.Referer = v
- }
- if v, ok := a.Header("Connection"); ok {
- request.Connection = v
- }
- if v, ok := a.Header("X-Forwarded-For"); ok {
- request.XForwardedFor = v
- }
- if v, ok := a.Header("X-Real-IP"); ok {
- request.XRealIP = v
- }
- if v, ok := a.Header("X-Requested-With"); ok {
- request.XRequestedWith = v
- }
- if v, ok := a.Header("Accept-Encoding"); ok {
- request.AcceptEncoding = v
- }
- if v, ok := a.Header("Accept-Language"); ok {
- request.AcceptLanguage = v
- }
- if v, ok := a.Header("User-Agent"); ok {
- request.UserAgent = v
- }
- if v, ok := a.Header("Accept"); ok {
- request.Accept = v
- }
- if v, ok := a.Header("Cookie"); ok {
- request.Cookie = v
- }
- if v, ok := a.Header("X-Forwarded-Host"); ok {
- if v != request.Host {
- request.Host = v
- }
- }
- return nil
- }
- func processHTTP(request *data.Request, appData []byte) error {
- reader := bufio.NewReader(strings.NewReader(string(appData)))
- req, err := http.ReadRequest(reader)
- if err != nil {
- return fmt.Errorf("Failed to parse HTTP header: %s", err)
- }
- request.Url = req.URL.String()
- request.Method = req.Method
- request.Referer = req.Referer()
- request.Host = req.Host
- request.Protocol = req.Proto
- request.Origin = request.Host
- if _, ok := req.Header["Connection"]; ok {
- request.Connection = req.Header["Connection"][0]
- }
- if _, ok := req.Header["X-Forwarded-For"]; ok {
- request.XForwardedFor = req.Header["X-Forwarded-For"][0]
- }
- // CloudFlare: override X-Forwarded for since it is tainted by cloudflare
- if _, ok := req.Header["True-Client-Ip"]; ok {
- request.XForwardedFor = req.Header["True-Client-Ip"][0]
- }
- if _, ok := req.Header["X-Real-Ip"]; ok {
- request.XRealIP = req.Header["X-Real-Ip"][0]
- }
- if _, ok := req.Header["X-Requested-With"]; ok {
- request.XRequestedWith = req.Header["X-Requested-With"][0]
- }
- if _, ok := req.Header["Accept-Encoding"]; ok {
- request.AcceptEncoding = req.Header["Accept-Encoding"][0]
- }
- if _, ok := req.Header["Accept-Language"]; ok {
- request.AcceptLanguage = req.Header["Accept-Language"][0]
- }
- if _, ok := req.Header["User-Agent"]; ok {
- request.UserAgent = req.Header["User-Agent"][0]
- }
- if _, ok := req.Header["Accept"]; ok {
- request.Accept = req.Header["Accept"][0]
- }
- if _, ok := req.Header["Cookie"]; ok {
- request.Cookie = req.Header["Cookie"][0]
- }
- request.Source = request.IpSrc
- return nil
- }
- // replayFile takes a file containing a list of requests (SourceIP Url) and queues the requests
- // e.g.
- // 157.55.39.229 /gross-gerau/12012260-beate-anstatt
- // 103.232.100.98 /weinsheim-eifel/13729444-plus-warenhandelsges-mbh
- func replayFile() {
- var req data.Request
- var startTs time.Time
- var endTs time.Time
- rand.Seed(time.Now().UnixNano())
- for {
- fh, err := os.Open(config.RequestsFile)
- if err != nil {
- log.Fatalf("Failed to open request file '%s': %s", config.RequestsFile, err)
- }
- c := csv.NewReader(fh)
- c.Comma = ' '
- for {
- if config.SleepFor.Duration > time.Nanosecond {
- startTs = time.Now()
- }
- r, err := c.Read()
- if err == io.EOF {
- break
- }
- if err != nil {
- log.Println(err)
- continue
- }
- req.IpSrc = r[0]
- req.Source = r[0]
- req.Url = r[1]
- req.UserAgent = "Munch/1.0"
- req.Host = "demo.scraperwall.com"
- req.CreatedAt = time.Now().UnixNano()
- publishRequest(config.NatsQueue, &req)
- if strings.Index(r[1], ".") < 0 {
- hash := sha1.New()
- io.WriteString(hash, r[0])
- fp := data.Fingerprint{
- ClientID: "scw",
- Fingerprint: fmt.Sprintf("%x", hash.Sum(nil)),
- Remote: r[0],
- Url: r[1],
- Source: r[0],
- CreatedAt: time.Now(),
- }
- if strings.HasPrefix(r[0], "50.31.") {
- fp.Fingerprint = "a1f2c2ee560ce6580d66d451a9c8dfbf"
- natsJSONEC.Publish("fingerprints_scw", fp)
- } else if rand.Intn(10) < 5 {
- natsJSONEC.Publish("fingerprints_scw", fp)
- }
- }
- count++
- if config.SleepFor.Duration >= time.Nanosecond {
- endTs = time.Now()
- if endTs.Before(startTs.Add(config.SleepFor.Duration)) {
- time.Sleep(config.SleepFor.Duration - endTs.Sub(startTs))
- }
- }
- }
- }
- }
- func loadConfig() {
- // initialize with values from the command line / environment
- config.Live = *doLiveCapture
- config.Interface = *iface
- config.SnapshotLen = *snapshotLen
- config.Filter = *filter
- config.Promiscuous = *promiscuous
- config.NatsURL = *natsURL
- config.NatsQueue = *natsQueue
- config.NatsUser = *natsUser
- config.NatsPassword = *natsPassword
- config.NatsCA = *natsCA
- config.SleepFor.Duration = *sleepFor
- config.RequestsFile = *requestsFile
- config.UseXForwardedAsSource = *useXForwardedAsSource
- config.Protocol = *protocol
- config.ApacheLog = *apacheLog
- config.ApacheReplay = *apacheReplay
- config.NginxLog = *nginxLog
- config.NginxLogFormat = *nginxFormat
- config.HostName = *hostName
- config.Quiet = *beQuiet
- config.Trace = *trace
- config.AccessWatchKey = *accessWatchKey
- if *configFile == "" {
- return
- }
- _, err := os.Stat(*configFile)
- if err != nil {
- log.Printf("%s: %s\n", *configFile, err)
- return
- }
- if _, err = toml.DecodeFile(*configFile, &config); err != nil {
- log.Printf("%s: %s\n", *configFile, err)
- }
- if !config.Quiet {
- config.print()
- }
- }
- // version outputs build information...
- func version() {
- fmt.Printf("munchclient %s, built on %s\n", Version, BuildDate)
- }
|