Browse Source

reset live capturing periodically if so desired

Tobias von Dewitz 6 năm trước cách đây
mục cha
commit
9a1c38249e
4 tập tin đã thay đổi với 98 bổ sung9 xóa
  1. 1 1
      Makefile
  2. 5 3
      build-rpm-centos7.sh
  3. 90 5
      main.go
  4. 2 0
      munchclient.toml

+ 1 - 1
Makefile

@@ -2,7 +2,7 @@ VERSION=$(shell git describe --tags | sed 's/^v//' )
 BUILD=$(shell date +%FT%T%z)
 
 all:
-	docker run --rm -v $(PWD):/root/go/src/git.scraperwall.com/scw/munchclient -w /root/go/src/git.scraperwall.com/scw/munchclient centos-build:latest make compile
+	docker run --rm -v $(PWD):/root/go/src/git.scraperwall.com/scw/munchclient -w /root/go/src/git.scraperwall.com/scw/munchclient registry.scw.systems/centos7-build:latest make compile
 
 compile:
 	env GOOS=linux GOARCH=amd64 go build -tags netgo -ldflags "-s -X main.Version=$(VERSION) -X main.BuildDate=$(BUILD)"

+ 5 - 3
build-rpm-centos7.sh

@@ -13,6 +13,8 @@ PKG_TYPE=rpm
 DESCR="ScraperWall traffic collector"
 OUTDIR=./rpms-centos7
 RPM_DIR=$PWD/rpm.scraperwall.com/centos7
+DOCKER_IMAGE=registry.scw.systems/centos7-build:latest
+
 
 
 rm -rf $DESTDIR
@@ -29,7 +31,7 @@ install -v -m 644 $BINARY.toml $DESTDIR/usr/share/$BINARY/
 
 
 
-docker run --rm -i -t -v $PWD:/scw -v $PWD/rpmmacros:/root/.rpmmacros -w /scw centos-build:latest \
+docker run --rm -i -t -v $PWD:/scw -v $PWD/rpmmacros:/root/.rpmmacros -w /scw $DOCKER_IMAGE \
 fpm -s dir -t $PKG_TYPE -C $DESTDIR --name $BINARY \
   --version $VERSION \
   --iteration $ITERATION \
@@ -42,7 +44,7 @@ fpm -s dir -t $PKG_TYPE -C $DESTDIR --name $BINARY \
 
 rpm_file="$OUTDIR/$BINARY-$VERSION-$ITERATION.x86_64.rpm"
 
-docker run --rm -i -t -v $PWD:/scw -v $PWD/rpmmacros:/root/.rpmmacros -w /scw centos-build:latest \
+docker run --rm -i -t -v $PWD:/scw -v $PWD/rpmmacros:/root/.rpmmacros -w /scw $DOCKER_IMAGE \
 /usr/bin/rpm --define "_gpg_name <tobias@scraperwall.com>" --addsign $rpm_file
 
 ok=$?
@@ -53,7 +55,7 @@ if [ $ok -eq 0 -a -f "$rpm_file" ]; then
 	# rm -f "$RPM_DIR/$BINARY-*.rpm"
   cp "$rpm_file" "$RPM_DIR"
 
-	docker run --rm -i -t -v $RPM_DIR:/rpms -w /rpms centos-build:latest \
+	docker run --rm -i -t -v $RPM_DIR:/rpms -w /rpms $DOCKER_IMAGE \
     createrepo -v . 
 
   ok2=$?

+ 90 - 5
main.go

