main.go 22 KB


  1. package main
  2. import (
  3. "bufio"
  4. "bytes"
  5. "crypto/sha1"
  6. "encoding/csv"
  7. "encoding/json"
  8. "flag"
  9. "fmt"
  10. "io"
  11. "log"
  12. "math/rand"
  13. "net"
  14. "net/http"
  15. "os"
  16. "regexp"
  17. "strings"
  18. "time"
  19. "github.com/BurntSushi/toml"
  20. "github.com/Songmu/axslogparser"
  21. "github.com/google/gopacket"
  22. "github.com/google/gopacket/layers"
  23. "github.com/google/gopacket/pcap"
  24. "github.com/hpcloud/tail"
  25. "github.com/kr/pretty"
  26. "github.com/nats-io/nats"
  27. "github.com/nats-io/nats/encoders/protobuf"
  28. "github.com/satyrius/gonx"
  29. "git.scraperwall.com/scw/ajp13"
  30. "git.scraperwall.com/scw/data"
  31. "git.scraperwall.com/scw/ip"
  32. )
  33. var (
  34. doLiveCapture = flag.Bool("live", false, "Capture data in real time from a given interface")
  35. iface = flag.String("interface", "eth0", "Interface to get packets from")
  36. snapshotLen = flag.Int("snapshot-len", 8192, "Snapshot Length in Bytes")
  37. filter = flag.String("filter", "tcp", "PCAP filter expression")
  38. promiscuous = flag.Bool("promiscuous", false, "Switch interface into promiscuous mode?")
  39. natsURL = flag.String("nats-url", "nats://127.0.0.1:4222", "The URL of the NATS server")
  40. natsUser = flag.String("nats-user", "", "The user for NATS authentication")
  41. natsPassword = flag.String("nats-password", "", "The password for NATS authentication")
  42. natsQueue = flag.String("nats-queue", "requests", "The NATS queue name")
  43. natsCA = flag.String("nats-ca", "", "CA chain for NATS TLS")
  44. sleepFor = flag.Duration("sleep", 0, "Sleep this long between sending data (only when replaying a file)")
  45. requestsFile = flag.String("requests", "", "CSV file containing requests (IP and URL)")
  46. protocol = flag.String("protocol", "http", "which protocol to parse: http or ajp13")
  47. useXForwardedAsSource = flag.Bool("use-x-forwarded", false, "Use the IP address in X-Forwarded-For as source")
  48. trace = flag.Bool("trace", false, "Trace the packet capturing")
  49. apacheLog = flag.String("apache-log", "", "Parse an Apache Log file")
  50. nginxLog = flag.String("nginx-log", "", "Nginx log file to tail")
  51. nginxFormat = flag.String("nginx-format", "", "The nginx log file format")
  52. hostName = flag.String("hostname", "", "Override the captured hostname with this one")
  53. accessWatchKey = flag.String("access-watch-key", "", "access.watch API key")
  54. configFile = flag.String("config", "", "The location of the TOML config file")
  55. beQuiet = flag.Bool("quiet", true, "Be quiet")
  56. doVersion = flag.Bool("version", false, "Show version information")
  57. natsEC *nats.EncodedConn
  58. natsJSONEC *nats.EncodedConn
  59. natsErrorChan chan error
  60. natsIsAvailable bool
  61. count uint64
  62. timeout = -1 * time.Second
  63. ipPriv *ip.IP
  64. config Config
  65. // Version contains the program Version, e.g. 1.0.1
  66. Version string
  67. // BuildDate contains the date and time at which the program was compiled
  68. BuildDate string
  69. )
  70. // Config contains the program configuration
  71. type Config struct {
  72. Live bool
  73. Interface string
  74. SnapshotLen int
  75. Filter string
  76. Promiscuous bool
  77. NatsURL string
  78. NatsQueue string
  79. NatsUser string
  80. NatsPassword string
  81. NatsCA string
  82. SleepFor duration
  83. RequestsFile string
  84. UseXForwardedAsSource bool
  85. Quiet bool
  86. Protocol string
  87. Trace bool
  88. ApacheLog string
  89. NginxLog string
  90. NginxLogFormat string
  91. HostName string
  92. AccessWatchKey string
  93. }
  94. type duration struct {
  95. time.Duration
  96. }
  97. func (d *duration) UnmarshalText(text []byte) error {
  98. var err error
  99. d.Duration, err = time.ParseDuration(string(text))
  100. return err
  101. }
  102. func (c Config) print() {
  103. fmt.Printf("Live: %t\n", c.Live)
  104. fmt.Printf("Interface: %s\n", c.Interface)
  105. fmt.Printf("SnapshotLen: %d\n", c.SnapshotLen)
  106. fmt.Printf("Filter: %s\n", c.Filter)
  107. fmt.Printf("Promiscuous: %t\n", c.Promiscuous)
  108. fmt.Printf("NatsURL: %s\n", c.NatsURL)
  109. fmt.Printf("NatsQueue: %s\n", c.NatsQueue)
  110. fmt.Printf("NatsUser: %s\n", c.NatsUser)
  111. fmt.Printf("NatsPassword: %s\n", c.NatsPassword)
  112. fmt.Printf("NatsCA: %s\n", c.NatsCA)
  113. fmt.Printf("SleepFor: %s\n", c.SleepFor.String())
  114. fmt.Printf("RequestsFile: %s\n", c.RequestsFile)
  115. fmt.Printf("Apache Log: %s\n", c.ApacheLog)
  116. fmt.Printf("Nginx Log: %s\n", c.NginxLog)
  117. fmt.Printf("Nginx Log Format: %s\n", c.NginxLogFormat)
  118. fmt.Printf("HostName: %s\n", c.HostName)
  119. fmt.Printf("AccessWatchKey: %s\n", c.AccessWatchKey)
  120. fmt.Printf("UseXForwardedAsSource: %t\n", c.UseXForwardedAsSource)
  121. fmt.Printf("Protocol: %s\n", c.Protocol)
  122. fmt.Printf("Quiet: %t\n", c.Quiet)
  123. fmt.Printf("Trace: %t\n", c.Trace)
  124. }
  125. func init() {
  126. flag.Parse()
  127. nats.RegisterEncoder(protobuf.PROTOBUF_ENCODER, &protobuf.ProtobufEncoder{})
  128. }
  129. func main() {
  130. if *doVersion {
  131. version()
  132. os.Exit(0)
  133. }
  134. loadConfig()
  135. // Output how many requests per second were sent
  136. if !config.Quiet {
  137. go func(c *uint64) {
  138. for {
  139. fmt.Printf("%d requests per second\n", *c)
  140. *c = 0
  141. time.Sleep(time.Second)
  142. }
  143. }(&count)
  144. }
  145. // NATS
  146. //
  147. if config.NatsURL == "" && config.AccessWatchKey == "" {
  148. log.Fatal("No NATS URL specified (-nats-url)!")
  149. }
  150. natsIsAvailable = false
  151. natsErrorChan = make(chan error, 1)
  152. err := connectToNATS()
  153. if err != nil && config.AccessWatchKey == "" {
  154. log.Fatal(err)
  155. }
  156. go natsWatchdog(natsErrorChan)
  157. // What should I do?
  158. if config.RequestsFile != "" {
  159. replayFile()
  160. } else if config.ApacheLog != "" {
  161. apacheLogCapture(config.ApacheLog)
  162. } else if config.Live {
  163. fmt.Printf("live capture (%s, %s) to %s\n", config.Interface, config.Filter, config.NatsURL)
  164. liveCapture()
  165. } else if config.NginxLog != "" && config.NginxLogFormat != "" {
  166. nginxLogCapture(config.NginxLog, config.NginxLogFormat)
  167. }
  168. }
  169. func natsWatchdog(closedChan chan error) {
  170. var lastError error
  171. for err := range closedChan {
  172. if lastError != err {
  173. lastError = err
  174. log.Println(err)
  175. }
  176. if err != nats.ErrConnectionClosed {
  177. continue
  178. }
  179. RECONNECT:
  180. for {
  181. log.Printf("Reconnecting to NATS at %s\n", *natsURL)
  182. err := connectToNATS()
  183. if err == nil {
  184. break RECONNECT
  185. }
  186. time.Sleep(1 * time.Second)
  187. }
  188. }
  189. }
  190. func connectToNATS() error {
  191. var natsConn *nats.Conn
  192. var err error
  193. if config.NatsUser != "" && config.NatsPassword != "" && config.NatsCA != "" {
  194. natsConn, err = nats.Connect(config.NatsURL, nats.UserInfo(config.NatsUser, config.NatsPassword), nats.RootCAs(config.NatsCA))
  195. } else {
  196. if config.NatsPassword != "" && config.NatsUser != "" {
  197. natsConn, err = nats.Connect(config.NatsURL, nats.UserInfo(config.NatsUser, config.NatsPassword))
  198. } else {
  199. natsConn, err = nats.Connect(config.NatsURL)
  200. }
  201. }
  202. if err != nil {
  203. return err
  204. }
  205. natsEC, err = nats.NewEncodedConn(natsConn, protobuf.PROTOBUF_ENCODER)
  206. if err != nil {
  207. return fmt.Errorf("Encoded Connection: %v", err)
  208. }
  209. natsJSONEC, err = nats.NewEncodedConn(natsConn, nats.JSON_ENCODER)
  210. if err != nil {
  211. return fmt.Errorf("Encoded Connection: %v", err)
  212. }
  213. natsIsAvailable = true
  214. return nil
  215. }
  216. func nginxLogCapture(logfile, format string) {
  217. if _, err := os.Stat(logfile); err != nil {
  218. log.Fatalf("%s: %s", logfile, err)
  219. }
  220. t, err := tail.TailFile(logfile, tail.Config{
  221. Follow: true, // follow the file
  222. ReOpen: true, // reopen log file when it gets closed/rotated
  223. Logger: tail.DiscardingLogger, // don't log anything
  224. Location: &tail.SeekInfo{Offset: 0, Whence: 2}, // start at the end of the file
  225. })
  226. if err != nil {
  227. log.Fatalf("%s: %s", logfile, err)
  228. }
  229. // `$remote_addr - $remote_user [$time_local] "$request" $status $body_bytes_sent "$http_referer" "$http_user_agent"`
  230. p := gonx.NewParser(format)
  231. reqRegexp := regexp.MustCompile(`^([A-Z]+)\s+(.+?)\s+(HTTP/\d+\.\d+)$`)
  232. for line := range t.Lines {
  233. var remote string
  234. var err error
  235. l := line.Text
  236. logEntry, err := p.ParseString(l)
  237. if err != nil {
  238. log.Println(err)
  239. continue
  240. }
  241. if config.Trace {
  242. pretty.Println(logEntry)
  243. }
  244. remote, err = logEntry.Field("remote_addr")
  245. if err != nil {
  246. log.Println(err)
  247. continue
  248. }
  249. xff, err := logEntry.Field("http_x_forwarded_for")
  250. if err != nil && xff != "" {
  251. remote = xff
  252. }
  253. if remote == "" {
  254. log.Println("remote is empty: ignoring request.")
  255. continue
  256. }
  257. // only use the first host in case there are multiple hosts in the log
  258. if cidx := strings.Index(remote, ","); cidx >= 0 {
  259. remote = remote[0:cidx]
  260. }
  261. timestampStr, err := logEntry.Field("time_local")
  262. if err != nil {
  263. log.Println(err)
  264. continue
  265. }
  266. timeStamp, err := time.Parse("02/Jan/2006:15:04:05 -0700", timestampStr)
  267. if err != nil {
  268. log.Println(err)
  269. continue
  270. }
  271. httpRequest, err := logEntry.Field("request")
  272. if err != nil {
  273. log.Println(err)
  274. continue
  275. }
  276. reqData := reqRegexp.FindStringSubmatch(httpRequest)
  277. if len(reqData) < 4 {
  278. log.Printf("reqData is too short: %d instead of 4\n", len(reqData))
  279. continue
  280. }
  281. host := config.HostName
  282. if host == "" {
  283. host = "[not available]"
  284. }
  285. request := data.Request{
  286. IpSrc: remote,
  287. Origin: remote,
  288. Source: remote,
  289. IpDst: "127.0.0.1",
  290. PortSrc: 0,
  291. PortDst: 0,
  292. TcpSeq: 0,
  293. CreatedAt: timeStamp.Unix(),
  294. Url: reqData[2],
  295. Method: reqData[1],
  296. Host: host,
  297. Protocol: reqData[3],
  298. }
  299. request.Referer, _ = logEntry.Field("http_referer")
  300. request.UserAgent, _ = logEntry.Field("http_user_agent")
  301. if config.Trace {
  302. log.Printf("[%s] %s\n", request.Source, request.Url)
  303. }
  304. count++
  305. publishRequest(config.NatsQueue, &request)
  306. }
  307. }
  308. func apacheLogCapture(logfile string) {
  309. if _, err := os.Stat(logfile); err != nil {
  310. log.Fatalf("%s: %s", logfile, err)
  311. }
  312. t, err := tail.TailFile(logfile, tail.Config{
  313. Follow: true, // follow the file
  314. ReOpen: true, // reopen log file when it gets closed/rotated
  315. Logger: tail.DiscardingLogger, // don't log anything
  316. Location: &tail.SeekInfo{Offset: 0, Whence: 2}, // start at the end of the file
  317. })
  318. if err != nil {
  319. log.Fatalf("%s: %s", logfile, err)
  320. }
  321. var p axslogparser.Parser
  322. parserSet := false
  323. for line := range t.Lines {
  324. l := line.Text
  325. if !parserSet {
  326. p, _, err = axslogparser.GuessParser(l)
  327. if err != nil {
  328. log.Println(err)
  329. continue
  330. }
  331. parserSet = true
  332. }
  333. logEntry, err := p.Parse(l)
  334. if err != nil {
  335. log.Println(err)
  336. continue
  337. }
  338. remote := logEntry.Host
  339. if config.UseXForwardedAsSource && logEntry.ForwardedFor != "" {
  340. remote = logEntry.ForwardedFor
  341. }
  342. // only use the first host in case there are multiple hosts in the log
  343. if cidx := strings.Index(remote, ","); cidx >= 0 {
  344. remote = remote[0:cidx]
  345. }
  346. // extract the virtual host
  347. var virtualHost string
  348. vhost := logEntry.VirtualHost
  349. if vhost != "" {
  350. vhostAndPort := strings.Split(logEntry.VirtualHost, ":")
  351. virtualHost = vhostAndPort[0]
  352. } else {
  353. if config.HostName != "" {
  354. vhost = config.HostName
  355. } else {
  356. vhost = "[not available]"
  357. }
  358. }
  359. request := data.Request{
  360. IpSrc: remote,
  361. IpDst: "127.0.0.1",
  362. PortSrc: 0,
  363. PortDst: 0,
  364. TcpSeq: 0,
  365. CreatedAt: logEntry.Time.UnixNano(),
  366. Url: logEntry.RequestURI,
  367. Method: logEntry.Method,
  368. Host: virtualHost,
  369. Protocol: logEntry.Protocol,
  370. Origin: remote,
  371. Source: remote,
  372. Referer: logEntry.Referer,
  373. XForwardedFor: logEntry.ForwardedFor,
  374. UserAgent: logEntry.UserAgent,
  375. }
  376. if config.Trace {
  377. log.Printf("[%s] %s\n", request.Source, request.Url)
  378. }
  379. count++
  380. publishRequest(config.NatsQueue, &request)
  381. }
  382. }
  383. func liveCapture() {
  384. ipPriv = ip.NewIP()
  385. // PCAP setup
  386. //
  387. handle, err := pcap.OpenLive(config.Interface, int32(config.SnapshotLen), config.Promiscuous, timeout)
  388. if err != nil {
  389. log.Fatal(err)
  390. }
  391. defer handle.Close()
  392. err = handle.SetBPFFilter(config.Filter)
  393. if err != nil {
  394. log.Fatal(err)
  395. }
  396. packetSource := gopacket.NewPacketSource(handle, handle.LinkType())
  397. for packet := range packetSource.Packets() {
  398. go processPacket(packet)
  399. }
  400. }
  401. func writeLogToWatch(r *data.Request) {
  402. h := map[string]string{}
  403. if r.AcceptEncoding != "" {
  404. h["Accept-Encoding"] = r.AcceptEncoding
  405. }
  406. if r.Accept != "" {
  407. h["Accept"] = r.Accept
  408. }
  409. if r.AcceptLanguage != "" {
  410. h["Accept-Language"] = r.AcceptLanguage
  411. }
  412. if r.Cookie != "" {
  413. h["Cookie"] = r.Cookie
  414. }
  415. if r.Host != "" {
  416. h["Host"] = r.Host
  417. }
  418. if r.Referer != "" {
  419. h["Referer"] = r.Referer
  420. }
  421. if r.UserAgent != "" {
  422. h["User-Agent"] = r.UserAgent
  423. }
  424. if r.Via != "" {
  425. h["Via"] = r.Via
  426. }
  427. if r.XForwardedFor != "" {
  428. h["X-Forwarded-For"] = r.XForwardedFor
  429. }
  430. if r.XRequestedWith != "" {
  431. h["X-Requested-With"] = r.XRequestedWith
  432. }
  433. if r.XRequestedWith != "" {
  434. h["X-Requested-With"] = r.XRequestedWith
  435. }
  436. data := map[string]interface{}{
  437. "request": map[string]interface{}{
  438. "time": time.Unix(0, r.CreatedAt),
  439. "address": r.Source,
  440. // "scheme": r.Protocol,
  441. "method": r.Method,
  442. "url": r.Url,
  443. "headers": h,
  444. },
  445. "response": map[string]interface{}{"status": 200},
  446. }
  447. jdata, err := json.Marshal(data)
  448. client := &http.Client{}
  449. buf := bytes.NewBuffer(jdata)
  450. req, err := http.NewRequest("POST", "https://log.access.watch/1.1/log", buf)
  451. req.Header.Add("Api-Key", *accessWatchKey)
  452. resp, err := client.Do(req)
  453. if err != nil {
  454. log.Println(err)
  455. }
  456. resp.Body.Close()
  457. }
  458. func publishRequest(queue string, request *data.Request) {
  459. if config.AccessWatchKey != "" {
  460. writeLogToWatch(request)
  461. return
  462. }
  463. if !natsIsAvailable {
  464. return
  465. }
  466. if err := natsEC.Publish(config.NatsQueue, request); err != nil {
  467. natsErrorChan <- err
  468. if err == nats.ErrConnectionClosed {
  469. natsIsAvailable = false
  470. }
  471. }
  472. }
  473. // processPacket receives a raw packet from pcap, builds a Request item from it and sends it to the queue
  474. func processPacket(packet gopacket.Packet) {
  475. hasIPv4 := false
  476. var ipSrc, ipDst string
  477. // IPv4
  478. if ipLayer := packet.Layer(layers.LayerTypeIPv4); ipLayer != nil {
  479. ip := ipLayer.(*layers.IPv4)
  480. ipSrc = ip.SrcIP.String()
  481. ipDst = ip.DstIP.String()
  482. hasIPv4 = true
  483. }
  484. // IPv6
  485. if !hasIPv4 {
  486. if ipLayer := packet.Layer(layers.LayerTypeIPv6); ipLayer != nil {
  487. ip := ipLayer.(*layers.IPv6)
  488. ipSrc = ip.SrcIP.String()
  489. ipDst = ip.DstIP.String()
  490. }
  491. }
  492. // TCP
  493. tcpLayer := packet.Layer(layers.LayerTypeTCP)
  494. if tcpLayer == nil {
  495. return
  496. }
  497. tcp, _ := tcpLayer.(*layers.TCP)
  498. portSrc := tcp.SrcPort
  499. portDst := tcp.DstPort
  500. sequence := tcp.Seq
  501. applicationLayer := packet.ApplicationLayer()
  502. if applicationLayer == nil {
  503. return
  504. }
  505. count++
  506. if len(applicationLayer.Payload()) < 50 {
  507. log.Println("application layer too small!")
  508. return
  509. }
  510. request := data.Request{
  511. IpSrc: ipSrc,
  512. IpDst: ipDst,
  513. PortSrc: uint32(portSrc),
  514. PortDst: uint32(portDst),
  515. TcpSeq: uint32(sequence),
  516. CreatedAt: packet.Metadata().CaptureInfo.Timestamp.UnixNano(),
  517. }
  518. switch config.Protocol {
  519. case "http":
  520. err := processHTTP(&request, applicationLayer.Payload())
  521. if err != nil {
  522. log.Println(err)
  523. return
  524. }
  525. case "ajp13":
  526. err := processAJP13(&request, applicationLayer.Payload())
  527. if err != nil {
  528. log.Println(err)
  529. return
  530. }
  531. }
  532. if config.UseXForwardedAsSource && request.XForwardedFor != "" {
  533. if strings.Contains(request.XForwardedFor, ",") {
  534. ips := strings.Split(request.XForwardedFor, ",")
  535. for i := len(ips) - 1; i >= 0; i-- {
  536. ipRaw := strings.TrimSpace(ips[i])
  537. ipAddr := net.ParseIP(ipRaw)
  538. if ipAddr != nil && !ipPriv.IsPrivate(ipAddr) {
  539. request.Source = ipRaw
  540. break
  541. }
  542. }
  543. } else {
  544. ipAddr := net.ParseIP(strings.TrimSpace(request.XForwardedFor))
  545. if !ipPriv.IsPrivate(ipAddr) {
  546. request.Source = request.XForwardedFor
  547. }
  548. }
  549. }
  550. if request.Source == request.IpSrc && request.XRealIP != "" {
  551. request.Source = request.XRealIP
  552. }
  553. if config.Trace {
  554. log.Printf("[%s] %s\n", request.Source, request.Url)
  555. }
  556. publishRequest(config.NatsQueue, &request)
  557. }
  558. func processAJP13(request *data.Request, appData []byte) error {
  559. a, err := ajp13.Parse(appData)
  560. if err != nil {
  561. return fmt.Errorf("Failed to parse AJP13 request: %s", err)
  562. }
  563. request.Url = a.URI
  564. request.Method = a.Method()
  565. request.Host = a.Server
  566. request.Protocol = a.Version
  567. request.Origin = a.RemoteAddr.String()
  568. request.Source = a.RemoteAddr.String()
  569. if v, ok := a.Header("Referer"); ok {
  570. request.Referer = v
  571. }
  572. if v, ok := a.Header("Connection"); ok {
  573. request.Connection = v
  574. }
  575. if v, ok := a.Header("X-Forwarded-For"); ok {
  576. request.XForwardedFor = v
  577. }
  578. if v, ok := a.Header("X-Real-IP"); ok {
  579. request.XRealIP = v
  580. }
  581. if v, ok := a.Header("X-Requested-With"); ok {
  582. request.XRequestedWith = v
  583. }
  584. if v, ok := a.Header("Accept-Encoding"); ok {
  585. request.AcceptEncoding = v
  586. }
  587. if v, ok := a.Header("Accept-Language"); ok {
  588. request.AcceptLanguage = v
  589. }
  590. if v, ok := a.Header("User-Agent"); ok {
  591. request.UserAgent = v
  592. }
  593. if v, ok := a.Header("Accept"); ok {
  594. request.Accept = v
  595. }
  596. if v, ok := a.Header("Cookie"); ok {
  597. request.Cookie = v
  598. }
  599. if v, ok := a.Header("X-Forwarded-Host"); ok {
  600. if v != request.Host {
  601. request.Host = v
  602. }
  603. }
  604. return nil
  605. }
  606. func processHTTP(request *data.Request, appData []byte) error {
  607. reader := bufio.NewReader(strings.NewReader(string(appData)))
  608. req, err := http.ReadRequest(reader)
  609. if err != nil {
  610. return fmt.Errorf("Failed to parse HTTP header: %s", err)
  611. }
  612. request.Url = req.URL.String()
  613. request.Method = req.Method
  614. request.Referer = req.Referer()
  615. request.Host = req.Host
  616. request.Protocol = req.Proto
  617. request.Origin = request.Host
  618. if _, ok := req.Header["Connection"]; ok {
  619. request.Connection = req.Header["Connection"][0]
  620. }
  621. if _, ok := req.Header["X-Forwarded-For"]; ok {
  622. request.XForwardedFor = req.Header["X-Forwarded-For"][0]
  623. }
  624. // CloudFlare: override X-Forwarded for since it is tainted by cloudflare
  625. if _, ok := req.Header["True-Client-Ip"]; ok {
  626. request.XForwardedFor = req.Header["True-Client-Ip"][0]
  627. }
  628. if _, ok := req.Header["X-Real-Ip"]; ok {
  629. request.XRealIP = req.Header["X-Real-Ip"][0]
  630. }
  631. if _, ok := req.Header["X-Requested-With"]; ok {
  632. request.XRequestedWith = req.Header["X-Requested-With"][0]
  633. }
  634. if _, ok := req.Header["Accept-Encoding"]; ok {
  635. request.AcceptEncoding = req.Header["Accept-Encoding"][0]
  636. }
  637. if _, ok := req.Header["Accept-Language"]; ok {
  638. request.AcceptLanguage = req.Header["Accept-Language"][0]
  639. }
  640. if _, ok := req.Header["User-Agent"]; ok {
  641. request.UserAgent = req.Header["User-Agent"][0]
  642. }
  643. if _, ok := req.Header["Accept"]; ok {
  644. request.Accept = req.Header["Accept"][0]
  645. }
  646. if _, ok := req.Header["Cookie"]; ok {
  647. request.Cookie = req.Header["Cookie"][0]
  648. }
  649. request.Source = request.IpSrc
  650. return nil
  651. }
  652. // replayFile takes a file containing a list of requests (SourceIP Url) and queues the requests
  653. // e.g.
  654. // 157.55.39.229 /gross-gerau/12012260-beate-anstatt
  655. // 103.232.100.98 /weinsheim-eifel/13729444-plus-warenhandelsges-mbh
  656. func replayFile() {
  657. var req data.Request
  658. var startTs time.Time
  659. var endTs time.Time
  660. rand.Seed(time.Now().UnixNano())
  661. for {
  662. fh, err := os.Open(config.RequestsFile)
  663. if err != nil {
  664. log.Fatalf("Failed to open request file '%s': %s", config.RequestsFile, err)
  665. }
  666. c := csv.NewReader(fh)
  667. c.Comma = ' '
  668. for {
  669. if config.SleepFor.Duration > time.Nanosecond {
  670. startTs = time.Now()
  671. }
  672. r, err := c.Read()
  673. if err == io.EOF {
  674. break
  675. }
  676. if err != nil {
  677. log.Println(err)
  678. continue
  679. }
  680. req.IpSrc = r[0]
  681. req.Source = r[0]
  682. req.Url = r[1]
  683. req.UserAgent = "Munch/1.0"
  684. req.Host = "demo.scraperwall.com"
  685. req.CreatedAt = time.Now().UnixNano()
  686. publishRequest(config.NatsQueue, &req)
  687. if strings.Index(r[1], ".") < 0 {
  688. hash := sha1.New()
  689. io.WriteString(hash, r[0])
  690. fp := data.Fingerprint{
  691. ClientID: "scw",
  692. Fingerprint: fmt.Sprintf("%x", hash.Sum(nil)),
  693. Remote: r[0],
  694. Url: r[1],
  695. Source: r[0],
  696. CreatedAt: time.Now(),
  697. }
  698. if strings.HasPrefix(r[0], "50.31.") {
  699. fp.Fingerprint = "a1f2c2ee560ce6580d66d451a9c8dfbf"
  700. natsJSONEC.Publish("fingerprints_scw", fp)
  701. } else if rand.Intn(10) < 5 {
  702. natsJSONEC.Publish("fingerprints_scw", fp)
  703. }
  704. }
  705. count++
  706. if config.SleepFor.Duration >= time.Nanosecond {
  707. endTs = time.Now()
  708. if endTs.Before(startTs.Add(config.SleepFor.Duration)) {
  709. time.Sleep(config.SleepFor.Duration - endTs.Sub(startTs))
  710. }
  711. }
  712. }
  713. }
  714. }
  715. func loadConfig() {
  716. // initialize with values from the command line / environment
  717. config.Live = *doLiveCapture
  718. config.Interface = *iface
  719. config.SnapshotLen = *snapshotLen
  720. config.Filter = *filter
  721. config.Promiscuous = *promiscuous
  722. config.NatsURL = *natsURL
  723. config.NatsQueue = *natsQueue
  724. config.NatsUser = *natsUser
  725. config.NatsPassword = *natsPassword
  726. config.NatsCA = *natsCA
  727. config.SleepFor.Duration = *sleepFor
  728. config.RequestsFile = *requestsFile
  729. config.UseXForwardedAsSource = *useXForwardedAsSource
  730. config.Protocol = *protocol
  731. config.ApacheLog = *apacheLog
  732. config.NginxLog = *nginxLog
  733. config.NginxLogFormat = *nginxFormat
  734. config.HostName = *hostName
  735. config.Quiet = *beQuiet
  736. config.Trace = *trace
  737. config.AccessWatchKey = *accessWatchKey
  738. if *configFile == "" {
  739. return
  740. }
  741. _, err := os.Stat(*configFile)
  742. if err != nil {
  743. log.Printf("%s: %s\n", *configFile, err)
  744. return
  745. }
  746. if _, err = toml.DecodeFile(*configFile, &config); err != nil {
  747. log.Printf("%s: %s\n", *configFile, err)
  748. }
  749. if !config.Quiet {
  750. config.print()
  751. }
  752. }
  753. // version outputs build information...
  754. func version() {
  755. fmt.Printf("munchclient %s, built on %s\n", Version, BuildDate)
  756. }