main.go 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290
  1. package main
  2. import (
  3. "bufio"
  4. "encoding/csv"
  5. "flag"
  6. "fmt"
  7. "io"
  8. "log"
  9. "net"
  10. "net/http"
  11. "os"
  12. "strings"
  13. "time"
  14. "github.com/google/gopacket"
  15. "github.com/google/gopacket/layers"
  16. "github.com/google/gopacket/pcap"
  17. "github.com/nats-io/nats"
  18. "github.com/nats-io/nats/encoders/protobuf"
  19. "git.scraperwall.com/scw/data"
  20. "git.scraperwall.com/scw/ip"
  21. )
  22. var (
  23. doLiveCapture = flag.Bool("live", false, "Capture data in real time from a given interface")
  24. iface = flag.String("interface", "eth0", "Interface to get packets from")
  25. snapshotLen = flag.Int("snapshot-len", 8192, "Snapshot Length in Bytes")
  26. filter = flag.String("filter", "tcp", "PCAP filter expression")
  27. promiscuous = flag.Bool("promiscuous", false, "Switch interface into promiscuous mode?")
  28. natsURL = flag.String("nats-url", "nats://127.0.0.1:4222", "The URL of the NATS server")
  29. natsQueue = flag.String("nats-queue", "requests", "The NATS queue name")
  30. sleepFor = flag.Duration("sleep", 0, "Sleep this long between sending data (only when replaying a file)")
  31. requestsFile = flag.String("requests", "", "CSV file containing requests (IP and URL)")
  32. useXForwardedAsSource = flag.Bool("use-x-forwarded", false, "Use the IP address in X-Forwarded-For as source")
  33. doVersion = flag.Bool("version", false, "Show version information")
  34. natsEC *nats.EncodedConn
  35. count uint64
  36. timeout = -1 * time.Second
  37. ipPriv *ip.IP
  38. // Version contains the program Version, e.g. 1.0.1
  39. Version string
  40. // BuildDate contains the date and time at which the program was compiled
  41. BuildDate string
  42. )
  43. func init() {
  44. flag.Parse()
  45. nats.RegisterEncoder(protobuf.PROTOBUF_ENCODER, &protobuf.ProtobufEncoder{})
  46. }
  47. func main() {
  48. if *doVersion {
  49. version()
  50. os.Exit(0)
  51. }
  52. go func(c *uint64) {
  53. for {
  54. fmt.Printf("%d items per second\n", *c)
  55. *c = 0
  56. time.Sleep(time.Second)
  57. }
  58. }(&count)
  59. // NATS
  60. //
  61. if *natsURL == "" {
  62. log.Fatal("No NATS URL specified (-nats-url)!")
  63. }
  64. natsConn, err := nats.Connect(*natsURL)
  65. if err != nil {
  66. log.Fatal(err)
  67. }
  68. natsEC, err = nats.NewEncodedConn(natsConn, protobuf.PROTOBUF_ENCODER)
  69. if err != nil {
  70. log.Fatalf("Encoded Connection: %v!\n", err)
  71. }
  72. // What should I do?
  73. if *requestsFile != "" {
  74. replayFile()
  75. } else if *doLiveCapture {
  76. fmt.Printf("live capture (%s, %s) to %s\n", *iface, *filter, *natsURL)
  77. liveCapture()
  78. }
  79. }
  80. func liveCapture() {
  81. ipPriv = ip.NewIP()
  82. // PCAP setup
  83. //
  84. handle, err := pcap.OpenLive(*iface, int32(*snapshotLen), *promiscuous, timeout)
  85. if err != nil {
  86. log.Fatal(err)
  87. }
  88. defer handle.Close()
  89. err = handle.SetBPFFilter(*filter)
  90. if err != nil {
  91. log.Fatal(err)
  92. }
  93. packetSource := gopacket.NewPacketSource(handle, handle.LinkType())
  94. for packet := range packetSource.Packets() {
  95. go processPacket(packet)
  96. }
  97. }
  98. // processPacket receives a raw packet from pcap, builds a Request item from it and sends it to the queue
  99. func processPacket(packet gopacket.Packet) {
  100. count++
  101. ipLayer := packet.Layer(layers.LayerTypeIPv4)
  102. if ipLayer == nil {
  103. log.Println("No IPv4 Layer!")
  104. return
  105. }
  106. ip, _ := ipLayer.(*layers.IPv4)
  107. if ip.Protocol != layers.IPProtocolTCP {
  108. log.Println("No TCP Protocol!")
  109. return
  110. }
  111. ipSrc := ip.SrcIP.String()
  112. ipDst := ip.DstIP.String()
  113. tcpLayer := packet.Layer(layers.LayerTypeTCP)
  114. if tcpLayer == nil {
  115. return
  116. }
  117. tcp, _ := tcpLayer.(*layers.TCP)
  118. portSrc := tcp.SrcPort
  119. portDst := tcp.DstPort
  120. sequence := tcp.Seq
  121. applicationLayer := packet.ApplicationLayer()
  122. if applicationLayer == nil {
  123. return
  124. }
  125. data := data.Request{}
  126. reader := bufio.NewReader(strings.NewReader(string(applicationLayer.Payload())))
  127. req, err := http.ReadRequest(reader)
  128. if err != nil {
  129. return
  130. }
  131. data.IpSrc = ipSrc
  132. data.IpDst = ipDst
  133. data.PortSrc = uint32(portSrc)
  134. data.PortDst = uint32(portDst)
  135. data.TcpSeq = uint32(sequence)
  136. data.CreatedAt = time.Now().UnixNano()
  137. data.Url = req.URL.String()
  138. data.Method = req.Method
  139. data.Referer = req.Referer()
  140. data.Host = req.Host
  141. data.Protocol = req.Proto
  142. data.Origin = data.Host
  143. if _, ok := req.Header["Connection"]; ok {
  144. data.Connection = req.Header["Connection"][0]
  145. }
  146. if _, ok := req.Header["X-Forwarded-For"]; ok {
  147. data.XForwardedFor = req.Header["X-Forwarded-For"][0]
  148. }
  149. if _, ok := req.Header["X-Real-IP"]; ok {
  150. data.XRealIP = req.Header["X-Real-IP"][0]
  151. }
  152. if _, ok := req.Header["X-Requested-With"]; ok {
  153. data.XRequestedWith = req.Header["X-Requested-With"][0]
  154. }
  155. if _, ok := req.Header["Accept-Encoding"]; ok {
  156. data.AcceptEncoding = req.Header["Accept-Encoding"][0]
  157. }
  158. if _, ok := req.Header["Accept-Language"]; ok {
  159. data.AcceptLanguage = req.Header["Accept-Language"][0]
  160. }
  161. if _, ok := req.Header["User-Agent"]; ok {
  162. data.UserAgent = req.Header["User-Agent"][0]
  163. }
  164. if _, ok := req.Header["Accept"]; ok {
  165. data.Accept = req.Header["Accept"][0]
  166. }
  167. if _, ok := req.Header["Cookie"]; ok {
  168. data.Cookie = req.Header["Cookie"][0]
  169. }
  170. data.Source = data.IpSrc
  171. if *useXForwardedAsSource && data.XForwardedFor != "" {
  172. if strings.Contains(data.XForwardedFor, ",") {
  173. ips := strings.Split(data.XForwardedFor, ",")
  174. for i := len(ips) - 1; i >= 0; i-- {
  175. ipRaw := strings.TrimSpace(ips[i])
  176. ipAddr := net.ParseIP(ipRaw)
  177. if ipAddr != nil && !ipPriv.IsPrivate(ipAddr) {
  178. data.Source = ipRaw
  179. break
  180. }
  181. }
  182. } else {
  183. ipAddr := net.ParseIP(strings.TrimSpace(data.XForwardedFor))
  184. if !ipPriv.IsPrivate(ipAddr) {
  185. data.Source = data.XForwardedFor
  186. }
  187. }
  188. }
  189. if data.Source == data.IpSrc && data.XRealIP != "" {
  190. data.Source = data.XRealIP
  191. }
  192. natsEC.Publish(*natsQueue, &data)
  193. }
  194. // replayFile takes a file containing a list of requests (SourceIP Url) and queues the requests
  195. // e.g.
  196. // 157.55.39.229 /gross-gerau/12012260-beate-anstatt
  197. // 103.232.100.98 /weinsheim-eifel/13729444-plus-warenhandelsges-mbh
  198. func replayFile() {
  199. var req data.Request
  200. var startTs time.Time
  201. var endTs time.Time
  202. for {
  203. fh, err := os.Open(*requestsFile)
  204. if err != nil {
  205. log.Fatalf("Failed to open request file '%s': %s", *requestsFile, err)
  206. }
  207. c := csv.NewReader(fh)
  208. c.Comma = ' '
  209. for {
  210. if *sleepFor > time.Nanosecond {
  211. startTs = time.Now()
  212. }
  213. r, err := c.Read()
  214. if err == io.EOF {
  215. break
  216. }
  217. if err != nil {
  218. log.Println(err)
  219. continue
  220. }
  221. req.IpSrc = r[0]
  222. req.Source = r[0]
  223. req.Url = r[1]
  224. req.UserAgent = "Munch/1.0"
  225. req.Host = "www.scraperwall.com"
  226. req.CreatedAt = time.Now().UnixNano()
  227. natsEC.Publish(*natsQueue, &req)
  228. count++
  229. if *sleepFor >= time.Nanosecond {
  230. endTs = time.Now()
  231. if endTs.Before(startTs.Add(*sleepFor)) {
  232. time.Sleep(*sleepFor - endTs.Sub(startTs))
  233. }
  234. }
  235. }
  236. }
  237. }
  238. // version outputs build information
  239. func version() {
  240. fmt.Printf("munchclient %s, built on %s\n", Version, BuildDate)
  241. }