main.go 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812
  1. package main
  2. import (
  3. "bufio"
  4. "bytes"
  5. "crypto/sha1"
  6. "encoding/csv"
  7. "encoding/json"
  8. "flag"
  9. "fmt"
  10. "io"
  11. "log"
  12. "math/rand"
  13. "net"
  14. "net/http"
  15. "os"
  16. "strings"
  17. "time"
  18. "github.com/BurntSushi/toml"
  19. "github.com/google/gopacket"
  20. "github.com/google/gopacket/layers"
  21. "github.com/google/gopacket/pcap"
  22. "github.com/nats-io/nats"
  23. "github.com/nats-io/nats/encoders/protobuf"
  24. "git.scraperwall.com/scw/ajp13"
  25. "git.scraperwall.com/scw/data"
  26. "git.scraperwall.com/scw/ip"
  27. )
  28. var (
  29. doLiveCapture = flag.Bool("live", false, "Capture data in real time from a given interface")
  30. iface = flag.String("interface", "eth0", "Interface to get packets from")
  31. snapshotLen = flag.Int("snapshot-len", 8192, "Snapshot Length in Bytes")
  32. filter = flag.String("filter", "tcp", "PCAP filter expression")
  33. promiscuous = flag.Bool("promiscuous", false, "Switch interface into promiscuous mode?")
  34. natsURL = flag.String("nats-url", "nats://127.0.0.1:4222", "The URL of the NATS server")
  35. natsUser = flag.String("nats-user", "", "The user for NATS authentication")
  36. natsPassword = flag.String("nats-password", "", "The password for NATS authentication")
  37. natsQueue = flag.String("nats-queue", "requests", "The NATS queue name")
  38. natsCA = flag.String("nats-ca", "", "CA chain for NATS TLS")
  39. reconnectToNatsAfter = flag.Duration("reconnect-to-nats-after", 0, "reconnect to nats after this time periodically")
  40. resetLiveCapAfter = flag.Duration("reset-live-cap-after", 613*time.Second, "reset the live capture setup after this amount of time")
  41. sleepFor = flag.Duration("sleep", 0, "Sleep this long between sending data (only when replaying a file)")
  42. requestsFile = flag.String("requests", "", "CSV file containing requests (IP and URL)")
  43. protocol = flag.String("protocol", "http", "which protocol to parse: http or ajp13")
  44. useXForwardedAsSource = flag.Bool("use-x-forwarded", false, "Use the IP address in X-Forwarded-For as source")
  45. useVhostAsSource = flag.Bool("use-vhost-as-source", false, "Use the Vhost as source")
  46. trace = flag.Bool("trace", false, "Trace the packet capturing")
  47. apacheLog = flag.String("apache-log", "", "Parse an Apache Log file")
  48. apacheReplay = flag.String("apache-replay", "", "Apache log file to replay into the system")
  49. nginxLog = flag.String("nginx-log", "", "Nginx log file to tail")
  50. nginxFormat = flag.String("nginx-format", "", "The nginx log file format")
  51. nginxReplay = flag.String("nginx-replay", "", "Replay this nginx logfile")
  52. hostName = flag.String("hostname", "", "Override the captured hostname with this one")
  53. accessWatchKey = flag.String("access-watch-key", "", "access.watch API key")
  54. configFile = flag.String("config", "", "The location of the TOML config file")
  55. rpcAddr = flag.String("rpc-address", "", "The address where the RPC server is listening")
  56. beQuiet = flag.Bool("quiet", true, "Be quiet")
  57. doVersion = flag.Bool("version", false, "Show version information")
  58. natsEC *nats.EncodedConn
  59. natsJSONEC *nats.EncodedConn
  60. rpcClient *ScwRPC
  61. natsErrorChan chan error
  62. natsIsAvailable bool
  63. count uint64
  64. timeout = -1 * time.Second
  65. ipPriv *ip.IP
  66. config Config
  67. // Version contains the program Version, e.g. 1.0.1
  68. Version string
  69. // BuildDate contains the date and time at which the program was compiled
  70. BuildDate string
  71. )
  72. // Config contains the program configuration
  73. type Config struct {
  74. Live bool
  75. Interface string
  76. SnapshotLen int
  77. Filter string
  78. Promiscuous bool
  79. NatsURL string
  80. NatsQueue string
  81. NatsUser string
  82. NatsPassword string
  83. NatsCA string
  84. SleepFor duration
  85. RequestsFile string
  86. UseXForwardedAsSource bool
  87. UseVhostAsSource bool
  88. Quiet bool
  89. Protocol string
  90. Trace bool
  91. ApacheLog string
  92. ApacheReplay string
  93. NginxLog string
  94. NginxLogFormat string
  95. NginxReplay string
  96. HostName string
  97. AccessWatchKey string
  98. RPCAddress string
  99. ReconnectToNatsAfter duration
  100. ResetLiveCaptureAfter duration
  101. }
  102. type duration struct {
  103. time.Duration
  104. }
  105. func (d *duration) UnmarshalText(text []byte) error {
  106. var err error
  107. d.Duration, err = time.ParseDuration(string(text))
  108. return err
  109. }
  110. func (c Config) print() {
  111. fmt.Printf("Live: %t\n", c.Live)
  112. fmt.Printf("Interface: %s\n", c.Interface)
  113. fmt.Printf("SnapshotLen: %d\n", c.SnapshotLen)
  114. fmt.Printf("Filter: %s\n", c.Filter)
  115. fmt.Printf("Promiscuous: %t\n", c.Promiscuous)
  116. fmt.Printf("NatsURL: %s\n", c.NatsURL)
  117. fmt.Printf("NatsQueue: %s\n", c.NatsQueue)
  118. fmt.Printf("NatsUser: %s\n", c.NatsUser)
  119. fmt.Printf("NatsPassword: %s\n", c.NatsPassword)
  120. fmt.Printf("NatsCA: %s\n", c.NatsCA)
  121. fmt.Printf("ReconnectToNatsAfter: %s\n", c.ReconnectToNatsAfter.String())
  122. fmt.Printf("SleepFor: %s\n", c.SleepFor.String())
  123. fmt.Printf("RequestsFile: %s\n", c.RequestsFile)
  124. fmt.Printf("Apache Log: %s\n", c.ApacheLog)
  125. fmt.Printf("Apache Replay: %s\n", c.ApacheReplay)
  126. fmt.Printf("Nginx Log: %s\n", c.NginxLog)
  127. fmt.Printf("Nginx Log Format: %s\n", c.NginxLogFormat)
  128. fmt.Printf("NginxReplay: %s\n", c.NginxReplay)
  129. fmt.Printf("HostName: %s\n", c.HostName)
  130. fmt.Printf("AccessWatchKey: %s\n", c.AccessWatchKey)
  131. fmt.Printf("UseXForwardedAsSource: %t\n", c.UseXForwardedAsSource)
  132. fmt.Printf("UseVhostAsSource: %t\n", c.UseVhostAsSource)
  133. fmt.Printf("Protocol: %s\n", c.Protocol)
  134. fmt.Printf("Reset Live Cap After: %s\n", c.ResetLiveCaptureAfter.String())
  135. fmt.Printf("RPCAddress: %s\n", c.RPCAddress)
  136. fmt.Printf("Quiet: %t\n", c.Quiet)
  137. fmt.Printf("Trace: %t\n", c.Trace)
  138. }
  139. func init() {
  140. flag.Parse()
  141. nats.RegisterEncoder(protobuf.PROTOBUF_ENCODER, &protobuf.ProtobufEncoder{})
  142. }
  143. func main() {
  144. if *doVersion {
  145. version()
  146. os.Exit(0)
  147. }
  148. loadConfig()
  149. // Output how many requests per second were sent
  150. if !config.Quiet {
  151. go func(c *uint64) {
  152. for {
  153. fmt.Printf("%d requests per second\n", *c)
  154. *c = 0
  155. time.Sleep(time.Second)
  156. }
  157. }(&count)
  158. }
  159. // NATS
  160. //
  161. if config.NatsURL == "" && config.AccessWatchKey == "" {
  162. log.Fatal("No NATS URL specified (-nats-url)!")
  163. }
  164. natsIsAvailable = false
  165. natsErrorChan = make(chan error, 1)
  166. err := connectToNATS()
  167. if err != nil && config.AccessWatchKey == "" {
  168. log.Fatal(err)
  169. }
  170. // reconnect to nats periodically
  171. //
  172. if *reconnectToNatsAfter > time.Minute {
  173. go func(interval time.Duration) {
  174. for range time.Tick(interval) {
  175. if config.Trace {
  176. log.Printf("reconnecting to NATS")
  177. }
  178. natsEC.Conn.Close()
  179. natsJSONEC.Conn.Close()
  180. }
  181. }(*reconnectToNatsAfter)
  182. }
  183. go natsWatchdog(natsErrorChan)
  184. // RPC
  185. //
  186. if config.RPCAddress != "" {
  187. rpcClient, err = NewScwRPC(config.RPCAddress)
  188. if err != nil {
  189. log.Fatal(err)
  190. }
  191. }
  192. // What should I do?
  193. if config.RequestsFile != "" {
  194. replayFile()
  195. } else if config.ApacheReplay != "" {
  196. apacheLogReplay(config.ApacheReplay)
  197. } else if config.NginxReplay != "" {
  198. nginxLogReplay(config.NginxReplay, config.NginxLogFormat)
  199. } else if config.ApacheLog != "" {
  200. apacheLogCapture(config.ApacheLog)
  201. } else if config.Live {
  202. fmt.Printf("live capture (%s, %s) to %s\n", config.Interface, config.Filter, config.NatsURL)
  203. liveCapture()
  204. } else if config.NginxLog != "" && config.NginxLogFormat != "" {
  205. nginxLogCapture(config.NginxLog, config.NginxLogFormat)
  206. }
  207. }
  208. func natsWatchdog(closedChan chan error) {
  209. var lastError error
  210. for err := range closedChan {
  211. if lastError != err {
  212. lastError = err
  213. log.Println(err)
  214. }
  215. if err != nats.ErrConnectionClosed {
  216. continue
  217. }
  218. RECONNECT:
  219. for {
  220. log.Printf("Reconnecting to NATS at %s\n", *natsURL)
  221. err := connectToNATS()
  222. if err == nil {
  223. break RECONNECT
  224. }
  225. time.Sleep(1 * time.Second)
  226. }
  227. }
  228. }
  229. func connectToNATS() error {
  230. var natsConn *nats.Conn
  231. var err error
  232. if config.NatsUser != "" && config.NatsPassword != "" && config.NatsCA != "" {
  233. natsConn, err = nats.Connect(config.NatsURL, nats.UserInfo(config.NatsUser, config.NatsPassword), nats.RootCAs(config.NatsCA))
  234. } else {
  235. if config.NatsPassword != "" && config.NatsUser != "" {
  236. natsConn, err = nats.Connect(config.NatsURL, nats.UserInfo(config.NatsUser, config.NatsPassword))
  237. } else {
  238. natsConn, err = nats.Connect(config.NatsURL)
  239. }
  240. }
  241. if err != nil {
  242. return err
  243. }
  244. natsEC, err = nats.NewEncodedConn(natsConn, protobuf.PROTOBUF_ENCODER)
  245. if err != nil {
  246. return fmt.Errorf("Encoded Connection: %v", err)
  247. }
  248. natsJSONEC, err = nats.NewEncodedConn(natsConn, nats.JSON_ENCODER)
  249. if err != nil {
  250. return fmt.Errorf("Encoded Connection: %v", err)
  251. }
  252. natsIsAvailable = true
  253. return nil
  254. }
  255. type liveCap struct {
  256. filter string
  257. device string
  258. promisc bool
  259. snapshotLen int
  260. handle *pcap.Handle
  261. packetChan chan gopacket.Packet
  262. }
  263. func newLiveCap(device string, filter string, snapshotLen int, promisc bool) (*liveCap, error) {
  264. lc := &liveCap{
  265. filter: filter,
  266. device: device,
  267. promisc: promisc,
  268. snapshotLen: snapshotLen,
  269. }
  270. err := lc.SetupCap()
  271. if err != nil {
  272. return nil, err
  273. }
  274. return lc, nil
  275. }
  276. func (lc *liveCap) SetupCap() error {
  277. if !config.Quiet {
  278. log.Printf("reading incoming HTTP requests on %s %s\n", config.Interface, config.Filter)
  279. }
  280. if lc.handle != nil {
  281. lc.handle.Close()
  282. }
  283. // PCAP setup
  284. //
  285. handle, err := pcap.OpenLive(config.Interface, int32(config.SnapshotLen), config.Promiscuous, timeout)
  286. if err != nil {
  287. return err
  288. }
  289. // defer handle.Close()
  290. lc.handle = handle
  291. err = lc.handle.SetBPFFilter(config.Filter)
  292. if err != nil {
  293. return err
  294. }
  295. packetSource := gopacket.NewPacketSource(handle, handle.LinkType())
  296. lc.packetChan = packetSource.Packets()
  297. return nil
  298. }
  299. func liveCapture() {
  300. ipPriv = ip.NewIP()
  301. livecap, err := newLiveCap(config.Interface, config.Filter, config.SnapshotLen, config.Promiscuous)
  302. if err != nil {
  303. log.Fatal(err)
  304. }
  305. closeChan := time.Tick(config.ResetLiveCaptureAfter.Duration)
  306. for {
  307. select {
  308. case <-closeChan:
  309. livecap.SetupCap()
  310. case p := <-livecap.packetChan:
  311. go processPacket(p)
  312. }
  313. }
  314. }
  315. func writeLogToWatch(r *data.Request) {
  316. h := map[string]string{}
  317. if r.AcceptEncoding != "" {
  318. h["Accept-Encoding"] = r.AcceptEncoding
  319. }
  320. if r.Accept != "" {
  321. h["Accept"] = r.Accept
  322. }
  323. if r.AcceptLanguage != "" {
  324. h["Accept-Language"] = r.AcceptLanguage
  325. }
  326. if r.Cookie != "" {
  327. h["Cookie"] = r.Cookie
  328. }
  329. if r.Host != "" {
  330. h["Host"] = r.Host
  331. }
  332. if r.Referer != "" {
  333. h["Referer"] = r.Referer
  334. }
  335. if r.UserAgent != "" {
  336. h["User-Agent"] = r.UserAgent
  337. }
  338. if r.Via != "" {
  339. h["Via"] = r.Via
  340. }
  341. if r.XForwardedFor != "" {
  342. h["X-Forwarded-For"] = r.XForwardedFor
  343. }
  344. if r.XRequestedWith != "" {
  345. h["X-Requested-With"] = r.XRequestedWith
  346. }
  347. data := map[string]interface{}{
  348. "request": map[string]interface{}{
  349. "time": time.Unix(0, r.CreatedAt),
  350. "address": r.Source,
  351. "protocol": r.Protocol,
  352. "scheme": "https",
  353. "method": r.Method,
  354. "url": r.Url,
  355. "headers": h,
  356. },
  357. "response": map[string]interface{}{"status": "200"},
  358. }
  359. jdata, err := json.Marshal(data)
  360. client := &http.Client{}
  361. fmt.Println(string(jdata))
  362. buf := bytes.NewBuffer(jdata)
  363. req, err := http.NewRequest("POST", "https://log.access.watch/1.1/log", buf)
  364. req.Header.Add("Api-Key", config.AccessWatchKey)
  365. req.Header.Add("Accept", "application/json")
  366. req.Header.Add("Content-Type", "application/json")
  367. resp, err := client.Do(req)
  368. if err != nil {
  369. log.Println(err)
  370. }
  371. resp.Body.Close()
  372. }
  373. func publishRequest(queue string, request *data.Request) {
  374. if config.AccessWatchKey != "" {
  375. writeLogToWatch(request)
  376. return
  377. }
  378. if rpcClient != nil {
  379. select {
  380. case rpcClient.RChan <- request:
  381. default:
  382. }
  383. return
  384. }
  385. if !natsIsAvailable {
  386. if rand.Intn(100) == 0 {
  387. log.Println("nats connection is not available")
  388. }
  389. return
  390. }
  391. if err := natsEC.Publish(config.NatsQueue, request); err != nil {
  392. natsErrorChan <- err
  393. if err == nats.ErrConnectionClosed {
  394. natsIsAvailable = false
  395. }
  396. }
  397. }
  398. // processPacket receives a raw packet from pcap, builds a Request item from it and sends it to the queue
  399. func processPacket(packet gopacket.Packet) {
  400. hasIPv4 := false
  401. var ipSrc, ipDst string
  402. // IPv4
  403. if ipLayer := packet.Layer(layers.LayerTypeIPv4); ipLayer != nil {
  404. ip := ipLayer.(*layers.IPv4)
  405. ipSrc = ip.SrcIP.String()
  406. ipDst = ip.DstIP.String()
  407. hasIPv4 = true
  408. }
  409. // IPv6
  410. if !hasIPv4 {
  411. if ipLayer := packet.Layer(layers.LayerTypeIPv6); ipLayer != nil {
  412. ip := ipLayer.(*layers.IPv6)
  413. ipSrc = ip.SrcIP.String()
  414. ipDst = ip.DstIP.String()
  415. }
  416. }
  417. // TCP
  418. tcpLayer := packet.Layer(layers.LayerTypeTCP)
  419. if tcpLayer == nil {
  420. return
  421. }
  422. tcp, _ := tcpLayer.(*layers.TCP)
  423. portSrc := tcp.SrcPort
  424. portDst := tcp.DstPort
  425. sequence := tcp.Seq
  426. applicationLayer := packet.ApplicationLayer()
  427. if applicationLayer == nil {
  428. return
  429. }
  430. count++
  431. if len(applicationLayer.Payload()) < 50 {
  432. log.Println("application layer too small!")
  433. return
  434. }
  435. request := data.Request{
  436. IpSrc: ipSrc,
  437. IpDst: ipDst,
  438. PortSrc: uint32(portSrc),
  439. PortDst: uint32(portDst),
  440. TcpSeq: uint32(sequence),
  441. CreatedAt: packet.Metadata().CaptureInfo.Timestamp.UnixNano(),
  442. }
  443. switch config.Protocol {
  444. case "http":
  445. err := processHTTP(&request, applicationLayer.Payload())
  446. if err != nil {
  447. log.Println(err)
  448. return
  449. }
  450. case "ajp13":
  451. err := processAJP13(&request, applicationLayer.Payload())
  452. if err != nil {
  453. log.Println(err)
  454. return
  455. }
  456. }
  457. if config.UseXForwardedAsSource && request.XForwardedFor != "" {
  458. if strings.Contains(request.XForwardedFor, ",") {
  459. ips := strings.Split(request.XForwardedFor, ",")
  460. for i := len(ips) - 1; i >= 0; i-- {
  461. ipRaw := strings.TrimSpace(ips[i])
  462. ipAddr := net.ParseIP(ipRaw)
  463. if ipAddr != nil && !ipPriv.IsPrivate(ipAddr) {
  464. request.Source = ipRaw
  465. break
  466. }
  467. }
  468. } else {
  469. ipAddr := net.ParseIP(strings.TrimSpace(request.XForwardedFor))
  470. if !ipPriv.IsPrivate(ipAddr) {
  471. request.Source = request.XForwardedFor
  472. }
  473. }
  474. }
  475. if request.Source == request.IpSrc && request.XRealIP != "" {
  476. request.Source = request.XRealIP
  477. }
  478. if config.Trace {
  479. log.Printf("[%s] %s\n", request.Source, request.Url)
  480. }
  481. publishRequest(config.NatsQueue, &request)
  482. }
  483. func processAJP13(request *data.Request, appData []byte) error {
  484. a, err := ajp13.Parse(appData)
  485. if err != nil {
  486. return fmt.Errorf("Failed to parse AJP13 request: %s", err)
  487. }
  488. request.Url = a.URI
  489. request.Method = a.Method()
  490. request.Host = a.Server
  491. request.Protocol = a.Version
  492. request.Origin = a.RemoteAddr.String()
  493. request.Source = a.RemoteAddr.String()
  494. if v, ok := a.Header("Referer"); ok {
  495. request.Referer = v
  496. }
  497. if v, ok := a.Header("Connection"); ok {
  498. request.Connection = v
  499. }
  500. if v, ok := a.Header("X-Forwarded-For"); ok {
  501. request.XForwardedFor = v
  502. }
  503. if v, ok := a.Header("X-Real-IP"); ok {
  504. request.XRealIP = v
  505. }
  506. if v, ok := a.Header("X-Requested-With"); ok {
  507. request.XRequestedWith = v
  508. }
  509. if v, ok := a.Header("Accept-Encoding"); ok {
  510. request.AcceptEncoding = v
  511. }
  512. if v, ok := a.Header("Accept-Language"); ok {
  513. request.AcceptLanguage = v
  514. }
  515. if v, ok := a.Header("User-Agent"); ok {
  516. request.UserAgent = v
  517. }
  518. if v, ok := a.Header("Accept"); ok {
  519. request.Accept = v
  520. }
  521. if v, ok := a.Header("Cookie"); ok {
  522. request.Cookie = v
  523. }
  524. if v, ok := a.Header("X-Forwarded-Host"); ok {
  525. if v != request.Host {
  526. request.Host = v
  527. }
  528. }
  529. return nil
  530. }
  531. func processHTTP(request *data.Request, appData []byte) error {
  532. reader := bufio.NewReader(strings.NewReader(string(appData)))
  533. req, err := http.ReadRequest(reader)
  534. if err != nil {
  535. return fmt.Errorf("Failed to parse HTTP header: %s", err)
  536. }
  537. request.Url = req.URL.String()
  538. request.Method = req.Method
  539. request.Referer = req.Referer()
  540. request.Host = req.Host
  541. request.Protocol = req.Proto
  542. request.Origin = request.Host
  543. if _, ok := req.Header["Connection"]; ok {
  544. request.Connection = req.Header["Connection"][0]
  545. }
  546. if _, ok := req.Header["X-Forwarded-For"]; ok {
  547. request.XForwardedFor = req.Header["X-Forwarded-For"][0]
  548. }
  549. // CloudFlare: override X-Forwarded for since it is tainted by cloudflare
  550. if _, ok := req.Header["True-Client-Ip"]; ok {
  551. request.XForwardedFor = req.Header["True-Client-Ip"][0]
  552. }
  553. if _, ok := req.Header["X-Real-Ip"]; ok {
  554. request.XRealIP = req.Header["X-Real-Ip"][0]
  555. }
  556. if _, ok := req.Header["X-Requested-With"]; ok {
  557. request.XRequestedWith = req.Header["X-Requested-With"][0]
  558. }
  559. if _, ok := req.Header["Accept-Encoding"]; ok {
  560. request.AcceptEncoding = req.Header["Accept-Encoding"][0]
  561. }
  562. if _, ok := req.Header["Accept-Language"]; ok {
  563. request.AcceptLanguage = req.Header["Accept-Language"][0]
  564. }
  565. if _, ok := req.Header["User-Agent"]; ok {
  566. request.UserAgent = req.Header["User-Agent"][0]
  567. }
  568. if _, ok := req.Header["Accept"]; ok {
  569. request.Accept = req.Header["Accept"][0]
  570. }
  571. if _, ok := req.Header["Cookie"]; ok {
  572. request.Cookie = req.Header["Cookie"][0]
  573. }
  574. request.Source = request.IpSrc
  575. return nil
  576. }
  577. // replayFile takes a file containing a list of requests (SourceIP Url) and queues the requests
  578. // e.g.
  579. // 157.55.39.229 /gross-gerau/12012260-beate-anstatt
  580. // 103.232.100.98 /weinsheim-eifel/13729444-plus-warenhandelsges-mbh
  581. func replayFile() {
  582. var req data.Request
  583. var startTs time.Time
  584. var endTs time.Time
  585. rand.Seed(time.Now().UnixNano())
  586. for {
  587. fh, err := os.Open(config.RequestsFile)
  588. if err != nil {
  589. log.Fatalf("Failed to open request file '%s': %s", config.RequestsFile, err)
  590. }
  591. c := csv.NewReader(fh)
  592. c.Comma = ' '
  593. for {
  594. if config.SleepFor.Duration > time.Nanosecond {
  595. startTs = time.Now()
  596. }
  597. r, err := c.Read()
  598. if err == io.EOF {
  599. break
  600. }
  601. if err != nil {
  602. log.Println(err)
  603. continue
  604. }
  605. req.IpSrc = r[0]
  606. req.Source = r[0]
  607. req.Url = r[1]
  608. req.UserAgent = "Munch/1.0"
  609. req.Host = "demo.scraperwall.com"
  610. req.CreatedAt = time.Now().UnixNano()
  611. publishRequest(config.NatsQueue, &req)
  612. if strings.Index(r[1], ".") < 0 {
  613. hash := sha1.New()
  614. io.WriteString(hash, r[0])
  615. fp := data.Fingerprint{
  616. ClientID: "scw",
  617. Fingerprint: fmt.Sprintf("%x", hash.Sum(nil)),
  618. Remote: r[0],
  619. Url: r[1],
  620. Source: r[0],
  621. CreatedAt: time.Now(),
  622. }
  623. if strings.HasPrefix(r[0], "50.31.") {
  624. fp.Fingerprint = "a1f2c2ee560ce6580d66d451a9c8dfbf"
  625. natsJSONEC.Publish("fingerprints_scw", fp)
  626. } else if rand.Intn(10) < 5 {
  627. natsJSONEC.Publish("fingerprints_scw", fp)
  628. }
  629. }
  630. count++
  631. if config.SleepFor.Duration >= time.Nanosecond {
  632. endTs = time.Now()
  633. if endTs.Before(startTs.Add(config.SleepFor.Duration)) {
  634. time.Sleep(config.SleepFor.Duration - endTs.Sub(startTs))
  635. }
  636. }
  637. }
  638. }
  639. }
  640. func loadConfig() {
  641. // initialize with values from the command line / environment
  642. config.Live = *doLiveCapture
  643. config.Interface = *iface
  644. config.SnapshotLen = *snapshotLen
  645. config.Filter = *filter
  646. config.Promiscuous = *promiscuous
  647. config.NatsURL = *natsURL
  648. config.NatsQueue = *natsQueue
  649. config.NatsUser = *natsUser
  650. config.NatsPassword = *natsPassword
  651. config.NatsCA = *natsCA
  652. config.SleepFor.Duration = *sleepFor
  653. config.RequestsFile = *requestsFile
  654. config.UseXForwardedAsSource = *useXForwardedAsSource
  655. config.UseVhostAsSource = *useVhostAsSource
  656. config.Protocol = *protocol
  657. config.ApacheLog = *apacheLog
  658. config.ApacheReplay = *apacheReplay
  659. config.NginxLog = *nginxLog
  660. config.NginxLogFormat = *nginxFormat
  661. config.NginxReplay = *nginxReplay
  662. config.HostName = *hostName
  663. config.Quiet = *beQuiet
  664. config.Trace = *trace
  665. config.AccessWatchKey = *accessWatchKey
  666. config.RPCAddress = *rpcAddr
  667. config.ReconnectToNatsAfter.Duration = *reconnectToNatsAfter
  668. config.ResetLiveCaptureAfter.Duration = *resetLiveCapAfter
  669. if *configFile == "" {
  670. return
  671. }
  672. _, err := os.Stat(*configFile)
  673. if err != nil {
  674. log.Printf("%s: %s\n", *configFile, err)
  675. return
  676. }
  677. if _, err = toml.DecodeFile(*configFile, &config); err != nil {
  678. log.Printf("%s: %s\n", *configFile, err)
  679. }
  680. if !config.Quiet {
  681. config.print()
  682. }
  683. }
  684. // version outputs build information...
  685. func version() {
  686. fmt.Printf("munchclient %s, built on %s\n", Version, BuildDate)
  687. }