瀏覽代碼

added grpc sending

Tobias von Dewitz 6 年之前
父節點
當前提交
a15f4b3308
共有 2 個文件被更改,包括 94 次插入0 次删除
  1. 22 0
      main.go
  2. 72 0
      rpc.go

+ 22 - 0
main.go

@@ -53,12 +53,14 @@ var (
 	hostName              = flag.String("hostname", "", "Override the captured hostname with this one")
 	accessWatchKey        = flag.String("access-watch-key", "", "access.watch API key")
 	configFile            = flag.String("config", "", "The location of the TOML config file")
+	rpcAddr               = flag.String("rpc-address", "", "The address where the RPC server is listening")
 
 	beQuiet   = flag.Bool("quiet", true, "Be quiet")
 	doVersion = flag.Bool("version", false, "Show version information")
 
 	natsEC          *nats.EncodedConn
 	natsJSONEC      *nats.EncodedConn
+	rpcClient       *ScwRPC
 	natsErrorChan   chan error
 	natsIsAvailable bool
 	count           uint64
@@ -98,6 +100,7 @@ type Config struct {
 	NginxReplay           string
 	HostName              string
 	AccessWatchKey        string
+	RPCAddress            string
 }
 
 type duration struct {
@@ -132,6 +135,7 @@ func (c Config) print() {
 	fmt.Printf("AccessWatchKey:        %s\n", c.AccessWatchKey)
 	fmt.Printf("UseXForwardedAsSource: %t\n", c.UseXForwardedAsSource)
 	fmt.Printf("Protocol:              %s\n", c.Protocol)
+	fmt.Printf("RPCAddress:            %s\n", c.RPCAddress)
 	fmt.Printf("Quiet:                 %t\n", c.Quiet)
 	fmt.Printf("Trace:                 %t\n", c.Trace)
 }
@@ -177,6 +181,15 @@ func main() {
 
 	go natsWatchdog(natsErrorChan)
 
+	// RPC
+	//
+	if config.RPCAddress != "" {
+		rpcClient, err = NewScwRPC(config.RPCAddress)
+		if err != nil {
+			log.Fatal(err)
+		}
+	}
+
 	// What should I do?
 	if config.RequestsFile != "" {
 		replayFile()
@@ -352,6 +365,14 @@ func publishRequest(queue string, request *data.Request) {
 		return
 	}
 
+	if rpcClient != nil {
+		select {
+		case rpcClient.RChan <- request:
+		default:
+		}
+		return
+	}
+
 	if !natsIsAvailable {
 		if rand.Intn(100) == 0 {
 			log.Println("nats connection is not available")
@@ -687,6 +708,7 @@ func loadConfig() {
 	config.Quiet = *beQuiet
 	config.Trace = *trace
 	config.AccessWatchKey = *accessWatchKey
+	config.RPCAddress = *rpcAddr
 
 	if *configFile == "" {
 		return

+ 72 - 0
rpc.go

@@ -0,0 +1,72 @@
+package main
+
+import (
+	"context"
+	"log"
+	"math/rand"
+	"time"
+
+	"git.scraperwall.com/scw/data"
+	"google.golang.org/grpc"
+
+	pb "git.scraperwall.com/scw/grpc/scw"
+)
+
+// ScwRPC sends incoming requests to an RPC server
+type ScwRPC struct {
+	Connection *grpc.ClientConn
+	Client     pb.SCWProtosClient
+	RChan      chan *data.Request
+	RChanCap   int
+}
+
+// Connect creates a new connection to the RPC server
+func (s *ScwRPC) Connect(addr string) error {
+	var err error
+
+	s.Connection, err = grpc.Dial(addr, grpc.WithInsecure())
+	if err != nil {
+		return err
+	}
+
+	// defer conn.Close()
+
+	s.Client = pb.NewSCWProtosClient(s.Connection)
+
+	return nil
+}
+
+func (s *ScwRPC) sendRequest(req *data.Request) {
+	ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
+
+	_, err := s.Client.SendRequest(ctx, req)
+	defer cancel()
+	if err != nil {
+		if rand.Intn(100) == 0 {
+			log.Printf("sending request failed: %s\n", err)
+		}
+		return
+	}
+}
+
+// NewScwRPC generates a new ScwRPC instance
+func NewScwRPC(addr string) (*ScwRPC, error) {
+	s := &ScwRPC{}
+
+	err := s.Connect(addr)
+	if err != nil {
+		return nil, err
+	}
+
+	s.RChanCap = 5000
+	s.RChan = make(chan *data.Request, s.RChanCap)
+
+	go func() {
+		for r := range s.RChan {
+			go s.sendRequest(r)
+		}
+
+	}()
+
+	return s, nil
+}