rpc.go 1.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172
  1. package main
  2. import (
  3. "context"
  4. "log"
  5. "math/rand"
  6. "time"
  7. "git.scraperwall.com/scw/data"
  8. "google.golang.org/grpc"
  9. pb "git.scraperwall.com/scw/grpc/scw"
  10. )
  11. // ScwRPC sends incoming requests to an RPC server
  12. type ScwRPC struct {
  13. Connection *grpc.ClientConn
  14. Client pb.SCWProtosClient
  15. RChan chan *data.Request
  16. RChanCap int
  17. }
  18. // Connect creates a new connection to the RPC server
  19. func (s *ScwRPC) Connect(addr string) error {
  20. var err error
  21. s.Connection, err = grpc.Dial(addr, grpc.WithInsecure())
  22. if err != nil {
  23. return err
  24. }
  25. // defer conn.Close()
  26. s.Client = pb.NewSCWProtosClient(s.Connection)
  27. return nil
  28. }
  29. func (s *ScwRPC) sendRequest(req *data.Request) {
  30. ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
  31. _, err := s.Client.SendRequest(ctx, req)
  32. defer cancel()
  33. if err != nil {
  34. if rand.Intn(100) == 0 {
  35. log.Printf("sending request failed: %s\n", err)
  36. }
  37. return
  38. }
  39. }
  40. // NewScwRPC generates a new ScwRPC instance
  41. func NewScwRPC(addr string) (*ScwRPC, error) {
  42. s := &ScwRPC{}
  43. err := s.Connect(addr)
  44. if err != nil {
  45. return nil, err
  46. }
  47. s.RChanCap = 5000
  48. s.RChan = make(chan *data.Request, s.RChanCap)
  49. go func() {
  50. for r := range s.RChan {
  51. go s.sendRequest(r)
  52. }
  53. }()
  54. return s, nil
  55. }