package main

import (
	"encoding/csv"
	"flag"
	"fmt"
	"io"
	"log"
	"os"
	"time"

	"github.com/nats-io/nats"
	"github.com/nats-io/nats/encoders/protobuf"

	"github.com/pkg/profile"

	"gopkg.in/mgo.v2"
	"gopkg.in/mgo.v2/bson"

	"git.scraperwall.com/scw/data"
)

var (
	natsURL      = flag.String("nats-url", "nats://127.0.0.1:4222", "The URL of the NATS server")
	natsQueue    = flag.String("nats-queue", "mami_requests", "The NATS queue name")
	sleepFor     = flag.Duration("sleep", 0, "Sleep this long between sending data")
	requestsFile = flag.String("requests", "", "CSV file containing requests (IP and URL)")
	doProfile    = flag.Bool("profile", false, "Profile the program")
	doVersion    = flag.Bool("version", false, "Show version information")

	dataChan chan data.Request
	natsEC   *nats.EncodedConn
	count    uint64

	Version   string
	BuildDate string
)

func init() {
	flag.Parse()

	nats.RegisterEncoder(protobuf.PROTOBUF_ENCODER, &protobuf.ProtobufEncoder{})
}

func main() {

	if *doVersion {
		version()
		os.Exit(0)
	}

	if *doProfile {
		defer profile.Start(profile.CPUProfile).Stop()
	}

	go func(c *uint64) {
		for {
			fmt.Printf("%d items per second\n", *c)
			*c = 0
			time.Sleep(time.Second)
		}
	}(&count)

	// NATS
	//
	if *natsURL == "" {
		log.Fatal("No NATS URL specified (-nats-url)!")
	}

	natsConn, err := nats.Connect(*natsURL)
	if err != nil {
		log.Fatal(err)
	}

	// nats.RegisterEncoder(encType, enc)
	natsEC, err = nats.NewEncodedConn(natsConn, protobuf.PROTOBUF_ENCODER)
	// _, err = nats.NewEncodedConn(nats_conn, protobuf.PROTOBUF_ENCODER)
	if err != nil {
		log.Fatalf("Encoded Connection: %v!\n", err)
	}

	/*
		log.Print("binding chan...")
		data_chan = make(chan data.Request)
		nats_ec.BindSendChan(*nats_queue, data_chan)
		log.Println("done")
	*/
	/*
		laddr, err := net.ResolveUDPAddr("udp", "127.0.0.1:0")
		if err != nil {
			log.Fatal(err)
		}

		raddr, err := net.ResolveUDPAddr("udp", "127.0.0.1:9876")
		if err != nil {
			log.Fatal(err)
		}

		conn, err := net.DialUDP("udp", laddr, raddr)
		if err != nil {
			log.Fatal(err)
		}
	*/

	if *requestsFile != "" {
		replayFile()
	} else {
		replayMongoDB()
	}
}

func replayFile() {
	var req data.Request
	var startTs time.Time
	var endTs time.Time

	for {
		fh, err := os.Open(*requestsFile)
		if err != nil {
			log.Fatalf("Failed to open request file '%s': %s", *requestsFile, err)
		}

		c := csv.NewReader(fh)
		c.Comma = ' '

		for {
			if *sleepFor > time.Nanosecond {
				startTs = time.Now()
			}

			r, err := c.Read()

			if err == io.EOF {
				break
			}

			if err != nil {
				log.Println(err)
				continue
			}

			req.IpSrc = r[0]
			req.Source = r[0]
			req.Url = r[1]
			req.UserAgent = "Munch/1.0"
			req.Host = "www.marktplatz-mittelstand.de"
			req.CreatedAt = time.Now().UnixNano()

			// fmt.Printf("%s: %s\n", req.IpSrc, req.Url)
			natsEC.Publish(*natsQueue, &req)

			count++
			if *sleepFor >= time.Nanosecond {
				endTs = time.Now()
				if endTs.Before(startTs.Add(*sleepFor)) {
					time.Sleep(*sleepFor - endTs.Sub(startTs))
				}
			}
		}
	}
}

func replayMongoDB() {
	mongo, err := mgo.Dial("mongodb://127.0.0.1:27017")
	if err != nil {
		log.Fatal(err)
	}

	db := mongo.DB("scw")
	coll := db.C("requests")
	var req data.RequestData
	var startTs time.Time
	var endTs time.Time

	for {
		res := coll.Find(bson.M{}).Batch(200).Prefetch(0.5)
		iterator := res.Iter()

		for iterator.Next(&req) {
			if *sleepFor > time.Microsecond {
				startTs = time.Now()
			}

			req.CreatedAt = time.Now()
			natsEC.Publish(*natsQueue, req.ToRequest())
			// data_chan <- *req.ToRequest()

			// nats_conn.Publish(*nats_queue, j)

			// conn.Write(j)
			count++
			if *sleepFor >= time.Microsecond {
				endTs = time.Now()
				if endTs.Before(startTs.Add(*sleepFor)) {
					time.Sleep(*sleepFor - endTs.Sub(startTs))
				}
			}
		}
	}
}

func version() {
	fmt.Printf("munchclient %s, built on %s\n", Version, BuildDate)
}