package main import ( "bufio" "encoding/csv" "flag" "fmt" "io" "log" "net" "net/http" "os" "strings" "time" "github.com/BurntSushi/toml" "github.com/google/gopacket" "github.com/google/gopacket/layers" "github.com/google/gopacket/pcap" "github.com/kr/pretty" "github.com/nats-io/nats" "github.com/nats-io/nats/encoders/protobuf" "git.scraperwall.com/scw/ajp13" "git.scraperwall.com/scw/data" "git.scraperwall.com/scw/ip" ) var ( doLiveCapture = flag.Bool("live", false, "Capture data in real time from a given interface") iface = flag.String("interface", "eth0", "Interface to get packets from") snapshotLen = flag.Int("snapshot-len", 8192, "Snapshot Length in Bytes") filter = flag.String("filter", "tcp", "PCAP filter expression") promiscuous = flag.Bool("promiscuous", false, "Switch interface into promiscuous mode?") natsURL = flag.String("nats-url", "nats://127.0.0.1:4222", "The URL of the NATS server") natsQueue = flag.String("nats-queue", "requests", "The NATS queue name") sleepFor = flag.Duration("sleep", 0, "Sleep this long between sending data (only when replaying a file)") requestsFile = flag.String("requests", "", "CSV file containing requests (IP and URL)") protocol = flag.String("protocol", "http", "which protocol to parse: http or ajp13") useXForwardedAsSource = flag.Bool("use-x-forwarded", false, "Use the IP address in X-Forwarded-For as source") trace = flag.Bool("trace", false, "Trace the packet capturing") configFile = flag.String("config", "", "The location of the TOML config file") beQuiet = flag.Bool("quiet", true, "Be quiet") doVersion = flag.Bool("version", false, "Show version information") natsEC *nats.EncodedConn count uint64 timeout = -1 * time.Second ipPriv *ip.IP config Config // Version contains the program Version, e.g. 1.0.1 Version string // BuildDate contains the date and time at which the program was compiled BuildDate string ) // Config contains the program configuration type Config struct { Live bool Interface string SnapshotLen int Filter string Promiscuous bool NatsURL string NatsQueue string SleepFor duration RequestsFile string UseXForwardedAsSource bool Quiet bool Protocol string Trace bool } type duration struct { time.Duration } func (d *duration) UnmarshalText(text []byte) error { var err error d.Duration, err = time.ParseDuration(string(text)) return err } func (c Config) print() { fmt.Printf("Live: %t\n", c.Live) fmt.Printf("Interface: %s\n", c.Interface) fmt.Printf("SnapshotLen: %d\n", c.SnapshotLen) fmt.Printf("Filter: %s\n", c.Filter) fmt.Printf("Promiscuous: %t\n", c.Promiscuous) fmt.Printf("NatsURL: %s\n", c.NatsURL) fmt.Printf("NatsQueue: %s\n", c.NatsQueue) fmt.Printf("SleepFor: %s\n", c.SleepFor.String()) fmt.Printf("RequestsFile: %s\n", c.RequestsFile) fmt.Printf("UseXForwardedAsSource: %t\n", c.UseXForwardedAsSource) fmt.Printf("Protocol: %s\n", c.Protocol) fmt.Printf("Quiet: %t\n", c.Quiet) fmt.Printf("Trace: %t\n", c.Trace) } func init() { flag.Parse() nats.RegisterEncoder(protobuf.PROTOBUF_ENCODER, &protobuf.ProtobufEncoder{}) } func main() { if *doVersion { version() os.Exit(0) } loadConfig() // Output how many requests per second were sent if !config.Quiet { go func(c *uint64) { for { fmt.Printf("%d requests per second\n", *c) *c = 0 time.Sleep(time.Second) } }(&count) } // NATS // if config.NatsURL == "" { log.Fatal("No NATS URL specified (-nats-url)!") } natsConn, err := nats.Connect(config.NatsURL) if err != nil { log.Fatal(err) } natsEC, err = nats.NewEncodedConn(natsConn, protobuf.PROTOBUF_ENCODER) if err != nil { log.Fatalf("Encoded Connection: %v!\n", err) } // What should I do? if config.RequestsFile != "" { replayFile() } else if config.Live { fmt.Printf("live capture (%s, %s) to %s\n", config.Interface, config.Filter, config.NatsURL) liveCapture() } } func liveCapture() { ipPriv = ip.NewIP() // PCAP setup // handle, err := pcap.OpenLive(config.Interface, int32(config.SnapshotLen), config.Promiscuous, timeout) if err != nil { log.Fatal(err) } defer handle.Close() err = handle.SetBPFFilter(config.Filter) if err != nil { log.Fatal(err) } packetSource := gopacket.NewPacketSource(handle, handle.LinkType()) for packet := range packetSource.Packets() { go processPacket(packet) } } // processPacket receives a raw packet from pcap, builds a Request item from it and sends it to the queue func processPacket(packet gopacket.Packet) { hasIPv4 := false var ipSrc, ipDst string // IPv4 if ipLayer := packet.Layer(layers.LayerTypeIPv4); ipLayer != nil { ip := ipLayer.(*layers.IPv4) ipSrc = ip.SrcIP.String() ipDst = ip.DstIP.String() hasIPv4 = true } // IPv6 if !hasIPv4 { if ipLayer := packet.Layer(layers.LayerTypeIPv6); ipLayer != nil { ip := ipLayer.(*layers.IPv6) ipSrc = ip.SrcIP.String() ipDst = ip.DstIP.String() } } // TCP tcpLayer := packet.Layer(layers.LayerTypeTCP) if tcpLayer == nil { return } tcp, _ := tcpLayer.(*layers.TCP) portSrc := tcp.SrcPort portDst := tcp.DstPort sequence := tcp.Seq applicationLayer := packet.ApplicationLayer() if applicationLayer == nil { return } count++ if len(applicationLayer.Payload()) < 50 { log.Println("application layer too small!") return } request := data.Request{ IpSrc: ipSrc, IpDst: ipDst, PortSrc: uint32(portSrc), PortDst: uint32(portDst), TcpSeq: uint32(sequence), CreatedAt: packet.Metadata().CaptureInfo.Timestamp.UnixNano(), } switch config.Protocol { case "http": err := processHTTP(&request, applicationLayer.Payload()) if err != nil { log.Println(err) return } case "ajp13": err := processAJP13(&request, applicationLayer.Payload()) if err != nil { log.Println(err) return } } if config.UseXForwardedAsSource && request.XForwardedFor != "" { if strings.Contains(request.XForwardedFor, ",") { ips := strings.Split(request.XForwardedFor, ",") for i := len(ips) - 1; i >= 0; i-- { ipRaw := strings.TrimSpace(ips[i]) ipAddr := net.ParseIP(ipRaw) if ipAddr != nil && !ipPriv.IsPrivate(ipAddr) { request.Source = ipRaw break } } } else { ipAddr := net.ParseIP(strings.TrimSpace(request.XForwardedFor)) if !ipPriv.IsPrivate(ipAddr) { request.Source = request.XForwardedFor } } } if request.Source == request.IpSrc && request.XRealIP != "" { request.Source = request.XRealIP } if config.Trace { log.Printf("[%s] %s\n", request.Source, request.Url) } natsEC.Publish(config.NatsQueue, &request) } func processAJP13(request *data.Request, appData []byte) error { a, err := ajp13.Parse(appData) if err != nil { return fmt.Errorf("Failed to parse AJP13 request: %s", err) } request.Url = a.URI request.Method = a.Method() request.Host = a.Server request.Protocol = a.Version request.Origin = a.RemoteAddr.String() request.Source = a.RemoteAddr.String() if v, ok := a.Header("Referer"); ok { request.Referer = v } if v, ok := a.Header("Connection"); ok { request.Connection = v } if v, ok := a.Header("X-Forwarded-For"); ok { request.XForwardedFor = v } if v, ok := a.Header("X-Real-IP"); ok { request.XRealIP = v } if v, ok := a.Header("X-Requested-With"); ok { request.XRequestedWith = v } if v, ok := a.Header("Accept-Encoding"); ok { request.AcceptEncoding = v } if v, ok := a.Header("Accept-Language"); ok { request.AcceptLanguage = v } if v, ok := a.Header("User-Agent"); ok { request.UserAgent = v } if v, ok := a.Header("Accept"); ok { request.Accept = v } if v, ok := a.Header("Cookie"); ok { request.Cookie = v } if v, ok := a.Header("X-Forwarded-Host"); ok { if v != request.Host { request.Host = v } } if false && config.Trace { log.Println("Request") pretty.Println(request) } return nil } func processHTTP(request *data.Request, appData []byte) error { reader := bufio.NewReader(strings.NewReader(string(appData))) req, err := http.ReadRequest(reader) if err != nil { return fmt.Errorf("Failed to parse HTTP header: %s", err) } request.Url = req.URL.String() request.Method = req.Method request.Referer = req.Referer() request.Host = req.Host request.Protocol = req.Proto request.Origin = request.Host if _, ok := req.Header["Connection"]; ok { request.Connection = req.Header["Connection"][0] } if _, ok := req.Header["X-Forwarded-For"]; ok { request.XForwardedFor = req.Header["X-Forwarded-For"][0] } if _, ok := req.Header["X-Real-IP"]; ok { request.XRealIP = req.Header["X-Real-IP"][0] } if _, ok := req.Header["X-Requested-With"]; ok { request.XRequestedWith = req.Header["X-Requested-With"][0] } if _, ok := req.Header["Accept-Encoding"]; ok { request.AcceptEncoding = req.Header["Accept-Encoding"][0] } if _, ok := req.Header["Accept-Language"]; ok { request.AcceptLanguage = req.Header["Accept-Language"][0] } if _, ok := req.Header["User-Agent"]; ok { request.UserAgent = req.Header["User-Agent"][0] } if _, ok := req.Header["Accept"]; ok { request.Accept = req.Header["Accept"][0] } if _, ok := req.Header["Cookie"]; ok { request.Cookie = req.Header["Cookie"][0] } request.Source = request.IpSrc return nil } // replayFile takes a file containing a list of requests (SourceIP Url) and queues the requests // e.g. // 157.55.39.229 /gross-gerau/12012260-beate-anstatt // 103.232.100.98 /weinsheim-eifel/13729444-plus-warenhandelsges-mbh func replayFile() { var req data.Request var startTs time.Time var endTs time.Time for { fh, err := os.Open(config.RequestsFile) if err != nil { log.Fatalf("Failed to open request file '%s': %s", config.RequestsFile, err) } c := csv.NewReader(fh) c.Comma = ' ' for { if config.SleepFor.Duration > time.Nanosecond { startTs = time.Now() } r, err := c.Read() if err == io.EOF { break } if err != nil { log.Println(err) continue } req.IpSrc = r[0] req.Source = r[0] req.Url = r[1] req.UserAgent = "Munch/1.0" req.Host = "demo.scraperwall.com" req.CreatedAt = time.Now().UnixNano() natsEC.Publish(config.NatsQueue, &req) count++ if config.SleepFor.Duration >= time.Nanosecond { endTs = time.Now() if endTs.Before(startTs.Add(config.SleepFor.Duration)) { time.Sleep(config.SleepFor.Duration - endTs.Sub(startTs)) } } } } } func loadConfig() { // initialize with values from the command line / environment config.Live = *doLiveCapture config.Interface = *iface config.SnapshotLen = *snapshotLen config.Filter = *filter config.Promiscuous = *promiscuous config.NatsURL = *natsURL config.NatsQueue = *natsQueue config.SleepFor.Duration = *sleepFor config.RequestsFile = *requestsFile config.UseXForwardedAsSource = *useXForwardedAsSource config.Protocol = *protocol config.Quiet = *beQuiet config.Trace = *trace if *configFile == "" { return } _, err := os.Stat(*configFile) if err != nil { log.Printf("%s: %s\n", *configFile, err) return } if _, err = toml.DecodeFile(*configFile, &config); err != nil { log.Printf("%s: %s\n", *configFile, err) } if !config.Quiet { config.print() } } // version outputs build information func version() { fmt.Printf("munchclient %s, built on %s\n", Version, BuildDate) }