123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479 |
- package main
- import (
- "bufio"
- "encoding/csv"
- "flag"
- "fmt"
- "io"
- "log"
- "net"
- "net/http"
- "os"
- "strings"
- "time"
- "github.com/BurntSushi/toml"
- "github.com/google/gopacket"
- "github.com/google/gopacket/layers"
- "github.com/google/gopacket/pcap"
- "github.com/kr/pretty"
- "github.com/nats-io/nats"
- "github.com/nats-io/nats/encoders/protobuf"
- "git.scraperwall.com/scw/ajp13"
- "git.scraperwall.com/scw/data"
- "git.scraperwall.com/scw/ip"
- )
- var (
- doLiveCapture = flag.Bool("live", false, "Capture data in real time from a given interface")
- iface = flag.String("interface", "eth0", "Interface to get packets from")
- snapshotLen = flag.Int("snapshot-len", 8192, "Snapshot Length in Bytes")
- filter = flag.String("filter", "tcp", "PCAP filter expression")
- promiscuous = flag.Bool("promiscuous", false, "Switch interface into promiscuous mode?")
- natsURL = flag.String("nats-url", "nats://127.0.0.1:4222", "The URL of the NATS server")
- natsQueue = flag.String("nats-queue", "requests", "The NATS queue name")
- sleepFor = flag.Duration("sleep", 0, "Sleep this long between sending data (only when replaying a file)")
- requestsFile = flag.String("requests", "", "CSV file containing requests (IP and URL)")
- protocol = flag.String("protocol", "http", "which protocol to parse: http or ajp13")
- useXForwardedAsSource = flag.Bool("use-x-forwarded", false, "Use the IP address in X-Forwarded-For as source")
- trace = flag.Bool("trace", false, "Trace the packet capturing")
- configFile = flag.String("config", "", "The location of the TOML config file")
- beQuiet = flag.Bool("quiet", true, "Be quiet")
- doVersion = flag.Bool("version", false, "Show version information")
- natsEC *nats.EncodedConn
- count uint64
- timeout = -1 * time.Second
- ipPriv *ip.IP
- config Config
- // Version contains the program Version, e.g. 1.0.1
- Version string
- // BuildDate contains the date and time at which the program was compiled
- BuildDate string
- )
- // Config contains the program configuration
- type Config struct {
- Live bool
- Interface string
- SnapshotLen int
- Filter string
- Promiscuous bool
- NatsURL string
- NatsQueue string
- SleepFor duration
- RequestsFile string
- UseXForwardedAsSource bool
- Quiet bool
- Protocol string
- Trace bool
- }
- type duration struct {
- time.Duration
- }
- func (d *duration) UnmarshalText(text []byte) error {
- var err error
- d.Duration, err = time.ParseDuration(string(text))
- return err
- }
- func (c Config) print() {
- fmt.Printf("Live: %t\n", c.Live)
- fmt.Printf("Interface: %s\n", c.Interface)
- fmt.Printf("SnapshotLen: %d\n", c.SnapshotLen)
- fmt.Printf("Filter: %s\n", c.Filter)
- fmt.Printf("Promiscuous: %t\n", c.Promiscuous)
- fmt.Printf("NatsURL: %s\n", c.NatsURL)
- fmt.Printf("NatsQueue: %s\n", c.NatsQueue)
- fmt.Printf("SleepFor: %s\n", c.SleepFor.String())
- fmt.Printf("RequestsFile: %s\n", c.RequestsFile)
- fmt.Printf("UseXForwardedAsSource: %t\n", c.UseXForwardedAsSource)
- fmt.Printf("Protocol: %s\n", c.Protocol)
- fmt.Printf("Quiet: %t\n", c.Quiet)
- fmt.Printf("Trace: %t\n", c.Trace)
- }
- func init() {
- flag.Parse()
- nats.RegisterEncoder(protobuf.PROTOBUF_ENCODER, &protobuf.ProtobufEncoder{})
- }
- func main() {
- if *doVersion {
- version()
- os.Exit(0)
- }
- loadConfig()
- // Output how many requests per second were sent
- if !config.Quiet {
- go func(c *uint64) {
- for {
- fmt.Printf("%d requests per second\n", *c)
- *c = 0
- time.Sleep(time.Second)
- }
- }(&count)
- }
- // NATS
- //
- if config.NatsURL == "" {
- log.Fatal("No NATS URL specified (-nats-url)!")
- }
- natsConn, err := nats.Connect(config.NatsURL)
- if err != nil {
- log.Fatal(err)
- }
- natsEC, err = nats.NewEncodedConn(natsConn, protobuf.PROTOBUF_ENCODER)
- if err != nil {
- log.Fatalf("Encoded Connection: %v!\n", err)
- }
- // What should I do?
- if config.RequestsFile != "" {
- replayFile()
- } else if config.Live {
- fmt.Printf("live capture (%s, %s) to %s\n", config.Interface, config.Filter, config.NatsURL)
- liveCapture()
- }
- }
- func liveCapture() {
- ipPriv = ip.NewIP()
- // PCAP setup
- //
- handle, err := pcap.OpenLive(config.Interface, int32(config.SnapshotLen), config.Promiscuous, timeout)
- if err != nil {
- log.Fatal(err)
- }
- defer handle.Close()
- err = handle.SetBPFFilter(config.Filter)
- if err != nil {
- log.Fatal(err)
- }
- packetSource := gopacket.NewPacketSource(handle, handle.LinkType())
- for packet := range packetSource.Packets() {
- go processPacket(packet)
- }
- }
- // processPacket receives a raw packet from pcap, builds a Request item from it and sends it to the queue
- func processPacket(packet gopacket.Packet) {
- count++
- hasIPv4 := false
- var ipSrc, ipDst string
- // IPv4
- if ipLayer := packet.Layer(layers.LayerTypeIPv4); ipLayer != nil {
- ip := ipLayer.(*layers.IPv4)
- ipSrc = ip.SrcIP.String()
- ipDst = ip.DstIP.String()
- hasIPv4 = true
- }
- // IPv6
- if !hasIPv4 {
- if ipLayer := packet.Layer(layers.LayerTypeIPv6); ipLayer != nil {
- ip := ipLayer.(*layers.IPv6)
- ipSrc = ip.SrcIP.String()
- ipDst = ip.DstIP.String()
- }
- }
- // TCP
- tcpLayer := packet.Layer(layers.LayerTypeTCP)
- if tcpLayer == nil {
- return
- }
- tcp, _ := tcpLayer.(*layers.TCP)
- portSrc := tcp.SrcPort
- portDst := tcp.DstPort
- sequence := tcp.Seq
- applicationLayer := packet.ApplicationLayer()
- if applicationLayer == nil {
- return
- }
- request := data.Request{
- IpSrc: ipSrc,
- IpDst: ipDst,
- PortSrc: uint32(portSrc),
- PortDst: uint32(portDst),
- TcpSeq: uint32(sequence),
- CreatedAt: packet.Metadata().CaptureInfo.Timestamp.UnixNano(),
- }
- switch *protocol {
- case "http":
- processHTTP(&request, applicationLayer.Payload())
- case "ajp13":
- processAJP13(&request, applicationLayer.Payload())
- }
- if config.UseXForwardedAsSource && request.XForwardedFor != "" {
- if strings.Contains(request.XForwardedFor, ",") {
- ips := strings.Split(request.XForwardedFor, ",")
- for i := len(ips) - 1; i >= 0; i-- {
- ipRaw := strings.TrimSpace(ips[i])
- ipAddr := net.ParseIP(ipRaw)
- if ipAddr != nil && !ipPriv.IsPrivate(ipAddr) {
- request.Source = ipRaw
- break
- }
- }
- } else {
- ipAddr := net.ParseIP(strings.TrimSpace(request.XForwardedFor))
- if !ipPriv.IsPrivate(ipAddr) {
- request.Source = request.XForwardedFor
- }
- }
- }
- if request.Source == request.IpSrc && request.XRealIP != "" {
- request.Source = request.XRealIP
- }
- if *trace {
- fmt.Println("Request for NATS")
- pretty.Println(request)
- }
- natsEC.Publish(config.NatsQueue, &request)
- }
- func processAJP13(request *data.Request, appData []byte) error {
- if *trace {
- fmt.Printf("packet: %v\n", appData)
- }
- a, err := ajp13.Parse(appData)
- if err != nil {
- return fmt.Errorf("Failed to parse AJP13 request: %s", err)
- }
- if *trace {
- fmt.Println("AJP13")
- pretty.Println(a)
- }
- request.Url = a.URI
- request.Method = a.Method()
- request.Host = a.Server
- request.Protocol = a.Version
- request.Origin = a.RemoteAddr.String()
- if v, ok := a.Header("Referer"); ok {
- request.Referer = v
- }
- if v, ok := a.Header("Connection"); ok {
- request.Connection = v
- }
- if v, ok := a.Header("X-Forwarded-For"); ok {
- request.XForwardedFor = v
- }
- if v, ok := a.Header("X-Real-IP"); ok {
- request.XRealIP = v
- }
- if v, ok := a.Header("X-Requested-With"); ok {
- request.XRequestedWith = v
- }
- if v, ok := a.Header("Accept-Encoding"); ok {
- request.AcceptEncoding = v
- }
- if v, ok := a.Header("Accept-Language"); ok {
- request.AcceptLanguage = v
- }
- if v, ok := a.Header("User-Agent"); ok {
- request.UserAgent = v
- }
- if v, ok := a.Header("Accept"); ok {
- request.Accept = v
- }
- if v, ok := a.Header("Cookie"); ok {
- request.Cookie = v
- }
- if v, ok := a.Header("X-Forwarded-Host"); ok {
- if v != request.Host {
- request.Host = v
- }
- }
- if *trace {
- fmt.Println("Request")
- pretty.Println(request)
- }
- return nil
- }
- func processHTTP(request *data.Request, appData []byte) error {
- reader := bufio.NewReader(strings.NewReader(string(appData)))
- req, err := http.ReadRequest(reader)
- if err != nil {
- return fmt.Errorf("Failed to parse HTTP header: %s", err)
- }
- request.Url = req.URL.String()
- request.Method = req.Method
- request.Referer = req.Referer()
- request.Host = req.Host
- request.Protocol = req.Proto
- request.Origin = request.Host
- if _, ok := req.Header["Connection"]; ok {
- request.Connection = req.Header["Connection"][0]
- }
- if _, ok := req.Header["X-Forwarded-For"]; ok {
- request.XForwardedFor = req.Header["X-Forwarded-For"][0]
- }
- if _, ok := req.Header["X-Real-IP"]; ok {
- request.XRealIP = req.Header["X-Real-IP"][0]
- }
- if _, ok := req.Header["X-Requested-With"]; ok {
- request.XRequestedWith = req.Header["X-Requested-With"][0]
- }
- if _, ok := req.Header["Accept-Encoding"]; ok {
- request.AcceptEncoding = req.Header["Accept-Encoding"][0]
- }
- if _, ok := req.Header["Accept-Language"]; ok {
- request.AcceptLanguage = req.Header["Accept-Language"][0]
- }
- if _, ok := req.Header["User-Agent"]; ok {
- request.UserAgent = req.Header["User-Agent"][0]
- }
- if _, ok := req.Header["Accept"]; ok {
- request.Accept = req.Header["Accept"][0]
- }
- if _, ok := req.Header["Cookie"]; ok {
- request.Cookie = req.Header["Cookie"][0]
- }
- request.Source = request.IpSrc
- return nil
- }
- // replayFile takes a file containing a list of requests (SourceIP Url) and queues the requests
- // e.g.
- // 157.55.39.229 /gross-gerau/12012260-beate-anstatt
- // 103.232.100.98 /weinsheim-eifel/13729444-plus-warenhandelsges-mbh
- func replayFile() {
- var req data.Request
- var startTs time.Time
- var endTs time.Time
- for {
- fh, err := os.Open(config.RequestsFile)
- if err != nil {
- log.Fatalf("Failed to open request file '%s': %s", config.RequestsFile, err)
- }
- c := csv.NewReader(fh)
- c.Comma = ' '
- for {
- if config.SleepFor.Duration > time.Nanosecond {
- startTs = time.Now()
- }
- r, err := c.Read()
- if err == io.EOF {
- break
- }
- if err != nil {
- log.Println(err)
- continue
- }
- req.IpSrc = r[0]
- req.Source = r[0]
- req.Url = r[1]
- req.UserAgent = "Munch/1.0"
- req.Host = "demo.scraperwall.com"
- req.CreatedAt = time.Now().UnixNano()
- natsEC.Publish(config.NatsQueue, &req)
- count++
- if config.SleepFor.Duration >= time.Nanosecond {
- endTs = time.Now()
- if endTs.Before(startTs.Add(config.SleepFor.Duration)) {
- time.Sleep(config.SleepFor.Duration - endTs.Sub(startTs))
- }
- }
- }
- }
- }
- func loadConfig() {
- // initialize with values from the command line / environment
- config.Live = *doLiveCapture
- config.Interface = *iface
- config.SnapshotLen = *snapshotLen
- config.Filter = *filter
- config.Promiscuous = *promiscuous
- config.NatsURL = *natsURL
- config.NatsQueue = *natsQueue
- config.SleepFor.Duration = *sleepFor
- config.RequestsFile = *requestsFile
- config.UseXForwardedAsSource = *useXForwardedAsSource
- config.Protocol = *protocol
- config.Quiet = *beQuiet
- if *configFile == "" {
- return
- }
- _, err := os.Stat(*configFile)
- if err != nil {
- log.Printf("%s: %s\n", *configFile, err)
- return
- }
- if _, err = toml.DecodeFile(*configFile, &config); err != nil {
- log.Printf("%s: %s\n", *configFile, err)
- }
- if !config.Quiet {
- config.print()
- }
- }
- // version outputs build information
- func version() {
- fmt.Printf("munchclient %s, built on %s\n", Version, BuildDate)
- }
|