123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172 |
- package main
- import (
- "context"
- "log"
- "math/rand"
- "time"
- "git.scraperwall.com/scw/data"
- "google.golang.org/grpc"
- pb "git.scraperwall.com/scw/grpc/scw"
- )
- // ScwRPC sends incoming requests to an RPC server
- type ScwRPC struct {
- Connection *grpc.ClientConn
- Client pb.SCWProtosClient
- RChan chan *data.Request
- RChanCap int
- }
- // Connect creates a new connection to the RPC server
- func (s *ScwRPC) Connect(addr string) error {
- var err error
- s.Connection, err = grpc.Dial(addr, grpc.WithInsecure())
- if err != nil {
- return err
- }
- // defer conn.Close()
- s.Client = pb.NewSCWProtosClient(s.Connection)
- return nil
- }
- func (s *ScwRPC) sendRequest(req *data.Request) {
- ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
- _, err := s.Client.SendRequest(ctx, req)
- defer cancel()
- if err != nil {
- if rand.Intn(100) == 0 {
- log.Printf("sending request failed: %s\n", err)
- }
- return
- }
- }
- // NewScwRPC generates a new ScwRPC instance
- func NewScwRPC(addr string) (*ScwRPC, error) {
- s := &ScwRPC{}
- err := s.Connect(addr)
- if err != nil {
- return nil, err
- }
- s.RChanCap = 5000
- s.RChan = make(chan *data.Request, s.RChanCap)
- go func() {
- for r := range s.RChan {
- go s.sendRequest(r)
- }
- }()
- return s, nil
- }
|