main.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681
  1. package main
  2. import (
  3. "bufio"
  4. "crypto/sha1"
  5. "encoding/csv"
  6. "flag"
  7. "fmt"
  8. "io"
  9. "log"
  10. "math/rand"
  11. "net"
  12. "net/http"
  13. "os"
  14. "strings"
  15. "time"
  16. "github.com/BurntSushi/toml"
  17. "github.com/Songmu/axslogparser"
  18. "github.com/google/gopacket"
  19. "github.com/google/gopacket/layers"
  20. "github.com/google/gopacket/pcap"
  21. "github.com/hpcloud/tail"
  22. "github.com/kr/pretty"
  23. "github.com/nats-io/nats"
  24. "github.com/nats-io/nats/encoders/protobuf"
  25. "git.scraperwall.com/scw/ajp13"
  26. "git.scraperwall.com/scw/data"
  27. "git.scraperwall.com/scw/ip"
  28. )
  29. var (
  30. doLiveCapture = flag.Bool("live", false, "Capture data in real time from a given interface")
  31. iface = flag.String("interface", "eth0", "Interface to get packets from")
  32. snapshotLen = flag.Int("snapshot-len", 8192, "Snapshot Length in Bytes")
  33. filter = flag.String("filter", "tcp", "PCAP filter expression")
  34. promiscuous = flag.Bool("promiscuous", false, "Switch interface into promiscuous mode?")
  35. natsURL = flag.String("nats-url", "nats://127.0.0.1:4222", "The URL of the NATS server")
  36. natsUser = flag.String("nats-user", "", "The user for NATS authentication")
  37. natsPassword = flag.String("nats-password", "", "The password for NATS authentication")
  38. natsQueue = flag.String("nats-queue", "requests", "The NATS queue name")
  39. natsCA = flag.String("nats-ca", "", "CA chain for NATS TLS")
  40. sleepFor = flag.Duration("sleep", 0, "Sleep this long between sending data (only when replaying a file)")
  41. requestsFile = flag.String("requests", "", "CSV file containing requests (IP and URL)")
  42. protocol = flag.String("protocol", "http", "which protocol to parse: http or ajp13")
  43. useXForwardedAsSource = flag.Bool("use-x-forwarded", false, "Use the IP address in X-Forwarded-For as source")
  44. trace = flag.Bool("trace", false, "Trace the packet capturing")
  45. apacheLog = flag.String("apache-log", "", "Parse an Apache Log file")
  46. configFile = flag.String("config", "", "The location of the TOML config file")
  47. beQuiet = flag.Bool("quiet", true, "Be quiet")
  48. doVersion = flag.Bool("version", false, "Show version information")
  49. natsEC *nats.EncodedConn
  50. natsJsonEC *nats.EncodedConn
  51. natsClosedChan chan bool
  52. natsIsUnavailable bool
  53. count uint64
  54. timeout = -1 * time.Second
  55. ipPriv *ip.IP
  56. config Config
  57. // Version contains the program Version, e.g. 1.0.1
  58. Version string
  59. // BuildDate contains the date and time at which the program was compiled
  60. BuildDate string
  61. )
  62. // Config contains the program configuration
  63. type Config struct {
  64. Live bool
  65. Interface string
  66. SnapshotLen int
  67. Filter string
  68. Promiscuous bool
  69. NatsURL string
  70. NatsQueue string
  71. NatsUser string
  72. NatsPassword string
  73. NatsCA string
  74. SleepFor duration
  75. RequestsFile string
  76. UseXForwardedAsSource bool
  77. Quiet bool
  78. Protocol string
  79. Trace bool
  80. ApacheLog string
  81. }
  82. type duration struct {
  83. time.Duration
  84. }
  85. func (d *duration) UnmarshalText(text []byte) error {
  86. var err error
  87. d.Duration, err = time.ParseDuration(string(text))
  88. return err
  89. }
  90. func (c Config) print() {
  91. fmt.Printf("Live: %t\n", c.Live)
  92. fmt.Printf("Interface: %s\n", c.Interface)
  93. fmt.Printf("SnapshotLen: %d\n", c.SnapshotLen)
  94. fmt.Printf("Filter: %s\n", c.Filter)
  95. fmt.Printf("Promiscuous: %t\n", c.Promiscuous)
  96. fmt.Printf("NatsURL: %s\n", c.NatsURL)
  97. fmt.Printf("NatsQueue: %s\n", c.NatsQueue)
  98. fmt.Printf("NatsUser: %s\n", c.NatsUser)
  99. fmt.Printf("NatsPassword: %s\n", c.NatsPassword)
  100. fmt.Printf("NatsCA: %s\n", c.NatsCA)
  101. fmt.Printf("SleepFor: %s\n", c.SleepFor.String())
  102. fmt.Printf("RequestsFile: %s\n", c.RequestsFile)
  103. fmt.Printf("Apache Log: %s\n", c.ApacheLog)
  104. fmt.Printf("UseXForwardedAsSource: %t\n", c.UseXForwardedAsSource)
  105. fmt.Printf("Protocol: %s\n", c.Protocol)
  106. fmt.Printf("Quiet: %t\n", c.Quiet)
  107. fmt.Printf("Trace: %t\n", c.Trace)
  108. }
  109. func init() {
  110. flag.Parse()
  111. nats.RegisterEncoder(protobuf.PROTOBUF_ENCODER, &protobuf.ProtobufEncoder{})
  112. }
  113. func main() {
  114. if *doVersion {
  115. version()
  116. os.Exit(0)
  117. }
  118. loadConfig()
  119. // Output how many requests per second were sent
  120. if !config.Quiet {
  121. go func(c *uint64) {
  122. for {
  123. fmt.Printf("%d requests per second\n", *c)
  124. *c = 0
  125. time.Sleep(time.Second)
  126. }
  127. }(&count)
  128. }
  129. // NATS
  130. //
  131. if config.NatsURL == "" {
  132. log.Fatal("No NATS URL specified (-nats-url)!")
  133. }
  134. natsIsUnavailable = true
  135. natsClosedChan = make(chan bool, 10000)
  136. err := connectToNATS()
  137. if err != nil {
  138. log.Fatal(err)
  139. }
  140. // What should I do?
  141. if config.RequestsFile != "" {
  142. replayFile()
  143. } else if config.ApacheLog != "" {
  144. apacheLogCapture(config.ApacheLog)
  145. } else if config.Live {
  146. fmt.Printf("live capture (%s, %s) to %s\n", config.Interface, config.Filter, config.NatsURL)
  147. liveCapture()
  148. }
  149. }
  150. func natsWatchdog(closedChan chan bool) {
  151. for range closedChan {
  152. RECONNECT:
  153. for {
  154. log.Printf("Reconnecting to NATS at %s\n", *natsURL)
  155. err := connectToNATS()
  156. if err == nil {
  157. break RECONNECT
  158. }
  159. time.Sleep(1 * time.Second)
  160. }
  161. }
  162. }
  163. func connectToNATS() error {
  164. var natsConn *nats.Conn
  165. var err error
  166. if config.NatsUser != "" && config.NatsPassword != "" && config.NatsCA != "" {
  167. natsConn, err = nats.Connect(config.NatsURL, nats.UserInfo(config.NatsUser, config.NatsPassword), nats.RootCAs(config.NatsCA))
  168. } else {
  169. if config.NatsPassword != "" && config.NatsUser != "" {
  170. natsConn, err = nats.Connect(config.NatsURL, nats.UserInfo(config.NatsUser, config.NatsPassword))
  171. } else {
  172. natsConn, err = nats.Connect(config.NatsURL)
  173. }
  174. }
  175. if err != nil {
  176. return err
  177. }
  178. natsEC, err = nats.NewEncodedConn(natsConn, protobuf.PROTOBUF_ENCODER)
  179. if err != nil {
  180. return fmt.Errorf("Encoded Connection: %v", err)
  181. }
  182. natsJsonEC, err = nats.NewEncodedConn(natsConn, nats.JSON_ENCODER)
  183. if err != nil {
  184. return fmt.Errorf("Encoded Connection: %v", err)
  185. }
  186. natsIsUnavailable = false
  187. return nil
  188. }
  189. func apacheLogCapture(logfile string) {
  190. if _, err := os.Stat(logfile); err != nil {
  191. log.Fatalf("%s: %s", logfile, err)
  192. }
  193. t, err := tail.TailFile(logfile, tail.Config{
  194. Follow: true, // follow the file
  195. ReOpen: true, // reopen log file when it gets closed/rotated
  196. Logger: tail.DiscardingLogger, // don't log anything
  197. Location: &tail.SeekInfo{Offset: 0, Whence: 2}, // start at the end of the file
  198. })
  199. if err != nil {
  200. log.Fatalf("%s: %s", logfile, err)
  201. }
  202. var p axslogparser.Parser
  203. parserSet := false
  204. for line := range t.Lines {
  205. l := line.Text
  206. if !parserSet {
  207. p, _, err = axslogparser.GuessParser(l)
  208. if err != nil {
  209. log.Println(err)
  210. continue
  211. }
  212. parserSet = true
  213. }
  214. logEntry, err := p.Parse(l)
  215. if err != nil {
  216. log.Println(err)
  217. continue
  218. }
  219. remote := logEntry.Host
  220. if *useXForwardedAsSource && logEntry.ForwardedFor != "" {
  221. remote = logEntry.ForwardedFor
  222. }
  223. // only use the first host in case there are multiple hosts in the log
  224. if cidx := strings.Index(remote, ","); cidx >= 0 {
  225. remote = remote[0:cidx]
  226. }
  227. // extract the virtual host
  228. var virtualHost string
  229. vhost := logEntry.VirtualHost
  230. if vhost != "" {
  231. vhostAndPort := strings.Split(logEntry.VirtualHost, ":")
  232. virtualHost = vhostAndPort[0]
  233. }
  234. request := data.Request{
  235. IpSrc: remote,
  236. IpDst: "127.0.0.1",
  237. PortSrc: 0,
  238. PortDst: 0,
  239. TcpSeq: 0,
  240. CreatedAt: logEntry.Time.UnixNano(),
  241. Url: logEntry.RequestURI,
  242. Method: logEntry.Method,
  243. Host: virtualHost,
  244. Protocol: logEntry.Protocol,
  245. Origin: remote,
  246. Source: remote,
  247. Referer: logEntry.Referer,
  248. XForwardedFor: logEntry.ForwardedFor,
  249. UserAgent: logEntry.UserAgent,
  250. }
  251. if config.Trace {
  252. log.Printf("[%s] %s\n", request.Source, request.Url)
  253. }
  254. if natsIsUnavailable != true {
  255. if err := natsEC.Publish(config.NatsQueue, &request); err != nil {
  256. if err == nats.ErrConnectionClosed {
  257. natsIsUnavailable = true
  258. natsClosedChan <- true
  259. }
  260. }
  261. }
  262. }
  263. }
  264. func liveCapture() {
  265. ipPriv = ip.NewIP()
  266. // PCAP setup
  267. //
  268. handle, err := pcap.OpenLive(config.Interface, int32(config.SnapshotLen), config.Promiscuous, timeout)
  269. if err != nil {
  270. log.Fatal(err)
  271. }
  272. defer handle.Close()
  273. err = handle.SetBPFFilter(config.Filter)
  274. if err != nil {
  275. log.Fatal(err)
  276. }
  277. packetSource := gopacket.NewPacketSource(handle, handle.LinkType())
  278. for packet := range packetSource.Packets() {
  279. go processPacket(packet)
  280. }
  281. }
  282. // processPacket receives a raw packet from pcap, builds a Request item from it and sends it to the queue
  283. func processPacket(packet gopacket.Packet) {
  284. hasIPv4 := false
  285. var ipSrc, ipDst string
  286. // IPv4
  287. if ipLayer := packet.Layer(layers.LayerTypeIPv4); ipLayer != nil {
  288. ip := ipLayer.(*layers.IPv4)
  289. ipSrc = ip.SrcIP.String()
  290. ipDst = ip.DstIP.String()
  291. hasIPv4 = true
  292. }
  293. // IPv6
  294. if !hasIPv4 {
  295. if ipLayer := packet.Layer(layers.LayerTypeIPv6); ipLayer != nil {
  296. ip := ipLayer.(*layers.IPv6)
  297. ipSrc = ip.SrcIP.String()
  298. ipDst = ip.DstIP.String()
  299. }
  300. }
  301. // TCP
  302. tcpLayer := packet.Layer(layers.LayerTypeTCP)
  303. if tcpLayer == nil {
  304. return
  305. }
  306. tcp, _ := tcpLayer.(*layers.TCP)
  307. portSrc := tcp.SrcPort
  308. portDst := tcp.DstPort
  309. sequence := tcp.Seq
  310. applicationLayer := packet.ApplicationLayer()
  311. if applicationLayer == nil {
  312. return
  313. }
  314. count++
  315. if len(applicationLayer.Payload()) < 50 {
  316. log.Println("application layer too small!")
  317. return
  318. }
  319. request := data.Request{
  320. IpSrc: ipSrc,
  321. IpDst: ipDst,
  322. PortSrc: uint32(portSrc),
  323. PortDst: uint32(portDst),
  324. TcpSeq: uint32(sequence),
  325. CreatedAt: packet.Metadata().CaptureInfo.Timestamp.UnixNano(),
  326. }
  327. switch config.Protocol {
  328. case "http":
  329. err := processHTTP(&request, applicationLayer.Payload())
  330. if err != nil {
  331. log.Println(err)
  332. return
  333. }
  334. case "ajp13":
  335. err := processAJP13(&request, applicationLayer.Payload())
  336. if err != nil {
  337. log.Println(err)
  338. return
  339. }
  340. }
  341. if config.UseXForwardedAsSource && request.XForwardedFor != "" {
  342. if strings.Contains(request.XForwardedFor, ",") {
  343. ips := strings.Split(request.XForwardedFor, ",")
  344. for i := len(ips) - 1; i >= 0; i-- {
  345. ipRaw := strings.TrimSpace(ips[i])
  346. ipAddr := net.ParseIP(ipRaw)
  347. if ipAddr != nil && !ipPriv.IsPrivate(ipAddr) {
  348. request.Source = ipRaw
  349. break
  350. }
  351. }
  352. } else {
  353. ipAddr := net.ParseIP(strings.TrimSpace(request.XForwardedFor))
  354. if !ipPriv.IsPrivate(ipAddr) {
  355. request.Source = request.XForwardedFor
  356. }
  357. }
  358. }
  359. if request.Source == request.IpSrc && request.XRealIP != "" {
  360. request.Source = request.XRealIP
  361. }
  362. if config.Trace {
  363. log.Printf("[%s] %s\n", request.Source, request.Url)
  364. }
  365. if natsIsUnavailable != true {
  366. if err := natsEC.Publish(config.NatsQueue, &request); err != nil {
  367. if err == nats.ErrConnectionClosed {
  368. natsIsUnavailable = true
  369. natsClosedChan <- true
  370. }
  371. }
  372. }
  373. }
  374. func processAJP13(request *data.Request, appData []byte) error {
  375. a, err := ajp13.Parse(appData)
  376. if err != nil {
  377. return fmt.Errorf("Failed to parse AJP13 request: %s", err)
  378. }
  379. request.Url = a.URI
  380. request.Method = a.Method()
  381. request.Host = a.Server
  382. request.Protocol = a.Version
  383. request.Origin = a.RemoteAddr.String()
  384. request.Source = a.RemoteAddr.String()
  385. if v, ok := a.Header("Referer"); ok {
  386. request.Referer = v
  387. }
  388. if v, ok := a.Header("Connection"); ok {
  389. request.Connection = v
  390. }
  391. if v, ok := a.Header("X-Forwarded-For"); ok {
  392. request.XForwardedFor = v
  393. }
  394. if v, ok := a.Header("X-Real-IP"); ok {
  395. request.XRealIP = v
  396. }
  397. if v, ok := a.Header("X-Requested-With"); ok {
  398. request.XRequestedWith = v
  399. }
  400. if v, ok := a.Header("Accept-Encoding"); ok {
  401. request.AcceptEncoding = v
  402. }
  403. if v, ok := a.Header("Accept-Language"); ok {
  404. request.AcceptLanguage = v
  405. }
  406. if v, ok := a.Header("User-Agent"); ok {
  407. request.UserAgent = v
  408. }
  409. if v, ok := a.Header("Accept"); ok {
  410. request.Accept = v
  411. }
  412. if v, ok := a.Header("Cookie"); ok {
  413. request.Cookie = v
  414. }
  415. if v, ok := a.Header("X-Forwarded-Host"); ok {
  416. if v != request.Host {
  417. request.Host = v
  418. }
  419. }
  420. if false && config.Trace {
  421. log.Println("Request")
  422. pretty.Println(request)
  423. }
  424. return nil
  425. }
  426. func processHTTP(request *data.Request, appData []byte) error {
  427. reader := bufio.NewReader(strings.NewReader(string(appData)))
  428. req, err := http.ReadRequest(reader)
  429. if err != nil {
  430. return fmt.Errorf("Failed to parse HTTP header: %s", err)
  431. }
  432. request.Url = req.URL.String()
  433. request.Method = req.Method
  434. request.Referer = req.Referer()
  435. request.Host = req.Host
  436. request.Protocol = req.Proto
  437. request.Origin = request.Host
  438. if _, ok := req.Header["Connection"]; ok {
  439. request.Connection = req.Header["Connection"][0]
  440. }
  441. if _, ok := req.Header["X-Forwarded-For"]; ok {
  442. request.XForwardedFor = req.Header["X-Forwarded-For"][0]
  443. }
  444. // CloudFlare: override X-Forwarded for since it is tainted by cloudflare
  445. if _, ok := req.Header["True-Client-Ip"]; ok {
  446. request.XForwardedFor = req.Header["True-Client-Ip"][0]
  447. }
  448. if _, ok := req.Header["X-Real-Ip"]; ok {
  449. request.XRealIP = req.Header["X-Real-Ip"][0]
  450. }
  451. if _, ok := req.Header["X-Requested-With"]; ok {
  452. request.XRequestedWith = req.Header["X-Requested-With"][0]
  453. }
  454. if _, ok := req.Header["Accept-Encoding"]; ok {
  455. request.AcceptEncoding = req.Header["Accept-Encoding"][0]
  456. }
  457. if _, ok := req.Header["Accept-Language"]; ok {
  458. request.AcceptLanguage = req.Header["Accept-Language"][0]
  459. }
  460. if _, ok := req.Header["User-Agent"]; ok {
  461. request.UserAgent = req.Header["User-Agent"][0]
  462. }
  463. if _, ok := req.Header["Accept"]; ok {
  464. request.Accept = req.Header["Accept"][0]
  465. }
  466. if _, ok := req.Header["Cookie"]; ok {
  467. request.Cookie = req.Header["Cookie"][0]
  468. }
  469. request.Source = request.IpSrc
  470. return nil
  471. }
  472. // replayFile takes a file containing a list of requests (SourceIP Url) and queues the requests
  473. // e.g.
  474. // 157.55.39.229 /gross-gerau/12012260-beate-anstatt
  475. // 103.232.100.98 /weinsheim-eifel/13729444-plus-warenhandelsges-mbh
  476. func replayFile() {
  477. var req data.Request
  478. var startTs time.Time
  479. var endTs time.Time
  480. rand.Seed(time.Now().UnixNano())
  481. for {
  482. fh, err := os.Open(config.RequestsFile)
  483. if err != nil {
  484. log.Fatalf("Failed to open request file '%s': %s", config.RequestsFile, err)
  485. }
  486. c := csv.NewReader(fh)
  487. c.Comma = ' '
  488. for {
  489. if config.SleepFor.Duration > time.Nanosecond {
  490. startTs = time.Now()
  491. }
  492. r, err := c.Read()
  493. if err == io.EOF {
  494. break
  495. }
  496. if err != nil {
  497. log.Println(err)
  498. continue
  499. }
  500. req.IpSrc = r[0]
  501. req.Source = r[0]
  502. req.Url = r[1]
  503. req.UserAgent = "Munch/1.0"
  504. req.Host = "demo.scraperwall.com"
  505. req.CreatedAt = time.Now().UnixNano()
  506. if natsIsUnavailable != true {
  507. if err := natsEC.Publish(config.NatsQueue, &req); err != nil {
  508. if err == nats.ErrConnectionClosed {
  509. natsIsUnavailable = true
  510. natsClosedChan <- true
  511. }
  512. }
  513. }
  514. if strings.Index(r[1], ".") < 0 {
  515. hash := sha1.New()
  516. io.WriteString(hash, r[0])
  517. fp := data.Fingerprint{
  518. ClientID: "scw",
  519. Fingerprint: fmt.Sprintf("%x", hash.Sum(nil)),
  520. Remote: r[0],
  521. Url: r[1],
  522. Source: r[0],
  523. CreatedAt: time.Now(),
  524. }
  525. if strings.HasPrefix(r[0], "50.31.") {
  526. fp.Fingerprint = "a1f2c2ee560ce6580d66d451a9c8dfbf"
  527. natsJsonEC.Publish("fingerprints_scw", fp)
  528. } else if rand.Intn(10) < 5 {
  529. natsJsonEC.Publish("fingerprints_scw", fp)
  530. }
  531. }
  532. count++
  533. if config.SleepFor.Duration >= time.Nanosecond {
  534. endTs = time.Now()
  535. if endTs.Before(startTs.Add(config.SleepFor.Duration)) {
  536. time.Sleep(config.SleepFor.Duration - endTs.Sub(startTs))
  537. }
  538. }
  539. }
  540. }
  541. }
  542. func loadConfig() {
  543. // initialize with values from the command line / environment
  544. config.Live = *doLiveCapture
  545. config.Interface = *iface
  546. config.SnapshotLen = *snapshotLen
  547. config.Filter = *filter
  548. config.Promiscuous = *promiscuous
  549. config.NatsURL = *natsURL
  550. config.NatsQueue = *natsQueue
  551. config.NatsUser = *natsUser
  552. config.NatsPassword = *natsPassword
  553. config.NatsCA = *natsCA
  554. config.SleepFor.Duration = *sleepFor
  555. config.RequestsFile = *requestsFile
  556. config.UseXForwardedAsSource = *useXForwardedAsSource
  557. config.Protocol = *protocol
  558. config.ApacheLog = *apacheLog
  559. config.Quiet = *beQuiet
  560. config.Trace = *trace
  561. if *configFile == "" {
  562. return
  563. }
  564. _, err := os.Stat(*configFile)
  565. if err != nil {
  566. log.Printf("%s: %s\n", *configFile, err)
  567. return
  568. }
  569. if _, err = toml.DecodeFile(*configFile, &config); err != nil {
  570. log.Printf("%s: %s\n", *configFile, err)
  571. }
  572. if !config.Quiet {
  573. config.print()
  574. }
  575. }
  576. // version outputs build information
  577. func version() {
  578. fmt.Printf("munchclient %s, built on %s\n", Version, BuildDate)
  579. }