main.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456
  1. package main
  2. import (
  3. "bufio"
  4. "encoding/csv"
  5. "flag"
  6. "fmt"
  7. "io"
  8. "log"
  9. "net"
  10. "net/http"
  11. "os"
  12. "strings"
  13. "time"
  14. "github.com/BurntSushi/toml"
  15. "github.com/google/gopacket"
  16. "github.com/google/gopacket/layers"
  17. "github.com/google/gopacket/pcap"
  18. "github.com/nats-io/nats"
  19. "github.com/nats-io/nats/encoders/protobuf"
  20. "git.scraperwall.com/scw/ajp13"
  21. "git.scraperwall.com/scw/data"
  22. "git.scraperwall.com/scw/ip"
  23. )
  24. var (
  25. doLiveCapture = flag.Bool("live", false, "Capture data in real time from a given interface")
  26. iface = flag.String("interface", "eth0", "Interface to get packets from")
  27. snapshotLen = flag.Int("snapshot-len", 8192, "Snapshot Length in Bytes")
  28. filter = flag.String("filter", "tcp", "PCAP filter expression")
  29. promiscuous = flag.Bool("promiscuous", false, "Switch interface into promiscuous mode?")
  30. natsURL = flag.String("nats-url", "nats://127.0.0.1:4222", "The URL of the NATS server")
  31. natsQueue = flag.String("nats-queue", "requests", "The NATS queue name")
  32. sleepFor = flag.Duration("sleep", 0, "Sleep this long between sending data (only when replaying a file)")
  33. requestsFile = flag.String("requests", "", "CSV file containing requests (IP and URL)")
  34. protocol = flag.String("protocol", "http", "which protocol to parse: http or ajp13")
  35. useXForwardedAsSource = flag.Bool("use-x-forwarded", false, "Use the IP address in X-Forwarded-For as source")
  36. configFile = flag.String("config", "", "The location of the TOML config file")
  37. beQuiet = flag.Bool("quiet", true, "Be quiet")
  38. doVersion = flag.Bool("version", false, "Show version information")
  39. natsEC *nats.EncodedConn
  40. count uint64
  41. timeout = -1 * time.Second
  42. ipPriv *ip.IP
  43. config Config
  44. // Version contains the program Version, e.g. 1.0.1
  45. Version string
  46. // BuildDate contains the date and time at which the program was compiled
  47. BuildDate string
  48. )
  49. // Config contains the program configuration
  50. type Config struct {
  51. Live bool
  52. Interface string
  53. SnapshotLen int
  54. Filter string
  55. Promiscuous bool
  56. NatsURL string
  57. NatsQueue string
  58. SleepFor duration
  59. RequestsFile string
  60. UseXForwardedAsSource bool
  61. Quiet bool
  62. Protocol string
  63. }
  64. type duration struct {
  65. time.Duration
  66. }
  67. func (d *duration) UnmarshalText(text []byte) error {
  68. var err error
  69. d.Duration, err = time.ParseDuration(string(text))
  70. return err
  71. }
  72. func (c Config) print() {
  73. fmt.Printf("Live: %t\n", c.Live)
  74. fmt.Printf("Interface: %s\n", c.Interface)
  75. fmt.Printf("SnapshotLen: %d\n", c.SnapshotLen)
  76. fmt.Printf("Filter: %s\n", c.Filter)
  77. fmt.Printf("Promiscuous: %t\n", c.Promiscuous)
  78. fmt.Printf("NatsURL: %s\n", c.NatsURL)
  79. fmt.Printf("NatsQueue: %s\n", c.NatsQueue)
  80. fmt.Printf("SleepFor: %s\n", c.SleepFor.String())
  81. fmt.Printf("RequestsFile: %s\n", c.RequestsFile)
  82. fmt.Printf("UseXForwardedAsSource: %t\n", c.UseXForwardedAsSource)
  83. fmt.Printf("Protocol: %s\n", c.Protocol)
  84. fmt.Printf("Quiet: %t\n", c.Quiet)
  85. }
  86. func init() {
  87. flag.Parse()
  88. nats.RegisterEncoder(protobuf.PROTOBUF_ENCODER, &protobuf.ProtobufEncoder{})
  89. }
  90. func main() {
  91. if *doVersion {
  92. version()
  93. os.Exit(0)
  94. }
  95. loadConfig()
  96. // Output how many requests per second were sent
  97. if !config.Quiet {
  98. go func(c *uint64) {
  99. for {
  100. fmt.Printf("%d requests per second\n", *c)
  101. *c = 0
  102. time.Sleep(time.Second)
  103. }
  104. }(&count)
  105. }
  106. // NATS
  107. //
  108. if config.NatsURL == "" {
  109. log.Fatal("No NATS URL specified (-nats-url)!")
  110. }
  111. natsConn, err := nats.Connect(config.NatsURL)
  112. if err != nil {
  113. log.Fatal(err)
  114. }
  115. natsEC, err = nats.NewEncodedConn(natsConn, protobuf.PROTOBUF_ENCODER)
  116. if err != nil {
  117. log.Fatalf("Encoded Connection: %v!\n", err)
  118. }
  119. // What should I do?
  120. if config.RequestsFile != "" {
  121. replayFile()
  122. } else if config.Live {
  123. fmt.Printf("live capture (%s, %s) to %s\n", config.Interface, config.Filter, config.NatsURL)
  124. liveCapture()
  125. }
  126. }
  127. func liveCapture() {
  128. ipPriv = ip.NewIP()
  129. // PCAP setup
  130. //
  131. handle, err := pcap.OpenLive(config.Interface, int32(config.SnapshotLen), config.Promiscuous, timeout)
  132. if err != nil {
  133. log.Fatal(err)
  134. }
  135. defer handle.Close()
  136. err = handle.SetBPFFilter(config.Filter)
  137. if err != nil {
  138. log.Fatal(err)
  139. }
  140. packetSource := gopacket.NewPacketSource(handle, handle.LinkType())
  141. for packet := range packetSource.Packets() {
  142. go processPacket(packet)
  143. }
  144. }
  145. // processPacket receives a raw packet from pcap, builds a Request item from it and sends it to the queue
  146. func processPacket(packet gopacket.Packet) {
  147. count++
  148. hasIPv4 := false
  149. var ipSrc, ipDst string
  150. // IPv4
  151. if ipLayer := packet.Layer(layers.LayerTypeIPv4); ipLayer != nil {
  152. ip := ipLayer.(*layers.IPv4)
  153. ipSrc = ip.SrcIP.String()
  154. ipDst = ip.DstIP.String()
  155. hasIPv4 = true
  156. }
  157. // IPv6
  158. if !hasIPv4 {
  159. if ipLayer := packet.Layer(layers.LayerTypeIPv6); ipLayer != nil {
  160. ip := ipLayer.(*layers.IPv6)
  161. ipSrc = ip.SrcIP.String()
  162. ipDst = ip.DstIP.String()
  163. }
  164. }
  165. // TCP
  166. tcpLayer := packet.Layer(layers.LayerTypeTCP)
  167. if tcpLayer == nil {
  168. return
  169. }
  170. tcp, _ := tcpLayer.(*layers.TCP)
  171. portSrc := tcp.SrcPort
  172. portDst := tcp.DstPort
  173. sequence := tcp.Seq
  174. applicationLayer := packet.ApplicationLayer()
  175. if applicationLayer == nil {
  176. return
  177. }
  178. request := data.Request{
  179. IpSrc: ipSrc,
  180. IpDst: ipDst,
  181. PortSrc: uint32(portSrc),
  182. PortDst: uint32(portDst),
  183. TcpSeq: uint32(sequence),
  184. CreatedAt: packet.Metadata().CaptureInfo.Timestamp.UnixNano(),
  185. }
  186. switch *protocol {
  187. case "http":
  188. processHTTP(&request, applicationLayer.Payload())
  189. case "ajp13":
  190. processAJP13(&request, applicationLayer.Payload())
  191. }
  192. if config.UseXForwardedAsSource && request.XForwardedFor != "" {
  193. if strings.Contains(request.XForwardedFor, ",") {
  194. ips := strings.Split(request.XForwardedFor, ",")
  195. for i := len(ips) - 1; i >= 0; i-- {
  196. ipRaw := strings.TrimSpace(ips[i])
  197. ipAddr := net.ParseIP(ipRaw)
  198. if ipAddr != nil && !ipPriv.IsPrivate(ipAddr) {
  199. request.Source = ipRaw
  200. break
  201. }
  202. }
  203. } else {
  204. ipAddr := net.ParseIP(strings.TrimSpace(request.XForwardedFor))
  205. if !ipPriv.IsPrivate(ipAddr) {
  206. request.Source = request.XForwardedFor
  207. }
  208. }
  209. }
  210. if request.Source == request.IpSrc && request.XRealIP != "" {
  211. request.Source = request.XRealIP
  212. }
  213. natsEC.Publish(config.NatsQueue, &request)
  214. }
  215. func processAJP13(request *data.Request, appData []byte) error {
  216. a, err := ajp13.Parse(appData)
  217. if err != nil {
  218. return fmt.Errorf("Failed to parse AJP13 request: %s", err)
  219. }
  220. request.Url = a.URI
  221. request.Method = a.Method()
  222. request.Host = a.Server
  223. request.Protocol = a.Version
  224. request.Origin = a.RemoteAddr.String()
  225. if v, ok := a.Header("Referer"); ok {
  226. request.Referer = v
  227. }
  228. if v, ok := a.Header("Connection"); ok {
  229. request.Connection = v
  230. }
  231. if v, ok := a.Header("X-Forwarded-For"); ok {
  232. request.XForwardedFor = v
  233. }
  234. if v, ok := a.Header("X-Real-IP"); ok {
  235. request.XRealIP = v
  236. }
  237. if v, ok := a.Header("X-Requested-With"); ok {
  238. request.XRequestedWith = v
  239. }
  240. if v, ok := a.Header("Accept-Encoding"); ok {
  241. request.AcceptEncoding = v
  242. }
  243. if v, ok := a.Header("Accept-Language"); ok {
  244. request.AcceptLanguage = v
  245. }
  246. if v, ok := a.Header("User-Agent"); ok {
  247. request.UserAgent = v
  248. }
  249. if v, ok := a.Header("Accept"); ok {
  250. request.Accept = v
  251. }
  252. if v, ok := a.Header("Cookie"); ok {
  253. request.Cookie = v
  254. }
  255. if v, ok := a.Header("X-Forwarded-Host"); ok {
  256. if v != request.Host {
  257. request.Host = v
  258. }
  259. }
  260. return nil
  261. }
  262. func processHTTP(request *data.Request, appData []byte) error {
  263. reader := bufio.NewReader(strings.NewReader(string(appData)))
  264. req, err := http.ReadRequest(reader)
  265. if err != nil {
  266. return fmt.Errorf("Failed to parse HTTP header: %s", err)
  267. }
  268. request.Url = req.URL.String()
  269. request.Method = req.Method
  270. request.Referer = req.Referer()
  271. request.Host = req.Host
  272. request.Protocol = req.Proto
  273. request.Origin = request.Host
  274. if _, ok := req.Header["Connection"]; ok {
  275. request.Connection = req.Header["Connection"][0]
  276. }
  277. if _, ok := req.Header["X-Forwarded-For"]; ok {
  278. request.XForwardedFor = req.Header["X-Forwarded-For"][0]
  279. }
  280. if _, ok := req.Header["X-Real-IP"]; ok {
  281. request.XRealIP = req.Header["X-Real-IP"][0]
  282. }
  283. if _, ok := req.Header["X-Requested-With"]; ok {
  284. request.XRequestedWith = req.Header["X-Requested-With"][0]
  285. }
  286. if _, ok := req.Header["Accept-Encoding"]; ok {
  287. request.AcceptEncoding = req.Header["Accept-Encoding"][0]
  288. }
  289. if _, ok := req.Header["Accept-Language"]; ok {
  290. request.AcceptLanguage = req.Header["Accept-Language"][0]
  291. }
  292. if _, ok := req.Header["User-Agent"]; ok {
  293. request.UserAgent = req.Header["User-Agent"][0]
  294. }
  295. if _, ok := req.Header["Accept"]; ok {
  296. request.Accept = req.Header["Accept"][0]
  297. }
  298. if _, ok := req.Header["Cookie"]; ok {
  299. request.Cookie = req.Header["Cookie"][0]
  300. }
  301. request.Source = request.IpSrc
  302. return nil
  303. }
  304. // replayFile takes a file containing a list of requests (SourceIP Url) and queues the requests
  305. // e.g.
  306. // 157.55.39.229 /gross-gerau/12012260-beate-anstatt
  307. // 103.232.100.98 /weinsheim-eifel/13729444-plus-warenhandelsges-mbh
  308. func replayFile() {
  309. var req data.Request
  310. var startTs time.Time
  311. var endTs time.Time
  312. for {
  313. fh, err := os.Open(config.RequestsFile)
  314. if err != nil {
  315. log.Fatalf("Failed to open request file '%s': %s", config.RequestsFile, err)
  316. }
  317. c := csv.NewReader(fh)
  318. c.Comma = ' '
  319. for {
  320. if config.SleepFor.Duration > time.Nanosecond {
  321. startTs = time.Now()
  322. }
  323. r, err := c.Read()
  324. if err == io.EOF {
  325. break
  326. }
  327. if err != nil {
  328. log.Println(err)
  329. continue
  330. }
  331. req.IpSrc = r[0]
  332. req.Source = r[0]
  333. req.Url = r[1]
  334. req.UserAgent = "Munch/1.0"
  335. req.Host = "demo.scraperwall.com"
  336. req.CreatedAt = time.Now().UnixNano()
  337. natsEC.Publish(config.NatsQueue, &req)
  338. count++
  339. if config.SleepFor.Duration >= time.Nanosecond {
  340. endTs = time.Now()
  341. if endTs.Before(startTs.Add(config.SleepFor.Duration)) {
  342. time.Sleep(config.SleepFor.Duration - endTs.Sub(startTs))
  343. }
  344. }
  345. }
  346. }
  347. }
  348. func loadConfig() {
  349. // initialize with values from the command line / environment
  350. config.Live = *doLiveCapture
  351. config.Interface = *iface
  352. config.SnapshotLen = *snapshotLen
  353. config.Filter = *filter
  354. config.Promiscuous = *promiscuous
  355. config.NatsURL = *natsURL
  356. config.NatsQueue = *natsQueue
  357. config.SleepFor.Duration = *sleepFor
  358. config.RequestsFile = *requestsFile
  359. config.UseXForwardedAsSource = *useXForwardedAsSource
  360. config.Protocol = *protocol
  361. config.Quiet = *beQuiet
  362. if *configFile == "" {
  363. return
  364. }
  365. _, err := os.Stat(*configFile)
  366. if err != nil {
  367. log.Printf("%s: %s\n", *configFile, err)
  368. return
  369. }
  370. if _, err = toml.DecodeFile(*configFile, &config); err != nil {
  371. log.Printf("%s: %s\n", *configFile, err)
  372. }
  373. if !config.Quiet {
  374. config.print()
  375. }
  376. }
  377. // version outputs build information
  378. func version() {
  379. fmt.Printf("munchclient %s, built on %s\n", Version, BuildDate)
  380. }