main.go 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199
  1. package main
  2. import (
  3. "encoding/csv"
  4. "flag"
  5. "fmt"
  6. "io"
  7. "log"
  8. "os"
  9. "time"
  10. "github.com/nats-io/nats"
  11. "github.com/nats-io/nats/encoders/protobuf"
  12. "github.com/pkg/profile"
  13. "gopkg.in/mgo.v2"
  14. "gopkg.in/mgo.v2/bson"
  15. "git.scraperwall.com/scw/data"
  16. )
  17. var (
  18. natsURL = flag.String("nats-url", "nats://127.0.0.1:4222", "The URL of the NATS server")
  19. natsQueue = flag.String("nats-queue", "mami_requests", "The NATS queue name")
  20. sleepFor = flag.Duration("sleep", 0, "Sleep this long between sending data")
  21. requestsFile = flag.String("requests", "", "CSV file containing requests (IP and URL)")
  22. doProfile = flag.Bool("profile", false, "Profile the program")
  23. doVersion = flag.Bool("version", false, "Show version information")
  24. dataChan chan data.Request
  25. natsEC *nats.EncodedConn
  26. count uint64
  27. Version string
  28. BuildDate string
  29. )
  30. func init() {
  31. flag.Parse()
  32. nats.RegisterEncoder(protobuf.PROTOBUF_ENCODER, &protobuf.ProtobufEncoder{})
  33. }
  34. func main() {
  35. if *doProfile {
  36. defer profile.Start(profile.CPUProfile).Stop()
  37. }
  38. go func(c *uint64) {
  39. for {
  40. fmt.Printf("%d items per second\n", *c)
  41. *c = 0
  42. time.Sleep(time.Second)
  43. }
  44. }(&count)
  45. // NATS
  46. //
  47. if *natsURL == "" {
  48. log.Fatal("No NATS URL specified (-nats-url)!")
  49. }
  50. natsConn, err := nats.Connect(*natsURL)
  51. if err != nil {
  52. log.Fatal(err)
  53. }
  54. // nats.RegisterEncoder(encType, enc)
  55. natsEC, err = nats.NewEncodedConn(natsConn, protobuf.PROTOBUF_ENCODER)
  56. // _, err = nats.NewEncodedConn(nats_conn, protobuf.PROTOBUF_ENCODER)
  57. if err != nil {
  58. log.Fatalf("Encoded Connection: %v!\n", err)
  59. }
  60. /*
  61. log.Print("binding chan...")
  62. data_chan = make(chan data.Request)
  63. nats_ec.BindSendChan(*nats_queue, data_chan)
  64. log.Println("done")
  65. */
  66. /*
  67. laddr, err := net.ResolveUDPAddr("udp", "127.0.0.1:0")
  68. if err != nil {
  69. log.Fatal(err)
  70. }
  71. raddr, err := net.ResolveUDPAddr("udp", "127.0.0.1:9876")
  72. if err != nil {
  73. log.Fatal(err)
  74. }
  75. conn, err := net.DialUDP("udp", laddr, raddr)
  76. if err != nil {
  77. log.Fatal(err)
  78. }
  79. */
  80. if *requestsFile != "" {
  81. replayFile()
  82. } else {
  83. replayMongoDB()
  84. }
  85. }
  86. func replayFile() {
  87. var req data.Request
  88. var startTs time.Time
  89. var endTs time.Time
  90. for {
  91. fh, err := os.Open(*requestsFile)
  92. if err != nil {
  93. log.Fatalf("Failed to open request file '%s': %s", *requestsFile, err)
  94. }
  95. c := csv.NewReader(fh)
  96. c.Comma = ' '
  97. for {
  98. if *sleepFor > time.Nanosecond {
  99. startTs = time.Now()
  100. }
  101. r, err := c.Read()
  102. if err == io.EOF {
  103. break
  104. }
  105. if err != nil {
  106. log.Println(err)
  107. continue
  108. }
  109. req.IpSrc = r[0]
  110. req.Source = r[0]
  111. req.Url = r[1]
  112. req.UserAgent = "Munch/1.0"
  113. req.Host = "www.marktplatz-mittelstand.de"
  114. req.CreatedAt = time.Now().UnixNano()
  115. // fmt.Printf("%s: %s\n", req.IpSrc, req.Url)
  116. natsEC.Publish(*natsQueue, &req)
  117. count++
  118. if *sleepFor >= time.Nanosecond {
  119. endTs = time.Now()
  120. if endTs.Before(startTs.Add(*sleepFor)) {
  121. time.Sleep(*sleepFor - endTs.Sub(startTs))
  122. }
  123. }
  124. }
  125. }
  126. }
  127. func replayMongoDB() {
  128. mongo, err := mgo.Dial("mongodb://127.0.0.1:27017")
  129. if err != nil {
  130. log.Fatal(err)
  131. }
  132. db := mongo.DB("scw")
  133. coll := db.C("requests")
  134. var req data.RequestData
  135. var startTs time.Time
  136. var endTs time.Time
  137. for {
  138. res := coll.Find(bson.M{}).Batch(200).Prefetch(0.5)
  139. iterator := res.Iter()
  140. for iterator.Next(&req) {
  141. if *sleepFor > time.Microsecond {
  142. startTs = time.Now()
  143. }
  144. req.CreatedAt = time.Now()
  145. natsEC.Publish(*natsQueue, req.ToRequest())
  146. // data_chan <- *req.ToRequest()
  147. // nats_conn.Publish(*nats_queue, j)
  148. // conn.Write(j)
  149. count++
  150. if *sleepFor >= time.Microsecond {
  151. endTs = time.Now()
  152. if endTs.Before(startTs.Add(*sleepFor)) {
  153. time.Sleep(*sleepFor - endTs.Sub(startTs))
  154. }
  155. }
  156. }
  157. }
  158. }
  159. func version() {
  160. fmt.Printf("munchclient %s, built on %s\n", Version, BuildDate)
  161. }