main.go 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819
  1. package main
  2. import (
  3. "bufio"
  4. "bytes"
  5. "crypto/sha1"
  6. "encoding/csv"
  7. "encoding/json"
  8. "flag"
  9. "fmt"
  10. "io"
  11. "log"
  12. "math/rand"
  13. "net"
  14. "net/http"
  15. "os"
  16. "strings"
  17. "time"
  18. "github.com/BurntSushi/toml"
  19. _ "github.com/goinggo/tracelog"
  20. "github.com/google/gopacket"
  21. "github.com/google/gopacket/layers"
  22. "github.com/google/gopacket/pcap"
  23. "github.com/nats-io/nats.go"
  24. "github.com/nats-io/nats/encoders/protobuf"
  25. "git.scraperwall.com/scw/ajp13"
  26. "git.scraperwall.com/scw/data"
  27. "git.scraperwall.com/scw/ip"
  28. )
  29. var (
  30. doLiveCapture = flag.Bool("live", false, "Capture data in real time from a given interface")
  31. iface = flag.String("interface", "eth0", "Interface to get packets from")
  32. snapshotLen = flag.Int("snapshot-len", 8192, "Snapshot Length in Bytes")
  33. filter = flag.String("filter", "tcp", "PCAP filter expression")
  34. promiscuous = flag.Bool("promiscuous", false, "Switch interface into promiscuous mode?")
  35. natsURL = flag.String("nats-url", "nats://127.0.0.1:4222", "The URL of the NATS server")
  36. natsUser = flag.String("nats-user", "", "The user for NATS authentication")
  37. natsPassword = flag.String("nats-password", "", "The password for NATS authentication")
  38. natsQueue = flag.String("nats-queue", "requests", "The NATS queue name")
  39. natsCA = flag.String("nats-ca", "", "CA chain for NATS TLS")
  40. reconnectToNatsAfter = flag.Duration("reconnect-to-nats-after", 0, "reconnect to nats after this time periodically")
  41. resetLiveCapAfter = flag.Duration("reset-live-cap-after", 613*time.Second, "reset the live capture setup after this amount of time")
  42. sleepFor = flag.Duration("sleep", 0, "Sleep this long between sending data (only when replaying a file)")
  43. requestsFile = flag.String("requests", "", "CSV file containing requests (IP and URL)")
  44. protocol = flag.String("protocol", "http", "which protocol to parse: http or ajp13")
  45. useXForwardedAsSource = flag.Bool("use-x-forwarded", false, "Use the IP address in X-Forwarded-For as source")
  46. useVhostAsSource = flag.Bool("use-vhost-as-source", false, "Use the Vhost as source")
  47. trace = flag.Bool("trace", false, "Trace the packet capturing")
  48. tailPoll = flag.Bool("tail-poll", false, "use file polling to detect file changes when tailing logrfiles")
  49. apacheLog = flag.String("apache-log", "", "Parse an Apache Log file")
  50. apacheReplay = flag.String("apache-replay", "", "Apache log file to replay into the system")
  51. nginxLog = flag.String("nginx-log", "", "Nginx log file to tail")
  52. nginxFormat = flag.String("nginx-format", "", "The nginx log file format")
  53. nginxReplay = flag.String("nginx-replay", "", "Replay this nginx logfile")
  54. hostName = flag.String("hostname", "", "Override the captured hostname with this one")
  55. accessWatchKey = flag.String("access-watch-key", "", "access.watch API key")
  56. configFile = flag.String("config", "", "The location of the TOML config file")
  57. beQuiet = flag.Bool("quiet", true, "Be quiet")
  58. doVersion = flag.Bool("version", false, "Show version information")
  59. natsEC *nats.EncodedConn
  60. natsJSONEC *nats.EncodedConn
  61. natsErrorChan chan error
  62. natsIsAvailable bool
  63. count uint64
  64. timeout = -1 * time.Second
  65. ipPriv *ip.IP
  66. config Config
  67. // Version contains the program Version, e.g. 1.0.1
  68. Version string
  69. // BuildDate contains the date and time at which the program was compiled
  70. BuildDate string
  71. )
  72. type NginxLogFile struct {
  73. File string
  74. Host string
  75. Format string
  76. }
  77. // Config contains the program configuration
  78. type Config struct {
  79. Live bool
  80. Interface string
  81. SnapshotLen int
  82. Filter string
  83. Promiscuous bool
  84. NatsURL string
  85. NatsQueue string
  86. NatsUser string
  87. NatsPassword string
  88. NatsCA string
  89. SleepFor duration
  90. RequestsFile string
  91. UseXForwardedAsSource bool
  92. UseVhostAsSource bool
  93. Quiet bool
  94. Protocol string
  95. Trace bool
  96. TailPoll bool
  97. ApacheLog string
  98. ApacheReplay string
  99. NginxLog string
  100. NginxLogFormat string
  101. NginxLogFiles []NginxLogFile
  102. NginxReplay string
  103. HostName string
  104. AccessWatchKey string
  105. ReconnectToNatsAfter duration
  106. ResetLiveCaptureAfter duration
  107. }
  108. type duration struct {
  109. time.Duration
  110. }
  111. func (d *duration) UnmarshalText(text []byte) error {
  112. var err error
  113. d.Duration, err = time.ParseDuration(string(text))
  114. return err
  115. }
  116. func (c Config) print() {
  117. fmt.Printf("Live: %t\n", c.Live)
  118. fmt.Printf("Interface: %s\n", c.Interface)
  119. fmt.Printf("SnapshotLen: %d\n", c.SnapshotLen)
  120. fmt.Printf("Filter: %s\n", c.Filter)
  121. fmt.Printf("Promiscuous: %t\n", c.Promiscuous)
  122. fmt.Printf("NatsURL: %s\n", c.NatsURL)
  123. fmt.Printf("NatsQueue: %s\n", c.NatsQueue)
  124. fmt.Printf("NatsUser: %s\n", c.NatsUser)
  125. fmt.Printf("NatsPassword: %s\n", c.NatsPassword)
  126. fmt.Printf("NatsCA: %s\n", c.NatsCA)
  127. fmt.Printf("ReconnectToNatsAfter: %s\n", c.ReconnectToNatsAfter.String())
  128. fmt.Printf("SleepFor: %s\n", c.SleepFor.String())
  129. fmt.Printf("RequestsFile: %s\n", c.RequestsFile)
  130. fmt.Printf("TailPoll: %t\n", c.TailPoll)
  131. fmt.Printf("Apache Log: %s\n", c.ApacheLog)
  132. fmt.Printf("Apache Replay: %s\n", c.ApacheReplay)
  133. fmt.Printf("Nginx Log: %s\n", c.NginxLog)
  134. fmt.Printf("Nginx Log Format: %s\n", c.NginxLogFormat)
  135. fmt.Printf("Nginx Log Files:\n")
  136. for _, logfile := range c.NginxLogFiles {
  137. fmt.Printf(" %s (%s) (%s)\n", logfile.File, logfile.Host, logfile.Format)
  138. }
  139. fmt.Printf("NginxReplay: %s\n", c.NginxReplay)
  140. fmt.Printf("HostName: %s\n", c.HostName)
  141. fmt.Printf("AccessWatchKey: %s\n", c.AccessWatchKey)
  142. fmt.Printf("UseXForwardedAsSource: %t\n", c.UseXForwardedAsSource)
  143. fmt.Printf("UseVhostAsSource: %t\n", c.UseVhostAsSource)
  144. fmt.Printf("Protocol: %s\n", c.Protocol)
  145. fmt.Printf("Reset Live Cap After: %s\n", c.ResetLiveCaptureAfter.String())
  146. fmt.Printf("Quiet: %t\n", c.Quiet)
  147. fmt.Printf("Trace: %t\n", c.Trace)
  148. }
  149. func init() {
  150. flag.Parse()
  151. nats.RegisterEncoder(protobuf.PROTOBUF_ENCODER, &protobuf.ProtobufEncoder{})
  152. }
  153. func main() {
  154. if *doVersion {
  155. version()
  156. os.Exit(0)
  157. }
  158. loadConfig()
  159. // Output how many requests per second were sent
  160. if !config.Quiet {
  161. go func(c *uint64) {
  162. for {
  163. fmt.Printf("%d requests per second\n", *c)
  164. *c = 0
  165. time.Sleep(time.Second)
  166. }
  167. }(&count)
  168. }
  169. // NATS
  170. //
  171. if config.NatsURL == "" && config.AccessWatchKey == "" {
  172. log.Fatal("No NATS URL specified (-nats-url)!")
  173. }
  174. natsIsAvailable = false
  175. natsErrorChan = make(chan error, 1)
  176. err := connectToNATS()
  177. if err != nil && config.AccessWatchKey == "" {
  178. log.Fatal(err)
  179. }
  180. // reconnect to nats periodically
  181. //
  182. if *reconnectToNatsAfter > time.Minute {
  183. go func(interval time.Duration) {
  184. for range time.Tick(interval) {
  185. if config.Trace {
  186. log.Printf("reconnecting to NATS")
  187. }
  188. natsEC.Conn.Close()
  189. natsJSONEC.Conn.Close()
  190. }
  191. }(*reconnectToNatsAfter)
  192. }
  193. go natsWatchdog(natsErrorChan)
  194. // What should I do?
  195. if config.RequestsFile != "" {
  196. replayFile()
  197. } else if config.ApacheReplay != "" {
  198. apacheLogReplay(config.ApacheReplay)
  199. } else if config.NginxReplay != "" {
  200. nginxLogReplay(config.NginxReplay, config.NginxLogFormat)
  201. } else if config.ApacheLog != "" {
  202. apacheLogCapture(config.ApacheLog)
  203. } else if config.Live {
  204. fmt.Printf("live capture (%s, %s) to %s\n", config.Interface, config.Filter, config.NatsURL)
  205. liveCapture()
  206. } else if config.NginxLog != "" && config.NginxLogFormat != "" {
  207. nginxLogCapture(config.NginxLog, config.NginxLogFormat, config.HostName)
  208. } else if len(config.NginxLogFiles) > 0 {
  209. nginxLogCaptureMulti(config.NginxLogFiles)
  210. }
  211. }
  212. func nginxLogCaptureMulti(logs []NginxLogFile) {
  213. for _, logfile := range logs {
  214. if !config.Quiet {
  215. log.Printf("tailing nginx logfile %s (Host: %s)\n", logfile.File, logfile.Host)
  216. }
  217. go nginxLogCapture(logfile.File, logfile.Format, logfile.Host)
  218. }
  219. <-make(chan bool)
  220. }
  221. func natsWatchdog(closedChan chan error) {
  222. var lastError error
  223. for err := range closedChan {
  224. if lastError != err {
  225. lastError = err
  226. log.Println(err)
  227. }
  228. if err != nats.ErrConnectionClosed {
  229. continue
  230. }
  231. RECONNECT:
  232. for {
  233. log.Printf("Reconnecting to NATS at %s\n", *natsURL)
  234. err := connectToNATS()
  235. if err == nil {
  236. break RECONNECT
  237. }
  238. time.Sleep(1 * time.Second)
  239. }
  240. }
  241. }
  242. func connectToNATS() error {
  243. var natsConn *nats.Conn
  244. var err error
  245. if config.NatsUser != "" && config.NatsPassword != "" && config.NatsCA != "" {
  246. natsConn, err = nats.Connect(config.NatsURL, nats.UserInfo(config.NatsUser, config.NatsPassword), nats.RootCAs(config.NatsCA))
  247. } else {
  248. if config.NatsPassword != "" && config.NatsUser != "" {
  249. natsConn, err = nats.Connect(config.NatsURL, nats.UserInfo(config.NatsUser, config.NatsPassword))
  250. } else {
  251. natsConn, err = nats.Connect(config.NatsURL)
  252. }
  253. }
  254. if err != nil {
  255. return err
  256. }
  257. natsEC, err = nats.NewEncodedConn(natsConn, protobuf.PROTOBUF_ENCODER)
  258. if err != nil {
  259. return fmt.Errorf("Encoded Connection: %v", err)
  260. }
  261. natsJSONEC, err = nats.NewEncodedConn(natsConn, nats.JSON_ENCODER)
  262. if err != nil {
  263. return fmt.Errorf("Encoded Connection: %v", err)
  264. }
  265. natsIsAvailable = true
  266. return nil
  267. }
  268. type liveCap struct {
  269. filter string
  270. device string
  271. promisc bool
  272. snapshotLen int
  273. handle *pcap.Handle
  274. packetChan chan gopacket.Packet
  275. }
  276. func newLiveCap(device string, filter string, snapshotLen int, promisc bool) (*liveCap, error) {
  277. lc := &liveCap{
  278. filter: filter,
  279. device: device,
  280. promisc: promisc,
  281. snapshotLen: snapshotLen,
  282. }
  283. err := lc.SetupCap()
  284. if err != nil {
  285. return nil, err
  286. }
  287. return lc, nil
  288. }
  289. func (lc *liveCap) SetupCap() error {
  290. if !config.Quiet {
  291. log.Printf("reading incoming HTTP requests on %s %s\n", config.Interface, config.Filter)
  292. }
  293. if lc.handle != nil {
  294. lc.handle.Close()
  295. }
  296. // PCAP setup
  297. //
  298. handle, err := pcap.OpenLive(config.Interface, int32(config.SnapshotLen), config.Promiscuous, timeout)
  299. if err != nil {
  300. return err
  301. }
  302. // defer handle.Close()
  303. lc.handle = handle
  304. err = lc.handle.SetBPFFilter(config.Filter)
  305. if err != nil {
  306. return err
  307. }
  308. packetSource := gopacket.NewPacketSource(handle, handle.LinkType())
  309. lc.packetChan = packetSource.Packets()
  310. return nil
  311. }
  312. func liveCapture() {
  313. ipPriv = ip.NewIP()
  314. livecap, err := newLiveCap(config.Interface, config.Filter, config.SnapshotLen, config.Promiscuous)
  315. if err != nil {
  316. log.Fatal(err)
  317. }
  318. closeChan := time.Tick(config.ResetLiveCaptureAfter.Duration)
  319. for {
  320. select {
  321. case <-closeChan:
  322. livecap.SetupCap()
  323. case p := <-livecap.packetChan:
  324. go processPacket(p)
  325. }
  326. }
  327. }
  328. func writeLogToWatch(r *data.Request) {
  329. h := map[string]string{}
  330. if r.AcceptEncoding != "" {
  331. h["Accept-Encoding"] = r.AcceptEncoding
  332. }
  333. if r.Accept != "" {
  334. h["Accept"] = r.Accept
  335. }
  336. if r.AcceptLanguage != "" {
  337. h["Accept-Language"] = r.AcceptLanguage
  338. }
  339. if r.Cookie != "" {
  340. h["Cookie"] = r.Cookie
  341. }
  342. if r.Host != "" {
  343. h["Host"] = r.Host
  344. }
  345. if r.Referer != "" {
  346. h["Referer"] = r.Referer
  347. }
  348. if r.UserAgent != "" {
  349. h["User-Agent"] = r.UserAgent
  350. }
  351. if r.Via != "" {
  352. h["Via"] = r.Via
  353. }
  354. if r.XForwardedFor != "" {
  355. h["X-Forwarded-For"] = r.XForwardedFor
  356. }
  357. if r.XRequestedWith != "" {
  358. h["X-Requested-With"] = r.XRequestedWith
  359. }
  360. data := map[string]interface{}{
  361. "request": map[string]interface{}{
  362. "time": time.Unix(0, r.CreatedAt),
  363. "address": r.Source,
  364. "protocol": r.Protocol,
  365. "scheme": "https",
  366. "method": r.Method,
  367. "url": r.Url,
  368. "headers": h,
  369. },
  370. "response": map[string]interface{}{"status": "200"},
  371. }
  372. jdata, err := json.Marshal(data)
  373. client := &http.Client{}
  374. fmt.Println(string(jdata))
  375. buf := bytes.NewBuffer(jdata)
  376. req, err := http.NewRequest("POST", "https://log.access.watch/1.1/log", buf)
  377. req.Header.Add("Api-Key", config.AccessWatchKey)
  378. req.Header.Add("Accept", "application/json")
  379. req.Header.Add("Content-Type", "application/json")
  380. resp, err := client.Do(req)
  381. if err != nil {
  382. log.Println(err)
  383. }
  384. resp.Body.Close()
  385. }
  386. func publishRequest(queue string, request *data.Request) {
  387. if config.AccessWatchKey != "" {
  388. writeLogToWatch(request)
  389. return
  390. }
  391. if !natsIsAvailable {
  392. if rand.Intn(100) == 0 {
  393. log.Println("nats connection is not available")
  394. }
  395. return
  396. }
  397. if err := natsEC.Publish(config.NatsQueue, request); err != nil {
  398. natsErrorChan <- err
  399. if err == nats.ErrConnectionClosed {
  400. natsIsAvailable = false
  401. }
  402. }
  403. }
  404. // processPacket receives a raw packet from pcap, builds a Request item from it and sends it to the queue
  405. func processPacket(packet gopacket.Packet) {
  406. hasIPv4 := false
  407. var ipSrc, ipDst string
  408. // IPv4
  409. if ipLayer := packet.Layer(layers.LayerTypeIPv4); ipLayer != nil {
  410. ip := ipLayer.(*layers.IPv4)
  411. ipSrc = ip.SrcIP.String()
  412. ipDst = ip.DstIP.String()
  413. hasIPv4 = true
  414. }
  415. // IPv6
  416. if !hasIPv4 {
  417. if ipLayer := packet.Layer(layers.LayerTypeIPv6); ipLayer != nil {
  418. ip := ipLayer.(*layers.IPv6)
  419. ipSrc = ip.SrcIP.String()
  420. ipDst = ip.DstIP.String()
  421. }
  422. }
  423. // TCP
  424. tcpLayer := packet.Layer(layers.LayerTypeTCP)
  425. if tcpLayer == nil {
  426. return
  427. }
  428. tcp, _ := tcpLayer.(*layers.TCP)
  429. portSrc := tcp.SrcPort
  430. portDst := tcp.DstPort
  431. sequence := tcp.Seq
  432. applicationLayer := packet.ApplicationLayer()
  433. if applicationLayer == nil {
  434. return
  435. }
  436. count++
  437. if len(applicationLayer.Payload()) < 50 {
  438. // log.Println("application layer too small!")
  439. return
  440. }
  441. request := data.Request{
  442. IpSrc: ipSrc,
  443. IpDst: ipDst,
  444. PortSrc: uint32(portSrc),
  445. PortDst: uint32(portDst),
  446. TcpSeq: uint32(sequence),
  447. CreatedAt: packet.Metadata().CaptureInfo.Timestamp.UnixNano(),
  448. }
  449. switch config.Protocol {
  450. case "http":
  451. err := processHTTP(&request, applicationLayer.Payload())
  452. if err != nil {
  453. log.Println(err)
  454. return
  455. }
  456. case "ajp13":
  457. err := processAJP13(&request, applicationLayer.Payload())
  458. if err != nil {
  459. log.Println(err)
  460. return
  461. }
  462. }
  463. if config.UseXForwardedAsSource && request.XForwardedFor != "" {
  464. if strings.Contains(request.XForwardedFor, ",") {
  465. ips := strings.Split(request.XForwardedFor, ",")
  466. for i := len(ips) - 1; i >= 0; i-- {
  467. ipRaw := strings.TrimSpace(ips[i])
  468. ipAddr := net.ParseIP(ipRaw)
  469. if ipAddr != nil && !ipPriv.IsPrivate(ipAddr) {
  470. request.Source = ipRaw
  471. break
  472. }
  473. }
  474. } else {
  475. ipAddr := net.ParseIP(strings.TrimSpace(request.XForwardedFor))
  476. if !ipPriv.IsPrivate(ipAddr) {
  477. request.Source = request.XForwardedFor
  478. }
  479. }
  480. }
  481. if request.Source == request.IpSrc && request.XRealIP != "" {
  482. request.Source = request.XRealIP
  483. }
  484. if config.Trace {
  485. log.Printf("[%s] %s\n", request.Source, request.Url)
  486. }
  487. publishRequest(config.NatsQueue, &request)
  488. }
  489. func processAJP13(request *data.Request, appData []byte) error {
  490. a, err := ajp13.Parse(appData)
  491. if err != nil {
  492. return fmt.Errorf("Failed to parse AJP13 request: %s", err)
  493. }
  494. request.Url = a.URI
  495. request.Method = a.Method()
  496. request.Host = a.Server
  497. request.Protocol = a.Version
  498. request.Origin = a.RemoteAddr.String()
  499. request.Source = a.RemoteAddr.String()
  500. if v, ok := a.Header("Referer"); ok {
  501. request.Referer = v
  502. }
  503. if v, ok := a.Header("Connection"); ok {
  504. request.Connection = v
  505. }
  506. if v, ok := a.Header("X-Forwarded-For"); ok {
  507. request.XForwardedFor = v
  508. }
  509. if v, ok := a.Header("X-Real-IP"); ok {
  510. request.XRealIP = v
  511. }
  512. if v, ok := a.Header("X-Requested-With"); ok {
  513. request.XRequestedWith = v
  514. }
  515. if v, ok := a.Header("Accept-Encoding"); ok {
  516. request.AcceptEncoding = v
  517. }
  518. if v, ok := a.Header("Accept-Language"); ok {
  519. request.AcceptLanguage = v
  520. }
  521. if v, ok := a.Header("User-Agent"); ok {
  522. request.UserAgent = v
  523. }
  524. if v, ok := a.Header("Accept"); ok {
  525. request.Accept = v
  526. }
  527. if v, ok := a.Header("Cookie"); ok {
  528. request.Cookie = v
  529. }
  530. if v, ok := a.Header("X-Forwarded-Host"); ok {
  531. if v != request.Host {
  532. request.Host = v
  533. }
  534. }
  535. return nil
  536. }
  537. func processHTTP(request *data.Request, appData []byte) error {
  538. reader := bufio.NewReader(strings.NewReader(string(appData)))
  539. req, err := http.ReadRequest(reader)
  540. if err != nil {
  541. return fmt.Errorf("Failed to parse HTTP header: %s", err)
  542. }
  543. request.Url = req.URL.String()
  544. request.Method = req.Method
  545. request.Referer = req.Referer()
  546. request.Host = req.Host
  547. request.Protocol = req.Proto
  548. request.Origin = request.Host
  549. if _, ok := req.Header["Connection"]; ok {
  550. request.Connection = req.Header["Connection"][0]
  551. }
  552. if _, ok := req.Header["X-Forwarded-For"]; ok {
  553. request.XForwardedFor = req.Header["X-Forwarded-For"][0]
  554. }
  555. // CloudFlare: override X-Forwarded for since it is tainted by cloudflare
  556. if _, ok := req.Header["True-Client-Ip"]; ok {
  557. request.XForwardedFor = req.Header["True-Client-Ip"][0]
  558. }
  559. if _, ok := req.Header["X-Real-Ip"]; ok {
  560. request.XRealIP = req.Header["X-Real-Ip"][0]
  561. }
  562. if _, ok := req.Header["X-Requested-With"]; ok {
  563. request.XRequestedWith = req.Header["X-Requested-With"][0]
  564. }
  565. if _, ok := req.Header["Accept-Encoding"]; ok {
  566. request.AcceptEncoding = req.Header["Accept-Encoding"][0]
  567. }
  568. if _, ok := req.Header["Accept-Language"]; ok {
  569. request.AcceptLanguage = req.Header["Accept-Language"][0]
  570. }
  571. if _, ok := req.Header["User-Agent"]; ok {
  572. request.UserAgent = req.Header["User-Agent"][0]
  573. }
  574. if _, ok := req.Header["Accept"]; ok {
  575. request.Accept = req.Header["Accept"][0]
  576. }
  577. if _, ok := req.Header["Cookie"]; ok {
  578. request.Cookie = req.Header["Cookie"][0]
  579. }
  580. request.Source = request.IpSrc
  581. return nil
  582. }
  583. // replayFile takes a file containing a list of requests (SourceIP Url) and queues the requests
  584. // e.g.
  585. // 157.55.39.229 /gross-gerau/12012260-beate-anstatt
  586. // 103.232.100.98 /weinsheim-eifel/13729444-plus-warenhandelsges-mbh
  587. func replayFile() {
  588. var req data.Request
  589. var startTs time.Time
  590. var endTs time.Time
  591. rand.Seed(time.Now().UnixNano())
  592. for {
  593. fh, err := os.Open(config.RequestsFile)
  594. if err != nil {
  595. log.Fatalf("Failed to open request file '%s': %s", config.RequestsFile, err)
  596. }
  597. c := csv.NewReader(fh)
  598. c.Comma = ' '
  599. for {
  600. if config.SleepFor.Duration > time.Nanosecond {
  601. startTs = time.Now()
  602. }
  603. r, err := c.Read()
  604. if err == io.EOF {
  605. break
  606. }
  607. if err != nil {
  608. log.Println(err)
  609. continue
  610. }
  611. req.IpSrc = r[0]
  612. req.Source = r[0]
  613. req.Url = r[1]
  614. req.UserAgent = "Munch/1.0"
  615. req.Host = "demo.scraperwall.com"
  616. req.CreatedAt = time.Now().UnixNano()
  617. publishRequest(config.NatsQueue, &req)
  618. if strings.Index(r[1], ".") < 0 {
  619. hash := sha1.New()
  620. io.WriteString(hash, r[0])
  621. fp := data.Fingerprint{
  622. ClientID: "scw",
  623. Fingerprint: fmt.Sprintf("%x", hash.Sum(nil)),
  624. Remote: r[0],
  625. Url: r[1],
  626. Source: r[0],
  627. CreatedAt: time.Now(),
  628. }
  629. if strings.HasPrefix(r[0], "50.31.") {
  630. fp.Fingerprint = "a1f2c2ee560ce6580d66d451a9c8dfbf"
  631. natsJSONEC.Publish("fingerprints_scw", fp)
  632. } else if rand.Intn(10) < 5 {
  633. natsJSONEC.Publish("fingerprints_scw", fp)
  634. }
  635. }
  636. count++
  637. if config.SleepFor.Duration >= time.Nanosecond {
  638. endTs = time.Now()
  639. if endTs.Before(startTs.Add(config.SleepFor.Duration)) {
  640. time.Sleep(config.SleepFor.Duration - endTs.Sub(startTs))
  641. }
  642. }
  643. }
  644. }
  645. }
  646. func loadConfig() {
  647. // initialize with values from the command line / environment
  648. config.Live = *doLiveCapture
  649. config.Interface = *iface
  650. config.SnapshotLen = *snapshotLen
  651. config.Filter = *filter
  652. config.Promiscuous = *promiscuous
  653. config.NatsURL = *natsURL
  654. config.NatsQueue = *natsQueue
  655. config.NatsUser = *natsUser
  656. config.NatsPassword = *natsPassword
  657. config.NatsCA = *natsCA
  658. config.SleepFor.Duration = *sleepFor
  659. config.RequestsFile = *requestsFile
  660. config.UseXForwardedAsSource = *useXForwardedAsSource
  661. config.UseVhostAsSource = *useVhostAsSource
  662. config.Protocol = *protocol
  663. config.TailPoll = *tailPoll
  664. config.ApacheLog = *apacheLog
  665. config.ApacheReplay = *apacheReplay
  666. config.NginxLog = *nginxLog
  667. config.NginxLogFormat = *nginxFormat
  668. config.NginxReplay = *nginxReplay
  669. config.HostName = *hostName
  670. config.Quiet = *beQuiet
  671. config.Trace = *trace
  672. config.AccessWatchKey = *accessWatchKey
  673. config.ReconnectToNatsAfter.Duration = *reconnectToNatsAfter
  674. config.ResetLiveCaptureAfter.Duration = *resetLiveCapAfter
  675. if *configFile == "" {
  676. return
  677. }
  678. _, err := os.Stat(*configFile)
  679. if err != nil {
  680. log.Printf("%s: %s\n", *configFile, err)
  681. return
  682. }
  683. if _, err = toml.DecodeFile(*configFile, &config); err != nil {
  684. log.Printf("%s: %s\n", *configFile, err)
  685. }
  686. if !config.Quiet {
  687. config.print()
  688. }
  689. }
  690. // version outputs build information...
  691. func version() {
  692. fmt.Printf("munchclient %s, built on %s\n", Version, BuildDate)
  693. }