瀏覽代碼

head merge

Tobias von Dewitz 7 年之前
父節點
當前提交
28ed9c53c4
共有 10 個文件被更改,包括 447 次插入92 次删除
  1. 4 0
      .gitignore
  2. 5 1
      Makefile
  3. 3 0
      after-install-trigger.sh
  4. 3 0
      before-uninstall-trigger.sh
  5. 52 0
      build-rpm.sh
  6. 8 0
      defaults/munchclient
  7. 90 0
      init.d/munchclient
  8. 10 0
      logrotate.d/munchclient
  9. 261 91
      main.go
  10. 11 0
      munchclient.toml

+ 4 - 0
.gitignore

@@ -0,0 +1,4 @@
+/munchclient
+/*.csv
+/m
+*.rpm

+ 5 - 1
Makefile

@@ -1,2 +1,6 @@
+VERSION=`git describe --tags`
+BUILD=`date +%FT%T%z`
+
+
 all:
-	go build -ldflags -s -tags netgo
+	go build -tags netgo -ldflags "-s -X main.Version=$(VERSION) -X main.BuildDate=$(BUILD)"

+ 3 - 0
after-install-trigger.sh

@@ -0,0 +1,3 @@
+#!/bin/sh
+
+chkconfig --add munchclient

+ 3 - 0
before-uninstall-trigger.sh

@@ -0,0 +1,3 @@
+#!/bin/sh
+
+chkconfig --del munchclient

+ 52 - 0
build-rpm.sh

@@ -0,0 +1,52 @@
+#!/bin/bash
+
+DESTDIR=./dist
+VERSION=`git describe --tag | sed 's/^v//'`
+ITERATION=2
+BINARY=munchclient
+DEFAULTS_FILE="$BINARY"
+CONFIG_FILE="$BINARY.toml"
+SERVICE_FILE="$BINARY.service"
+PKG_TYPE=rpm
+DESCR="ScraperWall traffic collector"
+RPM_DIR=/opt/rpm.scraperwall.com/centos6
+
+
+rm -rf $DESTDIR
+install -d $DESTDIR/{usr/bin,etc/init.d,etc/default,etc/logrotate.d}
+make
+
+install -v -m 755 $BINARY $DESTDIR/usr/bin/
+install -v -m 644 defaults/$DEFAULTS_FILE $DESTDIR/etc/default/
+install -v -m 644 logrotate.d/munchclient $DESTDIR/etc/logrotate.d/
+install -v -m 644 munchclient.toml $DESTDIR/etc/
+
+
+fpm -s dir -t $PKG_TYPE -C $DESTDIR --name $BINARY \
+  --version $VERSION \
+  --iteration $ITERATION \
+  --description "$DESCR" \
+  --config-files "etc/default/$BINARY" \
+  --config-files "etc/$CONFIG_FILE" \
+  --rpm-init "init.d/$BINARY" \
+  --rpm-trigger-after-install "[]munchclient: ./after-install-trigger.sh" \
+  --rpm-trigger-before-uninstall "[]munchclient: ./before-uninstall-trigger.sh" \
+  -p rpms \
+  -d go-daemon \
+  --rpm-sign
+
+ok=$?
+
+rm -rf $DESTDIR
+
+exit
+
+rpm_file="munch-$VERSION-$ITERATION.x86_64.rpm"
+if [ $ok -eq 0 -a -f "$rpm_file" ]; then
+	rm -f "$RPM_DIR/munch-*.rpm"
+  cp "$rpm_file" "$RPM_DIR"
+
+	(cd "$RPM_DIR" && \
+   createrepo -v . && \
+   rsync -av --progress --delete . fender.spyz.org:/srv/http-vhosts/rpm.scraperwall.com/)
+fi

+ 8 - 0
defaults/munchclient

@@ -0,0 +1,8 @@
+FILTER="tcp port 80 and host 10.1.1.1"
+INTERFACE="eth0"
+LIVE=yes
+NATS_QUEUE=requests
+NATS_URL=nats://127.0.0.1:4222
+PROMISCUOUS=false
+USE_X_FORWARDED=true
+

+ 90 - 0
init.d/munchclient

@@ -0,0 +1,90 @@
+#!/bin/bash
+#
+# munchclient                     Startup script for munchclient
+#
+# chkconfig: 2345 12 88
+# description: Munchclient collects HTTP requests and sends them to the ScraperWall queue
+### BEGIN INIT INFO
+# Provides: $syslog
+# Required-Start: $local_fs $network
+# Required-Stop: $local_fs $network
+# Default-Start:  2 3 4 5
+# Default-Stop: 0 1 6
+# Short-Description: ScraperWall HTTP request collector
+# Description: Munchclient is the ScraperWall HTTP-request collector
+### END INIT INFO
+
+# Source function library.
+. /etc/init.d/functions
+
+RETVAL=0
+PIDFILE=/var/run/munchclient.pid
+
+prog=munchclient
+exec=/usr/bin/$prog
+cmd="/usr/bin/god --nohup --logfile /var/log/munchclient.log --pidfile $PIDFILE --rundir /tmp -- /usr/bin/munchclient -config /etc/munchclient.toml"
+lockfile=/var/lock/subsys/$prog
+
+
+# Source config
+if [ -f /etc/default/$prog ] ; then
+    source /etc/default/$prog
+fi
+
+start() {
+	[ -x $exec ] || exit 5
+
+	umask 077
+
+        echo -n $"Starting ScraperWall request collector: "
+        daemon --pidfile="$PIDFILE" $cmd
+        RETVAL=$?
+        echo
+        [ $RETVAL -eq 0 ] && touch $lockfile
+        return $RETVAL
+}
+stop() {
+        echo -n $"Shutting down ScraperWall request collector: "
+        killproc -p "$PIDFILE" $exec
+        RETVAL=$?
+        echo
+        [ $RETVAL -eq 0 ] && rm -f $lockfile
+        return $RETVAL
+}
+rhstatus() {
+        status -p "$PIDFILE" -l $prog $exec
+}
+restart() {
+        stop
+        start
+}
+
+case "$1" in
+  start)
+        start
+        ;;
+  stop)
+        stop
+        ;;
+  restart)
+        restart
+        ;;
+  reload)
+        exit 3
+        ;;
+  force-reload)
+        restart
+        ;;
+  status)
+        rhstatus
+        ;;
+  condrestart|try-restart)
+        rhstatus >/dev/null 2>&1 || exit 0
+        restart
+        ;;
+  *)
+        echo $"Usage: $0 {start|stop|restart|condrestart|try-restart|reload|force-reload|status}"
+        exit 3
+esac
+
+exit $?

+ 10 - 0
logrotate.d/munchclient

@@ -0,0 +1,10 @@
+/var/log/munchclient.log {
+    weekly
+    missingok
+    notifempty
+    rotate 4 
+    minsize 10000000
+    postrotate
+    /etc/init.d/munchclient restart || true
+    endscript
+}

+ 261 - 91
main.go

@@ -1,37 +1,86 @@
 package main
 
 import (
+	"bufio"
 	"encoding/csv"
 	"flag"
 	"fmt"
 	"io"
 	"log"
+	"net"
+	"net/http"
 	"os"
+	"strings"
 	"time"
 
+	"github.com/BurntSushi/toml"
+	"github.com/google/gopacket"
+	"github.com/google/gopacket/layers"
+	"github.com/google/gopacket/pcap"
 	"github.com/nats-io/nats"
 	"github.com/nats-io/nats/encoders/protobuf"
 
-	"github.com/pkg/profile"
-
-	"gopkg.in/mgo.v2"
-	"gopkg.in/mgo.v2/bson"
-
 	"git.scraperwall.com/scw/data"
+	"git.scraperwall.com/scw/ip"
 )
 
 var (
-	natsURL      = flag.String("nats-url", "nats://127.0.0.1:4222", "The URL of the NATS server")
-	natsQueue    = flag.String("nats-queue", "mami_requests", "The NATS queue name")
-	sleepFor     = flag.Duration("sleep", 0, "Sleep this long between sending data")
-	requestsFile = flag.String("requests", "", "CSV file containing requests (IP and URL)")
-	doProfile    = flag.Bool("profile", false, "Profile the program")
-
-	dataChan chan data.Request
-	natsEC   *nats.EncodedConn
-	count    uint64
+	doLiveCapture         = flag.Bool("live", false, "Capture data in real time from a given interface")
+	iface                 = flag.String("interface", "eth0", "Interface to get packets from")
+	snapshotLen           = flag.Int("snapshot-len", 8192, "Snapshot Length in Bytes")
+	filter                = flag.String("filter", "tcp", "PCAP filter expression")
+	promiscuous           = flag.Bool("promiscuous", false, "Switch interface into promiscuous mode?")
+	natsURL               = flag.String("nats-url", "nats://127.0.0.1:4222", "The URL of the NATS server")
+	natsQueue             = flag.String("nats-queue", "requests", "The NATS queue name")
+	sleepFor              = flag.Duration("sleep", 0, "Sleep this long between sending data (only when replaying a file)")
+	requestsFile          = flag.String("requests", "", "CSV file containing requests (IP and URL)")
+	useXForwardedAsSource = flag.Bool("use-x-forwarded", false, "Use the IP address in X-Forwarded-For as source")
+	configFile            = flag.String("config", "", "The location of the TOML config file")
+	beQuiet               = flag.Bool("quiet", true, "Be quiet")
+	doVersion             = flag.Bool("version", false, "Show version information")
+
+	natsEC  *nats.EncodedConn
+	count   uint64
+	timeout = -1 * time.Second
+	ipPriv  *ip.IP
+	config  Config
+
+	// Version contains the program Version, e.g. 1.0.1
+	Version string
+
+	// BuildDate contains the date and time at which the program was compiled
+	BuildDate string
 )
 
+// Config contains the program configuration
+type Config struct {
+	Live                  bool
+	Interface             string
+	SnapshotLen           int
+	Filter                string
+	Promiscuous           bool
+	NatsURL               string
+	NatsQueue             string
+	SleepFor              time.Duration
+	RequestsFile          string
+	UseXForwardedAsSource bool
+	Quiet                 bool
+}
+
+func (c Config) print() {
+	fmt.Printf("Live:                  %t\n", c.Live)
+	fmt.Printf("Interface:             %s\n", c.Interface)
+	fmt.Printf("SnapshotLen:           %d\n", c.SnapshotLen)
+	fmt.Printf("Filter:                %s\n", c.Filter)
+	fmt.Printf("Promiscuous:           %t\n", c.Promiscuous)
+	fmt.Printf("NatsURL:               %s\n", c.NatsURL)
+	fmt.Printf("NatsQueue:             %s\n", c.NatsQueue)
+	fmt.Printf("SleepFor:              %s\n", c.SleepFor.String())
+	fmt.Printf("RequestsFile:          %s\n", c.RequestsFile)
+	fmt.Printf("UseXForwardedAsSource: %t\n", c.UseXForwardedAsSource)
+	fmt.Printf("Quiet:                 %t\n", c.Quiet)
+}
+
 func init() {
 	flag.Parse()
 
@@ -39,83 +88,205 @@ func init() {
 }
 
 func main() {
-
-	if *doProfile {
-		defer profile.Start(profile.CPUProfile).Stop()
+	if *doVersion {
+		version()
+		os.Exit(0)
 	}
 
-	go func(c *uint64) {
-		for {
-			fmt.Printf("%d items per second\n", *c)
-			*c = 0
-			time.Sleep(time.Second)
-		}
-	}(&count)
+	loadConfig()
+
+	// Output how many requests per second were sent
+	if !config.Quiet {
+		go func(c *uint64) {
+			for {
+				fmt.Printf("%d requests per second\n", *c)
+				*c = 0
+				time.Sleep(time.Second)
+			}
+		}(&count)
+	}
 
 	// NATS
 	//
-	if *natsURL == "" {
+	if config.NatsURL == "" {
 		log.Fatal("No NATS URL specified (-nats-url)!")
 	}
 
-	natsConn, err := nats.Connect(*natsURL)
+	natsConn, err := nats.Connect(config.NatsURL)
 	if err != nil {
 		log.Fatal(err)
 	}
 
-	// nats.RegisterEncoder(encType, enc)
 	natsEC, err = nats.NewEncodedConn(natsConn, protobuf.PROTOBUF_ENCODER)
-	// _, err = nats.NewEncodedConn(nats_conn, protobuf.PROTOBUF_ENCODER)
 	if err != nil {
 		log.Fatalf("Encoded Connection: %v!\n", err)
 	}
 
-	/*
-		log.Print("binding chan...")
-		data_chan = make(chan data.Request)
-		nats_ec.BindSendChan(*nats_queue, data_chan)
-		log.Println("done")
-	*/
-	/*
-		laddr, err := net.ResolveUDPAddr("udp", "127.0.0.1:0")
-		if err != nil {
-			log.Fatal(err)
-		}
+	// What should I do?
+	if config.RequestsFile != "" {
+		replayFile()
+	} else if config.Live {
+		fmt.Printf("live capture (%s, %s) to %s\n", config.Interface, config.Filter, config.NatsURL)
+		liveCapture()
+	}
+}
 
-		raddr, err := net.ResolveUDPAddr("udp", "127.0.0.1:9876")
-		if err != nil {
-			log.Fatal(err)
-		}
+func liveCapture() {
+	ipPriv = ip.NewIP()
 
-		conn, err := net.DialUDP("udp", laddr, raddr)
-		if err != nil {
-			log.Fatal(err)
+	// PCAP setup
+	//
+	handle, err := pcap.OpenLive(config.Interface, int32(config.SnapshotLen), config.Promiscuous, timeout)
+	if err != nil {
+		log.Fatal(err)
+	}
+	defer handle.Close()
+
+	err = handle.SetBPFFilter(config.Filter)
+	if err != nil {
+		log.Fatal(err)
+	}
+
+	packetSource := gopacket.NewPacketSource(handle, handle.LinkType())
+
+	for packet := range packetSource.Packets() {
+		go processPacket(packet)
+	}
+}
+
+// processPacket receives a raw packet from pcap, builds a Request item from it and sends it to the queue
+func processPacket(packet gopacket.Packet) {
+	count++
+
+	ipLayer := packet.Layer(layers.LayerTypeIPv4)
+	if ipLayer == nil {
+		log.Println("No IPv4 Layer!")
+		return
+	}
+
+	ip, _ := ipLayer.(*layers.IPv4)
+
+	if ip.Protocol != layers.IPProtocolTCP {
+		log.Println("No TCP Protocol!")
+		return
+	}
+
+	ipSrc := ip.SrcIP.String()
+	ipDst := ip.DstIP.String()
+
+	tcpLayer := packet.Layer(layers.LayerTypeTCP)
+	if tcpLayer == nil {
+		return
+	}
+
+	tcp, _ := tcpLayer.(*layers.TCP)
+
+	portSrc := tcp.SrcPort
+	portDst := tcp.DstPort
+	sequence := tcp.Seq
+
+	applicationLayer := packet.ApplicationLayer()
+	if applicationLayer == nil {
+		return
+	}
+
+	data := data.Request{}
+
+	reader := bufio.NewReader(strings.NewReader(string(applicationLayer.Payload())))
+
+	req, err := http.ReadRequest(reader)
+	if err != nil {
+		return
+	}
+
+	data.IpSrc = ipSrc
+	data.IpDst = ipDst
+	data.PortSrc = uint32(portSrc)
+	data.PortDst = uint32(portDst)
+	data.TcpSeq = uint32(sequence)
+	data.CreatedAt = time.Now().UnixNano()
+	data.Url = req.URL.String()
+	data.Method = req.Method
+	data.Referer = req.Referer()
+	data.Host = req.Host
+	data.Protocol = req.Proto
+	data.Origin = data.Host
+	if _, ok := req.Header["Connection"]; ok {
+		data.Connection = req.Header["Connection"][0]
+	}
+	if _, ok := req.Header["X-Forwarded-For"]; ok {
+		data.XForwardedFor = req.Header["X-Forwarded-For"][0]
+	}
+	if _, ok := req.Header["X-Real-IP"]; ok {
+		data.XRealIP = req.Header["X-Real-IP"][0]
+	}
+	if _, ok := req.Header["X-Requested-With"]; ok {
+		data.XRequestedWith = req.Header["X-Requested-With"][0]
+	}
+	if _, ok := req.Header["Accept-Encoding"]; ok {
+		data.AcceptEncoding = req.Header["Accept-Encoding"][0]
+	}
+	if _, ok := req.Header["Accept-Language"]; ok {
+		data.AcceptLanguage = req.Header["Accept-Language"][0]
+	}
+	if _, ok := req.Header["User-Agent"]; ok {
+		data.UserAgent = req.Header["User-Agent"][0]
+	}
+	if _, ok := req.Header["Accept"]; ok {
+		data.Accept = req.Header["Accept"][0]
+	}
+	if _, ok := req.Header["Cookie"]; ok {
+		data.Cookie = req.Header["Cookie"][0]
+	}
+	data.Source = data.IpSrc
+
+	if config.UseXForwardedAsSource && data.XForwardedFor != "" {
+		if strings.Contains(data.XForwardedFor, ",") {
+			ips := strings.Split(data.XForwardedFor, ",")
+			for i := len(ips) - 1; i >= 0; i-- {
+				ipRaw := strings.TrimSpace(ips[i])
+				ipAddr := net.ParseIP(ipRaw)
+				if ipAddr != nil && !ipPriv.IsPrivate(ipAddr) {
+					data.Source = ipRaw
+					break
+				}
+			}
+		} else {
+			ipAddr := net.ParseIP(strings.TrimSpace(data.XForwardedFor))
+
+			if !ipPriv.IsPrivate(ipAddr) {
+				data.Source = data.XForwardedFor
+			}
 		}
-	*/
+	}
 
