main.go 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128
  1. package main
  2. import (
  3. "flag"
  4. "fmt"
  5. "log"
  6. "time"
  7. "github.com/nats-io/nats"
  8. "github.com/nats-io/nats/encoders/protobuf"
  9. "github.com/pkg/profile"
  10. "gopkg.in/mgo.v2"
  11. "gopkg.in/mgo.v2/bson"
  12. "git.scraperwall.com/scw/data"
  13. )
  14. var (
  15. nats_url = flag.String("nats-url", "nats://127.0.0.1:4222", "The URL of the NATS server")
  16. nats_queue = flag.String("nats-queue", "mami_requests", "The NATS queue name")
  17. sleepFor = flag.Duration("sleep", 0, "Sleep this long between sending data")
  18. doProfile = flag.Bool("profile", false, "Profile the program")
  19. data_chan chan data.Request
  20. count uint64
  21. )
  22. func init() {
  23. flag.Parse()
  24. nats.RegisterEncoder(protobuf.PROTOBUF_ENCODER, &protobuf.ProtobufEncoder{})
  25. }
  26. func main() {
  27. if *doProfile {
  28. defer profile.Start(profile.CPUProfile).Stop()
  29. }
  30. go func(c *uint64) {
  31. for {
  32. fmt.Printf("%d items per second\n", *c)
  33. *c = 0
  34. time.Sleep(time.Second)
  35. }
  36. }(&count)
  37. // NATS
  38. //
  39. if *nats_url == "" {
  40. log.Fatal("No NATS URL specified (-nats-url)!")
  41. }
  42. nats_conn, err := nats.Connect(*nats_url)
  43. if err != nil {
  44. log.Fatal(err)
  45. }
  46. // nats.RegisterEncoder(encType, enc)
  47. nats_ec, err := nats.NewEncodedConn(nats_conn, protobuf.PROTOBUF_ENCODER)
  48. // _, err = nats.NewEncodedConn(nats_conn, protobuf.PROTOBUF_ENCODER)
  49. if err != nil {
  50. log.Fatalf("Encoded Connection: %v!\n", err)
  51. }
  52. /*
  53. log.Print("binding chan...")
  54. data_chan = make(chan data.Request)
  55. nats_ec.BindSendChan(*nats_queue, data_chan)
  56. log.Println("done")
  57. */
  58. /*
  59. laddr, err := net.ResolveUDPAddr("udp", "127.0.0.1:0")
  60. if err != nil {
  61. log.Fatal(err)
  62. }
  63. raddr, err := net.ResolveUDPAddr("udp", "127.0.0.1:9876")
  64. if err != nil {
  65. log.Fatal(err)
  66. }
  67. conn, err := net.DialUDP("udp", laddr, raddr)
  68. if err != nil {
  69. log.Fatal(err)
  70. }
  71. */
  72. mongo, err := mgo.Dial("mongodb://127.0.0.1:27017")
  73. if err != nil {
  74. log.Fatal(err)
  75. }
  76. db := mongo.DB("scw")
  77. coll := db.C("requests")
  78. var req data.RequestData
  79. var startTs time.Time
  80. var endTs time.Time
  81. for {
  82. res := coll.Find(bson.M{}).Batch(200).Prefetch(0.5)
  83. iterator := res.Iter()
  84. for iterator.Next(&req) {
  85. if *sleepFor > time.Microsecond {
  86. startTs = time.Now()
  87. }
  88. req.CreatedAt = time.Now()
  89. nats_ec.Publish(*nats_queue, req.ToRequest())
  90. // data_chan <- *req.ToRequest()
  91. // nats_conn.Publish(*nats_queue, j)
  92. // conn.Write(j)
  93. count++
  94. if *sleepFor >= time.Microsecond {
  95. endTs = time.Now()
  96. if endTs.Before(startTs.Add(*sleepFor)) {
  97. time.Sleep(*sleepFor - endTs.Sub(startTs))
  98. }
  99. }
  100. }
  101. }
  102. }