main.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204
  1. package main
  2. import (
  3. "encoding/csv"
  4. "flag"
  5. "fmt"
  6. "io"
  7. "log"
  8. "os"
  9. "time"
  10. "github.com/nats-io/nats"
  11. "github.com/nats-io/nats/encoders/protobuf"
  12. "github.com/pkg/profile"
  13. "gopkg.in/mgo.v2"
  14. "gopkg.in/mgo.v2/bson"
  15. "git.scraperwall.com/scw/data"
  16. )
  17. var (
  18. natsURL = flag.String("nats-url", "nats://127.0.0.1:4222", "The URL of the NATS server")
  19. natsQueue = flag.String("nats-queue", "mami_requests", "The NATS queue name")
  20. sleepFor = flag.Duration("sleep", 0, "Sleep this long between sending data")
  21. requestsFile = flag.String("requests", "", "CSV file containing requests (IP and URL)")
  22. doProfile = flag.Bool("profile", false, "Profile the program")
  23. doVersion = flag.Bool("version", false, "Show version information")
  24. dataChan chan data.Request
  25. natsEC *nats.EncodedConn
  26. count uint64
  27. Version string
  28. BuildDate string
  29. )
  30. func init() {
  31. flag.Parse()
  32. nats.RegisterEncoder(protobuf.PROTOBUF_ENCODER, &protobuf.ProtobufEncoder{})
  33. }
  34. func main() {
  35. if *doVersion {
  36. version()
  37. os.Exit(0)
  38. }
  39. if *doProfile {
  40. defer profile.Start(profile.CPUProfile).Stop()
  41. }
  42. go func(c *uint64) {
  43. for {
  44. fmt.Printf("%d items per second\n", *c)
  45. *c = 0
  46. time.Sleep(time.Second)
  47. }
  48. }(&count)
  49. // NATS
  50. //
  51. if *natsURL == "" {
  52. log.Fatal("No NATS URL specified (-nats-url)!")
  53. }
  54. natsConn, err := nats.Connect(*natsURL)
  55. if err != nil {
  56. log.Fatal(err)
  57. }
  58. // nats.RegisterEncoder(encType, enc)
  59. natsEC, err = nats.NewEncodedConn(natsConn, protobuf.PROTOBUF_ENCODER)
  60. // _, err = nats.NewEncodedConn(nats_conn, protobuf.PROTOBUF_ENCODER)
  61. if err != nil {
  62. log.Fatalf("Encoded Connection: %v!\n", err)
  63. }
  64. /*
  65. log.Print("binding chan...")
  66. data_chan = make(chan data.Request)
  67. nats_ec.BindSendChan(*nats_queue, data_chan)
  68. log.Println("done")
  69. */
  70. /*
  71. laddr, err := net.ResolveUDPAddr("udp", "127.0.0.1:0")
  72. if err != nil {
  73. log.Fatal(err)
  74. }
  75. raddr, err := net.ResolveUDPAddr("udp", "127.0.0.1:9876")
  76. if err != nil {
  77. log.Fatal(err)
  78. }
  79. conn, err := net.DialUDP("udp", laddr, raddr)
  80. if err != nil {
  81. log.Fatal(err)
  82. }
  83. */
  84. if *requestsFile != "" {
  85. replayFile()
  86. } else {
  87. replayMongoDB()
  88. }
  89. }
  90. func replayFile() {
  91. var req data.Request
  92. var startTs time.Time
  93. var endTs time.Time
  94. for {
  95. fh, err := os.Open(*requestsFile)
  96. if err != nil {
  97. log.Fatalf("Failed to open request file '%s': %s", *requestsFile, err)
  98. }
  99. c := csv.NewReader(fh)
  100. c.Comma = ' '
  101. for {
  102. if *sleepFor > time.Nanosecond {
  103. startTs = time.Now()
  104. }
  105. r, err := c.Read()
  106. if err == io.EOF {
  107. break
  108. }
  109. if err != nil {
  110. log.Println(err)
  111. continue
  112. }
  113. req.IpSrc = r[0]
  114. req.Source = r[0]
  115. req.Url = r[1]
  116. req.UserAgent = "Munch/1.0"
  117. req.Host = "www.marktplatz-mittelstand.de"
  118. req.CreatedAt = time.Now().UnixNano()
  119. // fmt.Printf("%s: %s\n", req.IpSrc, req.Url)
  120. natsEC.Publish(*natsQueue, &req)
  121. count++
  122. if *sleepFor >= time.Nanosecond {
  123. endTs = time.Now()
  124. if endTs.Before(startTs.Add(*sleepFor)) {
  125. time.Sleep(*sleepFor - endTs.Sub(startTs))
  126. }
  127. }
  128. }
  129. }
  130. }
  131. func replayMongoDB() {
  132. mongo, err := mgo.Dial("mongodb://127.0.0.1:27017")
  133. if err != nil {
  134. log.Fatal(err)
  135. }
  136. db := mongo.DB("scw")
  137. coll := db.C("requests")
  138. var req data.RequestData
  139. var startTs time.Time
  140. var endTs time.Time
  141. for {
  142. res := coll.Find(bson.M{}).Batch(200).Prefetch(0.5)
  143. iterator := res.Iter()
  144. for iterator.Next(&req) {
  145. if *sleepFor > time.Microsecond {
  146. startTs = time.Now()
  147. }
  148. req.CreatedAt = time.Now()
  149. natsEC.Publish(*natsQueue, req.ToRequest())
  150. // data_chan <- *req.ToRequest()
  151. // nats_conn.Publish(*nats_queue, j)
  152. // conn.Write(j)
  153. count++
  154. if *sleepFor >= time.Microsecond {
  155. endTs = time.Now()
  156. if endTs.Before(startTs.Add(*sleepFor)) {
  157. time.Sleep(*sleepFor - endTs.Sub(startTs))
  158. }
  159. }
  160. }
  161. }
  162. }
  163. func version() {
  164. fmt.Printf("munchclient %s, built on %s\n", Version, BuildDate)
  165. }