main.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574
  1. package main
  2. import (
  3. "bufio"
  4. "encoding/csv"
  5. "flag"
  6. "fmt"
  7. "io"
  8. "log"
  9. "net"
  10. "net/http"
  11. "os"
  12. "strings"
  13. "time"
  14. "github.com/BurntSushi/toml"
  15. "github.com/Songmu/axslogparser"
  16. "github.com/google/gopacket"
  17. "github.com/google/gopacket/layers"
  18. "github.com/google/gopacket/pcap"
  19. "github.com/hpcloud/tail"
  20. "github.com/kr/pretty"
  21. "github.com/nats-io/nats"
  22. "github.com/nats-io/nats/encoders/protobuf"
  23. "git.scraperwall.com/scw/ajp13"
  24. "git.scraperwall.com/scw/data"
  25. "git.scraperwall.com/scw/ip"
  26. )
  27. var (
  28. doLiveCapture = flag.Bool("live", false, "Capture data in real time from a given interface")
  29. iface = flag.String("interface", "eth0", "Interface to get packets from")
  30. snapshotLen = flag.Int("snapshot-len", 8192, "Snapshot Length in Bytes")
  31. filter = flag.String("filter", "tcp", "PCAP filter expression")
  32. promiscuous = flag.Bool("promiscuous", false, "Switch interface into promiscuous mode?")
  33. natsURL = flag.String("nats-url", "nats://127.0.0.1:4222", "The URL of the NATS server")
  34. natsQueue = flag.String("nats-queue", "requests", "The NATS queue name")
  35. sleepFor = flag.Duration("sleep", 0, "Sleep this long between sending data (only when replaying a file)")
  36. requestsFile = flag.String("requests", "", "CSV file containing requests (IP and URL)")
  37. protocol = flag.String("protocol", "http", "which protocol to parse: http or ajp13")
  38. useXForwardedAsSource = flag.Bool("use-x-forwarded", false, "Use the IP address in X-Forwarded-For as source")
  39. trace = flag.Bool("trace", false, "Trace the packet capturing")
  40. apacheLog = flag.String("apache-log", "", "Parse an Apache Log file")
  41. configFile = flag.String("config", "", "The location of the TOML config file")
  42. beQuiet = flag.Bool("quiet", true, "Be quiet")
  43. doVersion = flag.Bool("version", false, "Show version information")
  44. natsEC *nats.EncodedConn
  45. count uint64
  46. timeout = -1 * time.Second
  47. ipPriv *ip.IP
  48. config Config
  49. // Version contains the program Version, e.g. 1.0.1
  50. Version string
  51. // BuildDate contains the date and time at which the program was compiled
  52. BuildDate string
  53. )
  54. // Config contains the program configuration
  55. type Config struct {
  56. Live bool
  57. Interface string
  58. SnapshotLen int
  59. Filter string
  60. Promiscuous bool
  61. NatsURL string
  62. NatsQueue string
  63. SleepFor duration
  64. RequestsFile string
  65. UseXForwardedAsSource bool
  66. Quiet bool
  67. Protocol string
  68. Trace bool
  69. ApacheLog string
  70. }
  71. type duration struct {
  72. time.Duration
  73. }
  74. func (d *duration) UnmarshalText(text []byte) error {
  75. var err error
  76. d.Duration, err = time.ParseDuration(string(text))
  77. return err
  78. }
  79. func (c Config) print() {
  80. fmt.Printf("Live: %t\n", c.Live)
  81. fmt.Printf("Interface: %s\n", c.Interface)
  82. fmt.Printf("SnapshotLen: %d\n", c.SnapshotLen)
  83. fmt.Printf("Filter: %s\n", c.Filter)
  84. fmt.Printf("Promiscuous: %t\n", c.Promiscuous)
  85. fmt.Printf("NatsURL: %s\n", c.NatsURL)
  86. fmt.Printf("NatsQueue: %s\n", c.NatsQueue)
  87. fmt.Printf("SleepFor: %s\n", c.SleepFor.String())
  88. fmt.Printf("RequestsFile: %s\n", c.RequestsFile)
  89. fmt.Printf("Apache Log: %s\n", c.ApacheLog)
  90. fmt.Printf("UseXForwardedAsSource: %t\n", c.UseXForwardedAsSource)
  91. fmt.Printf("Protocol: %s\n", c.Protocol)
  92. fmt.Printf("Quiet: %t\n", c.Quiet)
  93. fmt.Printf("Trace: %t\n", c.Trace)
  94. }
  95. func init() {
  96. flag.Parse()
  97. nats.RegisterEncoder(protobuf.PROTOBUF_ENCODER, &protobuf.ProtobufEncoder{})
  98. }
  99. func main() {
  100. if *doVersion {
  101. version()
  102. os.Exit(0)
  103. }
  104. loadConfig()
  105. // Output how many requests per second were sent
  106. if !config.Quiet {
  107. go func(c *uint64) {
  108. for {
  109. fmt.Printf("%d requests per second\n", *c)
  110. *c = 0
  111. time.Sleep(time.Second)
  112. }
  113. }(&count)
  114. }
  115. // NATS
  116. //
  117. if config.NatsURL == "" {
  118. log.Fatal("No NATS URL specified (-nats-url)!")
  119. }
  120. natsConn, err := nats.Connect(config.NatsURL)
  121. if err != nil {
  122. log.Fatal(err)
  123. }
  124. natsEC, err = nats.NewEncodedConn(natsConn, protobuf.PROTOBUF_ENCODER)
  125. if err != nil {
  126. log.Fatalf("Encoded Connection: %v!\n", err)
  127. }
  128. // What should I do?
  129. if config.RequestsFile != "" {
  130. replayFile()
  131. } else if config.ApacheLog != "" {
  132. apacheLogCapture(config.ApacheLog)
  133. } else if config.Live {
  134. fmt.Printf("live capture (%s, %s) to %s\n", config.Interface, config.Filter, config.NatsURL)
  135. liveCapture()
  136. }
  137. }
  138. func apacheLogCapture(logfile string) {
  139. if _, err := os.Stat(logfile); err != nil {
  140. log.Fatalf("%s: %s", logfile, err)
  141. }
  142. t, err := tail.TailFile(logfile, tail.Config{
  143. Follow: true, // follow the file
  144. ReOpen: true, // reopen log file when it gets closed/rotated
  145. Logger: tail.DiscardingLogger, // don't log anything
  146. Location: &tail.SeekInfo{Offset: 0, Whence: 2}, // start at the end of the file
  147. })
  148. if err != nil {
  149. log.Fatalf("%s: %s", logfile, err)
  150. }
  151. var p axslogparser.Parser
  152. parserSet := false
  153. for line := range t.Lines {
  154. l := line.Text
  155. if !parserSet {
  156. p, _, err = axslogparser.GuessParser(l)
  157. if err != nil {
  158. log.Println(err)
  159. continue
  160. }
  161. parserSet = true
  162. }
  163. logEntry, err := p.Parse(l)
  164. if err != nil {
  165. log.Println(err)
  166. continue
  167. }
  168. remote := logEntry.Host
  169. if *useXForwardedAsSource && logEntry.ForwardedFor != "" {
  170. remote = logEntry.ForwardedFor
  171. }
  172. // only use the first host in case there are multiple hosts in the log
  173. if cidx := strings.Index(remote, ","); cidx >= 0 {
  174. remote = remote[0:cidx]
  175. }
  176. // extract the virtual host
  177. var virtualHost string
  178. vhost := logEntry.VirtualHost
  179. if vhost != "" {
  180. vhostAndPort := strings.Split(logEntry.VirtualHost, ":")
  181. virtualHost = vhostAndPort[0]
  182. }
  183. request := data.Request{
  184. IpSrc: remote,
  185. IpDst: "127.0.0.1",
  186. PortSrc: 0,
  187. PortDst: 0,
  188. TcpSeq: 0,
  189. CreatedAt: logEntry.Time.UnixNano(),
  190. Url: logEntry.RequestURI,
  191. Method: logEntry.Method,
  192. Host: virtualHost,
  193. Protocol: logEntry.Protocol,
  194. Origin: remote,
  195. Source: remote,
  196. Referer: logEntry.Referer,
  197. XForwardedFor: logEntry.ForwardedFor,
  198. UserAgent: logEntry.UserAgent,
  199. }
  200. if config.Trace {
  201. log.Printf("[%s] %s\n", request.Source, request.Url)
  202. }
  203. natsEC.Publish(config.NatsQueue, &request)
  204. }
  205. }
  206. func liveCapture() {
  207. ipPriv = ip.NewIP()
  208. // PCAP setup
  209. //
  210. handle, err := pcap.OpenLive(config.Interface, int32(config.SnapshotLen), config.Promiscuous, timeout)
  211. if err != nil {
  212. log.Fatal(err)
  213. }
  214. defer handle.Close()
  215. err = handle.SetBPFFilter(config.Filter)
  216. if err != nil {
  217. log.Fatal(err)
  218. }
  219. packetSource := gopacket.NewPacketSource(handle, handle.LinkType())
  220. for packet := range packetSource.Packets() {
  221. go processPacket(packet)
  222. }
  223. }
  224. // processPacket receives a raw packet from pcap, builds a Request item from it and sends it to the queue
  225. func processPacket(packet gopacket.Packet) {
  226. hasIPv4 := false
  227. var ipSrc, ipDst string
  228. // IPv4
  229. if ipLayer := packet.Layer(layers.LayerTypeIPv4); ipLayer != nil {
  230. ip := ipLayer.(*layers.IPv4)
  231. ipSrc = ip.SrcIP.String()
  232. ipDst = ip.DstIP.String()
  233. hasIPv4 = true
  234. }
  235. // IPv6
  236. if !hasIPv4 {
  237. if ipLayer := packet.Layer(layers.LayerTypeIPv6); ipLayer != nil {
  238. ip := ipLayer.(*layers.IPv6)
  239. ipSrc = ip.SrcIP.String()
  240. ipDst = ip.DstIP.String()
  241. }
  242. }
  243. // TCP
  244. tcpLayer := packet.Layer(layers.LayerTypeTCP)
  245. if tcpLayer == nil {
  246. return
  247. }
  248. tcp, _ := tcpLayer.(*layers.TCP)
  249. portSrc := tcp.SrcPort
  250. portDst := tcp.DstPort
  251. sequence := tcp.Seq
  252. applicationLayer := packet.ApplicationLayer()
  253. if applicationLayer == nil {
  254. return
  255. }
  256. count++
  257. if len(applicationLayer.Payload()) < 50 {
  258. log.Println("application layer too small!")
  259. return
  260. }
  261. request := data.Request{
  262. IpSrc: ipSrc,
  263. IpDst: ipDst,
  264. PortSrc: uint32(portSrc),
  265. PortDst: uint32(portDst),
  266. TcpSeq: uint32(sequence),
  267. CreatedAt: packet.Metadata().CaptureInfo.Timestamp.UnixNano(),
  268. }
  269. switch config.Protocol {
  270. case "http":
  271. err := processHTTP(&request, applicationLayer.Payload())
  272. if err != nil {
  273. log.Println(err)
  274. return
  275. }
  276. case "ajp13":
  277. err := processAJP13(&request, applicationLayer.Payload())
  278. if err != nil {
  279. log.Println(err)
  280. return
  281. }
  282. }
  283. if config.UseXForwardedAsSource && request.XForwardedFor != "" {
  284. if strings.Contains(request.XForwardedFor, ",") {
  285. ips := strings.Split(request.XForwardedFor, ",")
  286. for i := len(ips) - 1; i >= 0; i-- {
  287. ipRaw := strings.TrimSpace(ips[i])
  288. ipAddr := net.ParseIP(ipRaw)
  289. if ipAddr != nil && !ipPriv.IsPrivate(ipAddr) {
  290. request.Source = ipRaw
  291. break
  292. }
  293. }
  294. } else {
  295. ipAddr := net.ParseIP(strings.TrimSpace(request.XForwardedFor))
  296. if !ipPriv.IsPrivate(ipAddr) {
  297. request.Source = request.XForwardedFor
  298. }
  299. }
  300. }
  301. if request.Source == request.IpSrc && request.XRealIP != "" {
  302. request.Source = request.XRealIP
  303. }
  304. if config.Trace {
  305. log.Printf("[%s] %s\n", request.Source, request.Url)
  306. }
  307. natsEC.Publish(config.NatsQueue, &request)
  308. }
  309. func processAJP13(request *data.Request, appData []byte) error {
  310. a, err := ajp13.Parse(appData)
  311. if err != nil {
  312. return fmt.Errorf("Failed to parse AJP13 request: %s", err)
  313. }
  314. request.Url = a.URI
  315. request.Method = a.Method()
  316. request.Host = a.Server
  317. request.Protocol = a.Version
  318. request.Origin = a.RemoteAddr.String()
  319. request.Source = a.RemoteAddr.String()
  320. if v, ok := a.Header("Referer"); ok {
  321. request.Referer = v
  322. }
  323. if v, ok := a.Header("Connection"); ok {
  324. request.Connection = v
  325. }
  326. if v, ok := a.Header("X-Forwarded-For"); ok {
  327. request.XForwardedFor = v
  328. }
  329. if v, ok := a.Header("X-Real-IP"); ok {
  330. request.XRealIP = v
  331. }
  332. if v, ok := a.Header("X-Requested-With"); ok {
  333. request.XRequestedWith = v
  334. }
  335. if v, ok := a.Header("Accept-Encoding"); ok {
  336. request.AcceptEncoding = v
  337. }
  338. if v, ok := a.Header("Accept-Language"); ok {
  339. request.AcceptLanguage = v
  340. }
  341. if v, ok := a.Header("User-Agent"); ok {
  342. request.UserAgent = v
  343. }
  344. if v, ok := a.Header("Accept"); ok {
  345. request.Accept = v
  346. }
  347. if v, ok := a.Header("Cookie"); ok {
  348. request.Cookie = v
  349. }
  350. if v, ok := a.Header("X-Forwarded-Host"); ok {
  351. if v != request.Host {
  352. request.Host = v
  353. }
  354. }
  355. if false && config.Trace {
  356. log.Println("Request")
  357. pretty.Println(request)
  358. }
  359. return nil
  360. }
  361. func processHTTP(request *data.Request, appData []byte) error {
  362. reader := bufio.NewReader(strings.NewReader(string(appData)))
  363. req, err := http.ReadRequest(reader)
  364. if err != nil {
  365. return fmt.Errorf("Failed to parse HTTP header: %s", err)
  366. }
  367. request.Url = req.URL.String()
  368. request.Method = req.Method
  369. request.Referer = req.Referer()
  370. request.Host = req.Host
  371. request.Protocol = req.Proto
  372. request.Origin = request.Host
  373. if _, ok := req.Header["Connection"]; ok {
  374. request.Connection = req.Header["Connection"][0]
  375. }
  376. if _, ok := req.Header["X-Forwarded-For"]; ok {
  377. request.XForwardedFor = req.Header["X-Forwarded-For"][0]
  378. }
  379. if _, ok := req.Header["X-Real-IP"]; ok {
  380. request.XRealIP = req.Header["X-Real-IP"][0]
  381. }
  382. if _, ok := req.Header["X-Requested-With"]; ok {
  383. request.XRequestedWith = req.Header["X-Requested-With"][0]
  384. }
  385. if _, ok := req.Header["Accept-Encoding"]; ok {
  386. request.AcceptEncoding = req.Header["Accept-Encoding"][0]
  387. }
  388. if _, ok := req.Header["Accept-Language"]; ok {
  389. request.AcceptLanguage = req.Header["Accept-Language"][0]
  390. }
  391. if _, ok := req.Header["User-Agent"]; ok {
  392. request.UserAgent = req.Header["User-Agent"][0]
  393. }
  394. if _, ok := req.Header["Accept"]; ok {
  395. request.Accept = req.Header["Accept"][0]
  396. }
  397. if _, ok := req.Header["Cookie"]; ok {
  398. request.Cookie = req.Header["Cookie"][0]
  399. }
  400. request.Source = request.IpSrc
  401. return nil
  402. }
  403. // replayFile takes a file containing a list of requests (SourceIP Url) and queues the requests
  404. // e.g.
  405. // 157.55.39.229 /gross-gerau/12012260-beate-anstatt
  406. // 103.232.100.98 /weinsheim-eifel/13729444-plus-warenhandelsges-mbh
  407. func replayFile() {
  408. var req data.Request
  409. var startTs time.Time
  410. var endTs time.Time
  411. for {
  412. fh, err := os.Open(config.RequestsFile)
  413. if err != nil {
  414. log.Fatalf("Failed to open request file '%s': %s", config.RequestsFile, err)
  415. }
  416. c := csv.NewReader(fh)
  417. c.Comma = ' '
  418. for {
  419. if config.SleepFor.Duration > time.Nanosecond {
  420. startTs = time.Now()
  421. }
  422. r, err := c.Read()
  423. if err == io.EOF {
  424. break
  425. }
  426. if err != nil {
  427. log.Println(err)
  428. continue
  429. }
  430. req.IpSrc = r[0]
  431. req.Source = r[0]
  432. req.Url = r[1]
  433. req.UserAgent = "Munch/1.0"
  434. req.Host = "demo.scraperwall.com"
  435. req.CreatedAt = time.Now().UnixNano()
  436. natsEC.Publish(config.NatsQueue, &req)
  437. count++
  438. if config.SleepFor.Duration >= time.Nanosecond {
  439. endTs = time.Now()
  440. if endTs.Before(startTs.Add(config.SleepFor.Duration)) {
  441. time.Sleep(config.SleepFor.Duration - endTs.Sub(startTs))
  442. }
  443. }
  444. }
  445. }
  446. }
  447. func loadConfig() {
  448. // initialize with values from the command line / environment
  449. config.Live = *doLiveCapture
  450. config.Interface = *iface
  451. config.SnapshotLen = *snapshotLen
  452. config.Filter = *filter
  453. config.Promiscuous = *promiscuous
  454. config.NatsURL = *natsURL
  455. config.NatsQueue = *natsQueue
  456. config.SleepFor.Duration = *sleepFor
  457. config.RequestsFile = *requestsFile
  458. config.UseXForwardedAsSource = *useXForwardedAsSource
  459. config.Protocol = *protocol
  460. config.ApacheLog = *apacheLog
  461. config.Quiet = *beQuiet
  462. config.Trace = *trace
  463. if *configFile == "" {
  464. return
  465. }
  466. _, err := os.Stat(*configFile)
  467. if err != nil {
  468. log.Printf("%s: %s\n", *configFile, err)
  469. return
  470. }
  471. if _, err = toml.DecodeFile(*configFile, &config); err != nil {
  472. log.Printf("%s: %s\n", *configFile, err)
  473. }
  474. if !config.Quiet {
  475. config.print()
  476. }
  477. }
  478. // version outputs build information
  479. func version() {
  480. fmt.Printf("munchclient %s, built on %s\n", Version, BuildDate)
  481. }