main.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593
  1. package main
  2. import (
  3. "bufio"
  4. "encoding/csv"
  5. "flag"
  6. "fmt"
  7. "io"
  8. "log"
  9. "net"
  10. "net/http"
  11. "os"
  12. "strings"
  13. "time"
  14. "github.com/BurntSushi/toml"
  15. "github.com/Songmu/axslogparser"
  16. "github.com/google/gopacket"
  17. "github.com/google/gopacket/layers"
  18. "github.com/google/gopacket/pcap"
  19. "github.com/hpcloud/tail"
  20. "github.com/kr/pretty"
  21. "github.com/nats-io/nats"
  22. "github.com/nats-io/nats/encoders/protobuf"
  23. "git.scraperwall.com/scw/ajp13"
  24. "git.scraperwall.com/scw/data"
  25. "git.scraperwall.com/scw/ip"
  26. )
  27. var (
  28. doLiveCapture = flag.Bool("live", false, "Capture data in real time from a given interface")
  29. iface = flag.String("interface", "eth0", "Interface to get packets from")
  30. snapshotLen = flag.Int("snapshot-len", 8192, "Snapshot Length in Bytes")
  31. filter = flag.String("filter", "tcp", "PCAP filter expression")
  32. promiscuous = flag.Bool("promiscuous", false, "Switch interface into promiscuous mode?")
  33. natsURL = flag.String("nats-url", "nats://127.0.0.1:4222", "The URL of the NATS server")
  34. natsUser = flag.String("nats-user", "", "The user for NATS authentication")
  35. natsPassword = flag.String("nats-password", "", "The password for NATS authentication")
  36. natsQueue = flag.String("nats-queue", "requests", "The NATS queue name")
  37. natsCA = flag.String("nats-ca", "", "CA chain for NATS TLS")
  38. sleepFor = flag.Duration("sleep", 0, "Sleep this long between sending data (only when replaying a file)")
  39. requestsFile = flag.String("requests", "", "CSV file containing requests (IP and URL)")
  40. protocol = flag.String("protocol", "http", "which protocol to parse: http or ajp13")
  41. useXForwardedAsSource = flag.Bool("use-x-forwarded", false, "Use the IP address in X-Forwarded-For as source")
  42. trace = flag.Bool("trace", false, "Trace the packet capturing")
  43. apacheLog = flag.String("apache-log", "", "Parse an Apache Log file")
  44. configFile = flag.String("config", "", "The location of the TOML config file")
  45. beQuiet = flag.Bool("quiet", true, "Be quiet")
  46. doVersion = flag.Bool("version", false, "Show version information")
  47. natsEC *nats.EncodedConn
  48. count uint64
  49. timeout = -1 * time.Second
  50. ipPriv *ip.IP
  51. config Config
  52. // Version contains the program Version, e.g. 1.0.1
  53. Version string
  54. // BuildDate contains the date and time at which the program was compiled
  55. BuildDate string
  56. )
  57. // Config contains the program configuration
  58. type Config struct {
  59. Live bool
  60. Interface string
  61. SnapshotLen int
  62. Filter string
  63. Promiscuous bool
  64. NatsURL string
  65. NatsQueue string
  66. NatsUser string
  67. NatsPassword string
  68. NatsCA string
  69. SleepFor duration
  70. RequestsFile string
  71. UseXForwardedAsSource bool
  72. Quiet bool
  73. Protocol string
  74. Trace bool
  75. ApacheLog string
  76. }
  77. type duration struct {
  78. time.Duration
  79. }
  80. func (d *duration) UnmarshalText(text []byte) error {
  81. var err error
  82. d.Duration, err = time.ParseDuration(string(text))
  83. return err
  84. }
  85. func (c Config) print() {
  86. fmt.Printf("Live: %t\n", c.Live)
  87. fmt.Printf("Interface: %s\n", c.Interface)
  88. fmt.Printf("SnapshotLen: %d\n", c.SnapshotLen)
  89. fmt.Printf("Filter: %s\n", c.Filter)
  90. fmt.Printf("Promiscuous: %t\n", c.Promiscuous)
  91. fmt.Printf("NatsURL: %s\n", c.NatsURL)
  92. fmt.Printf("NatsQueue: %s\n", c.NatsQueue)
  93. fmt.Printf("NatsUser: %s\n", c.NatsUser)
  94. fmt.Printf("NatsPassword: %s\n", c.NatsPassword)
  95. fmt.Printf("NatsCA: %s\n", c.NatsCA)
  96. fmt.Printf("SleepFor: %s\n", c.SleepFor.String())
  97. fmt.Printf("RequestsFile: %s\n", c.RequestsFile)
  98. fmt.Printf("Apache Log: %s\n", c.ApacheLog)
  99. fmt.Printf("UseXForwardedAsSource: %t\n", c.UseXForwardedAsSource)
  100. fmt.Printf("Protocol: %s\n", c.Protocol)
  101. fmt.Printf("Quiet: %t\n", c.Quiet)
  102. fmt.Printf("Trace: %t\n", c.Trace)
  103. }
  104. func init() {
  105. flag.Parse()
  106. nats.RegisterEncoder(protobuf.PROTOBUF_ENCODER, &protobuf.ProtobufEncoder{})
  107. }
  108. func main() {
  109. if *doVersion {
  110. version()
  111. os.Exit(0)
  112. }
  113. loadConfig()
  114. // Output how many requests per second were sent
  115. if !config.Quiet {
  116. go func(c *uint64) {
  117. for {
  118. fmt.Printf("%d requests per second\n", *c)
  119. *c = 0
  120. time.Sleep(time.Second)
  121. }
  122. }(&count)
  123. }
  124. // NATS
  125. //
  126. if config.NatsURL == "" {
  127. log.Fatal("No NATS URL specified (-nats-url)!")
  128. }
  129. var natsConn *nats.Conn
  130. var err error
  131. if config.NatsUser != "" && config.NatsPassword != "" && config.NatsCA != "" {
  132. natsConn, err = nats.Connect(config.NatsURL, nats.UserInfo(config.NatsUser, config.NatsPassword), nats.RootCAs(config.NatsCA))
  133. } else {
  134. natsConn, err = nats.Connect(config.NatsURL)
  135. }
  136. if err != nil {
  137. log.Fatal(err)
  138. }
  139. natsEC, err = nats.NewEncodedConn(natsConn, protobuf.PROTOBUF_ENCODER)
  140. if err != nil {
  141. log.Fatalf("Encoded Connection: %v!\n", err)
  142. }
  143. // What should I do?
  144. if config.RequestsFile != "" {
  145. replayFile()
  146. } else if config.ApacheLog != "" {
  147. apacheLogCapture(config.ApacheLog)
  148. } else if config.Live {
  149. fmt.Printf("live capture (%s, %s) to %s\n", config.Interface, config.Filter, config.NatsURL)
  150. liveCapture()
  151. }
  152. }
  153. func apacheLogCapture(logfile string) {
  154. if _, err := os.Stat(logfile); err != nil {
  155. log.Fatalf("%s: %s", logfile, err)
  156. }
  157. t, err := tail.TailFile(logfile, tail.Config{
  158. Follow: true, // follow the file
  159. ReOpen: true, // reopen log file when it gets closed/rotated
  160. Logger: tail.DiscardingLogger, // don't log anything
  161. Location: &tail.SeekInfo{Offset: 0, Whence: 2}, // start at the end of the file
  162. })
  163. if err != nil {
  164. log.Fatalf("%s: %s", logfile, err)
  165. }
  166. var p axslogparser.Parser
  167. parserSet := false
  168. for line := range t.Lines {
  169. l := line.Text
  170. if !parserSet {
  171. p, _, err = axslogparser.GuessParser(l)
  172. if err != nil {
  173. log.Println(err)
  174. continue
  175. }
  176. parserSet = true
  177. }
  178. logEntry, err := p.Parse(l)
  179. if err != nil {
  180. log.Println(err)
  181. continue
  182. }
  183. remote := logEntry.Host
  184. if *useXForwardedAsSource && logEntry.ForwardedFor != "" {
  185. remote = logEntry.ForwardedFor
  186. }
  187. // only use the first host in case there are multiple hosts in the log
  188. if cidx := strings.Index(remote, ","); cidx >= 0 {
  189. remote = remote[0:cidx]
  190. }
  191. // extract the virtual host
  192. var virtualHost string
  193. vhost := logEntry.VirtualHost
  194. if vhost != "" {
  195. vhostAndPort := strings.Split(logEntry.VirtualHost, ":")
  196. virtualHost = vhostAndPort[0]
  197. }
  198. request := data.Request{
  199. IpSrc: remote,
  200. IpDst: "127.0.0.1",
  201. PortSrc: 0,
  202. PortDst: 0,
  203. TcpSeq: 0,
  204. CreatedAt: logEntry.Time.UnixNano(),
  205. Url: logEntry.RequestURI,
  206. Method: logEntry.Method,
  207. Host: virtualHost,
  208. Protocol: logEntry.Protocol,
  209. Origin: remote,
  210. Source: remote,
  211. Referer: logEntry.Referer,
  212. XForwardedFor: logEntry.ForwardedFor,
  213. UserAgent: logEntry.UserAgent,
  214. }
  215. if config.Trace {
  216. log.Printf("[%s] %s\n", request.Source, request.Url)
  217. }
  218. natsEC.Publish(config.NatsQueue, &request)
  219. }
  220. }
  221. func liveCapture() {
  222. ipPriv = ip.NewIP()
  223. // PCAP setup
  224. //
  225. handle, err := pcap.OpenLive(config.Interface, int32(config.SnapshotLen), config.Promiscuous, timeout)
  226. if err != nil {
  227. log.Fatal(err)
  228. }
  229. defer handle.Close()
  230. err = handle.SetBPFFilter(config.Filter)
  231. if err != nil {
  232. log.Fatal(err)
  233. }
  234. packetSource := gopacket.NewPacketSource(handle, handle.LinkType())
  235. for packet := range packetSource.Packets() {
  236. go processPacket(packet)
  237. }
  238. }
  239. // processPacket receives a raw packet from pcap, builds a Request item from it and sends it to the queue
  240. func processPacket(packet gopacket.Packet) {
  241. hasIPv4 := false
  242. var ipSrc, ipDst string
  243. // IPv4
  244. if ipLayer := packet.Layer(layers.LayerTypeIPv4); ipLayer != nil {
  245. ip := ipLayer.(*layers.IPv4)
  246. ipSrc = ip.SrcIP.String()
  247. ipDst = ip.DstIP.String()
  248. hasIPv4 = true
  249. }
  250. // IPv6
  251. if !hasIPv4 {
  252. if ipLayer := packet.Layer(layers.LayerTypeIPv6); ipLayer != nil {
  253. ip := ipLayer.(*layers.IPv6)
  254. ipSrc = ip.SrcIP.String()
  255. ipDst = ip.DstIP.String()
  256. }
  257. }
  258. // TCP
  259. tcpLayer := packet.Layer(layers.LayerTypeTCP)
  260. if tcpLayer == nil {
  261. return
  262. }
  263. tcp, _ := tcpLayer.(*layers.TCP)
  264. portSrc := tcp.SrcPort
  265. portDst := tcp.DstPort
  266. sequence := tcp.Seq
  267. applicationLayer := packet.ApplicationLayer()
  268. if applicationLayer == nil {
  269. return
  270. }
  271. count++
  272. if len(applicationLayer.Payload()) < 50 {
  273. log.Println("application layer too small!")
  274. return
  275. }
  276. request := data.Request{
  277. IpSrc: ipSrc,
  278. IpDst: ipDst,
  279. PortSrc: uint32(portSrc),
  280. PortDst: uint32(portDst),
  281. TcpSeq: uint32(sequence),
  282. CreatedAt: packet.Metadata().CaptureInfo.Timestamp.UnixNano(),
  283. }
  284. switch config.Protocol {
  285. case "http":
  286. err := processHTTP(&request, applicationLayer.Payload())
  287. if err != nil {
  288. log.Println(err)
  289. return
  290. }
  291. case "ajp13":
  292. err := processAJP13(&request, applicationLayer.Payload())
  293. if err != nil {
  294. log.Println(err)
  295. return
  296. }
  297. }
  298. if config.UseXForwardedAsSource && request.XForwardedFor != "" {
  299. if strings.Contains(request.XForwardedFor, ",") {
  300. ips := strings.Split(request.XForwardedFor, ",")
  301. for i := len(ips) - 1; i >= 0; i-- {
  302. ipRaw := strings.TrimSpace(ips[i])
  303. ipAddr := net.ParseIP(ipRaw)
  304. if ipAddr != nil && !ipPriv.IsPrivate(ipAddr) {
  305. request.Source = ipRaw
  306. break
  307. }
  308. }
  309. } else {
  310. ipAddr := net.ParseIP(strings.TrimSpace(request.XForwardedFor))
  311. if !ipPriv.IsPrivate(ipAddr) {
  312. request.Source = request.XForwardedFor
  313. }
  314. }
  315. }
  316. if request.Source == request.IpSrc && request.XRealIP != "" {
  317. request.Source = request.XRealIP
  318. }
  319. if config.Trace {
  320. log.Printf("[%s] %s\n", request.Source, request.Url)
  321. }
  322. natsEC.Publish(config.NatsQueue, &request)
  323. }
  324. func processAJP13(request *data.Request, appData []byte) error {
  325. a, err := ajp13.Parse(appData)
  326. if err != nil {
  327. return fmt.Errorf("Failed to parse AJP13 request: %s", err)
  328. }
  329. request.Url = a.URI
  330. request.Method = a.Method()
  331. request.Host = a.Server
  332. request.Protocol = a.Version
  333. request.Origin = a.RemoteAddr.String()
  334. request.Source = a.RemoteAddr.String()
  335. if v, ok := a.Header("Referer"); ok {
  336. request.Referer = v
  337. }
  338. if v, ok := a.Header("Connection"); ok {
  339. request.Connection = v
  340. }
  341. if v, ok := a.Header("X-Forwarded-For"); ok {
  342. request.XForwardedFor = v
  343. }
  344. if v, ok := a.Header("X-Real-IP"); ok {
  345. request.XRealIP = v
  346. }
  347. if v, ok := a.Header("X-Requested-With"); ok {
  348. request.XRequestedWith = v
  349. }
  350. if v, ok := a.Header("Accept-Encoding"); ok {
  351. request.AcceptEncoding = v
  352. }
  353. if v, ok := a.Header("Accept-Language"); ok {
  354. request.AcceptLanguage = v
  355. }
  356. if v, ok := a.Header("User-Agent"); ok {
  357. request.UserAgent = v
  358. }
  359. if v, ok := a.Header("Accept"); ok {
  360. request.Accept = v
  361. }
  362. if v, ok := a.Header("Cookie"); ok {
  363. request.Cookie = v
  364. }
  365. if v, ok := a.Header("X-Forwarded-Host"); ok {
  366. if v != request.Host {
  367. request.Host = v
  368. }
  369. }
  370. if false && config.Trace {
  371. log.Println("Request")
  372. pretty.Println(request)
  373. }
  374. return nil
  375. }
  376. func processHTTP(request *data.Request, appData []byte) error {
  377. reader := bufio.NewReader(strings.NewReader(string(appData)))
  378. req, err := http.ReadRequest(reader)
  379. if err != nil {
  380. return fmt.Errorf("Failed to parse HTTP header: %s", err)
  381. }
  382. request.Url = req.URL.String()
  383. request.Method = req.Method
  384. request.Referer = req.Referer()
  385. request.Host = req.Host
  386. request.Protocol = req.Proto
  387. request.Origin = request.Host
  388. if _, ok := req.Header["Connection"]; ok {
  389. request.Connection = req.Header["Connection"][0]
  390. }
  391. if _, ok := req.Header["X-Forwarded-For"]; ok {
  392. request.XForwardedFor = req.Header["X-Forwarded-For"][0]
  393. }
  394. if _, ok := req.Header["X-Real-IP"]; ok {
  395. request.XRealIP = req.Header["X-Real-IP"][0]
  396. }
  397. if _, ok := req.Header["X-Requested-With"]; ok {
  398. request.XRequestedWith = req.Header["X-Requested-With"][0]
  399. }
  400. if _, ok := req.Header["Accept-Encoding"]; ok {
  401. request.AcceptEncoding = req.Header["Accept-Encoding"][0]
  402. }
  403. if _, ok := req.Header["Accept-Language"]; ok {
  404. request.AcceptLanguage = req.Header["Accept-Language"][0]
  405. }
  406. if _, ok := req.Header["User-Agent"]; ok {
  407. request.UserAgent = req.Header["User-Agent"][0]
  408. }
  409. if _, ok := req.Header["Accept"]; ok {
  410. request.Accept = req.Header["Accept"][0]
  411. }
  412. if _, ok := req.Header["Cookie"]; ok {
  413. request.Cookie = req.Header["Cookie"][0]
  414. }
  415. request.Source = request.IpSrc
  416. return nil
  417. }
  418. // replayFile takes a file containing a list of requests (SourceIP Url) and queues the requests
  419. // e.g.
  420. // 157.55.39.229 /gross-gerau/12012260-beate-anstatt
  421. // 103.232.100.98 /weinsheim-eifel/13729444-plus-warenhandelsges-mbh
  422. func replayFile() {
  423. var req data.Request
  424. var startTs time.Time
  425. var endTs time.Time
  426. for {
  427. fh, err := os.Open(config.RequestsFile)
  428. if err != nil {
  429. log.Fatalf("Failed to open request file '%s': %s", config.RequestsFile, err)
  430. }
  431. c := csv.NewReader(fh)
  432. c.Comma = ' '
  433. for {
  434. if config.SleepFor.Duration > time.Nanosecond {
  435. startTs = time.Now()
  436. }
  437. r, err := c.Read()
  438. if err == io.EOF {
  439. break
  440. }
  441. if err != nil {
  442. log.Println(err)
  443. continue
  444. }
  445. req.IpSrc = r[0]
  446. req.Source = r[0]
  447. req.Url = r[1]
  448. req.UserAgent = "Munch/1.0"
  449. req.Host = "demo.scraperwall.com"
  450. req.CreatedAt = time.Now().UnixNano()
  451. natsEC.Publish(config.NatsQueue, &req)
  452. count++
  453. if config.SleepFor.Duration >= time.Nanosecond {
  454. endTs = time.Now()
  455. if endTs.Before(startTs.Add(config.SleepFor.Duration)) {
  456. time.Sleep(config.SleepFor.Duration - endTs.Sub(startTs))
  457. }
  458. }
  459. }
  460. }
  461. }
  462. func loadConfig() {
  463. // initialize with values from the command line / environment
  464. config.Live = *doLiveCapture
  465. config.Interface = *iface
  466. config.SnapshotLen = *snapshotLen
  467. config.Filter = *filter
  468. config.Promiscuous = *promiscuous
  469. config.NatsURL = *natsURL
  470. config.NatsQueue = *natsQueue
  471. config.NatsUser = *natsUser
  472. config.NatsPassword = *natsPassword
  473. config.NatsCA = *natsCA
  474. config.SleepFor.Duration = *sleepFor
  475. config.RequestsFile = *requestsFile
  476. config.UseXForwardedAsSource = *useXForwardedAsSource
  477. config.Protocol = *protocol
  478. config.ApacheLog = *apacheLog
  479. config.Quiet = *beQuiet
  480. config.Trace = *trace
  481. if *configFile == "" {
  482. return
  483. }
  484. _, err := os.Stat(*configFile)
  485. if err != nil {
  486. log.Printf("%s: %s\n", *configFile, err)
  487. return
  488. }
  489. if _, err = toml.DecodeFile(*configFile, &config); err != nil {
  490. log.Printf("%s: %s\n", *configFile, err)
  491. }
  492. if !config.Quiet {
  493. config.print()
  494. }
  495. }
  496. // version outputs build information
  497. func version() {
  498. fmt.Printf("munchclient %s, built on %s\n", Version, BuildDate)
  499. }