|
@@ -40,8 +40,8 @@ var (
|
|
|
natsPassword = flag.String("nats-password", "", "The password for NATS authentication")
|
|
|
natsQueue = flag.String("nats-queue", "requests", "The NATS queue name")
|
|
|
natsCA = flag.String("nats-ca", "", "CA chain for NATS TLS")
|
|
|
- reconnectToNatsAfter = flag.Duration("reconnect-to-nats-after", 0, "reconnect to nats after this long periodically")
|
|
|
- resetLiveCapAfter = flag.Duration("rest-live-cap-after", 0, "reset the live capture setup after this amount of time")
|
|
|
+ reconnectToNatsAfter = flag.Duration("reconnect-to-nats-after", 0, "reconnect to nats after this time periodically")
|
|
|
+ resetLiveCapAfter = flag.Duration("reset-live-cap-after", 613*time.Second, "reset the live capture setup after this amount of time")
|
|
|
sleepFor = flag.Duration("sleep", 0, "Sleep this long between sending data (only when replaying a file)")
|
|
|
requestsFile = flag.String("requests", "", "CSV file containing requests (IP and URL)")
|
|
|
protocol = flag.String("protocol", "http", "which protocol to parse: http or ajp13")
|
|
@@ -187,9 +187,12 @@ func main() {
|
|
|
|
|
|
// reconnect to nats periodically
|
|
|
//
|
|
|
- if *reconnectToNatsAfter > 0 {
|
|
|
+ if *reconnectToNatsAfter > time.Minute {
|
|
|
go func(interval time.Duration) {
|
|
|
for range time.Tick(interval) {
|
|
|
+ if config.Trace {
|
|
|
+ log.Printf("reconnecting to NATS")
|
|
|
+ }
|
|
|
natsEC.Conn.Close()
|
|
|
natsJSONEC.Conn.Close()
|
|
|
}
|
|
@@ -290,27 +293,25 @@ type liveCap struct {
|
|
|
}
|
|
|
|
|
|
func newLiveCap(device string, filter string, snapshotLen int, promisc bool) (*liveCap, error) {
|
|
|
- lc := liveCap{
|
|
|
+ lc := &liveCap{
|
|
|
filter: filter,
|
|
|
device: device,
|
|
|
promisc: promisc,
|
|
|
snapshotLen: snapshotLen,
|
|
|
}
|
|
|
|
|
|
- err := lc.Setup()
|
|
|
+ err := lc.SetupCap()
|
|
|
if err != nil {
|
|
|
return nil, err
|
|
|
}
|
|
|
|
|
|
- return &lc, nil
|
|
|
+ return lc, nil
|
|
|
}
|
|
|
|
|
|
-func (lc *liveCap) Setup() error {
|
|
|
- /*
|
|
|
- if lc.packetChan != nil {
|
|
|
- close(lc.packetChan)
|
|
|
- }
|
|
|
- */
|
|
|
+func (lc *liveCap) SetupCap() error {
|
|
|
+ if !config.Quiet {
|
|
|
+ log.Printf("reading incoming HTTP requests on %s %s\n", config.Interface, config.Filter)
|
|
|
+ }
|
|
|
|
|
|
if lc.handle != nil {
|
|
|
lc.handle.Close()
|
|
@@ -344,48 +345,18 @@ func liveCapture() {
|
|
|
log.Fatal(err)
|
|
|
}
|
|
|
|
|
|
- closeChan := make(chan bool, 1)
|
|
|
- go func(cc chan bool) {
|
|
|
- for range time.Tick(1 * time.Minute) {
|
|
|
- cc <- true
|
|
|
- }
|
|
|
- }(closeChan)
|
|
|
+ closeChan := time.Tick(config.ResetLiveCaptureAfter.Duration)
|
|
|
|
|
|
for {
|
|
|
select {
|
|
|
case <-closeChan:
|
|
|
- livecap.Setup()
|
|
|
- case packet := <-livecap.packetChan:
|
|
|
- go processPacket(packet)
|
|
|
+ livecap.SetupCap()
|
|
|
+ case p := <-livecap.packetChan:
|
|
|
+ go processPacket(p)
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-func setupLiveCapture() (chan gopacket.Packet, error) {
|
|
|
- if !config.Quiet {
|
|
|
- log.Printf("reading incoming HTTP requests on %s %s\n", config.Interface, config.Filter)
|
|
|
- }
|
|
|
-
|
|
|
- // PCAP setup
|
|
|
- //
|
|
|
- handle, err := pcap.OpenLive(config.Interface, int32(config.SnapshotLen), config.Promiscuous, timeout)
|
|
|
- if err != nil {
|
|
|
- return nil, err
|
|
|
- // log.Fatal(err)
|
|
|
- }
|
|
|
- // defer handle.Close()
|
|
|
-
|
|
|
- err = handle.SetBPFFilter(config.Filter)
|
|
|
- if err != nil {
|
|
|
- return nil, err
|
|
|
- // log.Fatal(err)
|
|
|
- }
|
|
|
-
|
|
|
- packetSource := gopacket.NewPacketSource(handle, handle.LinkType())
|
|
|
-
|
|
|
- return packetSource.Packets(), nil
|
|
|
-}
|
|
|
-
|
|
|
func writeLogToWatch(r *data.Request) {
|
|
|
h := map[string]string{}
|
|
|
|