main.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681
  1. package main
  2. import (
  3. "bufio"
  4. "crypto/sha1"
  5. "encoding/csv"
  6. "flag"
  7. "fmt"
  8. "io"
  9. "log"
  10. "math/rand"
  11. "net"
  12. "net/http"
  13. "os"
  14. "strings"
  15. "time"
  16. "github.com/BurntSushi/toml"
  17. "github.com/Songmu/axslogparser"
  18. "github.com/google/gopacket"
  19. "github.com/google/gopacket/layers"
  20. "github.com/google/gopacket/pcap"
  21. "github.com/hpcloud/tail"
  22. "github.com/nats-io/nats"
  23. "github.com/nats-io/nats/encoders/protobuf"
  24. "git.scraperwall.com/scw/ajp13"
  25. "git.scraperwall.com/scw/data"
  26. "git.scraperwall.com/scw/ip"
  27. )
  28. var (
  29. doLiveCapture = flag.Bool("live", false, "Capture data in real time from a given interface")
  30. iface = flag.String("interface", "eth0", "Interface to get packets from")
  31. snapshotLen = flag.Int("snapshot-len", 8192, "Snapshot Length in Bytes")
  32. filter = flag.String("filter", "tcp", "PCAP filter expression")
  33. promiscuous = flag.Bool("promiscuous", false, "Switch interface into promiscuous mode?")
  34. natsURL = flag.String("nats-url", "nats://127.0.0.1:4222", "The URL of the NATS server")
  35. natsUser = flag.String("nats-user", "", "The user for NATS authentication")
  36. natsPassword = flag.String("nats-password", "", "The password for NATS authentication")
  37. natsQueue = flag.String("nats-queue", "requests", "The NATS queue name")
  38. natsCA = flag.String("nats-ca", "", "CA chain for NATS TLS")
  39. sleepFor = flag.Duration("sleep", 0, "Sleep this long between sending data (only when replaying a file)")
  40. requestsFile = flag.String("requests", "", "CSV file containing requests (IP and URL)")
  41. protocol = flag.String("protocol", "http", "which protocol to parse: http or ajp13")
  42. useXForwardedAsSource = flag.Bool("use-x-forwarded", false, "Use the IP address in X-Forwarded-For as source")
  43. trace = flag.Bool("trace", false, "Trace the packet capturing")
  44. apacheLog = flag.String("apache-log", "", "Parse an Apache Log file")
  45. configFile = flag.String("config", "", "The location of the TOML config file")
  46. beQuiet = flag.Bool("quiet", true, "Be quiet")
  47. doVersion = flag.Bool("version", false, "Show version information")
  48. natsEC *nats.EncodedConn
  49. natsJsonEC *nats.EncodedConn
  50. natsErrorChan chan error
  51. natsIsAvailable bool
  52. count uint64
  53. timeout = -1 * time.Second
  54. ipPriv *ip.IP
  55. config Config
  56. // Version contains the program Version, e.g. 1.0.1
  57. Version string
  58. // BuildDate contains the date and time at which the program was compiled
  59. BuildDate string
  60. )
  61. // Config contains the program configuration
  62. type Config struct {
  63. Live bool
  64. Interface string
  65. SnapshotLen int
  66. Filter string
  67. Promiscuous bool
  68. NatsURL string
  69. NatsQueue string
  70. NatsUser string
  71. NatsPassword string
  72. NatsCA string
  73. SleepFor duration
  74. RequestsFile string
  75. UseXForwardedAsSource bool
  76. Quiet bool
  77. Protocol string
  78. Trace bool
  79. ApacheLog string
  80. }
  81. type duration struct {
  82. time.Duration
  83. }
  84. func (d *duration) UnmarshalText(text []byte) error {
  85. var err error
  86. d.Duration, err = time.ParseDuration(string(text))
  87. return err
  88. }
  89. func (c Config) print() {
  90. fmt.Printf("Live: %t\n", c.Live)
  91. fmt.Printf("Interface: %s\n", c.Interface)
  92. fmt.Printf("SnapshotLen: %d\n", c.SnapshotLen)
  93. fmt.Printf("Filter: %s\n", c.Filter)
  94. fmt.Printf("Promiscuous: %t\n", c.Promiscuous)
  95. fmt.Printf("NatsURL: %s\n", c.NatsURL)
  96. fmt.Printf("NatsQueue: %s\n", c.NatsQueue)
  97. fmt.Printf("NatsUser: %s\n", c.NatsUser)
  98. fmt.Printf("NatsPassword: %s\n", c.NatsPassword)
  99. fmt.Printf("NatsCA: %s\n", c.NatsCA)
  100. fmt.Printf("SleepFor: %s\n", c.SleepFor.String())
  101. fmt.Printf("RequestsFile: %s\n", c.RequestsFile)
  102. fmt.Printf("Apache Log: %s\n", c.ApacheLog)
  103. fmt.Printf("UseXForwardedAsSource: %t\n", c.UseXForwardedAsSource)
  104. fmt.Printf("Protocol: %s\n", c.Protocol)
  105. fmt.Printf("Quiet: %t\n", c.Quiet)
  106. fmt.Printf("Trace: %t\n", c.Trace)
  107. }
  108. func init() {
  109. flag.Parse()
  110. nats.RegisterEncoder(protobuf.PROTOBUF_ENCODER, &protobuf.ProtobufEncoder{})
  111. }
  112. func main() {
  113. if *doVersion {
  114. version()
  115. os.Exit(0)
  116. }
  117. loadConfig()
  118. // Output how many requests per second were sent
  119. if !config.Quiet {
  120. go func(c *uint64) {
  121. for {
  122. fmt.Printf("%d requests per second\n", *c)
  123. *c = 0
  124. time.Sleep(time.Second)
  125. }
  126. }(&count)
  127. }
  128. // NATS
  129. //
  130. if config.NatsURL == "" {
  131. log.Fatal("No NATS URL specified (-nats-url)!")
  132. }
  133. natsIsAvailable = false
  134. natsErrorChan = make(chan error, 1)
  135. err := connectToNATS()
  136. if err != nil {
  137. log.Fatal(err)
  138. }
  139. go natsWatchdog(natsErrorChan)
  140. // What should I do?
  141. if config.RequestsFile != "" {
  142. replayFile()
  143. } else if config.ApacheLog != "" {
  144. apacheLogCapture(config.ApacheLog)
  145. } else if config.Live {
  146. fmt.Printf("live capture (%s, %s) to %s\n", config.Interface, config.Filter, config.NatsURL)
  147. liveCapture()
  148. }
  149. }
  150. func natsWatchdog(closedChan chan error) {
  151. var lastError error
  152. for err := range closedChan {
  153. if lastError != err {
  154. lastError = err
  155. log.Println(err)
  156. }
  157. if err != nats.ErrConnectionClosed {
  158. continue
  159. }
  160. RECONNECT:
  161. for {
  162. log.Printf("Reconnecting to NATS at %s\n", *natsURL)
  163. err := connectToNATS()
  164. if err == nil {
  165. break RECONNECT
  166. }
  167. time.Sleep(1 * time.Second)
  168. }
  169. }
  170. }
  171. func connectToNATS() error {
  172. var natsConn *nats.Conn
  173. var err error
  174. if config.NatsUser != "" && config.NatsPassword != "" && config.NatsCA != "" {
  175. natsConn, err = nats.Connect(config.NatsURL, nats.UserInfo(config.NatsUser, config.NatsPassword), nats.RootCAs(config.NatsCA))
  176. } else {
  177. if config.NatsPassword != "" && config.NatsUser != "" {
  178. natsConn, err = nats.Connect(config.NatsURL, nats.UserInfo(config.NatsUser, config.NatsPassword))
  179. } else {
  180. natsConn, err = nats.Connect(config.NatsURL)
  181. }
  182. }
  183. if err != nil {
  184. return err
  185. }
  186. natsEC, err = nats.NewEncodedConn(natsConn, protobuf.PROTOBUF_ENCODER)
  187. if err != nil {
  188. return fmt.Errorf("Encoded Connection: %v", err)
  189. }
  190. natsJsonEC, err = nats.NewEncodedConn(natsConn, nats.JSON_ENCODER)
  191. if err != nil {
  192. return fmt.Errorf("Encoded Connection: %v", err)
  193. }
  194. natsIsAvailable = true
  195. return nil
  196. }
  197. func apacheLogCapture(logfile string) {
  198. if _, err := os.Stat(logfile); err != nil {
  199. log.Fatalf("%s: %s", logfile, err)
  200. }
  201. t, err := tail.TailFile(logfile, tail.Config{
  202. Follow: true, // follow the file
  203. ReOpen: true, // reopen log file when it gets closed/rotated
  204. Logger: tail.DiscardingLogger, // don't log anything
  205. Location: &tail.SeekInfo{Offset: 0, Whence: 2}, // start at the end of the file
  206. })
  207. if err != nil {
  208. log.Fatalf("%s: %s", logfile, err)
  209. }
  210. var p axslogparser.Parser
  211. parserSet := false
  212. for line := range t.Lines {
  213. l := line.Text
  214. if !parserSet {
  215. p, _, err = axslogparser.GuessParser(l)
  216. if err != nil {
  217. log.Println(err)
  218. continue
  219. }
  220. parserSet = true
  221. }
  222. logEntry, err := p.Parse(l)
  223. if err != nil {
  224. log.Println(err)
  225. continue
  226. }
  227. remote := logEntry.Host
  228. if *useXForwardedAsSource && logEntry.ForwardedFor != "" {
  229. remote = logEntry.ForwardedFor
  230. }
  231. // only use the first host in case there are multiple hosts in the log
  232. if cidx := strings.Index(remote, ","); cidx >= 0 {
  233. remote = remote[0:cidx]
  234. }
  235. // extract the virtual host
  236. var virtualHost string
  237. vhost := logEntry.VirtualHost
  238. if vhost != "" {
  239. vhostAndPort := strings.Split(logEntry.VirtualHost, ":")
  240. virtualHost = vhostAndPort[0]
  241. }
  242. request := data.Request{
  243. IpSrc: remote,
  244. IpDst: "127.0.0.1",
  245. PortSrc: 0,
  246. PortDst: 0,
  247. TcpSeq: 0,
  248. CreatedAt: logEntry.Time.UnixNano(),
  249. Url: logEntry.RequestURI,
  250. Method: logEntry.Method,
  251. Host: virtualHost,
  252. Protocol: logEntry.Protocol,
  253. Origin: remote,
  254. Source: remote,
  255. Referer: logEntry.Referer,
  256. XForwardedFor: logEntry.ForwardedFor,
  257. UserAgent: logEntry.UserAgent,
  258. }
  259. if config.Trace {
  260. log.Printf("[%s] %s\n", request.Source, request.Url)
  261. }
  262. publishRequest(config.NatsQueue, &request)
  263. }
  264. }
  265. func liveCapture() {
  266. ipPriv = ip.NewIP()
  267. // PCAP setup
  268. //
  269. handle, err := pcap.OpenLive(config.Interface, int32(config.SnapshotLen), config.Promiscuous, timeout)
  270. if err != nil {
  271. log.Fatal(err)
  272. }
  273. defer handle.Close()
  274. err = handle.SetBPFFilter(config.Filter)
  275. if err != nil {
  276. log.Fatal(err)
  277. }
  278. packetSource := gopacket.NewPacketSource(handle, handle.LinkType())
  279. for packet := range packetSource.Packets() {
  280. go processPacket(packet)
  281. }
  282. }
  283. func publishRequest(queue string, request *data.Request) {
  284. if !natsIsAvailable {
  285. return
  286. }
  287. if err := natsEC.Publish(config.NatsQueue, request); err != nil {
  288. natsErrorChan <- err
  289. if err == nats.ErrConnectionClosed {
  290. natsIsAvailable = false
  291. }
  292. }
  293. }
  294. // processPacket receives a raw packet from pcap, builds a Request item from it and sends it to the queue
  295. func processPacket(packet gopacket.Packet) {
  296. hasIPv4 := false
  297. var ipSrc, ipDst string
  298. // IPv4
  299. if ipLayer := packet.Layer(layers.LayerTypeIPv4); ipLayer != nil {
  300. ip := ipLayer.(*layers.IPv4)
  301. ipSrc = ip.SrcIP.String()
  302. ipDst = ip.DstIP.String()
  303. hasIPv4 = true
  304. }
  305. // IPv6
  306. if !hasIPv4 {
  307. if ipLayer := packet.Layer(layers.LayerTypeIPv6); ipLayer != nil {
  308. ip := ipLayer.(*layers.IPv6)
  309. ipSrc = ip.SrcIP.String()
  310. ipDst = ip.DstIP.String()
  311. }
  312. }
  313. // TCP
  314. tcpLayer := packet.Layer(layers.LayerTypeTCP)
  315. if tcpLayer == nil {
  316. return
  317. }
  318. tcp, _ := tcpLayer.(*layers.TCP)
  319. portSrc := tcp.SrcPort
  320. portDst := tcp.DstPort
  321. sequence := tcp.Seq
  322. applicationLayer := packet.ApplicationLayer()
  323. if applicationLayer == nil {
  324. return
  325. }
  326. count++
  327. if len(applicationLayer.Payload()) < 50 {
  328. log.Println("application layer too small!")
  329. return
  330. }
  331. request := data.Request{
  332. IpSrc: ipSrc,
  333. IpDst: ipDst,
  334. PortSrc: uint32(portSrc),
  335. PortDst: uint32(portDst),
  336. TcpSeq: uint32(sequence),
  337. CreatedAt: packet.Metadata().CaptureInfo.Timestamp.UnixNano(),
  338. }
  339. switch config.Protocol {
  340. case "http":
  341. err := processHTTP(&request, applicationLayer.Payload())
  342. if err != nil {
  343. log.Println(err)
  344. return
  345. }
  346. case "ajp13":
  347. err := processAJP13(&request, applicationLayer.Payload())
  348. if err != nil {
  349. log.Println(err)
  350. return
  351. }
  352. }
  353. if config.UseXForwardedAsSource && request.XForwardedFor != "" {
  354. if strings.Contains(request.XForwardedFor, ",") {
  355. ips := strings.Split(request.XForwardedFor, ",")
  356. for i := len(ips) - 1; i >= 0; i-- {
  357. ipRaw := strings.TrimSpace(ips[i])
  358. ipAddr := net.ParseIP(ipRaw)
  359. if ipAddr != nil && !ipPriv.IsPrivate(ipAddr) {
  360. request.Source = ipRaw
  361. break
  362. }
  363. }
  364. } else {
  365. ipAddr := net.ParseIP(strings.TrimSpace(request.XForwardedFor))
  366. if !ipPriv.IsPrivate(ipAddr) {
  367. request.Source = request.XForwardedFor
  368. }
  369. }
  370. }
  371. if request.Source == request.IpSrc && request.XRealIP != "" {
  372. request.Source = request.XRealIP
  373. }
  374. if config.Trace {
  375. log.Printf("[%s] %s\n", request.Source, request.Url)
  376. }
  377. publishRequest(config.NatsQueue, &request)
  378. }
  379. func processAJP13(request *data.Request, appData []byte) error {
  380. a, err := ajp13.Parse(appData)
  381. if err != nil {
  382. return fmt.Errorf("Failed to parse AJP13 request: %s", err)
  383. }
  384. request.Url = a.URI
  385. request.Method = a.Method()
  386. request.Host = a.Server
  387. request.Protocol = a.Version
  388. request.Origin = a.RemoteAddr.String()
  389. request.Source = a.RemoteAddr.String()
  390. if v, ok := a.Header("Referer"); ok {
  391. request.Referer = v
  392. }
  393. if v, ok := a.Header("Connection"); ok {
  394. request.Connection = v
  395. }
  396. if v, ok := a.Header("X-Forwarded-For"); ok {
  397. request.XForwardedFor = v
  398. }
  399. if v, ok := a.Header("X-Real-IP"); ok {
  400. request.XRealIP = v
  401. }
  402. if v, ok := a.Header("X-Requested-With"); ok {
  403. request.XRequestedWith = v
  404. }
  405. if v, ok := a.Header("Accept-Encoding"); ok {
  406. request.AcceptEncoding = v
  407. }
  408. if v, ok := a.Header("Accept-Language"); ok {
  409. request.AcceptLanguage = v
  410. }
  411. if v, ok := a.Header("User-Agent"); ok {
  412. request.UserAgent = v
  413. }
  414. if v, ok := a.Header("Accept"); ok {
  415. request.Accept = v
  416. }
  417. if v, ok := a.Header("Cookie"); ok {
  418. request.Cookie = v
  419. }
  420. if v, ok := a.Header("X-Forwarded-Host"); ok {
  421. if v != request.Host {
  422. request.Host = v
  423. }
  424. }
  425. return nil
  426. }
  427. func processHTTP(request *data.Request, appData []byte) error {
  428. reader := bufio.NewReader(strings.NewReader(string(appData)))
  429. req, err := http.ReadRequest(reader)
  430. if err != nil {
  431. return fmt.Errorf("Failed to parse HTTP header: %s", err)
  432. }
  433. request.Url = req.URL.String()
  434. request.Method = req.Method
  435. request.Referer = req.Referer()
  436. request.Host = req.Host
  437. request.Protocol = req.Proto
  438. request.Origin = request.Host
  439. if _, ok := req.Header["Connection"]; ok {
  440. request.Connection = req.Header["Connection"][0]
  441. }
  442. if _, ok := req.Header["X-Forwarded-For"]; ok {
  443. request.XForwardedFor = req.Header["X-Forwarded-For"][0]
  444. }
  445. // CloudFlare: override X-Forwarded for since it is tainted by cloudflare
  446. if _, ok := req.Header["True-Client-Ip"]; ok {
  447. request.XForwardedFor = req.Header["True-Client-Ip"][0]
  448. }
  449. if _, ok := req.Header["X-Real-Ip"]; ok {
  450. request.XRealIP = req.Header["X-Real-Ip"][0]
  451. }
  452. if _, ok := req.Header["X-Requested-With"]; ok {
  453. request.XRequestedWith = req.Header["X-Requested-With"][0]
  454. }
  455. if _, ok := req.Header["Accept-Encoding"]; ok {
  456. request.AcceptEncoding = req.Header["Accept-Encoding"][0]
  457. }
  458. if _, ok := req.Header["Accept-Language"]; ok {
  459. request.AcceptLanguage = req.Header["Accept-Language"][0]
  460. }
  461. if _, ok := req.Header["User-Agent"]; ok {
  462. request.UserAgent = req.Header["User-Agent"][0]
  463. }
  464. if _, ok := req.Header["Accept"]; ok {
  465. request.Accept = req.Header["Accept"][0]
  466. }
  467. if _, ok := req.Header["Cookie"]; ok {
  468. request.Cookie = req.Header["Cookie"][0]
  469. }
  470. request.Source = request.IpSrc
  471. return nil
  472. }
  473. // replayFile takes a file containing a list of requests (SourceIP Url) and queues the requests
  474. // e.g.
  475. // 157.55.39.229 /gross-gerau/12012260-beate-anstatt
  476. // 103.232.100.98 /weinsheim-eifel/13729444-plus-warenhandelsges-mbh
  477. func replayFile() {
  478. var req data.Request
  479. var startTs time.Time
  480. var endTs time.Time
  481. rand.Seed(time.Now().UnixNano())
  482. for {
  483. fh, err := os.Open(config.RequestsFile)
  484. if err != nil {
  485. log.Fatalf("Failed to open request file '%s': %s", config.RequestsFile, err)
  486. }
  487. c := csv.NewReader(fh)
  488. c.Comma = ' '
  489. for {
  490. if config.SleepFor.Duration > time.Nanosecond {
  491. startTs = time.Now()
  492. }
  493. r, err := c.Read()
  494. if err == io.EOF {
  495. break
  496. }
  497. if err != nil {
  498. log.Println(err)
  499. continue
  500. }
  501. req.IpSrc = r[0]
  502. req.Source = r[0]
  503. req.Url = r[1]
  504. req.UserAgent = "Munch/1.0"
  505. req.Host = "demo.scraperwall.com"
  506. req.CreatedAt = time.Now().UnixNano()
  507. publishRequest(config.NatsQueue, &req)
  508. if strings.Index(r[1], ".") < 0 {
  509. hash := sha1.New()
  510. io.WriteString(hash, r[0])
  511. fp := data.Fingerprint{
  512. ClientID: "scw",
  513. Fingerprint: fmt.Sprintf("%x", hash.Sum(nil)),
  514. Remote: r[0],
  515. Url: r[1],
  516. Source: r[0],
  517. CreatedAt: time.Now(),
  518. }
  519. if strings.HasPrefix(r[0], "50.31.") {
  520. fp.Fingerprint = "a1f2c2ee560ce6580d66d451a9c8dfbf"
  521. natsJsonEC.Publish("fingerprints_scw", fp)
  522. } else if rand.Intn(10) < 5 {
  523. natsJsonEC.Publish("fingerprints_scw", fp)
  524. }
  525. }
  526. count++
  527. if config.SleepFor.Duration >= time.Nanosecond {
  528. endTs = time.Now()
  529. if endTs.Before(startTs.Add(config.SleepFor.Duration)) {
  530. time.Sleep(config.SleepFor.Duration - endTs.Sub(startTs))
  531. }
  532. }
  533. }
  534. }
  535. }
  536. func loadConfig() {
  537. // initialize with values from the command line / environment
  538. config.Live = *doLiveCapture
  539. config.Interface = *iface
  540. config.SnapshotLen = *snapshotLen
  541. config.Filter = *filter
  542. config.Promiscuous = *promiscuous
  543. config.NatsURL = *natsURL
  544. config.NatsQueue = *natsQueue
  545. config.NatsUser = *natsUser
  546. config.NatsPassword = *natsPassword
  547. config.NatsCA = *natsCA
  548. config.SleepFor.Duration = *sleepFor
  549. config.RequestsFile = *requestsFile
  550. config.UseXForwardedAsSource = *useXForwardedAsSource
  551. config.Protocol = *protocol
  552. config.ApacheLog = *apacheLog
  553. config.Quiet = *beQuiet
  554. config.Trace = *trace
  555. if *configFile == "" {
  556. return
  557. }
  558. _, err := os.Stat(*configFile)
  559. if err != nil {
  560. log.Printf("%s: %s\n", *configFile, err)
  561. return
  562. }
  563. if _, err = toml.DecodeFile(*configFile, &config); err != nil {
  564. log.Printf("%s: %s\n", *configFile, err)
  565. }
  566. if !config.Quiet {
  567. config.print()
  568. }
  569. }
  570. // version outputs build information
  571. func version() {
  572. fmt.Printf("munchclient %s, built on %s\n", Version, BuildDate)
  573. }