main.go 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191
  1. package main
  2. import (
  3. "encoding/csv"
  4. "flag"
  5. "fmt"
  6. "io"
  7. "log"
  8. "os"
  9. "time"
  10. "github.com/nats-io/nats"
  11. "github.com/nats-io/nats/encoders/protobuf"
  12. "github.com/pkg/profile"
  13. "gopkg.in/mgo.v2"
  14. "gopkg.in/mgo.v2/bson"
  15. "git.scraperwall.com/scw/data"
  16. )
  17. var (
  18. natsURL = flag.String("nats-url", "nats://127.0.0.1:4222", "The URL of the NATS server")
  19. natsQueue = flag.String("nats-queue", "mami_requests", "The NATS queue name")
  20. sleepFor = flag.Duration("sleep", 0, "Sleep this long between sending data")
  21. requestsFile = flag.String("requests", "", "CSV file containing requests (IP and URL)")
  22. doProfile = flag.Bool("profile", false, "Profile the program")
  23. dataChan chan data.Request
  24. natsEC *nats.EncodedConn
  25. count uint64
  26. )
  27. func init() {
  28. flag.Parse()
  29. nats.RegisterEncoder(protobuf.PROTOBUF_ENCODER, &protobuf.ProtobufEncoder{})
  30. }
  31. func main() {
  32. if *doProfile {
  33. defer profile.Start(profile.CPUProfile).Stop()
  34. }
  35. go func(c *uint64) {
  36. for {
  37. fmt.Printf("%d items per second\n", *c)
  38. *c = 0
  39. time.Sleep(time.Second)
  40. }
  41. }(&count)
  42. // NATS
  43. //
  44. if *natsURL == "" {
  45. log.Fatal("No NATS URL specified (-nats-url)!")
  46. }
  47. natsConn, err := nats.Connect(*natsURL)
  48. if err != nil {
  49. log.Fatal(err)
  50. }
  51. // nats.RegisterEncoder(encType, enc)
  52. natsEC, err = nats.NewEncodedConn(natsConn, protobuf.PROTOBUF_ENCODER)
  53. // _, err = nats.NewEncodedConn(nats_conn, protobuf.PROTOBUF_ENCODER)
  54. if err != nil {
  55. log.Fatalf("Encoded Connection: %v!\n", err)
  56. }
  57. /*
  58. log.Print("binding chan...")
  59. data_chan = make(chan data.Request)
  60. nats_ec.BindSendChan(*nats_queue, data_chan)
  61. log.Println("done")
  62. */
  63. /*
  64. laddr, err := net.ResolveUDPAddr("udp", "127.0.0.1:0")
  65. if err != nil {
  66. log.Fatal(err)
  67. }
  68. raddr, err := net.ResolveUDPAddr("udp", "127.0.0.1:9876")
  69. if err != nil {
  70. log.Fatal(err)
  71. }
  72. conn, err := net.DialUDP("udp", laddr, raddr)
  73. if err != nil {
  74. log.Fatal(err)
  75. }
  76. */
  77. if *requestsFile != "" {
  78. replayFile()
  79. } else {
  80. replayMongoDB()
  81. }
  82. }
  83. func replayFile() {
  84. var req data.Request
  85. var startTs time.Time
  86. var endTs time.Time
  87. for {
  88. fh, err := os.Open(*requestsFile)
  89. if err != nil {
  90. log.Fatalf("Failed to open request file '%s': %s", *requestsFile, err)
  91. }
  92. c := csv.NewReader(fh)
  93. c.Comma = ' '
  94. for {
  95. if *sleepFor > time.Nanosecond {
  96. startTs = time.Now()
  97. }
  98. r, err := c.Read()
  99. if err == io.EOF {
  100. break
  101. }
  102. if err != nil {
  103. log.Println(err)
  104. continue
  105. }
  106. req.IpSrc = r[0]
  107. req.Source = r[0]
  108. req.Url = r[1]
  109. req.UserAgent = "Munch/1.0"
  110. req.Host = "demo.scraperwall.com"
  111. req.CreatedAt = time.Now().UnixNano()
  112. // fmt.Printf("%s: %s\n", req.IpSrc, req.Url)
  113. natsEC.Publish(*natsQueue, &req)
  114. count++
  115. if *sleepFor >= time.Nanosecond {
  116. endTs = time.Now()
  117. if endTs.Before(startTs.Add(*sleepFor)) {
  118. time.Sleep(*sleepFor - endTs.Sub(startTs))
  119. }
  120. }
  121. }
  122. }
  123. }
  124. func replayMongoDB() {
  125. mongo, err := mgo.Dial("mongodb://127.0.0.1:27017")
  126. if err != nil {
  127. log.Fatal(err)
  128. }
  129. db := mongo.DB("scw")
  130. coll := db.C("requests")
  131. var req data.RequestData
  132. var startTs time.Time
  133. var endTs time.Time
  134. for {
  135. res := coll.Find(bson.M{}).Batch(200).Prefetch(0.5)
  136. iterator := res.Iter()
  137. for iterator.Next(&req) {
  138. if *sleepFor > time.Microsecond {
  139. startTs = time.Now()
  140. }
  141. req.CreatedAt = time.Now()
  142. natsEC.Publish(*natsQueue, req.ToRequest())
  143. // data_chan <- *req.ToRequest()
  144. // nats_conn.Publish(*nats_queue, j)
  145. // conn.Write(j)
  146. count++
  147. if *sleepFor >= time.Microsecond {
  148. endTs = time.Now()
  149. if endTs.Before(startTs.Add(*sleepFor)) {
  150. time.Sleep(*sleepFor - endTs.Sub(startTs))
  151. }
  152. }
  153. }
  154. }
  155. }