main.go 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297
  1. package main
  2. import (
  3. "bufio"
  4. "encoding/csv"
  5. "flag"
  6. "fmt"
  7. "io"
  8. "log"
  9. "net"
  10. "net/http"
  11. "os"
  12. "strings"
  13. "time"
  14. "github.com/google/gopacket"
  15. "github.com/google/gopacket/layers"
  16. "github.com/google/gopacket/pcap"
  17. "github.com/nats-io/nats"
  18. "github.com/nats-io/nats/encoders/protobuf"
  19. "git.scraperwall.com/scw/data"
  20. "git.scraperwall.com/scw/ip"
  21. "git.scraperwall.com/scw/pidfile"
  22. )
  23. var (
  24. doLiveCapture = flag.Bool("live", false, "Capture data in real time from a given interface")
  25. iface = flag.String("interface", "eth0", "Interface to get packets from")
  26. snapshotLen = flag.Int("snapshot-len", 8192, "Snapshot Length in Bytes")
  27. filter = flag.String("filter", "tcp", "PCAP filter expression")
  28. promiscuous = flag.Bool("promiscuous", false, "Switch interface into promiscuous mode?")
  29. natsURL = flag.String("nats-url", "nats://127.0.0.1:4222", "The URL of the NATS server")
  30. natsQueue = flag.String("nats-queue", "requests", "The NATS queue name")
  31. sleepFor = flag.Duration("sleep", 0, "Sleep this long between sending data (only when replaying a file)")
  32. requestsFile = flag.String("requests", "", "CSV file containing requests (IP and URL)")
  33. useXForwardedAsSource = flag.Bool("use-x-forwarded", false, "Use the IP address in X-Forwarded-For as source")
  34. pidFile = flag.String("pidfile", "/var/run/munchclient.pid", "The location of the PID file")
  35. doVersion = flag.Bool("version", false, "Show version information")
  36. natsEC *nats.EncodedConn
  37. count uint64
  38. timeout = -1 * time.Second
  39. ipPriv *ip.IP
  40. // Version contains the program Version, e.g. 1.0.1
  41. Version string
  42. // BuildDate contains the date and time at which the program was compiled
  43. BuildDate string
  44. )
  45. func init() {
  46. flag.Parse()
  47. nats.RegisterEncoder(protobuf.PROTOBUF_ENCODER, &protobuf.ProtobufEncoder{})
  48. }
  49. func main() {
  50. if *doVersion {
  51. version()
  52. os.Exit(0)
  53. }
  54. if err := pidfile.Write(*pidFile); err != nil {
  55. log.Fatal("munchclient is already running, exiting!")
  56. os.Exit(1)
  57. }
  58. defer pidfile.Remove(*pidFile)
  59. go func(c *uint64) {
  60. for {
  61. fmt.Printf("%d requests per second\n", *c)
  62. *c = 0
  63. time.Sleep(time.Second)
  64. }
  65. }(&count)
  66. // NATS
  67. //
  68. if *natsURL == "" {
  69. log.Fatal("No NATS URL specified (-nats-url)!")
  70. }
  71. natsConn, err := nats.Connect(*natsURL)
  72. if err != nil {
  73. log.Fatal(err)
  74. }
  75. natsEC, err = nats.NewEncodedConn(natsConn, protobuf.PROTOBUF_ENCODER)
  76. if err != nil {
  77. log.Fatalf("Encoded Connection: %v!\n", err)
  78. }
  79. // What should I do?
  80. if *requestsFile != "" {
  81. replayFile()
  82. } else if *doLiveCapture {
  83. fmt.Printf("live capture (%s, %s) to %s\n", *iface, *filter, *natsURL)
  84. liveCapture()
  85. }
  86. }
  87. func liveCapture() {
  88. ipPriv = ip.NewIP()
  89. // PCAP setup
  90. //
  91. handle, err := pcap.OpenLive(*iface, int32(*snapshotLen), *promiscuous, timeout)
  92. if err != nil {
  93. log.Fatal(err)
  94. }
  95. defer handle.Close()
  96. err = handle.SetBPFFilter(*filter)
  97. if err != nil {
  98. log.Fatal(err)
  99. }
  100. packetSource := gopacket.NewPacketSource(handle, handle.LinkType())
  101. for packet := range packetSource.Packets() {
  102. go processPacket(packet)
  103. }
  104. }
  105. // processPacket receives a raw packet from pcap, builds a Request item from it and sends it to the queue
  106. func processPacket(packet gopacket.Packet) {
  107. count++
  108. ipLayer := packet.Layer(layers.LayerTypeIPv4)
  109. if ipLayer == nil {
  110. log.Println("No IPv4 Layer!")
  111. return
  112. }
  113. ip, _ := ipLayer.(*layers.IPv4)
  114. if ip.Protocol != layers.IPProtocolTCP {
  115. log.Println("No TCP Protocol!")
  116. return
  117. }
  118. ipSrc := ip.SrcIP.String()
  119. ipDst := ip.DstIP.String()
  120. tcpLayer := packet.Layer(layers.LayerTypeTCP)
  121. if tcpLayer == nil {
  122. return
  123. }
  124. tcp, _ := tcpLayer.(*layers.TCP)
  125. portSrc := tcp.SrcPort
  126. portDst := tcp.DstPort
  127. sequence := tcp.Seq
  128. applicationLayer := packet.ApplicationLayer()
  129. if applicationLayer == nil {
  130. return
  131. }
  132. data := data.Request{}
  133. reader := bufio.NewReader(strings.NewReader(string(applicationLayer.Payload())))
  134. req, err := http.ReadRequest(reader)
  135. if err != nil {
  136. return
  137. }
  138. data.IpSrc = ipSrc
  139. data.IpDst = ipDst
  140. data.PortSrc = uint32(portSrc)
  141. data.PortDst = uint32(portDst)
  142. data.TcpSeq = uint32(sequence)
  143. data.CreatedAt = time.Now().UnixNano()
  144. data.Url = req.URL.String()
  145. data.Method = req.Method
  146. data.Referer = req.Referer()
  147. data.Host = req.Host
  148. data.Protocol = req.Proto
  149. data.Origin = data.Host
  150. if _, ok := req.Header["Connection"]; ok {
  151. data.Connection = req.Header["Connection"][0]
  152. }
  153. if _, ok := req.Header["X-Forwarded-For"]; ok {
  154. data.XForwardedFor = req.Header["X-Forwarded-For"][0]
  155. }
  156. if _, ok := req.Header["X-Real-IP"]; ok {
  157. data.XRealIP = req.Header["X-Real-IP"][0]
  158. }
  159. if _, ok := req.Header["X-Requested-With"]; ok {
  160. data.XRequestedWith = req.Header["X-Requested-With"][0]
  161. }
  162. if _, ok := req.Header["Accept-Encoding"]; ok {
  163. data.AcceptEncoding = req.Header["Accept-Encoding"][0]
  164. }
  165. if _, ok := req.Header["Accept-Language"]; ok {
  166. data.AcceptLanguage = req.Header["Accept-Language"][0]
  167. }
  168. if _, ok := req.Header["User-Agent"]; ok {
  169. data.UserAgent = req.Header["User-Agent"][0]
  170. }
  171. if _, ok := req.Header["Accept"]; ok {
  172. data.Accept = req.Header["Accept"][0]
  173. }
  174. if _, ok := req.Header["Cookie"]; ok {
  175. data.Cookie = req.Header["Cookie"][0]
  176. }
  177. data.Source = data.IpSrc
  178. if *useXForwardedAsSource && data.XForwardedFor != "" {
  179. if strings.Contains(data.XForwardedFor, ",") {
  180. ips := strings.Split(data.XForwardedFor, ",")
  181. for i := len(ips) - 1; i >= 0; i-- {
  182. ipRaw := strings.TrimSpace(ips[i])
  183. ipAddr := net.ParseIP(ipRaw)
  184. if ipAddr != nil && !ipPriv.IsPrivate(ipAddr) {
  185. data.Source = ipRaw
  186. break
  187. }
  188. }
  189. } else {
  190. ipAddr := net.ParseIP(strings.TrimSpace(data.XForwardedFor))
  191. if !ipPriv.IsPrivate(ipAddr) {
  192. data.Source = data.XForwardedFor
  193. }
  194. }
  195. }
  196. if data.Source == data.IpSrc && data.XRealIP != "" {
  197. data.Source = data.XRealIP
  198. }
  199. natsEC.Publish(*natsQueue, &data)
  200. }
  201. // replayFile takes a file containing a list of requests (SourceIP Url) and queues the requests
  202. // e.g.
  203. // 157.55.39.229 /gross-gerau/12012260-beate-anstatt
  204. // 103.232.100.98 /weinsheim-eifel/13729444-plus-warenhandelsges-mbh
  205. func replayFile() {
  206. var req data.Request
  207. var startTs time.Time
  208. var endTs time.Time
  209. for {
  210. fh, err := os.Open(*requestsFile)
  211. if err != nil {
  212. log.Fatalf("Failed to open request file '%s': %s", *requestsFile, err)
  213. }
  214. c := csv.NewReader(fh)
  215. c.Comma = ' '
  216. for {
  217. if *sleepFor > time.Nanosecond {
  218. startTs = time.Now()
  219. }
  220. r, err := c.Read()
  221. if err == io.EOF {
  222. break
  223. }
  224. if err != nil {
  225. log.Println(err)
  226. continue
  227. }
  228. req.IpSrc = r[0]
  229. req.Source = r[0]
  230. req.Url = r[1]
  231. req.UserAgent = "Munch/1.0"
  232. req.Host = "www.scraperwall.com"
  233. req.CreatedAt = time.Now().UnixNano()
  234. natsEC.Publish(*natsQueue, &req)
  235. count++
  236. if *sleepFor >= time.Nanosecond {
  237. endTs = time.Now()
  238. if endTs.Before(startTs.Add(*sleepFor)) {
  239. time.Sleep(*sleepFor - endTs.Sub(startTs))
  240. }
  241. }
  242. }
  243. }
  244. }
  245. // version outputs build information
  246. func version() {
  247. fmt.Printf("munchclient %s, built on %s\n", Version, BuildDate)
  248. }