-	if *requestsFile != "" {
-		replayFile()
-	} else {
-		replayMongoDB()
+	if data.Source == data.IpSrc && data.XRealIP != "" {
+		data.Source = data.XRealIP
 	}
+
+	natsEC.Publish(config.NatsQueue, &data)
 }
 
+// replayFile takes a file containing a list of requests (SourceIP Url) and queues the requests
+// e.g.
+// 157.55.39.229 /gross-gerau/12012260-beate-anstatt
+// 103.232.100.98 /weinsheim-eifel/13729444-plus-warenhandelsges-mbh
 func replayFile() {
 	var req data.Request
 	var startTs time.Time
 	var endTs time.Time
 
 	for {
-		fh, err := os.Open(*requestsFile)
+		fh, err := os.Open(config.RequestsFile)
 		if err != nil {
-			log.Fatalf("Failed to open request file '%s': %s", *requestsFile, err)
+			log.Fatalf("Failed to open request file '%s': %s", config.RequestsFile, err)
 		}
 
 		c := csv.NewReader(fh)
 		c.Comma = ' '
 
 		for {
-			if *sleepFor > time.Nanosecond {
+			if config.SleepFor > time.Nanosecond {
 				startTs = time.Now()
 			}
 
@@ -137,55 +308,54 @@ func replayFile() {
 			req.Host = "demo.scraperwall.com"
 			req.CreatedAt = time.Now().UnixNano()
 
-			// fmt.Printf("%s: %s\n", req.IpSrc, req.Url)
-			natsEC.Publish(*natsQueue, &req)
+			natsEC.Publish(config.NatsQueue, &req)
 
 			count++
-			if *sleepFor >= time.Nanosecond {
+			if config.SleepFor >= time.Nanosecond {
 				endTs = time.Now()
-				if endTs.Before(startTs.Add(*sleepFor)) {
-					time.Sleep(*sleepFor - endTs.Sub(startTs))
+				if endTs.Before(startTs.Add(config.SleepFor)) {
+					time.Sleep(config.SleepFor - endTs.Sub(startTs))
 				}
 			}
 		}
 	}
 }
 
-func replayMongoDB() {
-	mongo, err := mgo.Dial("mongodb://127.0.0.1:27017")
-	if err != nil {
-		log.Fatal(err)
+func loadConfig() {
+
+	// initialize with values from the command line / environment
+	config.Live = *doLiveCapture
+	config.Interface = *iface
+	config.SnapshotLen = *snapshotLen
+	config.Filter = *filter
+	config.Promiscuous = *promiscuous
+	config.NatsURL = *natsURL
+	config.NatsQueue = *natsQueue
+	config.SleepFor = *sleepFor
+	config.RequestsFile = *requestsFile
+	config.UseXForwardedAsSource = *useXForwardedAsSource
+	config.Quiet = *beQuiet
+
+	if *configFile == "" {
+		return
 	}
 
-	db := mongo.DB("scw")
-	coll := db.C("requests")
-	var req data.RequestData
-	var startTs time.Time
-	var endTs time.Time
-
-	for {
-		res := coll.Find(bson.M{}).Batch(200).Prefetch(0.5)
-		iterator := res.Iter()
-
-		for iterator.Next(&req) {
-			if *sleepFor > time.Microsecond {
-				startTs = time.Now()
-			}
-
-			req.CreatedAt = time.Now()
-			natsEC.Publish(*natsQueue, req.ToRequest())
-			// data_chan <- *req.ToRequest()
+	_, err := os.Stat(*configFile)
+	if err != nil {
+		log.Printf("%s: %s\n", *configFile, err)
+		return
+	}
 
-			// nats_conn.Publish(*nats_queue, j)
+	if _, err = toml.DecodeFile(*configFile, &config); err != nil {
+		log.Printf("%s: %s\n", *configFile, err)
+	}
 
-			// conn.Write(j)
-			count++
-			if *sleepFor >= time.Microsecond {
-				endTs = time.Now()
-				if endTs.Before(startTs.Add(*sleepFor)) {
-					time.Sleep(*sleepFor - endTs.Sub(startTs))
-				}
-			}
-		}
+	if !config.Quiet {
+		config.print()
 	}
 }
+
+// version outputs build information
+func version() {
+	fmt.Printf("munchclient %s, built on %s\n", Version, BuildDate)
+}

+ 11 - 0
munchclient.toml

@@ -0,0 +1,11 @@
+# Live = true
+# Interface = "eth0"
+# SnapshotLen = 8192
+# Filter = "tcp port 80 and dst 10.1.1.1"
+# Promiscuous = false
+# NatsURL = "nats://192.168.122.1:4222"
+# NatsQueue = "requests"
+# UseXForwardedAsSource = true
+# Quiet = true
+# SleepFor = 100µs
+# RequestsFile = requests.csv