123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191 |
- package main
- import (
- "encoding/csv"
- "flag"
- "fmt"
- "io"
- "log"
- "os"
- "time"
- "github.com/nats-io/nats"
- "github.com/nats-io/nats/encoders/protobuf"
- "github.com/pkg/profile"
- "gopkg.in/mgo.v2"
- "gopkg.in/mgo.v2/bson"
- "git.scraperwall.com/scw/data"
- )
- var (
- natsURL = flag.String("nats-url", "nats://127.0.0.1:4222", "The URL of the NATS server")
- natsQueue = flag.String("nats-queue", "mami_requests", "The NATS queue name")
- sleepFor = flag.Duration("sleep", 0, "Sleep this long between sending data")
- requestsFile = flag.String("requests", "", "CSV file containing requests (IP and URL)")
- doProfile = flag.Bool("profile", false, "Profile the program")
- dataChan chan data.Request
- natsEC *nats.EncodedConn
- count uint64
- )
- func init() {
- flag.Parse()
- nats.RegisterEncoder(protobuf.PROTOBUF_ENCODER, &protobuf.ProtobufEncoder{})
- }
- func main() {
- if *doProfile {
- defer profile.Start(profile.CPUProfile).Stop()
- }
- go func(c *uint64) {
- for {
- fmt.Printf("%d items per second\n", *c)
- *c = 0
- time.Sleep(time.Second)
- }
- }(&count)
- // NATS
- //
- if *natsURL == "" {
- log.Fatal("No NATS URL specified (-nats-url)!")
- }
- natsConn, err := nats.Connect(*natsURL)
- if err != nil {
- log.Fatal(err)
- }
- // nats.RegisterEncoder(encType, enc)
- natsEC, err = nats.NewEncodedConn(natsConn, protobuf.PROTOBUF_ENCODER)
- // _, err = nats.NewEncodedConn(nats_conn, protobuf.PROTOBUF_ENCODER)
- if err != nil {
- log.Fatalf("Encoded Connection: %v!\n", err)
- }
- /*
- log.Print("binding chan...")
- data_chan = make(chan data.Request)
- nats_ec.BindSendChan(*nats_queue, data_chan)
- log.Println("done")
- */
- /*
- laddr, err := net.ResolveUDPAddr("udp", "127.0.0.1:0")
- if err != nil {
- log.Fatal(err)
- }
- raddr, err := net.ResolveUDPAddr("udp", "127.0.0.1:9876")
- if err != nil {
- log.Fatal(err)
- }
- conn, err := net.DialUDP("udp", laddr, raddr)
- if err != nil {
- log.Fatal(err)
- }
- */
- if *requestsFile != "" {
- replayFile()
- } else {
- replayMongoDB()
- }
- }
- func replayFile() {
- var req data.Request
- var startTs time.Time
- var endTs time.Time
- for {
- fh, err := os.Open(*requestsFile)
- if err != nil {
- log.Fatalf("Failed to open request file '%s': %s", *requestsFile, err)
- }
- c := csv.NewReader(fh)
- c.Comma = ' '
- for {
- if *sleepFor > time.Nanosecond {
- startTs = time.Now()
- }
- r, err := c.Read()
- if err == io.EOF {
- break
- }
- if err != nil {
- log.Println(err)
- continue
- }
- req.IpSrc = r[0]
- req.Source = r[0]
- req.Url = r[1]
- req.UserAgent = "Munch/1.0"
- req.Host = "demo.scraperwall.com"
- req.CreatedAt = time.Now().UnixNano()
- // fmt.Printf("%s: %s\n", req.IpSrc, req.Url)
- natsEC.Publish(*natsQueue, &req)
- count++
- if *sleepFor >= time.Nanosecond {
- endTs = time.Now()
- if endTs.Before(startTs.Add(*sleepFor)) {
- time.Sleep(*sleepFor - endTs.Sub(startTs))
- }
- }
- }
- }
- }
- func replayMongoDB() {
- mongo, err := mgo.Dial("mongodb://127.0.0.1:27017")
- if err != nil {
- log.Fatal(err)
- }
- db := mongo.DB("scw")
- coll := db.C("requests")
- var req data.RequestData
- var startTs time.Time
- var endTs time.Time
- for {
- res := coll.Find(bson.M{}).Batch(200).Prefetch(0.5)
- iterator := res.Iter()
- for iterator.Next(&req) {
- if *sleepFor > time.Microsecond {
- startTs = time.Now()
- }
- req.CreatedAt = time.Now()
- natsEC.Publish(*natsQueue, req.ToRequest())
- // data_chan <- *req.ToRequest()
- // nats_conn.Publish(*nats_queue, j)
- // conn.Write(j)
- count++
- if *sleepFor >= time.Microsecond {
- endTs = time.Now()
- if endTs.Before(startTs.Add(*sleepFor)) {
- time.Sleep(*sleepFor - endTs.Sub(startTs))
- }
- }
- }
- }
- }
|