package main import ( "flag" "fmt" "log" "time" "github.com/nats-io/nats" "github.com/nats-io/nats/encoders/protobuf" "github.com/pkg/profile" "gopkg.in/mgo.v2" "gopkg.in/mgo.v2/bson" "git.scraperwall.com/scw/data" ) var ( nats_url = flag.String("nats-url", "nats://127.0.0.1:4222", "The URL of the NATS server") nats_queue = flag.String("nats-queue", "mami_requests", "The NATS queue name") sleepFor = flag.Duration("sleep", 0, "Sleep this long between sending data") doProfile = flag.Bool("profile", false, "Profile the program") data_chan chan data.Request count uint64 ) func init() { flag.Parse() nats.RegisterEncoder(protobuf.PROTOBUF_ENCODER, &protobuf.ProtobufEncoder{}) } func main() { if *doProfile { defer profile.Start(profile.CPUProfile).Stop() } go func(c *uint64) { for { fmt.Printf("%d items per second\n", *c) *c = 0 time.Sleep(time.Second) } }(&count) // NATS // if *nats_url == "" { log.Fatal("No NATS URL specified (-nats-url)!") } nats_conn, err := nats.Connect(*nats_url) if err != nil { log.Fatal(err) } // nats.RegisterEncoder(encType, enc) nats_ec, err := nats.NewEncodedConn(nats_conn, protobuf.PROTOBUF_ENCODER) // _, err = nats.NewEncodedConn(nats_conn, protobuf.PROTOBUF_ENCODER) if err != nil { log.Fatalf("Encoded Connection: %v!\n", err) } /* log.Print("binding chan...") data_chan = make(chan data.Request) nats_ec.BindSendChan(*nats_queue, data_chan) log.Println("done") */ /* laddr, err := net.ResolveUDPAddr("udp", "127.0.0.1:0") if err != nil { log.Fatal(err) } raddr, err := net.ResolveUDPAddr("udp", "127.0.0.1:9876") if err != nil { log.Fatal(err) } conn, err := net.DialUDP("udp", laddr, raddr) if err != nil { log.Fatal(err) } */ mongo, err := mgo.Dial("mongodb://127.0.0.1:27017") if err != nil { log.Fatal(err) } db := mongo.DB("scw") coll := db.C("requests") var req data.RequestData var startTs time.Time var endTs time.Time for { res := coll.Find(bson.M{}).Batch(200).Prefetch(0.5) iterator := res.Iter() for iterator.Next(&req) { if *sleepFor > time.Microsecond { startTs = time.Now() } req.CreatedAt = time.Now() nats_ec.Publish(*nats_queue, req.ToRequest()) // data_chan <- *req.ToRequest() // nats_conn.Publish(*nats_queue, j) // conn.Write(j) count++ if *sleepFor >= time.Microsecond { endTs = time.Now() if endTs.Before(startTs.Add(*sleepFor)) { time.Sleep(*sleepFor - endTs.Sub(startTs)) } } } } }