main.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684
  1. package main
  2. import (
  3. "bufio"
  4. "crypto/sha1"
  5. "encoding/csv"
  6. "flag"
  7. "fmt"
  8. "io"
  9. "log"
  10. "math/rand"
  11. "net"
  12. "net/http"
  13. "os"
  14. "strings"
  15. "time"
  16. "github.com/BurntSushi/toml"
  17. "github.com/Songmu/axslogparser"
  18. "github.com/google/gopacket"
  19. "github.com/google/gopacket/layers"
  20. "github.com/google/gopacket/pcap"
  21. "github.com/hpcloud/tail"
  22. "github.com/kr/pretty"
  23. "github.com/nats-io/nats"
  24. "github.com/nats-io/nats/encoders/protobuf"
  25. "git.scraperwall.com/scw/ajp13"
  26. "git.scraperwall.com/scw/data"
  27. "git.scraperwall.com/scw/ip"
  28. )
  29. var (
  30. doLiveCapture = flag.Bool("live", false, "Capture data in real time from a given interface")
  31. iface = flag.String("interface", "eth0", "Interface to get packets from")
  32. snapshotLen = flag.Int("snapshot-len", 8192, "Snapshot Length in Bytes")
  33. filter = flag.String("filter", "tcp", "PCAP filter expression")
  34. promiscuous = flag.Bool("promiscuous", false, "Switch interface into promiscuous mode?")
  35. natsURL = flag.String("nats-url", "nats://127.0.0.1:4222", "The URL of the NATS server")
  36. natsUser = flag.String("nats-user", "", "The user for NATS authentication")
  37. natsPassword = flag.String("nats-password", "", "The password for NATS authentication")
  38. natsQueue = flag.String("nats-queue", "requests", "The NATS queue name")
  39. natsCA = flag.String("nats-ca", "", "CA chain for NATS TLS")
  40. sleepFor = flag.Duration("sleep", 0, "Sleep this long between sending data (only when replaying a file)")
  41. requestsFile = flag.String("requests", "", "CSV file containing requests (IP and URL)")
  42. protocol = flag.String("protocol", "http", "which protocol to parse: http or ajp13")
  43. useXForwardedAsSource = flag.Bool("use-x-forwarded", false, "Use the IP address in X-Forwarded-For as source")
  44. trace = flag.Bool("trace", false, "Trace the packet capturing")
  45. apacheLog = flag.String("apache-log", "", "Parse an Apache Log file")
  46. configFile = flag.String("config", "", "The location of the TOML config file")
  47. beQuiet = flag.Bool("quiet", true, "Be quiet")
  48. doVersion = flag.Bool("version", false, "Show version information")
  49. natsEC *nats.EncodedConn
  50. natsJsonEC *nats.EncodedConn
  51. natsClosedChan chan bool
  52. natsIsUnavailable bool
  53. count uint64
  54. timeout = -1 * time.Second
  55. ipPriv *ip.IP
  56. config Config
  57. // Version contains the program Version, e.g. 1.0.1
  58. Version string
  59. // BuildDate contains the date and time at which the program was compiled
  60. BuildDate string
  61. )
  62. // Config contains the program configuration
  63. type Config struct {
  64. Live bool
  65. Interface string
  66. SnapshotLen int
  67. Filter string
  68. Promiscuous bool
  69. NatsURL string
  70. NatsQueue string
  71. NatsUser string
  72. NatsPassword string
  73. NatsCA string
  74. SleepFor duration
  75. RequestsFile string
  76. UseXForwardedAsSource bool
  77. Quiet bool
  78. Protocol string
  79. Trace bool
  80. ApacheLog string
  81. }
  82. type duration struct {
  83. time.Duration
  84. }
  85. func (d *duration) UnmarshalText(text []byte) error {
  86. var err error
  87. d.Duration, err = time.ParseDuration(string(text))
  88. return err
  89. }
  90. func (c Config) print() {
  91. fmt.Printf("Live: %t\n", c.Live)
  92. fmt.Printf("Interface: %s\n", c.Interface)
  93. fmt.Printf("SnapshotLen: %d\n", c.SnapshotLen)
  94. fmt.Printf("Filter: %s\n", c.Filter)
  95. fmt.Printf("Promiscuous: %t\n", c.Promiscuous)
  96. fmt.Printf("NatsURL: %s\n", c.NatsURL)
  97. fmt.Printf("NatsQueue: %s\n", c.NatsQueue)
  98. fmt.Printf("NatsUser: %s\n", c.NatsUser)
  99. fmt.Printf("NatsPassword: %s\n", c.NatsPassword)
  100. fmt.Printf("NatsCA: %s\n", c.NatsCA)
  101. fmt.Printf("SleepFor: %s\n", c.SleepFor.String())
  102. fmt.Printf("RequestsFile: %s\n", c.RequestsFile)
  103. fmt.Printf("Apache Log: %s\n", c.ApacheLog)
  104. fmt.Printf("UseXForwardedAsSource: %t\n", c.UseXForwardedAsSource)
  105. fmt.Printf("Protocol: %s\n", c.Protocol)
  106. fmt.Printf("Quiet: %t\n", c.Quiet)
  107. fmt.Printf("Trace: %t\n", c.Trace)
  108. }
  109. func init() {
  110. flag.Parse()
  111. nats.RegisterEncoder(protobuf.PROTOBUF_ENCODER, &protobuf.ProtobufEncoder{})
  112. }
  113. func main() {
  114. if *doVersion {
  115. version()
  116. os.Exit(0)
  117. }
  118. loadConfig()
  119. // Output how many requests per second were sent
  120. if !config.Quiet {
  121. go func(c *uint64) {
  122. for {
  123. fmt.Printf("%d requests per second\n", *c)
  124. *c = 0
  125. time.Sleep(time.Second)
  126. }
  127. }(&count)
  128. }
  129. // NATS
  130. //
  131. if config.NatsURL == "" {
  132. log.Fatal("No NATS URL specified (-nats-url)!")
  133. }
  134. natsIsUnavailable = true
  135. natsClosedChan = make(chan bool, 10000)
  136. err := connectToNATS()
  137. if err != nil {
  138. log.Fatal(err)
  139. }
  140. go natsWatchdog(natsClosedChan)
  141. // What should I do?
  142. if config.RequestsFile != "" {
  143. replayFile()
  144. } else if config.ApacheLog != "" {
  145. apacheLogCapture(config.ApacheLog)
  146. } else if config.Live {
  147. fmt.Printf("live capture (%s, %s) to %s\n", config.Interface, config.Filter, config.NatsURL)
  148. liveCapture()
  149. }
  150. }
  151. func natsWatchdog(closedChan chan bool) {
  152. for range closedChan {
  153. RECONNECT:
  154. for {
  155. log.Printf("Reconnecting to NATS at %s\n", *natsURL)
  156. err := connectToNATS()
  157. if err == nil {
  158. break RECONNECT
  159. }
  160. time.Sleep(1 * time.Second)
  161. }
  162. }
  163. }
  164. func connectToNATS() error {
  165. var natsConn *nats.Conn
  166. var err error
  167. if config.NatsUser != "" && config.NatsPassword != "" && config.NatsCA != "" {
  168. natsConn, err = nats.Connect(config.NatsURL, nats.UserInfo(config.NatsUser, config.NatsPassword), nats.RootCAs(config.NatsCA))
  169. } else {
  170. if config.NatsPassword != "" && config.NatsUser != "" {
  171. natsConn, err = nats.Connect(config.NatsURL, nats.UserInfo(config.NatsUser, config.NatsPassword))
  172. } else {
  173. natsConn, err = nats.Connect(config.NatsURL)
  174. }
  175. }
  176. if err != nil {
  177. return err
  178. }
  179. natsEC, err = nats.NewEncodedConn(natsConn, protobuf.PROTOBUF_ENCODER)
  180. if err != nil {
  181. return fmt.Errorf("Encoded Connection: %v", err)
  182. }
  183. natsJsonEC, err = nats.NewEncodedConn(natsConn, nats.JSON_ENCODER)
  184. if err != nil {
  185. return fmt.Errorf("Encoded Connection: %v", err)
  186. }
  187. natsIsUnavailable = false
  188. return nil
  189. }
  190. func apacheLogCapture(logfile string) {
  191. if _, err := os.Stat(logfile); err != nil {
  192. log.Fatalf("%s: %s", logfile, err)
  193. }
  194. t, err := tail.TailFile(logfile, tail.Config{
  195. Follow: true, // follow the file
  196. ReOpen: true, // reopen log file when it gets closed/rotated
  197. Logger: tail.DiscardingLogger, // don't log anything
  198. Location: &tail.SeekInfo{Offset: 0, Whence: 2}, // start at the end of the file
  199. })
  200. if err != nil {
  201. log.Fatalf("%s: %s", logfile, err)
  202. }
  203. var p axslogparser.Parser
  204. parserSet := false
  205. for line := range t.Lines {
  206. l := line.Text
  207. if !parserSet {
  208. p, _, err = axslogparser.GuessParser(l)
  209. if err != nil {
  210. log.Println(err)
  211. continue
  212. }
  213. parserSet = true
  214. }
  215. logEntry, err := p.Parse(l)
  216. if err != nil {
  217. log.Println(err)
  218. continue
  219. }
  220. remote := logEntry.Host
  221. if *useXForwardedAsSource && logEntry.ForwardedFor != "" {
  222. remote = logEntry.ForwardedFor
  223. }
  224. // only use the first host in case there are multiple hosts in the log
  225. if cidx := strings.Index(remote, ","); cidx >= 0 {
  226. remote = remote[0:cidx]
  227. }
  228. // extract the virtual host
  229. var virtualHost string
  230. vhost := logEntry.VirtualHost
  231. if vhost != "" {
  232. vhostAndPort := strings.Split(logEntry.VirtualHost, ":")
  233. virtualHost = vhostAndPort[0]
  234. }
  235. request := data.Request{
  236. IpSrc: remote,
  237. IpDst: "127.0.0.1",
  238. PortSrc: 0,
  239. PortDst: 0,
  240. TcpSeq: 0,
  241. CreatedAt: logEntry.Time.UnixNano(),
  242. Url: logEntry.RequestURI,
  243. Method: logEntry.Method,
  244. Host: virtualHost,
  245. Protocol: logEntry.Protocol,
  246. Origin: remote,
  247. Source: remote,
  248. Referer: logEntry.Referer,
  249. XForwardedFor: logEntry.ForwardedFor,
  250. UserAgent: logEntry.UserAgent,
  251. }
  252. if config.Trace {
  253. log.Printf("[%s] %s\n", request.Source, request.Url)
  254. }
  255. if natsIsUnavailable != true {
  256. if err := natsEC.Publish(config.NatsQueue, &request); err != nil {
  257. if err == nats.ErrConnectionClosed {
  258. natsIsUnavailable = true
  259. natsClosedChan <- true
  260. }
  261. }
  262. }
  263. }
  264. }
  265. func liveCapture() {
  266. ipPriv = ip.NewIP()
  267. // PCAP setup
  268. //
  269. handle, err := pcap.OpenLive(config.Interface, int32(config.SnapshotLen), config.Promiscuous, timeout)
  270. if err != nil {
  271. log.Fatal(err)
  272. }
  273. defer handle.Close()
  274. err = handle.SetBPFFilter(config.Filter)
  275. if err != nil {
  276. log.Fatal(err)
  277. }
  278. packetSource := gopacket.NewPacketSource(handle, handle.LinkType())
  279. for packet := range packetSource.Packets() {
  280. go processPacket(packet)
  281. }
  282. }
  283. // processPacket receives a raw packet from pcap, builds a Request item from it and sends it to the queue
  284. func processPacket(packet gopacket.Packet) {
  285. hasIPv4 := false
  286. var ipSrc, ipDst string
  287. // IPv4
  288. if ipLayer := packet.Layer(layers.LayerTypeIPv4); ipLayer != nil {
  289. ip := ipLayer.(*layers.IPv4)
  290. ipSrc = ip.SrcIP.String()
  291. ipDst = ip.DstIP.String()
  292. hasIPv4 = true
  293. }
  294. // IPv6
  295. if !hasIPv4 {
  296. if ipLayer := packet.Layer(layers.LayerTypeIPv6); ipLayer != nil {
  297. ip := ipLayer.(*layers.IPv6)
  298. ipSrc = ip.SrcIP.String()
  299. ipDst = ip.DstIP.String()
  300. }
  301. }
  302. // TCP
  303. tcpLayer := packet.Layer(layers.LayerTypeTCP)
  304. if tcpLayer == nil {
  305. return
  306. }
  307. tcp, _ := tcpLayer.(*layers.TCP)
  308. portSrc := tcp.SrcPort
  309. portDst := tcp.DstPort
  310. sequence := tcp.Seq
  311. applicationLayer := packet.ApplicationLayer()
  312. if applicationLayer == nil {
  313. return
  314. }
  315. count++
  316. if len(applicationLayer.Payload()) < 50 {
  317. log.Println("application layer too small!")
  318. return
  319. }
  320. request := data.Request{
  321. IpSrc: ipSrc,
  322. IpDst: ipDst,
  323. PortSrc: uint32(portSrc),
  324. PortDst: uint32(portDst),
  325. TcpSeq: uint32(sequence),
  326. CreatedAt: packet.Metadata().CaptureInfo.Timestamp.UnixNano(),
  327. }
  328. switch config.Protocol {
  329. case "http":
  330. err := processHTTP(&request, applicationLayer.Payload())
  331. if err != nil {
  332. log.Println(err)
  333. return
  334. }
  335. case "ajp13":
  336. err := processAJP13(&request, applicationLayer.Payload())
  337. if err != nil {
  338. log.Println(err)
  339. return
  340. }
  341. }
  342. if config.UseXForwardedAsSource && request.XForwardedFor != "" {
  343. if strings.Contains(request.XForwardedFor, ",") {
  344. ips := strings.Split(request.XForwardedFor, ",")
  345. for i := len(ips) - 1; i >= 0; i-- {
  346. ipRaw := strings.TrimSpace(ips[i])
  347. ipAddr := net.ParseIP(ipRaw)
  348. if ipAddr != nil && !ipPriv.IsPrivate(ipAddr) {
  349. request.Source = ipRaw
  350. break
  351. }
  352. }
  353. } else {
  354. ipAddr := net.ParseIP(strings.TrimSpace(request.XForwardedFor))
  355. if !ipPriv.IsPrivate(ipAddr) {
  356. request.Source = request.XForwardedFor
  357. }
  358. }
  359. }
  360. if request.Source == request.IpSrc && request.XRealIP != "" {
  361. request.Source = request.XRealIP
  362. }
  363. if config.Trace {
  364. log.Printf("[%s] %s\n", request.Source, request.Url)
  365. }
  366. if natsIsUnavailable != true {
  367. if err := natsEC.Publish(config.NatsQueue, &request); err != nil {
  368. if err == nats.ErrConnectionClosed {
  369. natsIsUnavailable = true
  370. natsClosedChan <- true
  371. }
  372. }
  373. }
  374. }
  375. func processAJP13(request *data.Request, appData []byte) error {
  376. a, err := ajp13.Parse(appData)
  377. if err != nil {
  378. return fmt.Errorf("Failed to parse AJP13 request: %s", err)
  379. }
  380. request.Url = a.URI
  381. request.Method = a.Method()
  382. request.Host = a.Server
  383. request.Protocol = a.Version
  384. request.Origin = a.RemoteAddr.String()
  385. request.Source = a.RemoteAddr.String()
  386. if v, ok := a.Header("Referer"); ok {
  387. request.Referer = v
  388. }
  389. if v, ok := a.Header("Connection"); ok {
  390. request.Connection = v
  391. }
  392. if v, ok := a.Header("X-Forwarded-For"); ok {
  393. request.XForwardedFor = v
  394. }
  395. if v, ok := a.Header("X-Real-IP"); ok {
  396. request.XRealIP = v
  397. }
  398. if v, ok := a.Header("X-Requested-With"); ok {
  399. request.XRequestedWith = v
  400. }
  401. if v, ok := a.Header("Accept-Encoding"); ok {
  402. request.AcceptEncoding = v
  403. }
  404. if v, ok := a.Header("Accept-Language"); ok {
  405. request.AcceptLanguage = v
  406. }
  407. if v, ok := a.Header("User-Agent"); ok {
  408. request.UserAgent = v
  409. }
  410. if v, ok := a.Header("Accept"); ok {
  411. request.Accept = v
  412. }
  413. if v, ok := a.Header("Cookie"); ok {
  414. request.Cookie = v
  415. }
  416. if v, ok := a.Header("X-Forwarded-Host"); ok {
  417. if v != request.Host {
  418. request.Host = v
  419. }
  420. }
  421. if false && config.Trace {
  422. log.Println("Request")
  423. pretty.Println(request)
  424. }
  425. return nil
  426. }
  427. func processHTTP(request *data.Request, appData []byte) error {
  428. reader := bufio.NewReader(strings.NewReader(string(appData)))
  429. req, err := http.ReadRequest(reader)
  430. if err != nil {
  431. return fmt.Errorf("Failed to parse HTTP header: %s", err)
  432. }
  433. request.Url = req.URL.String()
  434. request.Method = req.Method
  435. request.Referer = req.Referer()
  436. request.Host = req.Host
  437. request.Protocol = req.Proto
  438. request.Origin = request.Host
  439. if _, ok := req.Header["Connection"]; ok {
  440. request.Connection = req.Header["Connection"][0]
  441. }
  442. if _, ok := req.Header["X-Forwarded-For"]; ok {
  443. request.XForwardedFor = req.Header["X-Forwarded-For"][0]
  444. }
  445. // CloudFlare: override X-Forwarded for since it is tainted by cloudflare
  446. if _, ok := req.Header["True-Client-Ip"]; ok {
  447. request.XForwardedFor = req.Header["True-Client-Ip"][0]
  448. }
  449. if _, ok := req.Header["X-Real-Ip"]; ok {
  450. request.XRealIP = req.Header["X-Real-Ip"][0]
  451. }
  452. if _, ok := req.Header["X-Requested-With"]; ok {
  453. request.XRequestedWith = req.Header["X-Requested-With"][0]
  454. }
  455. if _, ok := req.Header["Accept-Encoding"]; ok {
  456. request.AcceptEncoding = req.Header["Accept-Encoding"][0]
  457. }
  458. if _, ok := req.Header["Accept-Language"]; ok {
  459. request.AcceptLanguage = req.Header["Accept-Language"][0]
  460. }
  461. if _, ok := req.Header["User-Agent"]; ok {
  462. request.UserAgent = req.Header["User-Agent"][0]
  463. }
  464. if _, ok := req.Header["Accept"]; ok {
  465. request.Accept = req.Header["Accept"][0]
  466. }
  467. if _, ok := req.Header["Cookie"]; ok {
  468. request.Cookie = req.Header["Cookie"][0]
  469. }
  470. request.Source = request.IpSrc
  471. return nil
  472. }
  473. // replayFile takes a file containing a list of requests (SourceIP Url) and queues the requests
  474. // e.g.
  475. // 157.55.39.229 /gross-gerau/12012260-beate-anstatt
  476. // 103.232.100.98 /weinsheim-eifel/13729444-plus-warenhandelsges-mbh
  477. func replayFile() {
  478. var req data.Request
  479. var startTs time.Time
  480. var endTs time.Time
  481. rand.Seed(time.Now().UnixNano())
  482. for {
  483. fh, err := os.Open(config.RequestsFile)
  484. if err != nil {
  485. log.Fatalf("Failed to open request file '%s': %s", config.RequestsFile, err)
  486. }
  487. c := csv.NewReader(fh)
  488. c.Comma = ' '
  489. for {
  490. if config.SleepFor.Duration > time.Nanosecond {
  491. startTs = time.Now()
  492. }
  493. r, err := c.Read()
  494. if err == io.EOF {
  495. break
  496. }
  497. if err != nil {
  498. log.Println(err)
  499. continue
  500. }
  501. req.IpSrc = r[0]
  502. req.Source = r[0]
  503. req.Url = r[1]
  504. req.UserAgent = "Munch/1.0"
  505. req.Host = "demo.scraperwall.com"
  506. req.CreatedAt = time.Now().UnixNano()
  507. if natsIsUnavailable != true {
  508. if err := natsEC.Publish(config.NatsQueue, &req); err != nil {
  509. if err == nats.ErrConnectionClosed {
  510. natsIsUnavailable = true
  511. natsClosedChan <- true
  512. }
  513. }
  514. }
  515. if strings.Index(r[1], ".") < 0 {
  516. hash := sha1.New()
  517. io.WriteString(hash, r[0])
  518. fp := data.Fingerprint{
  519. ClientID: "scw",
  520. Fingerprint: fmt.Sprintf("%x", hash.Sum(nil)),
  521. Remote: r[0],
  522. Url: r[1],
  523. Source: r[0],
  524. CreatedAt: time.Now(),
  525. }
  526. if strings.HasPrefix(r[0], "50.31.") {
  527. fp.Fingerprint = "a1f2c2ee560ce6580d66d451a9c8dfbf"
  528. natsJsonEC.Publish("fingerprints_scw", fp)
  529. } else if rand.Intn(10) < 5 {
  530. natsJsonEC.Publish("fingerprints_scw", fp)
  531. }
  532. }
  533. count++
  534. if config.SleepFor.Duration >= time.Nanosecond {
  535. endTs = time.Now()
  536. if endTs.Before(startTs.Add(config.SleepFor.Duration)) {
  537. time.Sleep(config.SleepFor.Duration - endTs.Sub(startTs))
  538. }
  539. }
  540. }
  541. }
  542. }
  543. func loadConfig() {
  544. // initialize with values from the command line / environment
  545. config.Live = *doLiveCapture
  546. config.Interface = *iface
  547. config.SnapshotLen = *snapshotLen
  548. config.Filter = *filter
  549. config.Promiscuous = *promiscuous
  550. config.NatsURL = *natsURL
  551. config.NatsQueue = *natsQueue
  552. config.NatsUser = *natsUser
  553. config.NatsPassword = *natsPassword
  554. config.NatsCA = *natsCA
  555. config.SleepFor.Duration = *sleepFor
  556. config.RequestsFile = *requestsFile
  557. config.UseXForwardedAsSource = *useXForwardedAsSource
  558. config.Protocol = *protocol
  559. config.ApacheLog = *apacheLog
  560. config.Quiet = *beQuiet
  561. config.Trace = *trace
  562. if *configFile == "" {
  563. return
  564. }
  565. _, err := os.Stat(*configFile)
  566. if err != nil {
  567. log.Printf("%s: %s\n", *configFile, err)
  568. return
  569. }
  570. if _, err = toml.DecodeFile(*configFile, &config); err != nil {
  571. log.Printf("%s: %s\n", *configFile, err)
  572. }
  573. if !config.Quiet {
  574. config.print()
  575. }
  576. }
  577. // version outputs build information
  578. func version() {
  579. fmt.Printf("munchclient %s, built on %s\n", Version, BuildDate)
  580. }