main.go 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372
  1. package main
  2. import (
  3. "bufio"
  4. "encoding/csv"
  5. "flag"
  6. "fmt"
  7. "io"
  8. "log"
  9. "net"
  10. "net/http"
  11. "os"
  12. "os/signal"
  13. "strings"
  14. "syscall"
  15. "time"
  16. "github.com/BurntSushi/toml"
  17. "github.com/google/gopacket"
  18. "github.com/google/gopacket/layers"
  19. "github.com/google/gopacket/pcap"
  20. "github.com/nats-io/nats"
  21. "github.com/nats-io/nats/encoders/protobuf"
  22. "git.scraperwall.com/scw/data"
  23. "git.scraperwall.com/scw/ip"
  24. "git.scraperwall.com/scw/pidfile"
  25. )
  26. var (
  27. doLiveCapture = flag.Bool("live", false, "Capture data in real time from a given interface")
  28. iface = flag.String("interface", "eth0", "Interface to get packets from")
  29. snapshotLen = flag.Int("snapshot-len", 8192, "Snapshot Length in Bytes")
  30. filter = flag.String("filter", "tcp", "PCAP filter expression")
  31. promiscuous = flag.Bool("promiscuous", false, "Switch interface into promiscuous mode?")
  32. natsURL = flag.String("nats-url", "nats://127.0.0.1:4222", "The URL of the NATS server")
  33. natsQueue = flag.String("nats-queue", "requests", "The NATS queue name")
  34. sleepFor = flag.Duration("sleep", 0, "Sleep this long between sending data (only when replaying a file)")
  35. requestsFile = flag.String("requests", "", "CSV file containing requests (IP and URL)")
  36. useXForwardedAsSource = flag.Bool("use-x-forwarded", false, "Use the IP address in X-Forwarded-For as source")
  37. pidFile = flag.String("pidfile", "/var/run/munchclient.pid", "The location of the PID file")
  38. configFile = flag.String("config", "", "The location of the TOML config file")
  39. beQuiet = flag.Bool("quiet", true, "Be quiet")
  40. doVersion = flag.Bool("version", false, "Show version information")
  41. natsEC *nats.EncodedConn
  42. count uint64
  43. timeout = -1 * time.Second
  44. ipPriv *ip.IP
  45. config Config
  46. // Version contains the program Version, e.g. 1.0.1
  47. Version string
  48. // BuildDate contains the date and time at which the program was compiled
  49. BuildDate string
  50. )
  51. // Config contains the program configuration
  52. type Config struct {
  53. Live bool
  54. Interface string
  55. SnapshotLen int
  56. Filter string
  57. Promiscuous bool
  58. NatsURL string
  59. NatsQueue string
  60. SleepFor time.Duration
  61. RequestsFile string
  62. UseXForwardedAsSource bool
  63. PidFile string
  64. Quiet bool
  65. }
  66. func init() {
  67. flag.Parse()
  68. nats.RegisterEncoder(protobuf.PROTOBUF_ENCODER, &protobuf.ProtobufEncoder{})
  69. }
  70. func loadConfig() {
  71. // initialize with values from the command line / environment
  72. config.Live = *doLiveCapture
  73. config.Interface = *iface
  74. config.SnapshotLen = *snapshotLen
  75. config.Filter = *filter
  76. config.Promiscuous = *promiscuous
  77. config.NatsURL = *natsURL
  78. config.NatsQueue = *natsQueue
  79. config.SleepFor = *sleepFor
  80. config.RequestsFile = *requestsFile
  81. config.UseXForwardedAsSource = *useXForwardedAsSource
  82. config.PidFile = *pidFile
  83. config.Quiet = *beQuiet
  84. if *configFile == "" {
  85. return
  86. }
  87. _, err := os.Stat(*configFile)
  88. if err != nil {
  89. log.Printf("%s: %s\n", *configFile, err)
  90. return
  91. }
  92. if _, err = toml.DecodeFile(*configFile, &config); err != nil {
  93. log.Printf("%s: %s\n", *configFile, err)
  94. }
  95. }
  96. func main() {
  97. if *doVersion {
  98. version()
  99. os.Exit(0)
  100. }
  101. loadConfig()
  102. // Clean up before exiting
  103. //
  104. sigs := make(chan os.Signal, 1)
  105. signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
  106. go func() {
  107. <-sigs
  108. pidfile.Remove(config.PidFile)
  109. os.Exit(0)
  110. }()
  111. // Write the PID file
  112. //
  113. err := pidfile.Write(config.PidFile)
  114. // if the PID file is stale remove it and create a new one with the current PID
  115. if err == pidfile.ErrFileStale {
  116. err2 := os.Remove(config.PidFile)
  117. if err2 != nil {
  118. log.Fatalf("%s: %s\n", config.PidFile, err)
  119. }
  120. } else if err != nil { // permission denied or program is still running
  121. log.Fatalf("%s: %s\n", config.PidFile, err)
  122. }
  123. // Output how many requests per second were sent
  124. if !config.Quiet {
  125. go func(c *uint64) {
  126. for {
  127. fmt.Printf("%d requests per second\n", *c)
  128. *c = 0
  129. time.Sleep(time.Second)
  130. }
  131. }(&count)
  132. }
  133. // NATS
  134. //
  135. if config.NatsURL == "" {
  136. log.Fatal("No NATS URL specified (-nats-url)!")
  137. }
  138. natsConn, err := nats.Connect(config.NatsURL)
  139. if err != nil {
  140. log.Fatal(err)
  141. }
  142. natsEC, err = nats.NewEncodedConn(natsConn, protobuf.PROTOBUF_ENCODER)
  143. if err != nil {
  144. log.Fatalf("Encoded Connection: %v!\n", err)
  145. }
  146. // What should I do?
  147. if config.RequestsFile != "" {
  148. replayFile()
  149. } else if config.Live {
  150. fmt.Printf("live capture (%s, %s) to %s\n", config.Interface, config.Filter, config.NatsURL)
  151. liveCapture()
  152. }
  153. }
  154. func liveCapture() {
  155. ipPriv = ip.NewIP()
  156. // PCAP setup
  157. //
  158. handle, err := pcap.OpenLive(config.Interface, int32(config.SnapshotLen), config.Promiscuous, timeout)
  159. if err != nil {
  160. log.Fatal(err)
  161. }
  162. defer handle.Close()
  163. err = handle.SetBPFFilter(config.Filter)
  164. if err != nil {
  165. log.Fatal(err)
  166. }
  167. packetSource := gopacket.NewPacketSource(handle, handle.LinkType())
  168. for packet := range packetSource.Packets() {
  169. go processPacket(packet)
  170. }
  171. }
  172. // processPacket receives a raw packet from pcap, builds a Request item from it and sends it to the queue
  173. func processPacket(packet gopacket.Packet) {
  174. count++
  175. ipLayer := packet.Layer(layers.LayerTypeIPv4)
  176. if ipLayer == nil {
  177. log.Println("No IPv4 Layer!")
  178. return
  179. }
  180. ip, _ := ipLayer.(*layers.IPv4)
  181. if ip.Protocol != layers.IPProtocolTCP {
  182. log.Println("No TCP Protocol!")
  183. return
  184. }
  185. ipSrc := ip.SrcIP.String()
  186. ipDst := ip.DstIP.String()
  187. tcpLayer := packet.Layer(layers.LayerTypeTCP)
  188. if tcpLayer == nil {
  189. return
  190. }
  191. tcp, _ := tcpLayer.(*layers.TCP)
  192. portSrc := tcp.SrcPort
  193. portDst := tcp.DstPort
  194. sequence := tcp.Seq
  195. applicationLayer := packet.ApplicationLayer()
  196. if applicationLayer == nil {
  197. return
  198. }
  199. data := data.Request{}
  200. reader := bufio.NewReader(strings.NewReader(string(applicationLayer.Payload())))
  201. req, err := http.ReadRequest(reader)
  202. if err != nil {
  203. return
  204. }
  205. data.IpSrc = ipSrc
  206. data.IpDst = ipDst
  207. data.PortSrc = uint32(portSrc)
  208. data.PortDst = uint32(portDst)
  209. data.TcpSeq = uint32(sequence)
  210. data.CreatedAt = time.Now().UnixNano()
  211. data.Url = req.URL.String()
  212. data.Method = req.Method
  213. data.Referer = req.Referer()
  214. data.Host = req.Host
  215. data.Protocol = req.Proto
  216. data.Origin = data.Host
  217. if _, ok := req.Header["Connection"]; ok {
  218. data.Connection = req.Header["Connection"][0]
  219. }
  220. if _, ok := req.Header["X-Forwarded-For"]; ok {
  221. data.XForwardedFor = req.Header["X-Forwarded-For"][0]
  222. }
  223. if _, ok := req.Header["X-Real-IP"]; ok {
  224. data.XRealIP = req.Header["X-Real-IP"][0]
  225. }
  226. if _, ok := req.Header["X-Requested-With"]; ok {
  227. data.XRequestedWith = req.Header["X-Requested-With"][0]
  228. }
  229. if _, ok := req.Header["Accept-Encoding"]; ok {
  230. data.AcceptEncoding = req.Header["Accept-Encoding"][0]
  231. }
  232. if _, ok := req.Header["Accept-Language"]; ok {
  233. data.AcceptLanguage = req.Header["Accept-Language"][0]
  234. }
  235. if _, ok := req.Header["User-Agent"]; ok {
  236. data.UserAgent = req.Header["User-Agent"][0]
  237. }
  238. if _, ok := req.Header["Accept"]; ok {
  239. data.Accept = req.Header["Accept"][0]
  240. }
  241. if _, ok := req.Header["Cookie"]; ok {
  242. data.Cookie = req.Header["Cookie"][0]
  243. }
  244. data.Source = data.IpSrc
  245. if config.UseXForwardedAsSource && data.XForwardedFor != "" {
  246. if strings.Contains(data.XForwardedFor, ",") {
  247. ips := strings.Split(data.XForwardedFor, ",")
  248. for i := len(ips) - 1; i >= 0; i-- {
  249. ipRaw := strings.TrimSpace(ips[i])
  250. ipAddr := net.ParseIP(ipRaw)
  251. if ipAddr != nil && !ipPriv.IsPrivate(ipAddr) {
  252. data.Source = ipRaw
  253. break
  254. }
  255. }
  256. } else {
  257. ipAddr := net.ParseIP(strings.TrimSpace(data.XForwardedFor))
  258. if !ipPriv.IsPrivate(ipAddr) {
  259. data.Source = data.XForwardedFor
  260. }
  261. }
  262. }
  263. if data.Source == data.IpSrc && data.XRealIP != "" {
  264. data.Source = data.XRealIP
  265. }
  266. natsEC.Publish(config.NatsQueue, &data)
  267. }
  268. // replayFile takes a file containing a list of requests (SourceIP Url) and queues the requests
  269. // e.g.
  270. // 157.55.39.229 /gross-gerau/12012260-beate-anstatt
  271. // 103.232.100.98 /weinsheim-eifel/13729444-plus-warenhandelsges-mbh
  272. func replayFile() {
  273. var req data.Request
  274. var startTs time.Time
  275. var endTs time.Time
  276. for {
  277. fh, err := os.Open(config.RequestsFile)
  278. if err != nil {
  279. log.Fatalf("Failed to open request file '%s': %s", config.RequestsFile, err)
  280. }
  281. c := csv.NewReader(fh)
  282. c.Comma = ' '
  283. for {
  284. if config.SleepFor > time.Nanosecond {
  285. startTs = time.Now()
  286. }
  287. r, err := c.Read()
  288. if err == io.EOF {
  289. break
  290. }
  291. if err != nil {
  292. log.Println(err)
  293. continue
  294. }
  295. req.IpSrc = r[0]
  296. req.Source = r[0]
  297. req.Url = r[1]
  298. req.UserAgent = "Munch/1.0"
  299. req.Host = "www.scraperwall.com"
  300. req.CreatedAt = time.Now().UnixNano()
  301. natsEC.Publish(config.NatsQueue, &req)
  302. count++
  303. if config.SleepFor >= time.Nanosecond {
  304. endTs = time.Now()
  305. if endTs.Before(startTs.Add(config.SleepFor)) {
  306. time.Sleep(config.SleepFor - endTs.Sub(startTs))
  307. }
  308. }
  309. }
  310. }
  311. }
  312. // version outputs build information
  313. func version() {
  314. fmt.Printf("munchclient %s, built on %s\n", Version, BuildDate)
  315. }