rpc.go 1.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374
  1. package main
  2. /*
  3. import (
  4. "context"
  5. "log"
  6. "math/rand"
  7. "time"
  8. "git.scraperwall.com/scw/data"
  9. "google.golang.org/grpc"
  10. pb "git.scraperwall.com/scw/grpc/scw"
  11. )
  12. // ScwRPC sends incoming requests to an RPC server
  13. type ScwRPC struct {
  14. Connection *grpc.ClientConn
  15. Client pb.SCWProtosClient
  16. RChan chan *data.Request
  17. RChanCap int
  18. }
  19. // Connect creates a new connection to the RPC server
  20. func (s *ScwRPC) Connect(addr string) error {
  21. var err error
  22. s.Connection, err = grpc.Dial(addr, grpc.WithInsecure())
  23. if err != nil {
  24. return err
  25. }
  26. // defer conn.Close()
  27. s.Client = pb.NewSCWProtosClient(s.Connection)
  28. return nil
  29. }
  30. func (s *ScwRPC) sendRequest(req *data.Request) {
  31. ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
  32. _, err := s.Client.SendRequest(ctx, req)
  33. defer cancel()
  34. if err != nil {
  35. if rand.Intn(100) == 0 {
  36. log.Printf("sending request failed: %s\n", err)
  37. }
  38. return
  39. }
  40. }
  41. // NewScwRPC generates a new ScwRPC instance
  42. func NewScwRPC(addr string) (*ScwRPC, error) {
  43. s := &ScwRPC{}
  44. err := s.Connect(addr)
  45. if err != nil {
  46. return nil, err
  47. }
  48. s.RChanCap = 5000
  49. s.RChan = make(chan *data.Request, s.RChanCap)
  50. go func() {
  51. for r := range s.RChan {
  52. go s.sendRequest(r)
  53. }
  54. }()
  55. return s, nil
  56. }
  57. */