Prechádzať zdrojové kódy

replay request files

Tobias Begalke 7 rokov pred
rodič
commit
e64538ff3e
1 zmenil súbory, kde vykonal 75 pridanie a 12 odobranie
  1. 75 12
      main.go

+ 75 - 12
main.go

@@ -1,9 +1,12 @@
 package main
 
 import (
+	"encoding/csv"
 	"flag"
 	"fmt"
+	"io"
 	"log"
+	"os"
 	"time"
 
 	"github.com/nats-io/nats"
@@ -18,13 +21,15 @@ import (
 )
 
 var (
-	nats_url   = flag.String("nats-url", "nats://127.0.0.1:4222", "The URL of the NATS server")
-	nats_queue = flag.String("nats-queue", "mami_requests", "The NATS queue name")
-	sleepFor   = flag.Duration("sleep", 0, "Sleep this long between sending data")
-	doProfile  = flag.Bool("profile", false, "Profile the program")
-
-	data_chan chan data.Request
-	count     uint64
+	natsURL      = flag.String("nats-url", "nats://127.0.0.1:4222", "The URL of the NATS server")
+	natsQueue    = flag.String("nats-queue", "mami_requests", "The NATS queue name")
+	sleepFor     = flag.Duration("sleep", 0, "Sleep this long between sending data")
+	requestsFile = flag.String("requests", "", "CSV file containing requests (IP and URL)")
+	doProfile    = flag.Bool("profile", false, "Profile the program")
+
+	dataChan chan data.Request
+	natsEC   *nats.EncodedConn
+	count    uint64
 )
 
 func init() {
@@ -49,18 +54,17 @@ func main() {
 
 	// NATS
 	//
-	if *nats_url == "" {
+	if *natsURL == "" {
 		log.Fatal("No NATS URL specified (-nats-url)!")
 	}
 
-	nats_conn, err := nats.Connect(*nats_url)
+	natsConn, err := nats.Connect(*natsURL)
 	if err != nil {
 		log.Fatal(err)
 	}
 
 	// nats.RegisterEncoder(encType, enc)
-
-	nats_ec, err := nats.NewEncodedConn(nats_conn, protobuf.PROTOBUF_ENCODER)
+	natsEC, err = nats.NewEncodedConn(natsConn, protobuf.PROTOBUF_ENCODER)
 	// _, err = nats.NewEncodedConn(nats_conn, protobuf.PROTOBUF_ENCODER)
 	if err != nil {
 		log.Fatalf("Encoded Connection: %v!\n", err)
@@ -89,6 +93,65 @@ func main() {
 		}
 	*/
 
+	if *requestsFile != "" {
+		replayFile()
+	} else {
+		replayMongoDB()
+	}
+}
+
+func replayFile() {
+	var req data.Request
+	var startTs time.Time
+	var endTs time.Time
+
+	for {
+		fh, err := os.Open(*requestsFile)
+		if err != nil {
+			log.Fatalf("Failed to open request file '%s': %s", *requestsFile, err)
+		}
+
+		c := csv.NewReader(fh)
+		c.Comma = ' '
+
+		for {
+			if *sleepFor > time.Nanosecond {
+				startTs = time.Now()
+			}
+
+			r, err := c.Read()
+
+			if err == io.EOF {
+				break
+			}
+
+			if err != nil {
+				log.Println(err)
+				continue
+			}
+
+			req.IpSrc = r[0]
+			req.Source = r[0]
+			req.Url = r[1]
+			req.UserAgent = "Munch/1.0"
+			req.Host = "www.marktplatz-mittelstand.de"
+			req.CreatedAt = time.Now().UnixNano()
+
+			// fmt.Printf("%s: %s\n", req.IpSrc, req.Url)
+			natsEC.Publish(*natsQueue, &req)
+
+			count++
+			if *sleepFor >= time.Nanosecond {
+				endTs = time.Now()
+				if endTs.Before(startTs.Add(*sleepFor)) {
+					time.Sleep(*sleepFor - endTs.Sub(startTs))
+				}
+			}
+		}
+	}
+}
+
+func replayMongoDB() {
 	mongo, err := mgo.Dial("mongodb://127.0.0.1:27017")
 	if err != nil {
 		log.Fatal(err)
@@ -110,7 +173,7 @@ func main() {
 			}
 
 			req.CreatedAt = time.Now()
-			nats_ec.Publish(*nats_queue, req.ToRequest())
+			natsEC.Publish(*natsQueue, req.ToRequest())
 			// data_chan <- *req.ToRequest()
 
 			// nats_conn.Publish(*nats_queue, j)