Tobias von Dewitz пре 6 година
родитељ
комит
a10ee1209f
1 измењених фајлова са 68 додато и 24 уклоњено
  1. 68 24
      main.go

+ 68 - 24
main.go

@@ -52,12 +52,14 @@ var (
 	beQuiet   = flag.Bool("quiet", true, "Be quiet")
 	doVersion = flag.Bool("version", false, "Show version information")
 
-	natsEC     *nats.EncodedConn
-	natsJsonEC *nats.EncodedConn
-	count      uint64
-	timeout    = -1 * time.Second
-	ipPriv     *ip.IP
-	config     Config
+	natsEC            *nats.EncodedConn
+	natsJsonEC        *nats.EncodedConn
+	natsClosedChan    chan bool
+	natsIsUnavailable bool
+	count             uint64
+	timeout           = -1 * time.Second
+	ipPriv            *ip.IP
+	config            Config
 
 	// Version contains the program Version, e.g. 1.0.1
 	Version string
@@ -148,6 +150,39 @@ func main() {
 		log.Fatal("No NATS URL specified (-nats-url)!")
 	}
 
+	natsIsUnavailable = true
+	natsClosedChan = make(chan bool, 10000)
+	err := connectToNATS()
+	if err != nil {
+		log.Fatal(err)
+	}
+
+	// What should I do?
+	if config.RequestsFile != "" {
+		replayFile()
+	} else if config.ApacheLog != "" {
+		apacheLogCapture(config.ApacheLog)
+	} else if config.Live {
+		fmt.Printf("live capture (%s, %s) to %s\n", config.Interface, config.Filter, config.NatsURL)
+		liveCapture()
+	}
+}
+
+func natsWatchdog(closedChan chan bool) {
+	for range closedChan {
+	RECONNECT:
+		for {
+			log.Printf("Reconnecting to NATS at %s\n", *natsURL)
+			err := connectToNATS()
+			if err == nil {
+				break RECONNECT
+			}
+			time.Sleep(1 * time.Second)
+		}
+	}
+}
+
+func connectToNATS() error {
 	var natsConn *nats.Conn
 	var err error
 
@@ -161,28 +196,21 @@ func main() {
 		}
 	}
 	if err != nil {
-		log.Fatal(err)
+		return err
 	}
 
 	natsEC, err = nats.NewEncodedConn(natsConn, protobuf.PROTOBUF_ENCODER)
 	if err != nil {
-		log.Fatalf("Encoded Connection: %v!\n", err)
+		return fmt.Errorf("Encoded Connection: %v", err)
 	}
 
 	natsJsonEC, err = nats.NewEncodedConn(natsConn, nats.JSON_ENCODER)
 	if err != nil {
-		log.Fatalf("Encoded Connection: %v!\n", err)
+		return fmt.Errorf("Encoded Connection: %v", err)
 	}
 
-	// What should I do?
-	if config.RequestsFile != "" {
-		replayFile()
-	} else if config.ApacheLog != "" {
-		apacheLogCapture(config.ApacheLog)
-	} else if config.Live {
-		fmt.Printf("live capture (%s, %s) to %s\n", config.Interface, config.Filter, config.NatsURL)
-		liveCapture()
-	}
+	natsIsUnavailable = false
+	return nil
 }
 
 func apacheLogCapture(logfile string) {
@@ -261,8 +289,13 @@ func apacheLogCapture(logfile string) {
 			log.Printf("[%s] %s\n", request.Source, request.Url)
 		}
 
-		if err := natsEC.Publish(config.NatsQueue, &request); err != nil {
-			log.Println(err)
+		if natsIsUnavailable != true {
+			if err := natsEC.Publish(config.NatsQueue, &request); err != nil {
+				if err == nats.ErrConnectionClosed {
+					natsIsUnavailable = true
+					natsClosedChan <- true
+				}
+			}
 		}
 	}
 }
@@ -389,7 +422,14 @@ func processPacket(packet gopacket.Packet) {
 		log.Printf("[%s] %s\n", request.Source, request.Url)
 	}
 
-	natsEC.Publish(config.NatsQueue, &request)
+	if natsIsUnavailable != true {
+		if err := natsEC.Publish(config.NatsQueue, &request); err != nil {
+			if err == nats.ErrConnectionClosed {
+				natsIsUnavailable = true
+				natsClosedChan <- true
+			}
+		}
+	}
 }
 
 func processAJP13(request *data.Request, appData []byte) error {
@@ -554,9 +594,13 @@ func replayFile() {
 			req.Host = "demo.scraperwall.com"
 			req.CreatedAt = time.Now().UnixNano()
 
-			err = natsEC.Publish(config.NatsQueue, &req)
-			if err != nil {
-				log.Println(err)
+			if natsIsUnavailable != true {
+				if err := natsEC.Publish(config.NatsQueue, &req); err != nil {
+					if err == nats.ErrConnectionClosed {
+						natsIsUnavailable = true
+						natsClosedChan <- true
+					}
+				}
 			}
 
 			if strings.Index(r[1], ".") < 0 {