main.go 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361
  1. package main
  2. import (
  3. "bufio"
  4. "encoding/csv"
  5. "flag"
  6. "fmt"
  7. "io"
  8. "log"
  9. "net"
  10. "net/http"
  11. "os"
  12. "strings"
  13. "time"
  14. "github.com/BurntSushi/toml"
  15. "github.com/google/gopacket"
  16. "github.com/google/gopacket/layers"
  17. "github.com/google/gopacket/pcap"
  18. "github.com/nats-io/nats"
  19. "github.com/nats-io/nats/encoders/protobuf"
  20. "git.scraperwall.com/scw/data"
  21. "git.scraperwall.com/scw/ip"
  22. )
  23. var (
  24. doLiveCapture = flag.Bool("live", false, "Capture data in real time from a given interface")
  25. iface = flag.String("interface", "eth0", "Interface to get packets from")
  26. snapshotLen = flag.Int("snapshot-len", 8192, "Snapshot Length in Bytes")
  27. filter = flag.String("filter", "tcp", "PCAP filter expression")
  28. promiscuous = flag.Bool("promiscuous", false, "Switch interface into promiscuous mode?")
  29. natsURL = flag.String("nats-url", "nats://127.0.0.1:4222", "The URL of the NATS server")
  30. natsQueue = flag.String("nats-queue", "requests", "The NATS queue name")
  31. sleepFor = flag.Duration("sleep", 0, "Sleep this long between sending data (only when replaying a file)")
  32. requestsFile = flag.String("requests", "", "CSV file containing requests (IP and URL)")
  33. useXForwardedAsSource = flag.Bool("use-x-forwarded", false, "Use the IP address in X-Forwarded-For as source")
  34. configFile = flag.String("config", "", "The location of the TOML config file")
  35. beQuiet = flag.Bool("quiet", true, "Be quiet")
  36. doVersion = flag.Bool("version", false, "Show version information")
  37. natsEC *nats.EncodedConn
  38. count uint64
  39. timeout = -1 * time.Second
  40. ipPriv *ip.IP
  41. config Config
  42. // Version contains the program Version, e.g. 1.0.1
  43. Version string
  44. // BuildDate contains the date and time at which the program was compiled
  45. BuildDate string
  46. )
  47. // Config contains the program configuration
  48. type Config struct {
  49. Live bool
  50. Interface string
  51. SnapshotLen int
  52. Filter string
  53. Promiscuous bool
  54. NatsURL string
  55. NatsQueue string
  56. SleepFor time.Duration
  57. RequestsFile string
  58. UseXForwardedAsSource bool
  59. Quiet bool
  60. }
  61. func (c Config) print() {
  62. fmt.Printf("Live: %t\n", c.Live)
  63. fmt.Printf("Interface: %s\n", c.Interface)
  64. fmt.Printf("SnapshotLen: %d\n", c.SnapshotLen)
  65. fmt.Printf("Filter: %s\n", c.Filter)
  66. fmt.Printf("Promiscuous: %t\n", c.Promiscuous)
  67. fmt.Printf("NatsURL: %s\n", c.NatsURL)
  68. fmt.Printf("NatsQueue: %s\n", c.NatsQueue)
  69. fmt.Printf("SleepFor: %s\n", c.SleepFor.String())
  70. fmt.Printf("RequestsFile: %s\n", c.RequestsFile)
  71. fmt.Printf("UseXForwardedAsSource: %t\n", c.UseXForwardedAsSource)
  72. fmt.Printf("Quiet: %t\n", c.Quiet)
  73. }
  74. func init() {
  75. flag.Parse()
  76. nats.RegisterEncoder(protobuf.PROTOBUF_ENCODER, &protobuf.ProtobufEncoder{})
  77. }
  78. func main() {
  79. if *doVersion {
  80. version()
  81. os.Exit(0)
  82. }
  83. loadConfig()
  84. // Output how many requests per second were sent
  85. if !config.Quiet {
  86. go func(c *uint64) {
  87. for {
  88. fmt.Printf("%d requests per second\n", *c)
  89. *c = 0
  90. time.Sleep(time.Second)
  91. }
  92. }(&count)
  93. }
  94. // NATS
  95. //
  96. if config.NatsURL == "" {
  97. log.Fatal("No NATS URL specified (-nats-url)!")
  98. }
  99. natsConn, err := nats.Connect(config.NatsURL)
  100. if err != nil {
  101. log.Fatal(err)
  102. }
  103. natsEC, err = nats.NewEncodedConn(natsConn, protobuf.PROTOBUF_ENCODER)
  104. if err != nil {
  105. log.Fatalf("Encoded Connection: %v!\n", err)
  106. }
  107. // What should I do?
  108. if config.RequestsFile != "" {
  109. replayFile()
  110. } else if config.Live {
  111. fmt.Printf("live capture (%s, %s) to %s\n", config.Interface, config.Filter, config.NatsURL)
  112. liveCapture()
  113. }
  114. }
  115. func liveCapture() {
  116. ipPriv = ip.NewIP()
  117. // PCAP setup
  118. //
  119. handle, err := pcap.OpenLive(config.Interface, int32(config.SnapshotLen), config.Promiscuous, timeout)
  120. if err != nil {
  121. log.Fatal(err)
  122. }
  123. defer handle.Close()
  124. err = handle.SetBPFFilter(config.Filter)
  125. if err != nil {
  126. log.Fatal(err)
  127. }
  128. packetSource := gopacket.NewPacketSource(handle, handle.LinkType())
  129. for packet := range packetSource.Packets() {
  130. go processPacket(packet)
  131. }
  132. }
  133. // processPacket receives a raw packet from pcap, builds a Request item from it and sends it to the queue
  134. func processPacket(packet gopacket.Packet) {
  135. count++
  136. ipLayer := packet.Layer(layers.LayerTypeIPv4)
  137. if ipLayer == nil {
  138. log.Println("No IPv4 Layer!")
  139. return
  140. }
  141. ip, _ := ipLayer.(*layers.IPv4)
  142. if ip.Protocol != layers.IPProtocolTCP {
  143. log.Println("No TCP Protocol!")
  144. return
  145. }
  146. ipSrc := ip.SrcIP.String()
  147. ipDst := ip.DstIP.String()
  148. tcpLayer := packet.Layer(layers.LayerTypeTCP)
  149. if tcpLayer == nil {
  150. return
  151. }
  152. tcp, _ := tcpLayer.(*layers.TCP)
  153. portSrc := tcp.SrcPort
  154. portDst := tcp.DstPort
  155. sequence := tcp.Seq
  156. applicationLayer := packet.ApplicationLayer()
  157. if applicationLayer == nil {
  158. return
  159. }
  160. data := data.Request{}
  161. reader := bufio.NewReader(strings.NewReader(string(applicationLayer.Payload())))
  162. req, err := http.ReadRequest(reader)
  163. if err != nil {
  164. return
  165. }
  166. data.IpSrc = ipSrc
  167. data.IpDst = ipDst
  168. data.PortSrc = uint32(portSrc)
  169. data.PortDst = uint32(portDst)
  170. data.TcpSeq = uint32(sequence)
  171. data.CreatedAt = time.Now().UnixNano()
  172. data.Url = req.URL.String()
  173. data.Method = req.Method
  174. data.Referer = req.Referer()
  175. data.Host = req.Host
  176. data.Protocol = req.Proto
  177. data.Origin = data.Host
  178. if _, ok := req.Header["Connection"]; ok {
  179. data.Connection = req.Header["Connection"][0]
  180. }
  181. if _, ok := req.Header["X-Forwarded-For"]; ok {
  182. data.XForwardedFor = req.Header["X-Forwarded-For"][0]
  183. }
  184. if _, ok := req.Header["X-Real-IP"]; ok {
  185. data.XRealIP = req.Header["X-Real-IP"][0]
  186. }
  187. if _, ok := req.Header["X-Requested-With"]; ok {
  188. data.XRequestedWith = req.Header["X-Requested-With"][0]
  189. }
  190. if _, ok := req.Header["Accept-Encoding"]; ok {
  191. data.AcceptEncoding = req.Header["Accept-Encoding"][0]
  192. }
  193. if _, ok := req.Header["Accept-Language"]; ok {
  194. data.AcceptLanguage = req.Header["Accept-Language"][0]
  195. }
  196. if _, ok := req.Header["User-Agent"]; ok {
  197. data.UserAgent = req.Header["User-Agent"][0]
  198. }
  199. if _, ok := req.Header["Accept"]; ok {
  200. data.Accept = req.Header["Accept"][0]
  201. }
  202. if _, ok := req.Header["Cookie"]; ok {
  203. data.Cookie = req.Header["Cookie"][0]
  204. }
  205. data.Source = data.IpSrc
  206. if config.UseXForwardedAsSource && data.XForwardedFor != "" {
  207. if strings.Contains(data.XForwardedFor, ",") {
  208. ips := strings.Split(data.XForwardedFor, ",")
  209. for i := len(ips) - 1; i >= 0; i-- {
  210. ipRaw := strings.TrimSpace(ips[i])
  211. ipAddr := net.ParseIP(ipRaw)
  212. if ipAddr != nil && !ipPriv.IsPrivate(ipAddr) {
  213. data.Source = ipRaw
  214. break
  215. }
  216. }
  217. } else {
  218. ipAddr := net.ParseIP(strings.TrimSpace(data.XForwardedFor))
  219. if !ipPriv.IsPrivate(ipAddr) {
  220. data.Source = data.XForwardedFor
  221. }
  222. }
  223. }
  224. if data.Source == data.IpSrc && data.XRealIP != "" {
  225. data.Source = data.XRealIP
  226. }
  227. natsEC.Publish(config.NatsQueue, &data)
  228. }
  229. // replayFile takes a file containing a list of requests (SourceIP Url) and queues the requests
  230. // e.g.
  231. // 157.55.39.229 /gross-gerau/12012260-beate-anstatt
  232. // 103.232.100.98 /weinsheim-eifel/13729444-plus-warenhandelsges-mbh
  233. func replayFile() {
  234. var req data.Request
  235. var startTs time.Time
  236. var endTs time.Time
  237. for {
  238. fh, err := os.Open(config.RequestsFile)
  239. if err != nil {
  240. log.Fatalf("Failed to open request file '%s': %s", config.RequestsFile, err)
  241. }
  242. c := csv.NewReader(fh)
  243. c.Comma = ' '
  244. for {
  245. if config.SleepFor > time.Nanosecond {
  246. startTs = time.Now()
  247. }
  248. r, err := c.Read()
  249. if err == io.EOF {
  250. break
  251. }
  252. if err != nil {
  253. log.Println(err)
  254. continue
  255. }
  256. req.IpSrc = r[0]
  257. req.Source = r[0]
  258. req.Url = r[1]
  259. req.UserAgent = "Munch/1.0"
  260. req.Host = "demo.scraperwall.com"
  261. req.CreatedAt = time.Now().UnixNano()
  262. natsEC.Publish(config.NatsQueue, &req)
  263. count++
  264. if config.SleepFor >= time.Nanosecond {
  265. endTs = time.Now()
  266. if endTs.Before(startTs.Add(config.SleepFor)) {
  267. time.Sleep(config.SleepFor - endTs.Sub(startTs))
  268. }
  269. }
  270. }
  271. }
  272. }
  273. func loadConfig() {
  274. // initialize with values from the command line / environment
  275. config.Live = *doLiveCapture
  276. config.Interface = *iface
  277. config.SnapshotLen = *snapshotLen
  278. config.Filter = *filter
  279. config.Promiscuous = *promiscuous
  280. config.NatsURL = *natsURL
  281. config.NatsQueue = *natsQueue
  282. config.SleepFor = *sleepFor
  283. config.RequestsFile = *requestsFile
  284. config.UseXForwardedAsSource = *useXForwardedAsSource
  285. config.Quiet = *beQuiet
  286. if *configFile == "" {
  287. return
  288. }
  289. _, err := os.Stat(*configFile)
  290. if err != nil {
  291. log.Printf("%s: %s\n", *configFile, err)
  292. return
  293. }
  294. if _, err = toml.DecodeFile(*configFile, &config); err != nil {
  295. log.Printf("%s: %s\n", *configFile, err)
  296. }
  297. if !config.Quiet {
  298. config.print()
  299. }
  300. }
  301. // version outputs build information
  302. func version() {
  303. fmt.Printf("munchclient %s, built on %s\n", Version, BuildDate)
  304. }