main.go 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837
  1. package main
  2. import (
  3. "bufio"
  4. "bytes"
  5. "crypto/sha1"
  6. "encoding/csv"
  7. "encoding/json"
  8. "flag"
  9. "fmt"
  10. "io"
  11. "log"
  12. "math/rand"
  13. "net"
  14. "net/http"
  15. "os"
  16. "strings"
  17. "time"
  18. "github.com/BurntSushi/toml"
  19. "github.com/google/gopacket"
  20. "github.com/google/gopacket/layers"
  21. "github.com/google/gopacket/pcap"
  22. "github.com/nats-io/nats"
  23. "github.com/nats-io/nats/encoders/protobuf"
  24. "git.scraperwall.com/scw/ajp13"
  25. "git.scraperwall.com/scw/data"
  26. "git.scraperwall.com/scw/ip"
  27. )
  28. var (
  29. doLiveCapture = flag.Bool("live", false, "Capture data in real time from a given interface")
  30. iface = flag.String("interface", "eth0", "Interface to get packets from")
  31. snapshotLen = flag.Int("snapshot-len", 8192, "Snapshot Length in Bytes")
  32. filter = flag.String("filter", "tcp", "PCAP filter expression")
  33. promiscuous = flag.Bool("promiscuous", false, "Switch interface into promiscuous mode?")
  34. natsURL = flag.String("nats-url", "nats://127.0.0.1:4222", "The URL of the NATS server")
  35. natsUser = flag.String("nats-user", "", "The user for NATS authentication")
  36. natsPassword = flag.String("nats-password", "", "The password for NATS authentication")
  37. natsQueue = flag.String("nats-queue", "requests", "The NATS queue name")
  38. natsCA = flag.String("nats-ca", "", "CA chain for NATS TLS")
  39. reconnectToNatsAfter = flag.Duration("reconnect-to-nats-after", 0, "reconnect to nats after this long periodically")
  40. resetLiveCapAfter = flag.Duration("rest-live-cap-after", 0, "reset the live capture setup after this amount of time")
  41. sleepFor = flag.Duration("sleep", 0, "Sleep this long between sending data (only when replaying a file)")
  42. requestsFile = flag.String("requests", "", "CSV file containing requests (IP and URL)")
  43. protocol = flag.String("protocol", "http", "which protocol to parse: http or ajp13")
  44. useXForwardedAsSource = flag.Bool("use-x-forwarded", false, "Use the IP address in X-Forwarded-For as source")
  45. trace = flag.Bool("trace", false, "Trace the packet capturing")
  46. apacheLog = flag.String("apache-log", "", "Parse an Apache Log file")
  47. apacheReplay = flag.String("apache-replay", "", "Apache log file to replay into the system")
  48. nginxLog = flag.String("nginx-log", "", "Nginx log file to tail")
  49. nginxFormat = flag.String("nginx-format", "", "The nginx log file format")
  50. nginxReplay = flag.String("nginx-replay", "", "Replay this nginx logfile")
  51. hostName = flag.String("hostname", "", "Override the captured hostname with this one")
  52. accessWatchKey = flag.String("access-watch-key", "", "access.watch API key")
  53. configFile = flag.String("config", "", "The location of the TOML config file")
  54. rpcAddr = flag.String("rpc-address", "", "The address where the RPC server is listening")
  55. beQuiet = flag.Bool("quiet", true, "Be quiet")
  56. doVersion = flag.Bool("version", false, "Show version information")
  57. natsEC *nats.EncodedConn
  58. natsJSONEC *nats.EncodedConn
  59. rpcClient *ScwRPC
  60. natsErrorChan chan error
  61. natsIsAvailable bool
  62. count uint64
  63. timeout = -1 * time.Second
  64. ipPriv *ip.IP
  65. config Config
  66. // Version contains the program Version, e.g. 1.0.1
  67. Version string
  68. // BuildDate contains the date and time at which the program was compiled
  69. BuildDate string
  70. )
  71. // Config contains the program configuration
  72. type Config struct {
  73. Live bool
  74. Interface string
  75. SnapshotLen int
  76. Filter string
  77. Promiscuous bool
  78. NatsURL string
  79. NatsQueue string
  80. NatsUser string
  81. NatsPassword string
  82. NatsCA string
  83. SleepFor duration
  84. RequestsFile string
  85. UseXForwardedAsSource bool
  86. Quiet bool
  87. Protocol string
  88. Trace bool
  89. ApacheLog string
  90. ApacheReplay string
  91. NginxLog string
  92. NginxLogFormat string
  93. NginxReplay string
  94. HostName string
  95. AccessWatchKey string
  96. RPCAddress string
  97. ReconnectToNatsAfter duration
  98. ResetLiveCaptureAfter duration
  99. }
  100. type duration struct {
  101. time.Duration
  102. }
  103. func (d *duration) UnmarshalText(text []byte) error {
  104. var err error
  105. d.Duration, err = time.ParseDuration(string(text))
  106. return err
  107. }
  108. func (c Config) print() {
  109. fmt.Printf("Live: %t\n", c.Live)
  110. fmt.Printf("Interface: %s\n", c.Interface)
  111. fmt.Printf("SnapshotLen: %d\n", c.SnapshotLen)
  112. fmt.Printf("Filter: %s\n", c.Filter)
  113. fmt.Printf("Promiscuous: %t\n", c.Promiscuous)
  114. fmt.Printf("NatsURL: %s\n", c.NatsURL)
  115. fmt.Printf("NatsQueue: %s\n", c.NatsQueue)
  116. fmt.Printf("NatsUser: %s\n", c.NatsUser)
  117. fmt.Printf("NatsPassword: %s\n", c.NatsPassword)
  118. fmt.Printf("NatsCA: %s\n", c.NatsCA)
  119. fmt.Printf("ReconnectToNatsAfter: %s\n", c.ReconnectToNatsAfter.String())
  120. fmt.Printf("SleepFor: %s\n", c.SleepFor.String())
  121. fmt.Printf("RequestsFile: %s\n", c.RequestsFile)
  122. fmt.Printf("Apache Log: %s\n", c.ApacheLog)
  123. fmt.Printf("Apache Replay: %s\n", c.ApacheReplay)
  124. fmt.Printf("Nginx Log: %s\n", c.NginxLog)
  125. fmt.Printf("Nginx Log Format: %s\n", c.NginxLogFormat)
  126. fmt.Printf("NginxReplay: %s\n", c.NginxReplay)
  127. fmt.Printf("HostName: %s\n", c.HostName)
  128. fmt.Printf("AccessWatchKey: %s\n", c.AccessWatchKey)
  129. fmt.Printf("UseXForwardedAsSource: %t\n", c.UseXForwardedAsSource)
  130. fmt.Printf("Protocol: %s\n", c.Protocol)
  131. fmt.Printf("Reset Live Cap After: %s\n", c.ResetLiveCaptureAfter.String())
  132. fmt.Printf("RPCAddress: %s\n", c.RPCAddress)
  133. fmt.Printf("Quiet: %t\n", c.Quiet)
  134. fmt.Printf("Trace: %t\n", c.Trace)
  135. }
  136. func init() {
  137. flag.Parse()
  138. nats.RegisterEncoder(protobuf.PROTOBUF_ENCODER, &protobuf.ProtobufEncoder{})
  139. }
  140. func main() {
  141. if *doVersion {
  142. version()
  143. os.Exit(0)
  144. }
  145. loadConfig()
  146. // Output how many requests per second were sent
  147. if !config.Quiet {
  148. go func(c *uint64) {
  149. for {
  150. fmt.Printf("%d requests per second\n", *c)
  151. *c = 0
  152. time.Sleep(time.Second)
  153. }
  154. }(&count)
  155. }
  156. // NATS
  157. //
  158. if config.NatsURL == "" && config.AccessWatchKey == "" {
  159. log.Fatal("No NATS URL specified (-nats-url)!")
  160. }
  161. natsIsAvailable = false
  162. natsErrorChan = make(chan error, 1)
  163. err := connectToNATS()
  164. if err != nil && config.AccessWatchKey == "" {
  165. log.Fatal(err)
  166. }
  167. // reconnect to nats periodically
  168. //
  169. if *reconnectToNatsAfter > 0 {
  170. go func(interval time.Duration) {
  171. for range time.Tick(interval) {
  172. natsEC.Conn.Close()
  173. natsJSONEC.Conn.Close()
  174. }
  175. }(*reconnectToNatsAfter)
  176. }
  177. go natsWatchdog(natsErrorChan)
  178. // RPC
  179. //
  180. if config.RPCAddress != "" {
  181. rpcClient, err = NewScwRPC(config.RPCAddress)
  182. if err != nil {
  183. log.Fatal(err)
  184. }
  185. }
  186. // What should I do?
  187. if config.RequestsFile != "" {
  188. replayFile()
  189. } else if config.ApacheReplay != "" {
  190. apacheLogReplay(config.ApacheReplay)
  191. } else if config.NginxReplay != "" {
  192. nginxLogReplay(config.NginxReplay, config.NginxLogFormat)
  193. } else if config.ApacheLog != "" {
  194. apacheLogCapture(config.ApacheLog)
  195. } else if config.Live {
  196. fmt.Printf("live capture (%s, %s) to %s\n", config.Interface, config.Filter, config.NatsURL)
  197. liveCapture()
  198. } else if config.NginxLog != "" && config.NginxLogFormat != "" {
  199. nginxLogCapture(config.NginxLog, config.NginxLogFormat)
  200. }
  201. }
  202. func natsWatchdog(closedChan chan error) {
  203. var lastError error
  204. for err := range closedChan {
  205. if lastError != err {
  206. lastError = err
  207. log.Println(err)
  208. }
  209. if err != nats.ErrConnectionClosed {
  210. continue
  211. }
  212. RECONNECT:
  213. for {
  214. log.Printf("Reconnecting to NATS at %s\n", *natsURL)
  215. err := connectToNATS()
  216. if err == nil {
  217. break RECONNECT
  218. }
  219. time.Sleep(1 * time.Second)
  220. }
  221. }
  222. }
  223. func connectToNATS() error {
  224. var natsConn *nats.Conn
  225. var err error
  226. if config.NatsUser != "" && config.NatsPassword != "" && config.NatsCA != "" {
  227. natsConn, err = nats.Connect(config.NatsURL, nats.UserInfo(config.NatsUser, config.NatsPassword), nats.RootCAs(config.NatsCA))
  228. } else {
  229. if config.NatsPassword != "" && config.NatsUser != "" {
  230. natsConn, err = nats.Connect(config.NatsURL, nats.UserInfo(config.NatsUser, config.NatsPassword))
  231. } else {
  232. natsConn, err = nats.Connect(config.NatsURL)
  233. }
  234. }
  235. if err != nil {
  236. return err
  237. }
  238. natsEC, err = nats.NewEncodedConn(natsConn, protobuf.PROTOBUF_ENCODER)
  239. if err != nil {
  240. return fmt.Errorf("Encoded Connection: %v", err)
  241. }
  242. natsJSONEC, err = nats.NewEncodedConn(natsConn, nats.JSON_ENCODER)
  243. if err != nil {
  244. return fmt.Errorf("Encoded Connection: %v", err)
  245. }
  246. natsIsAvailable = true
  247. return nil
  248. }
  249. type liveCap struct {
  250. filter string
  251. device string
  252. promisc bool
  253. snapshotLen int
  254. handle *pcap.Handle
  255. packetChan chan gopacket.Packet
  256. }
  257. func newLiveCap(device string, filter string, snapshotLen int, promisc bool) (*liveCap, error) {
  258. lc := liveCap{
  259. filter: filter,
  260. device: device,
  261. promisc: promisc,
  262. snapshotLen: snapshotLen,
  263. }
  264. err := lc.Setup()
  265. if err != nil {
  266. return nil, err
  267. }
  268. return &lc, nil
  269. }
  270. func (lc *liveCap) Setup() error {
  271. /*
  272. if lc.packetChan != nil {
  273. close(lc.packetChan)
  274. }
  275. */
  276. if lc.handle != nil {
  277. lc.handle.Close()
  278. }
  279. // PCAP setup
  280. //
  281. handle, err := pcap.OpenLive(config.Interface, int32(config.SnapshotLen), config.Promiscuous, timeout)
  282. if err != nil {
  283. return err
  284. }
  285. // defer handle.Close()
  286. lc.handle = handle
  287. err = lc.handle.SetBPFFilter(config.Filter)
  288. if err != nil {
  289. return err
  290. }
  291. packetSource := gopacket.NewPacketSource(handle, handle.LinkType())
  292. lc.packetChan = packetSource.Packets()
  293. return nil
  294. }
  295. func liveCapture() {
  296. ipPriv = ip.NewIP()
  297. livecap, err := newLiveCap(config.Interface, config.Filter, config.SnapshotLen, config.Promiscuous)
  298. if err != nil {
  299. log.Fatal(err)
  300. }
  301. closeChan := make(chan bool, 1)
  302. go func(cc chan bool) {
  303. for range time.Tick(1 * time.Minute) {
  304. cc <- true
  305. }
  306. }(closeChan)
  307. for {
  308. select {
  309. case <-closeChan:
  310. livecap.Setup()
  311. case packet := <-livecap.packetChan:
  312. go processPacket(packet)
  313. }
  314. }
  315. }
  316. func setupLiveCapture() (chan gopacket.Packet, error) {
  317. if !config.Quiet {
  318. log.Printf("reading incoming HTTP requests on %s %s\n", config.Interface, config.Filter)
  319. }
  320. // PCAP setup
  321. //
  322. handle, err := pcap.OpenLive(config.Interface, int32(config.SnapshotLen), config.Promiscuous, timeout)
  323. if err != nil {
  324. return nil, err
  325. // log.Fatal(err)
  326. }
  327. // defer handle.Close()
  328. err = handle.SetBPFFilter(config.Filter)
  329. if err != nil {
  330. return nil, err
  331. // log.Fatal(err)
  332. }
  333. packetSource := gopacket.NewPacketSource(handle, handle.LinkType())
  334. return packetSource.Packets(), nil
  335. }
  336. func writeLogToWatch(r *data.Request) {
  337. h := map[string]string{}
  338. if r.AcceptEncoding != "" {
  339. h["Accept-Encoding"] = r.AcceptEncoding
  340. }
  341. if r.Accept != "" {
  342. h["Accept"] = r.Accept
  343. }
  344. if r.AcceptLanguage != "" {
  345. h["Accept-Language"] = r.AcceptLanguage
  346. }
  347. if r.Cookie != "" {
  348. h["Cookie"] = r.Cookie
  349. }
  350. if r.Host != "" {
  351. h["Host"] = r.Host
  352. }
  353. if r.Referer != "" {
  354. h["Referer"] = r.Referer
  355. }
  356. if r.UserAgent != "" {
  357. h["User-Agent"] = r.UserAgent
  358. }
  359. if r.Via != "" {
  360. h["Via"] = r.Via
  361. }
  362. if r.XForwardedFor != "" {
  363. h["X-Forwarded-For"] = r.XForwardedFor
  364. }
  365. if r.XRequestedWith != "" {
  366. h["X-Requested-With"] = r.XRequestedWith
  367. }
  368. data := map[string]interface{}{
  369. "request": map[string]interface{}{
  370. "time": time.Unix(0, r.CreatedAt),
  371. "address": r.Source,
  372. "protocol": r.Protocol,
  373. "scheme": "https",
  374. "method": r.Method,
  375. "url": r.Url,
  376. "headers": h,
  377. },
  378. "response": map[string]interface{}{"status": "200"},
  379. }
  380. jdata, err := json.Marshal(data)
  381. client := &http.Client{}
  382. fmt.Println(string(jdata))
  383. buf := bytes.NewBuffer(jdata)
  384. req, err := http.NewRequest("POST", "https://log.access.watch/1.1/log", buf)
  385. req.Header.Add("Api-Key", config.AccessWatchKey)
  386. req.Header.Add("Accept", "application/json")
  387. req.Header.Add("Content-Type", "application/json")
  388. resp, err := client.Do(req)
  389. if err != nil {
  390. log.Println(err)
  391. }
  392. resp.Body.Close()
  393. }
  394. func publishRequest(queue string, request *data.Request) {
  395. if config.AccessWatchKey != "" {
  396. writeLogToWatch(request)
  397. return
  398. }
  399. if rpcClient != nil {
  400. select {
  401. case rpcClient.RChan <- request:
  402. default:
  403. }
  404. return
  405. }
  406. if !natsIsAvailable {
  407. if rand.Intn(100) == 0 {
  408. log.Println("nats connection is not available")
  409. }
  410. return
  411. }
  412. if err := natsEC.Publish(config.NatsQueue, request); err != nil {
  413. natsErrorChan <- err
  414. if err == nats.ErrConnectionClosed {
  415. natsIsAvailable = false
  416. }
  417. }
  418. }
  419. // processPacket receives a raw packet from pcap, builds a Request item from it and sends it to the queue
  420. func processPacket(packet gopacket.Packet) {
  421. hasIPv4 := false
  422. var ipSrc, ipDst string
  423. // IPv4
  424. if ipLayer := packet.Layer(layers.LayerTypeIPv4); ipLayer != nil {
  425. ip := ipLayer.(*layers.IPv4)
  426. ipSrc = ip.SrcIP.String()
  427. ipDst = ip.DstIP.String()
  428. hasIPv4 = true
  429. }
  430. // IPv6
  431. if !hasIPv4 {
  432. if ipLayer := packet.Layer(layers.LayerTypeIPv6); ipLayer != nil {
  433. ip := ipLayer.(*layers.IPv6)
  434. ipSrc = ip.SrcIP.String()
  435. ipDst = ip.DstIP.String()
  436. }
  437. }
  438. // TCP
  439. tcpLayer := packet.Layer(layers.LayerTypeTCP)
  440. if tcpLayer == nil {
  441. return
  442. }
  443. tcp, _ := tcpLayer.(*layers.TCP)
  444. portSrc := tcp.SrcPort
  445. portDst := tcp.DstPort
  446. sequence := tcp.Seq
  447. applicationLayer := packet.ApplicationLayer()
  448. if applicationLayer == nil {
  449. return
  450. }
  451. count++
  452. if len(applicationLayer.Payload()) < 50 {
  453. log.Println("application layer too small!")
  454. return
  455. }
  456. request := data.Request{
  457. IpSrc: ipSrc,
  458. IpDst: ipDst,
  459. PortSrc: uint32(portSrc),
  460. PortDst: uint32(portDst),
  461. TcpSeq: uint32(sequence),
  462. CreatedAt: packet.Metadata().CaptureInfo.Timestamp.UnixNano(),
  463. }
  464. switch config.Protocol {
  465. case "http":
  466. err := processHTTP(&request, applicationLayer.Payload())
  467. if err != nil {
  468. log.Println(err)
  469. return
  470. }
  471. case "ajp13":
  472. err := processAJP13(&request, applicationLayer.Payload())
  473. if err != nil {
  474. log.Println(err)
  475. return
  476. }
  477. }
  478. if config.UseXForwardedAsSource && request.XForwardedFor != "" {
  479. if strings.Contains(request.XForwardedFor, ",") {
  480. ips := strings.Split(request.XForwardedFor, ",")
  481. for i := len(ips) - 1; i >= 0; i-- {
  482. ipRaw := strings.TrimSpace(ips[i])
  483. ipAddr := net.ParseIP(ipRaw)
  484. if ipAddr != nil && !ipPriv.IsPrivate(ipAddr) {
  485. request.Source = ipRaw
  486. break
  487. }
  488. }
  489. } else {
  490. ipAddr := net.ParseIP(strings.TrimSpace(request.XForwardedFor))
  491. if !ipPriv.IsPrivate(ipAddr) {
  492. request.Source = request.XForwardedFor
  493. }
  494. }
  495. }
  496. if request.Source == request.IpSrc && request.XRealIP != "" {
  497. request.Source = request.XRealIP
  498. }
  499. if config.Trace {
  500. log.Printf("[%s] %s\n", request.Source, request.Url)
  501. }
  502. publishRequest(config.NatsQueue, &request)
  503. }
  504. func processAJP13(request *data.Request, appData []byte) error {
  505. a, err := ajp13.Parse(appData)
  506. if err != nil {
  507. return fmt.Errorf("Failed to parse AJP13 request: %s", err)
  508. }
  509. request.Url = a.URI
  510. request.Method = a.Method()
  511. request.Host = a.Server
  512. request.Protocol = a.Version
  513. request.Origin = a.RemoteAddr.String()
  514. request.Source = a.RemoteAddr.String()
  515. if v, ok := a.Header("Referer"); ok {
  516. request.Referer = v
  517. }
  518. if v, ok := a.Header("Connection"); ok {
  519. request.Connection = v
  520. }
  521. if v, ok := a.Header("X-Forwarded-For"); ok {
  522. request.XForwardedFor = v
  523. }
  524. if v, ok := a.Header("X-Real-IP"); ok {
  525. request.XRealIP = v
  526. }
  527. if v, ok := a.Header("X-Requested-With"); ok {
  528. request.XRequestedWith = v
  529. }
  530. if v, ok := a.Header("Accept-Encoding"); ok {
  531. request.AcceptEncoding = v
  532. }
  533. if v, ok := a.Header("Accept-Language"); ok {
  534. request.AcceptLanguage = v
  535. }
  536. if v, ok := a.Header("User-Agent"); ok {
  537. request.UserAgent = v
  538. }
  539. if v, ok := a.Header("Accept"); ok {
  540. request.Accept = v
  541. }
  542. if v, ok := a.Header("Cookie"); ok {
  543. request.Cookie = v
  544. }
  545. if v, ok := a.Header("X-Forwarded-Host"); ok {
  546. if v != request.Host {
  547. request.Host = v
  548. }
  549. }
  550. return nil
  551. }
  552. func processHTTP(request *data.Request, appData []byte) error {
  553. reader := bufio.NewReader(strings.NewReader(string(appData)))
  554. req, err := http.ReadRequest(reader)
  555. if err != nil {
  556. return fmt.Errorf("Failed to parse HTTP header: %s", err)
  557. }
  558. request.Url = req.URL.String()
  559. request.Method = req.Method
  560. request.Referer = req.Referer()
  561. request.Host = req.Host
  562. request.Protocol = req.Proto
  563. request.Origin = request.Host
  564. if _, ok := req.Header["Connection"]; ok {
  565. request.Connection = req.Header["Connection"][0]
  566. }
  567. if _, ok := req.Header["X-Forwarded-For"]; ok {
  568. request.XForwardedFor = req.Header["X-Forwarded-For"][0]
  569. }
  570. // CloudFlare: override X-Forwarded for since it is tainted by cloudflare
  571. if _, ok := req.Header["True-Client-Ip"]; ok {
  572. request.XForwardedFor = req.Header["True-Client-Ip"][0]
  573. }
  574. if _, ok := req.Header["X-Real-Ip"]; ok {
  575. request.XRealIP = req.Header["X-Real-Ip"][0]
  576. }
  577. if _, ok := req.Header["X-Requested-With"]; ok {
  578. request.XRequestedWith = req.Header["X-Requested-With"][0]
  579. }
  580. if _, ok := req.Header["Accept-Encoding"]; ok {
  581. request.AcceptEncoding = req.Header["Accept-Encoding"][0]
  582. }
  583. if _, ok := req.Header["Accept-Language"]; ok {
  584. request.AcceptLanguage = req.Header["Accept-Language"][0]
  585. }
  586. if _, ok := req.Header["User-Agent"]; ok {
  587. request.UserAgent = req.Header["User-Agent"][0]
  588. }
  589. if _, ok := req.Header["Accept"]; ok {
  590. request.Accept = req.Header["Accept"][0]
  591. }
  592. if _, ok := req.Header["Cookie"]; ok {
  593. request.Cookie = req.Header["Cookie"][0]
  594. }
  595. request.Source = request.IpSrc
  596. return nil
  597. }
  598. // replayFile takes a file containing a list of requests (SourceIP Url) and queues the requests
  599. // e.g.
  600. // 157.55.39.229 /gross-gerau/12012260-beate-anstatt
  601. // 103.232.100.98 /weinsheim-eifel/13729444-plus-warenhandelsges-mbh
  602. func replayFile() {
  603. var req data.Request
  604. var startTs time.Time
  605. var endTs time.Time
  606. rand.Seed(time.Now().UnixNano())
  607. for {
  608. fh, err := os.Open(config.RequestsFile)
  609. if err != nil {
  610. log.Fatalf("Failed to open request file '%s': %s", config.RequestsFile, err)
  611. }
  612. c := csv.NewReader(fh)
  613. c.Comma = ' '
  614. for {
  615. if config.SleepFor.Duration > time.Nanosecond {
  616. startTs = time.Now()
  617. }
  618. r, err := c.Read()
  619. if err == io.EOF {
  620. break
  621. }
  622. if err != nil {
  623. log.Println(err)
  624. continue
  625. }
  626. req.IpSrc = r[0]
  627. req.Source = r[0]
  628. req.Url = r[1]
  629. req.UserAgent = "Munch/1.0"
  630. req.Host = "demo.scraperwall.com"
  631. req.CreatedAt = time.Now().UnixNano()
  632. publishRequest(config.NatsQueue, &req)
  633. if strings.Index(r[1], ".") < 0 {
  634. hash := sha1.New()
  635. io.WriteString(hash, r[0])
  636. fp := data.Fingerprint{
  637. ClientID: "scw",
  638. Fingerprint: fmt.Sprintf("%x", hash.Sum(nil)),
  639. Remote: r[0],
  640. Url: r[1],
  641. Source: r[0],
  642. CreatedAt: time.Now(),
  643. }
  644. if strings.HasPrefix(r[0], "50.31.") {
  645. fp.Fingerprint = "a1f2c2ee560ce6580d66d451a9c8dfbf"
  646. natsJSONEC.Publish("fingerprints_scw", fp)
  647. } else if rand.Intn(10) < 5 {
  648. natsJSONEC.Publish("fingerprints_scw", fp)
  649. }
  650. }
  651. count++
  652. if config.SleepFor.Duration >= time.Nanosecond {
  653. endTs = time.Now()
  654. if endTs.Before(startTs.Add(config.SleepFor.Duration)) {
  655. time.Sleep(config.SleepFor.Duration - endTs.Sub(startTs))
  656. }
  657. }
  658. }
  659. }
  660. }
  661. func loadConfig() {
  662. // initialize with values from the command line / environment
  663. config.Live = *doLiveCapture
  664. config.Interface = *iface
  665. config.SnapshotLen = *snapshotLen
  666. config.Filter = *filter
  667. config.Promiscuous = *promiscuous
  668. config.NatsURL = *natsURL
  669. config.NatsQueue = *natsQueue
  670. config.NatsUser = *natsUser
  671. config.NatsPassword = *natsPassword
  672. config.NatsCA = *natsCA
  673. config.SleepFor.Duration = *sleepFor
  674. config.RequestsFile = *requestsFile
  675. config.UseXForwardedAsSource = *useXForwardedAsSource
  676. config.Protocol = *protocol
  677. config.ApacheLog = *apacheLog
  678. config.ApacheReplay = *apacheReplay
  679. config.NginxLog = *nginxLog
  680. config.NginxLogFormat = *nginxFormat
  681. config.NginxReplay = *nginxReplay
  682. config.HostName = *hostName
  683. config.Quiet = *beQuiet
  684. config.Trace = *trace
  685. config.AccessWatchKey = *accessWatchKey
  686. config.RPCAddress = *rpcAddr
  687. config.ReconnectToNatsAfter.Duration = *reconnectToNatsAfter
  688. config.ResetLiveCaptureAfter.Duration = *resetLiveCapAfter
  689. if *configFile == "" {
  690. return
  691. }
  692. _, err := os.Stat(*configFile)
  693. if err != nil {
  694. log.Printf("%s: %s\n", *configFile, err)
  695. return
  696. }
  697. if _, err = toml.DecodeFile(*configFile, &config); err != nil {
  698. log.Printf("%s: %s\n", *configFile, err)
  699. }
  700. if !config.Quiet {
  701. config.print()
  702. }
  703. }
  704. // version outputs build information...
  705. func version() {
  706. fmt.Printf("munchclient %s, built on %s\n", Version, BuildDate)
  707. }