@@ -41,6 +41,7 @@ var (
 	natsQueue             = flag.String("nats-queue", "requests", "The NATS queue name")
 	natsCA                = flag.String("nats-ca", "", "CA chain for NATS TLS")
 	reconnectToNatsAfter  = flag.Duration("reconnect-to-nats-after", 0, "reconnect to nats after this long periodically")
+	resetLiveCapAfter     = flag.Duration("rest-live-cap-after", 0, "reset the live capture setup after this amount of time")
 	sleepFor              = flag.Duration("sleep", 0, "Sleep this long between sending data (only when replaying a file)")
 	requestsFile          = flag.String("requests", "", "CSV file containing requests (IP and URL)")
 	protocol              = flag.String("protocol", "http", "which protocol to parse: http or ajp13")
@@ -103,6 +104,7 @@ type Config struct {
 	AccessWatchKey        string
 	RPCAddress            string
 	ReconnectToNatsAfter  duration
+	ResetLiveCaptureAfter duration
 }
 
 type duration struct {
@@ -138,6 +140,7 @@ func (c Config) print() {
 	fmt.Printf("AccessWatchKey:        %s\n", c.AccessWatchKey)
 	fmt.Printf("UseXForwardedAsSource: %t\n", c.UseXForwardedAsSource)
 	fmt.Printf("Protocol:              %s\n", c.Protocol)
+	fmt.Printf("Reset Live Cap After:  %s\n", c.ResetLiveCaptureAfter.String())
 	fmt.Printf("RPCAddress:            %s\n", c.RPCAddress)
 	fmt.Printf("Quiet:                 %t\n", c.Quiet)
 	fmt.Printf("Trace:                 %t\n", c.Trace)
@@ -277,27 +280,108 @@ func connectToNATS() error {
 	return nil
 }
 
+type liveCap struct {
+	filter      string
+	device      string
+	promisc     bool
+	snapshotLen int
+	handle      *pcap.Handle
+	packetChan  chan gopacket.Packet
+}
+
+func newLiveCap(device string, filter string, snapshotLen int, promisc bool) (*liveCap, error) {
+	lc := liveCap{
+		filter:      filter,
+		device:      device,
+		promisc:     promisc,
+		snapshotLen: snapshotLen,
+	}
+
+	err := lc.Setup()
+	if err != nil {
+		return nil, err
+	}
+
+	return &lc, nil
+}
+
+func (lc *liveCap) Setup() error {
+	if lc.packetChan != nil {
+		close(lc.packetChan)
+	}
+
+	if lc.handle != nil {
+		lc.handle.Close()
+	}
+
+	// PCAP setup
+	//
+	handle, err := pcap.OpenLive(config.Interface, int32(config.SnapshotLen), config.Promiscuous, timeout)
+	if err != nil {
+		return err
+	}
+	// defer handle.Close()
+	lc.handle = handle
+
+	err = lc.handle.SetBPFFilter(config.Filter)
+	if err != nil {
+		return err
+	}
+
+	packetSource := gopacket.NewPacketSource(handle, handle.LinkType())
+	lc.packetChan = packetSource.Packets()
+
+	return nil
+}
+
 func liveCapture() {
 	ipPriv = ip.NewIP()
 
+	livecap, err := newLiveCap(config.Interface, config.Filter, config.SnapshotLen, config.Promiscuous)
+	if err != nil {
+		log.Fatal(err)
+	}
+
+	closeChan := make(chan bool, 1)
+	go func(cc chan bool) {
+		for range time.Tick(1 * time.Minute) {
+			cc <- true
+		}
+	}(closeChan)
+
+	for {
+		if !config.Quiet {
+			log.Printf("reading incoming HTTP requests on %s %s\n", config.Interface, config.Filter)
+		}
+
+		select {
+		case <-closeChan:
+			livecap.Setup()
+		case packet := <-livecap.packetChan:
+			go processPacket(packet)
+		}
+	}
+}
+
+func setupLiveCapture() (chan gopacket.Packet, error) {
 	// PCAP setup
 	//
 	handle, err := pcap.OpenLive(config.Interface, int32(config.SnapshotLen), config.Promiscuous, timeout)
 	if err != nil {
-		log.Fatal(err)
+		return nil, err
+		// log.Fatal(err)
 	}
 	defer handle.Close()
 
 	err = handle.SetBPFFilter(config.Filter)
 	if err != nil {
-		log.Fatal(err)
+		return nil, err
+		// log.Fatal(err)
 	}
 
 	packetSource := gopacket.NewPacketSource(handle, handle.LinkType())
 
-	for packet := range packetSource.Packets() {
-		go processPacket(packet)
-	}
+	return packetSource.Packets(), nil
 }
 
 func writeLogToWatch(r *data.Request) {
@@ -724,6 +808,7 @@ func loadConfig() {
 	config.AccessWatchKey = *accessWatchKey
 	config.RPCAddress = *rpcAddr
 	config.ReconnectToNatsAfter.Duration = *reconnectToNatsAfter
+	config.ResetLiveCaptureAfter.Duration = *resetLiveCapAfter
 
 	if *configFile == "" {
 		return

+ 2 - 0
munchclient.toml

@@ -17,3 +17,5 @@
 # Protocol = ajp13
 # Trace = false
 # ReconnectToNatsAfter = 1m
+# RPCAddress = 127.0.0.1:4994
+# ResetLiveCaptureAfter = 1m