main.go 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371
  1. package main
  2. import (
  3. "bufio"
  4. "encoding/csv"
  5. "flag"
  6. "fmt"
  7. "io"
  8. "log"
  9. "net"
  10. "net/http"
  11. "os"
  12. "strings"
  13. "time"
  14. "github.com/BurntSushi/toml"
  15. "github.com/google/gopacket"
  16. "github.com/google/gopacket/layers"
  17. "github.com/google/gopacket/pcap"
  18. "github.com/nats-io/nats"
  19. "github.com/nats-io/nats/encoders/protobuf"
  20. "git.scraperwall.com/scw/data"
  21. "git.scraperwall.com/scw/ip"
  22. )
  23. var (
  24. doLiveCapture = flag.Bool("live", false, "Capture data in real time from a given interface")
  25. iface = flag.String("interface", "eth0", "Interface to get packets from")
  26. snapshotLen = flag.Int("snapshot-len", 8192, "Snapshot Length in Bytes")
  27. filter = flag.String("filter", "tcp", "PCAP filter expression")
  28. promiscuous = flag.Bool("promiscuous", false, "Switch interface into promiscuous mode?")
  29. natsURL = flag.String("nats-url", "nats://127.0.0.1:4222", "The URL of the NATS server")
  30. natsQueue = flag.String("nats-queue", "requests", "The NATS queue name")
  31. sleepFor = flag.Duration("sleep", 0, "Sleep this long between sending data (only when replaying a file)")
  32. requestsFile = flag.String("requests", "", "CSV file containing requests (IP and URL)")
  33. useXForwardedAsSource = flag.Bool("use-x-forwarded", false, "Use the IP address in X-Forwarded-For as source")
  34. configFile = flag.String("config", "", "The location of the TOML config file")
  35. beQuiet = flag.Bool("quiet", true, "Be quiet")
  36. doVersion = flag.Bool("version", false, "Show version information")
  37. natsEC *nats.EncodedConn
  38. count uint64
  39. timeout = -1 * time.Second
  40. ipPriv *ip.IP
  41. config Config
  42. // Version contains the program Version, e.g. 1.0.1
  43. Version string
  44. // BuildDate contains the date and time at which the program was compiled
  45. BuildDate string
  46. )
  47. // Config contains the program configuration
  48. type Config struct {
  49. Live bool
  50. Interface string
  51. SnapshotLen int
  52. Filter string
  53. Promiscuous bool
  54. NatsURL string
  55. NatsQueue string
  56. SleepFor duration
  57. RequestsFile string
  58. UseXForwardedAsSource bool
  59. Quiet bool
  60. }
  61. type duration struct {
  62. time.Duration
  63. }
  64. func (d *duration) UnmarshalText(text []byte) error {
  65. var err error
  66. d.Duration, err = time.ParseDuration(string(text))
  67. return err
  68. }
  69. func (c Config) print() {
  70. fmt.Printf("Live: %t\n", c.Live)
  71. fmt.Printf("Interface: %s\n", c.Interface)
  72. fmt.Printf("SnapshotLen: %d\n", c.SnapshotLen)
  73. fmt.Printf("Filter: %s\n", c.Filter)
  74. fmt.Printf("Promiscuous: %t\n", c.Promiscuous)
  75. fmt.Printf("NatsURL: %s\n", c.NatsURL)
  76. fmt.Printf("NatsQueue: %s\n", c.NatsQueue)
  77. fmt.Printf("SleepFor: %s\n", c.SleepFor.String())
  78. fmt.Printf("RequestsFile: %s\n", c.RequestsFile)
  79. fmt.Printf("UseXForwardedAsSource: %t\n", c.UseXForwardedAsSource)
  80. fmt.Printf("Quiet: %t\n", c.Quiet)
  81. }
  82. func init() {
  83. flag.Parse()
  84. nats.RegisterEncoder(protobuf.PROTOBUF_ENCODER, &protobuf.ProtobufEncoder{})
  85. }
  86. func main() {
  87. if *doVersion {
  88. version()
  89. os.Exit(0)
  90. }
  91. loadConfig()
  92. // Output how many requests per second were sent
  93. if !config.Quiet {
  94. go func(c *uint64) {
  95. for {
  96. fmt.Printf("%d requests per second\n", *c)
  97. *c = 0
  98. time.Sleep(time.Second)
  99. }
  100. }(&count)
  101. }
  102. // NATS
  103. //
  104. if config.NatsURL == "" {
  105. log.Fatal("No NATS URL specified (-nats-url)!")
  106. }
  107. natsConn, err := nats.Connect(config.NatsURL)
  108. if err != nil {
  109. log.Fatal(err)
  110. }
  111. natsEC, err = nats.NewEncodedConn(natsConn, protobuf.PROTOBUF_ENCODER)
  112. if err != nil {
  113. log.Fatalf("Encoded Connection: %v!\n", err)
  114. }
  115. // What should I do?
  116. if config.RequestsFile != "" {
  117. replayFile()
  118. } else if config.Live {
  119. fmt.Printf("live capture (%s, %s) to %s\n", config.Interface, config.Filter, config.NatsURL)
  120. liveCapture()
  121. }
  122. }
  123. func liveCapture() {
  124. ipPriv = ip.NewIP()
  125. // PCAP setup
  126. //
  127. handle, err := pcap.OpenLive(config.Interface, int32(config.SnapshotLen), config.Promiscuous, timeout)
  128. if err != nil {
  129. log.Fatal(err)
  130. }
  131. defer handle.Close()
  132. err = handle.SetBPFFilter(config.Filter)
  133. if err != nil {
  134. log.Fatal(err)
  135. }
  136. packetSource := gopacket.NewPacketSource(handle, handle.LinkType())
  137. for packet := range packetSource.Packets() {
  138. go processPacket(packet)
  139. }
  140. }
  141. // processPacket receives a raw packet from pcap, builds a Request item from it and sends it to the queue
  142. func processPacket(packet gopacket.Packet) {
  143. count++
  144. ipLayer := packet.Layer(layers.LayerTypeIPv4)
  145. if ipLayer == nil {
  146. log.Println("No IPv4 Layer!")
  147. return
  148. }
  149. ip, _ := ipLayer.(*layers.IPv4)
  150. if ip.Protocol != layers.IPProtocolTCP {
  151. log.Println("No TCP Protocol!")
  152. return
  153. }
  154. ipSrc := ip.SrcIP.String()
  155. ipDst := ip.DstIP.String()
  156. tcpLayer := packet.Layer(layers.LayerTypeTCP)
  157. if tcpLayer == nil {
  158. return
  159. }
  160. tcp, _ := tcpLayer.(*layers.TCP)
  161. portSrc := tcp.SrcPort
  162. portDst := tcp.DstPort
  163. sequence := tcp.Seq
  164. applicationLayer := packet.ApplicationLayer()
  165. if applicationLayer == nil {
  166. return
  167. }
  168. data := data.Request{}
  169. reader := bufio.NewReader(strings.NewReader(string(applicationLayer.Payload())))
  170. req, err := http.ReadRequest(reader)
  171. if err != nil {
  172. return
  173. }
  174. data.IpSrc = ipSrc
  175. data.IpDst = ipDst
  176. data.PortSrc = uint32(portSrc)
  177. data.PortDst = uint32(portDst)
  178. data.TcpSeq = uint32(sequence)
  179. data.CreatedAt = time.Now().UnixNano()
  180. data.Url = req.URL.String()
  181. data.Method = req.Method
  182. data.Referer = req.Referer()
  183. data.Host = req.Host
  184. data.Protocol = req.Proto
  185. data.Origin = data.Host
  186. if _, ok := req.Header["Connection"]; ok {
  187. data.Connection = req.Header["Connection"][0]
  188. }
  189. if _, ok := req.Header["X-Forwarded-For"]; ok {
  190. data.XForwardedFor = req.Header["X-Forwarded-For"][0]
  191. }
  192. if _, ok := req.Header["X-Real-IP"]; ok {
  193. data.XRealIP = req.Header["X-Real-IP"][0]
  194. }
  195. if _, ok := req.Header["X-Requested-With"]; ok {
  196. data.XRequestedWith = req.Header["X-Requested-With"][0]
  197. }
  198. if _, ok := req.Header["Accept-Encoding"]; ok {
  199. data.AcceptEncoding = req.Header["Accept-Encoding"][0]
  200. }
  201. if _, ok := req.Header["Accept-Language"]; ok {
  202. data.AcceptLanguage = req.Header["Accept-Language"][0]
  203. }
  204. if _, ok := req.Header["User-Agent"]; ok {
  205. data.UserAgent = req.Header["User-Agent"][0]
  206. }
  207. if _, ok := req.Header["Accept"]; ok {
  208. data.Accept = req.Header["Accept"][0]
  209. }
  210. if _, ok := req.Header["Cookie"]; ok {
  211. data.Cookie = req.Header["Cookie"][0]
  212. }
  213. data.Source = data.IpSrc
  214. if config.UseXForwardedAsSource && data.XForwardedFor != "" {
  215. if strings.Contains(data.XForwardedFor, ",") {
  216. ips := strings.Split(data.XForwardedFor, ",")
  217. for i := len(ips) - 1; i >= 0; i-- {
  218. ipRaw := strings.TrimSpace(ips[i])
  219. ipAddr := net.ParseIP(ipRaw)
  220. if ipAddr != nil && !ipPriv.IsPrivate(ipAddr) {
  221. data.Source = ipRaw
  222. break
  223. }
  224. }
  225. } else {
  226. ipAddr := net.ParseIP(strings.TrimSpace(data.XForwardedFor))
  227. if !ipPriv.IsPrivate(ipAddr) {
  228. data.Source = data.XForwardedFor
  229. }
  230. }
  231. }
  232. if data.Source == data.IpSrc && data.XRealIP != "" {
  233. data.Source = data.XRealIP
  234. }
  235. natsEC.Publish(config.NatsQueue, &data)
  236. }
  237. // replayFile takes a file containing a list of requests (SourceIP Url) and queues the requests
  238. // e.g.
  239. // 157.55.39.229 /gross-gerau/12012260-beate-anstatt
  240. // 103.232.100.98 /weinsheim-eifel/13729444-plus-warenhandelsges-mbh
  241. func replayFile() {
  242. var req data.Request
  243. var startTs time.Time
  244. var endTs time.Time
  245. for {
  246. fh, err := os.Open(config.RequestsFile)
  247. if err != nil {
  248. log.Fatalf("Failed to open request file '%s': %s", config.RequestsFile, err)
  249. }
  250. c := csv.NewReader(fh)
  251. c.Comma = ' '
  252. for {
  253. if config.SleepFor.Duration > time.Nanosecond {
  254. startTs = time.Now()
  255. }
  256. r, err := c.Read()
  257. if err == io.EOF {
  258. break
  259. }
  260. if err != nil {
  261. log.Println(err)
  262. continue
  263. }
  264. req.IpSrc = r[0]
  265. req.Source = r[0]
  266. req.Url = r[1]
  267. req.UserAgent = "Munch/1.0"
  268. req.Host = "demo.scraperwall.com"
  269. req.CreatedAt = time.Now().UnixNano()
  270. natsEC.Publish(config.NatsQueue, &req)
  271. count++
  272. if config.SleepFor.Duration >= time.Nanosecond {
  273. endTs = time.Now()
  274. if endTs.Before(startTs.Add(config.SleepFor.Duration)) {
  275. time.Sleep(config.SleepFor.Duration - endTs.Sub(startTs))
  276. }
  277. }
  278. }
  279. }
  280. }
  281. func loadConfig() {
  282. // initialize with values from the command line / environment
  283. config.Live = *doLiveCapture
  284. config.Interface = *iface
  285. config.SnapshotLen = *snapshotLen
  286. config.Filter = *filter
  287. config.Promiscuous = *promiscuous
  288. config.NatsURL = *natsURL
  289. config.NatsQueue = *natsQueue
  290. config.SleepFor.Duration = *sleepFor
  291. config.RequestsFile = *requestsFile
  292. config.UseXForwardedAsSource = *useXForwardedAsSource
  293. config.Quiet = *beQuiet
  294. if *configFile == "" {
  295. return
  296. }
  297. _, err := os.Stat(*configFile)
  298. if err != nil {
  299. log.Printf("%s: %s\n", *configFile, err)
  300. return
  301. }
  302. if _, err = toml.DecodeFile(*configFile, &config); err != nil {
  303. log.Printf("%s: %s\n", *configFile, err)
  304. }
  305. if !config.Quiet {
  306. config.print()
  307. }
  308. }
  309. // version outputs build information
  310. func version() {
  311. fmt.Printf("munchclient %s, built on %s\n", Version, BuildDate)
  312. }