Browse Source

recover from a closed connection

Tobias Begalke 6 years ago
parent
commit
ad4a8271b6
1 changed files with 41 additions and 44 deletions
  1. 41 44
      main.go

+ 41 - 44
main.go

@@ -21,7 +21,6 @@ import (
 	"github.com/google/gopacket/layers"
 	"github.com/google/gopacket/pcap"
 	"github.com/hpcloud/tail"
-	"github.com/kr/pretty"
 	"github.com/nats-io/nats"
 	"github.com/nats-io/nats/encoders/protobuf"
 
@@ -52,14 +51,14 @@ var (
 	beQuiet   = flag.Bool("quiet", true, "Be quiet")
 	doVersion = flag.Bool("version", false, "Show version information")
 
-	natsEC            *nats.EncodedConn
-	natsJsonEC        *nats.EncodedConn
-	natsClosedChan    chan bool
-	natsIsUnavailable bool
-	count             uint64
-	timeout           = -1 * time.Second
-	ipPriv            *ip.IP
-	config            Config
+	natsEC          *nats.EncodedConn
+	natsJsonEC      *nats.EncodedConn
+	natsErrorChan   chan error
+	natsIsAvailable bool
+	count           uint64
+	timeout         = -1 * time.Second
+	ipPriv          *ip.IP
+	config          Config
 
 	// Version contains the program Version, e.g. 1.0.1
 	Version string
@@ -150,15 +149,15 @@ func main() {
 		log.Fatal("No NATS URL specified (-nats-url)!")
 	}
 
-	natsIsUnavailable = true
-	natsClosedChan = make(chan bool, 10000)
+	natsIsAvailable = false
+	natsErrorChan = make(chan error, 1)
 
 	err := connectToNATS()
 	if err != nil {
 		log.Fatal(err)
 	}
 
-	go natsWatchdog(natsClosedChan)
+	go natsWatchdog(natsErrorChan)
 
 	// What should I do?
 	if config.RequestsFile != "" {
@@ -171,8 +170,19 @@ func main() {
 	}
 }
 
-func natsWatchdog(closedChan chan bool) {
-	for range closedChan {
+func natsWatchdog(closedChan chan error) {
+	var lastError error
+
+	for err := range closedChan {
+		if lastError != err {
+			lastError = err
+			log.Println(err)
+		}
+
+		if err != nats.ErrConnectionClosed {
+			continue
+		}
+
 	RECONNECT:
 		for {
 			log.Printf("Reconnecting to NATS at %s\n", *natsURL)
@@ -212,7 +222,7 @@ func connectToNATS() error {
 		return fmt.Errorf("Encoded Connection: %v", err)
 	}
 
-	natsIsUnavailable = false
+	natsIsAvailable = true
 	return nil
 }
 
@@ -292,14 +302,7 @@ func apacheLogCapture(logfile string) {
 			log.Printf("[%s] %s\n", request.Source, request.Url)
 		}
 
-		if natsIsUnavailable != true {
-			if err := natsEC.Publish(config.NatsQueue, &request); err != nil {
-				if err == nats.ErrConnectionClosed {
-					natsIsUnavailable = true
-					natsClosedChan <- true
-				}
-			}
-		}
+		publishRequest(config.NatsQueue, &request)
 	}
 }
 
@@ -326,6 +329,19 @@ func liveCapture() {
 	}
 }
 
+func publishRequest(queue string, request *data.Request) {
+	if !natsIsAvailable {
+		return
+	}
+
+	if err := natsEC.Publish(config.NatsQueue, request); err != nil {
+		natsErrorChan <- err
+		if err == nats.ErrConnectionClosed {
+			natsIsAvailable = false
+		}
+	}
+}
+
 // processPacket receives a raw packet from pcap, builds a Request item from it and sends it to the queue
 func processPacket(packet gopacket.Packet) {
 
@@ -425,14 +441,7 @@ func processPacket(packet gopacket.Packet) {
 		log.Printf("[%s] %s\n", request.Source, request.Url)
 	}
 
-	if natsIsUnavailable != true {
-		if err := natsEC.Publish(config.NatsQueue, &request); err != nil {
-			if err == nats.ErrConnectionClosed {
-				natsIsUnavailable = true
-				natsClosedChan <- true
-			}
-		}
-	}
+	publishRequest(config.NatsQueue, &request)
 }
 
 func processAJP13(request *data.Request, appData []byte) error {
@@ -495,11 +504,6 @@ func processAJP13(request *data.Request, appData []byte) error {
 		}
 	}
 
-	if false && config.Trace {
-		log.Println("Request")
-		pretty.Println(request)
-	}
-
 	return nil
 }
 
@@ -597,14 +601,7 @@ func replayFile() {
 			req.Host = "demo.scraperwall.com"
 			req.CreatedAt = time.Now().UnixNano()
 
-			if natsIsUnavailable != true {
-				if err := natsEC.Publish(config.NatsQueue, &req); err != nil {
-					if err == nats.ErrConnectionClosed {
-						natsIsUnavailable = true
-						natsClosedChan <- true
-					}
-				}
-			}
+			publishRequest(config.NatsQueue, &req)
 
 			if strings.Index(r[1], ".") < 0 {
 				hash := sha1.New()