package main import ( "encoding/csv" "flag" "fmt" "io" "log" "os" "time" "github.com/nats-io/nats" "github.com/nats-io/nats/encoders/protobuf" "github.com/pkg/profile" "gopkg.in/mgo.v2" "gopkg.in/mgo.v2/bson" "git.scraperwall.com/scw/data" ) var ( natsURL = flag.String("nats-url", "nats://127.0.0.1:4222", "The URL of the NATS server") natsQueue = flag.String("nats-queue", "mami_requests", "The NATS queue name") sleepFor = flag.Duration("sleep", 0, "Sleep this long between sending data") requestsFile = flag.String("requests", "", "CSV file containing requests (IP and URL)") doProfile = flag.Bool("profile", false, "Profile the program") dataChan chan data.Request natsEC *nats.EncodedConn count uint64 ) func init() { flag.Parse() nats.RegisterEncoder(protobuf.PROTOBUF_ENCODER, &protobuf.ProtobufEncoder{}) } func main() { if *doProfile { defer profile.Start(profile.CPUProfile).Stop() } go func(c *uint64) { for { fmt.Printf("%d items per second\n", *c) *c = 0 time.Sleep(time.Second) } }(&count) // NATS // if *natsURL == "" { log.Fatal("No NATS URL specified (-nats-url)!") } natsConn, err := nats.Connect(*natsURL) if err != nil { log.Fatal(err) } // nats.RegisterEncoder(encType, enc) natsEC, err = nats.NewEncodedConn(natsConn, protobuf.PROTOBUF_ENCODER) // _, err = nats.NewEncodedConn(nats_conn, protobuf.PROTOBUF_ENCODER) if err != nil { log.Fatalf("Encoded Connection: %v!\n", err) } /* log.Print("binding chan...") data_chan = make(chan data.Request) nats_ec.BindSendChan(*nats_queue, data_chan) log.Println("done") */ /* laddr, err := net.ResolveUDPAddr("udp", "127.0.0.1:0") if err != nil { log.Fatal(err) } raddr, err := net.ResolveUDPAddr("udp", "127.0.0.1:9876") if err != nil { log.Fatal(err) } conn, err := net.DialUDP("udp", laddr, raddr) if err != nil { log.Fatal(err) } */ if *requestsFile != "" { replayFile() } else { replayMongoDB() } } func replayFile() { var req data.Request var startTs time.Time var endTs time.Time for { fh, err := os.Open(*requestsFile) if err != nil { log.Fatalf("Failed to open request file '%s': %s", *requestsFile, err) } c := csv.NewReader(fh) c.Comma = ' ' for { if *sleepFor > time.Nanosecond { startTs = time.Now() } r, err := c.Read() if err == io.EOF { break } if err != nil { log.Println(err) continue } req.IpSrc = r[0] req.Source = r[0] req.Url = r[1] req.UserAgent = "Munch/1.0" req.Host = "www.marktplatz-mittelstand.de" req.CreatedAt = time.Now().UnixNano() // fmt.Printf("%s: %s\n", req.IpSrc, req.Url) natsEC.Publish(*natsQueue, &req) count++ if *sleepFor >= time.Nanosecond { endTs = time.Now() if endTs.Before(startTs.Add(*sleepFor)) { time.Sleep(*sleepFor - endTs.Sub(startTs)) } } } } } func replayMongoDB() { mongo, err := mgo.Dial("mongodb://127.0.0.1:27017") if err != nil { log.Fatal(err) } db := mongo.DB("scw") coll := db.C("requests") var req data.RequestData var startTs time.Time var endTs time.Time for { res := coll.Find(bson.M{}).Batch(200).Prefetch(0.5) iterator := res.Iter() for iterator.Next(&req) { if *sleepFor > time.Microsecond { startTs = time.Now() } req.CreatedAt = time.Now() natsEC.Publish(*natsQueue, req.ToRequest()) // data_chan <- *req.ToRequest() // nats_conn.Publish(*nats_queue, j) // conn.Write(j) count++ if *sleepFor >= time.Microsecond { endTs = time.Now() if endTs.Before(startTs.Add(*sleepFor)) { time.Sleep(*sleepFor - endTs.Sub(startTs)) } } } } }