main.go 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350
  1. package main
  2. import (
  3. "bufio"
  4. "encoding/csv"
  5. "flag"
  6. "fmt"
  7. "io"
  8. "log"
  9. "net"
  10. "net/http"
  11. "os"
  12. "strings"
  13. "time"
  14. "github.com/BurntSushi/toml"
  15. "github.com/google/gopacket"
  16. "github.com/google/gopacket/layers"
  17. "github.com/google/gopacket/pcap"
  18. "github.com/kr/pretty"
  19. "github.com/nats-io/nats"
  20. "github.com/nats-io/nats/encoders/protobuf"
  21. "git.scraperwall.com/scw/data"
  22. "git.scraperwall.com/scw/ip"
  23. "git.scraperwall.com/scw/pidfile"
  24. )
  25. var (
  26. doLiveCapture = flag.Bool("live", false, "Capture data in real time from a given interface")
  27. iface = flag.String("interface", "eth0", "Interface to get packets from")
  28. snapshotLen = flag.Int("snapshot-len", 8192, "Snapshot Length in Bytes")
  29. filter = flag.String("filter", "tcp", "PCAP filter expression")
  30. promiscuous = flag.Bool("promiscuous", false, "Switch interface into promiscuous mode?")
  31. natsURL = flag.String("nats-url", "nats://127.0.0.1:4222", "The URL of the NATS server")
  32. natsQueue = flag.String("nats-queue", "requests", "The NATS queue name")
  33. sleepFor = flag.Duration("sleep", 0, "Sleep this long between sending data (only when replaying a file)")
  34. requestsFile = flag.String("requests", "", "CSV file containing requests (IP and URL)")
  35. useXForwardedAsSource = flag.Bool("use-x-forwarded", false, "Use the IP address in X-Forwarded-For as source")
  36. pidFile = flag.String("pidfile", "/var/run/munchclient.pid", "The location of the PID file")
  37. configFile = flag.String("config", "", "The location of the TOML config file")
  38. doVersion = flag.Bool("version", false, "Show version information")
  39. natsEC *nats.EncodedConn
  40. count uint64
  41. timeout = -1 * time.Second
  42. ipPriv *ip.IP
  43. config Config
  44. // Version contains the program Version, e.g. 1.0.1
  45. Version string
  46. // BuildDate contains the date and time at which the program was compiled
  47. BuildDate string
  48. )
  49. // Config contains the program configuration
  50. type Config struct {
  51. Live bool
  52. Interface string
  53. SnapshotLen int
  54. Filter string
  55. Promiscuous bool
  56. NatsURL string
  57. NatsQueue string
  58. SleepFor time.Duration
  59. RequestsFile string
  60. UseXForwardedAsSource bool
  61. PidFile string
  62. }
  63. func init() {
  64. flag.Parse()
  65. nats.RegisterEncoder(protobuf.PROTOBUF_ENCODER, &protobuf.ProtobufEncoder{})
  66. }
  67. func loadConfig() {
  68. // initialize with values from the command line / environment
  69. config.Live = *doLiveCapture
  70. config.Interface = *iface
  71. config.SnapshotLen = *snapshotLen
  72. config.Filter = *filter
  73. config.Promiscuous = *promiscuous
  74. config.NatsURL = *natsURL
  75. config.NatsQueue = *natsQueue
  76. config.SleepFor = *sleepFor
  77. config.RequestsFile = *requestsFile
  78. config.UseXForwardedAsSource = *useXForwardedAsSource
  79. config.PidFile = *pidFile
  80. if *configFile == "" {
  81. return
  82. }
  83. _, err := os.Stat(*configFile)
  84. if err != nil {
  85. log.Printf("%s: %s\n", *configFile, err)
  86. return
  87. }
  88. if _, err = toml.DecodeFile(*configFile, &config); err != nil {
  89. log.Printf("%s: %s\n", *configFile, err)
  90. }
  91. pretty.Println(config)
  92. }
  93. func main() {
  94. if *doVersion {
  95. version()
  96. os.Exit(0)
  97. }
  98. loadConfig()
  99. if err := pidfile.Write(config.PidFile); err != nil {
  100. log.Fatalf("munchclient: %s\n", err)
  101. os.Exit(1)
  102. }
  103. defer pidfile.Remove(config.PidFile)
  104. go func(c *uint64) {
  105. for {
  106. fmt.Printf("%d requests per second\n", *c)
  107. *c = 0
  108. time.Sleep(time.Second)
  109. }
  110. }(&count)
  111. // NATS
  112. //
  113. if config.NatsURL == "" {
  114. log.Fatal("No NATS URL specified (-nats-url)!")
  115. }
  116. natsConn, err := nats.Connect(config.NatsURL)
  117. if err != nil {
  118. log.Fatal(err)
  119. }
  120. natsEC, err = nats.NewEncodedConn(natsConn, protobuf.PROTOBUF_ENCODER)
  121. if err != nil {
  122. log.Fatalf("Encoded Connection: %v!\n", err)
  123. }
  124. // What should I do?
  125. if config.RequestsFile != "" {
  126. replayFile()
  127. } else if config.Live {
  128. fmt.Printf("live capture (%s, %s) to %s\n", config.Interface, config.Filter, config.NatsURL)
  129. liveCapture()
  130. }
  131. }
  132. func liveCapture() {
  133. ipPriv = ip.NewIP()
  134. // PCAP setup
  135. //
  136. handle, err := pcap.OpenLive(config.Interface, int32(config.SnapshotLen), config.Promiscuous, timeout)
  137. if err != nil {
  138. log.Fatal(err)
  139. }
  140. defer handle.Close()
  141. err = handle.SetBPFFilter(config.Filter)
  142. if err != nil {
  143. log.Fatal(err)
  144. }
  145. packetSource := gopacket.NewPacketSource(handle, handle.LinkType())
  146. for packet := range packetSource.Packets() {
  147. go processPacket(packet)
  148. }
  149. }
  150. // processPacket receives a raw packet from pcap, builds a Request item from it and sends it to the queue
  151. func processPacket(packet gopacket.Packet) {
  152. count++
  153. ipLayer := packet.Layer(layers.LayerTypeIPv4)
  154. if ipLayer == nil {
  155. log.Println("No IPv4 Layer!")
  156. return
  157. }
  158. ip, _ := ipLayer.(*layers.IPv4)
  159. if ip.Protocol != layers.IPProtocolTCP {
  160. log.Println("No TCP Protocol!")
  161. return
  162. }
  163. ipSrc := ip.SrcIP.String()
  164. ipDst := ip.DstIP.String()
  165. tcpLayer := packet.Layer(layers.LayerTypeTCP)
  166. if tcpLayer == nil {
  167. return
  168. }
  169. tcp, _ := tcpLayer.(*layers.TCP)
  170. portSrc := tcp.SrcPort
  171. portDst := tcp.DstPort
  172. sequence := tcp.Seq
  173. applicationLayer := packet.ApplicationLayer()
  174. if applicationLayer == nil {
  175. return
  176. }
  177. data := data.Request{}
  178. reader := bufio.NewReader(strings.NewReader(string(applicationLayer.Payload())))
  179. req, err := http.ReadRequest(reader)
  180. if err != nil {
  181. return
  182. }
  183. data.IpSrc = ipSrc
  184. data.IpDst = ipDst
  185. data.PortSrc = uint32(portSrc)
  186. data.PortDst = uint32(portDst)
  187. data.TcpSeq = uint32(sequence)
  188. data.CreatedAt = time.Now().UnixNano()
  189. data.Url = req.URL.String()
  190. data.Method = req.Method
  191. data.Referer = req.Referer()
  192. data.Host = req.Host
  193. data.Protocol = req.Proto
  194. data.Origin = data.Host
  195. if _, ok := req.Header["Connection"]; ok {
  196. data.Connection = req.Header["Connection"][0]
  197. }
  198. if _, ok := req.Header["X-Forwarded-For"]; ok {
  199. data.XForwardedFor = req.Header["X-Forwarded-For"][0]
  200. }
  201. if _, ok := req.Header["X-Real-IP"]; ok {
  202. data.XRealIP = req.Header["X-Real-IP"][0]
  203. }
  204. if _, ok := req.Header["X-Requested-With"]; ok {
  205. data.XRequestedWith = req.Header["X-Requested-With"][0]
  206. }
  207. if _, ok := req.Header["Accept-Encoding"]; ok {
  208. data.AcceptEncoding = req.Header["Accept-Encoding"][0]
  209. }
  210. if _, ok := req.Header["Accept-Language"]; ok {
  211. data.AcceptLanguage = req.Header["Accept-Language"][0]
  212. }
  213. if _, ok := req.Header["User-Agent"]; ok {
  214. data.UserAgent = req.Header["User-Agent"][0]
  215. }
  216. if _, ok := req.Header["Accept"]; ok {
  217. data.Accept = req.Header["Accept"][0]
  218. }
  219. if _, ok := req.Header["Cookie"]; ok {
  220. data.Cookie = req.Header["Cookie"][0]
  221. }
  222. data.Source = data.IpSrc
  223. if config.UseXForwardedAsSource && data.XForwardedFor != "" {
  224. if strings.Contains(data.XForwardedFor, ",") {
  225. ips := strings.Split(data.XForwardedFor, ",")
  226. for i := len(ips) - 1; i >= 0; i-- {
  227. ipRaw := strings.TrimSpace(ips[i])
  228. ipAddr := net.ParseIP(ipRaw)
  229. if ipAddr != nil && !ipPriv.IsPrivate(ipAddr) {
  230. data.Source = ipRaw
  231. break
  232. }
  233. }
  234. } else {
  235. ipAddr := net.ParseIP(strings.TrimSpace(data.XForwardedFor))
  236. if !ipPriv.IsPrivate(ipAddr) {
  237. data.Source = data.XForwardedFor
  238. }
  239. }
  240. }
  241. if data.Source == data.IpSrc && data.XRealIP != "" {
  242. data.Source = data.XRealIP
  243. }
  244. natsEC.Publish(config.NatsQueue, &data)
  245. }
  246. // replayFile takes a file containing a list of requests (SourceIP Url) and queues the requests
  247. // e.g.
  248. // 157.55.39.229 /gross-gerau/12012260-beate-anstatt
  249. // 103.232.100.98 /weinsheim-eifel/13729444-plus-warenhandelsges-mbh
  250. func replayFile() {
  251. var req data.Request
  252. var startTs time.Time
  253. var endTs time.Time
  254. for {
  255. fh, err := os.Open(config.RequestsFile)
  256. if err != nil {
  257. log.Fatalf("Failed to open request file '%s': %s", config.RequestsFile, err)
  258. }
  259. c := csv.NewReader(fh)
  260. c.Comma = ' '
  261. for {
  262. if config.SleepFor > time.Nanosecond {
  263. startTs = time.Now()
  264. }
  265. r, err := c.Read()
  266. if err == io.EOF {
  267. break
  268. }
  269. if err != nil {
  270. log.Println(err)
  271. continue
  272. }
  273. req.IpSrc = r[0]
  274. req.Source = r[0]
  275. req.Url = r[1]
  276. req.UserAgent = "Munch/1.0"
  277. req.Host = "www.scraperwall.com"
  278. req.CreatedAt = time.Now().UnixNano()
  279. natsEC.Publish(config.NatsQueue, &req)
  280. count++
  281. if config.SleepFor >= time.Nanosecond {
  282. endTs = time.Now()
  283. if endTs.Before(startTs.Add(config.SleepFor)) {
  284. time.Sleep(config.SleepFor - endTs.Sub(startTs))
  285. }
  286. }
  287. }
  288. }
  289. }
  290. // version outputs build information
  291. func version() {
  292. fmt.Printf("munchclient %s, built on %s\n", Version, BuildDate)
  293. }