package main import ( "bufio" "encoding/csv" "flag" "fmt" "io" "log" "net" "net/http" "os" "strings" "time" "github.com/google/gopacket" "github.com/google/gopacket/layers" "github.com/google/gopacket/pcap" "github.com/nats-io/nats" "github.com/nats-io/nats/encoders/protobuf" "git.scraperwall.com/scw/data" "git.scraperwall.com/scw/ip" ) var ( doLiveCapture = flag.Bool("live", false, "Capture data in real time from a given interface") iface = flag.String("interface", "eth0", "Interface to get packets from") snapshotLen = flag.Int("snapshot-len", 8192, "Snapshot Length in Bytes") filter = flag.String("filter", "tcp", "PCAP filter expression") promiscuous = flag.Bool("promiscuous", false, "Switch interface into promiscuous mode?") natsURL = flag.String("nats-url", "nats://127.0.0.1:4222", "The URL of the NATS server") natsQueue = flag.String("nats-queue", "requests", "The NATS queue name") sleepFor = flag.Duration("sleep", 0, "Sleep this long between sending data (only when replaying a file)") requestsFile = flag.String("requests", "", "CSV file containing requests (IP and URL)") useXForwardedAsSource = flag.Bool("use-x-forwarded", false, "Use the IP address in X-Forwarded-For as source") doVersion = flag.Bool("version", false, "Show version information") natsEC *nats.EncodedConn count uint64 timeout = -1 * time.Second ipPriv *ip.IP // Version contains the program Version, e.g. 1.0.1 Version string // BuildDate contains the date and time at which the program was compiled BuildDate string ) func init() { flag.Parse() nats.RegisterEncoder(protobuf.PROTOBUF_ENCODER, &protobuf.ProtobufEncoder{}) } func main() { if *doVersion { version() os.Exit(0) } go func(c *uint64) { for { fmt.Printf("%d requests per second\n", *c) *c = 0 time.Sleep(time.Second) } }(&count) // NATS // if *natsURL == "" { log.Fatal("No NATS URL specified (-nats-url)!") } natsConn, err := nats.Connect(*natsURL) if err != nil { log.Fatal(err) } natsEC, err = nats.NewEncodedConn(natsConn, protobuf.PROTOBUF_ENCODER) if err != nil { log.Fatalf("Encoded Connection: %v!\n", err) } // What should I do? if *requestsFile != "" { replayFile() } else if *doLiveCapture { fmt.Printf("live capture (%s, %s) to %s\n", *iface, *filter, *natsURL) liveCapture() } } func liveCapture() { ipPriv = ip.NewIP() // PCAP setup // handle, err := pcap.OpenLive(*iface, int32(*snapshotLen), *promiscuous, timeout) if err != nil { log.Fatal(err) } defer handle.Close() err = handle.SetBPFFilter(*filter) if err != nil { log.Fatal(err) } packetSource := gopacket.NewPacketSource(handle, handle.LinkType()) for packet := range packetSource.Packets() { go processPacket(packet) } } // processPacket receives a raw packet from pcap, builds a Request item from it and sends it to the queue func processPacket(packet gopacket.Packet) { count++ ipLayer := packet.Layer(layers.LayerTypeIPv4) if ipLayer == nil { log.Println("No IPv4 Layer!") return } ip, _ := ipLayer.(*layers.IPv4) if ip.Protocol != layers.IPProtocolTCP { log.Println("No TCP Protocol!") return } ipSrc := ip.SrcIP.String() ipDst := ip.DstIP.String() tcpLayer := packet.Layer(layers.LayerTypeTCP) if tcpLayer == nil { return } tcp, _ := tcpLayer.(*layers.TCP) portSrc := tcp.SrcPort portDst := tcp.DstPort sequence := tcp.Seq applicationLayer := packet.ApplicationLayer() if applicationLayer == nil { return } data := data.Request{} reader := bufio.NewReader(strings.NewReader(string(applicationLayer.Payload()))) req, err := http.ReadRequest(reader) if err != nil { return } data.IpSrc = ipSrc data.IpDst = ipDst data.PortSrc = uint32(portSrc) data.PortDst = uint32(portDst) data.TcpSeq = uint32(sequence) data.CreatedAt = time.Now().UnixNano() data.Url = req.URL.String() data.Method = req.Method data.Referer = req.Referer() data.Host = req.Host data.Protocol = req.Proto data.Origin = data.Host if _, ok := req.Header["Connection"]; ok { data.Connection = req.Header["Connection"][0] } if _, ok := req.Header["X-Forwarded-For"]; ok { data.XForwardedFor = req.Header["X-Forwarded-For"][0] } if _, ok := req.Header["X-Real-IP"]; ok { data.XRealIP = req.Header["X-Real-IP"][0] } if _, ok := req.Header["X-Requested-With"]; ok { data.XRequestedWith = req.Header["X-Requested-With"][0] } if _, ok := req.Header["Accept-Encoding"]; ok { data.AcceptEncoding = req.Header["Accept-Encoding"][0] } if _, ok := req.Header["Accept-Language"]; ok { data.AcceptLanguage = req.Header["Accept-Language"][0] } if _, ok := req.Header["User-Agent"]; ok { data.UserAgent = req.Header["User-Agent"][0] } if _, ok := req.Header["Accept"]; ok { data.Accept = req.Header["Accept"][0] } if _, ok := req.Header["Cookie"]; ok { data.Cookie = req.Header["Cookie"][0] } data.Source = data.IpSrc if *useXForwardedAsSource && data.XForwardedFor != "" { if strings.Contains(data.XForwardedFor, ",") { ips := strings.Split(data.XForwardedFor, ",") for i := len(ips) - 1; i >= 0; i-- { ipRaw := strings.TrimSpace(ips[i]) ipAddr := net.ParseIP(ipRaw) if ipAddr != nil && !ipPriv.IsPrivate(ipAddr) { data.Source = ipRaw break } } } else { ipAddr := net.ParseIP(strings.TrimSpace(data.XForwardedFor)) if !ipPriv.IsPrivate(ipAddr) { data.Source = data.XForwardedFor } } } if data.Source == data.IpSrc && data.XRealIP != "" { data.Source = data.XRealIP } natsEC.Publish(*natsQueue, &data) } // replayFile takes a file containing a list of requests (SourceIP Url) and queues the requests // e.g. // 157.55.39.229 /gross-gerau/12012260-beate-anstatt // 103.232.100.98 /weinsheim-eifel/13729444-plus-warenhandelsges-mbh func replayFile() { var req data.Request var startTs time.Time var endTs time.Time for { fh, err := os.Open(*requestsFile) if err != nil { log.Fatalf("Failed to open request file '%s': %s", *requestsFile, err) } c := csv.NewReader(fh) c.Comma = ' ' for { if *sleepFor > time.Nanosecond { startTs = time.Now() } r, err := c.Read() if err == io.EOF { break } if err != nil { log.Println(err) continue } req.IpSrc = r[0] req.Source = r[0] req.Url = r[1] req.UserAgent = "Munch/1.0" req.Host = "www.scraperwall.com" req.CreatedAt = time.Now().UnixNano() natsEC.Publish(*natsQueue, &req) count++ if *sleepFor >= time.Nanosecond { endTs = time.Now() if endTs.Before(startTs.Add(*sleepFor)) { time.Sleep(*sleepFor - endTs.Sub(startTs)) } } } } } // version outputs build information func version() { fmt.Printf("munchclient %s, built on %s\n", Version, BuildDate) }