Tobias Begalke 7 yıl önce
işleme
2a1a08754a
2 değiştirilmiş dosya ile 130 ekleme ve 0 silme
  1. 2 0
      Makefile
  2. 128 0
      main.go

+ 2 - 0
Makefile

@@ -0,0 +1,2 @@
+all:
+	go build -ldflags -s -tags netgo

+ 128 - 0
main.go

@@ -0,0 +1,128 @@
+package main
+
+import (
+	"flag"
+	"fmt"
+	"log"
+	"time"
+
+	"github.com/nats-io/nats"
+	"github.com/nats-io/nats/encoders/protobuf"
+
+	"github.com/pkg/profile"
+
+	"gopkg.in/mgo.v2"
+	"gopkg.in/mgo.v2/bson"
+
+	"git.scraperwall.com/scw/data"
+)
+
+var (
+	nats_url   = flag.String("nats-url", "nats://127.0.0.1:4222", "The URL of the NATS server")
+	nats_queue = flag.String("nats-queue", "mami_requests", "The NATS queue name")
+	sleepFor   = flag.Duration("sleep", 0, "Sleep this long between sending data")
+	doProfile  = flag.Bool("profile", false, "Profile the program")
+
+	data_chan chan data.Request
+	count     uint64
+)
+
+func init() {
+	flag.Parse()
+
+	nats.RegisterEncoder(protobuf.PROTOBUF_ENCODER, &protobuf.ProtobufEncoder{})
+}
+
+func main() {
+
+	if *doProfile {
+		defer profile.Start(profile.CPUProfile).Stop()
+	}
+
+	go func(c *uint64) {
+		for {
+			fmt.Printf("%d items per second\n", *c)
+			*c = 0
+			time.Sleep(time.Second)
+		}
+	}(&count)
+
+	// NATS
+	//
+	if *nats_url == "" {
+		log.Fatal("No NATS URL specified (-nats-url)!")
+	}
+
+	nats_conn, err := nats.Connect(*nats_url)
+	if err != nil {
+		log.Fatal(err)
+	}
+
+	// nats.RegisterEncoder(encType, enc)
+
+	nats_ec, err := nats.NewEncodedConn(nats_conn, protobuf.PROTOBUF_ENCODER)
+	// _, err = nats.NewEncodedConn(nats_conn, protobuf.PROTOBUF_ENCODER)
+	if err != nil {
+		log.Fatalf("Encoded Connection: %v!\n", err)
+	}
+
+	/*
+		log.Print("binding chan...")
+		data_chan = make(chan data.Request)
+		nats_ec.BindSendChan(*nats_queue, data_chan)
+		log.Println("done")
+	*/
+	/*
+		laddr, err := net.ResolveUDPAddr("udp", "127.0.0.1:0")
+		if err != nil {
+			log.Fatal(err)
+		}
+
+		raddr, err := net.ResolveUDPAddr("udp", "127.0.0.1:9876")
+		if err != nil {
+			log.Fatal(err)
+		}
+
+		conn, err := net.DialUDP("udp", laddr, raddr)
+		if err != nil {
+			log.Fatal(err)
+		}
+	*/
+
+	mongo, err := mgo.Dial("mongodb://127.0.0.1:27017")
+	if err != nil {
+		log.Fatal(err)
+	}
+
+	db := mongo.DB("scw")
+	coll := db.C("requests")
+	var req data.RequestData
+	var startTs time.Time
+	var endTs time.Time
+
+	for {
+		res := coll.Find(bson.M{}).Batch(200).Prefetch(0.5)
+		iterator := res.Iter()
+
+		for iterator.Next(&req) {
+			if *sleepFor > time.Microsecond {
+				startTs = time.Now()
+			}
+
+			req.CreatedAt = time.Now()
+			nats_ec.Publish(*nats_queue, req.ToRequest())
+			// data_chan <- *req.ToRequest()
+
+			// nats_conn.Publish(*nats_queue, j)
+
+			// conn.Write(j)
+			count++
+			if *sleepFor >= time.Microsecond {
+				endTs = time.Now()
+				if endTs.Before(startTs.Add(*sleepFor)) {
+					time.Sleep(*sleepFor - endTs.Sub(startTs))
+				}
+			}
+		}
+	}
+}