main.go 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835
  1. package main
  2. import (
  3. "bufio"
  4. "bytes"
  5. "crypto/sha1"
  6. "encoding/csv"
  7. "encoding/json"
  8. "flag"
  9. "fmt"
  10. "io"
  11. "log"
  12. "math/rand"
  13. "net"
  14. "net/http"
  15. "os"
  16. "strings"
  17. "time"
  18. "github.com/BurntSushi/toml"
  19. "github.com/google/gopacket"
  20. "github.com/google/gopacket/layers"
  21. "github.com/google/gopacket/pcap"
  22. "github.com/nats-io/nats"
  23. "github.com/nats-io/nats/encoders/protobuf"
  24. "git.scraperwall.com/scw/ajp13"
  25. "git.scraperwall.com/scw/data"
  26. "git.scraperwall.com/scw/ip"
  27. )
  28. var (
  29. doLiveCapture = flag.Bool("live", false, "Capture data in real time from a given interface")
  30. iface = flag.String("interface", "eth0", "Interface to get packets from")
  31. snapshotLen = flag.Int("snapshot-len", 8192, "Snapshot Length in Bytes")
  32. filter = flag.String("filter", "tcp", "PCAP filter expression")
  33. promiscuous = flag.Bool("promiscuous", false, "Switch interface into promiscuous mode?")
  34. natsURL = flag.String("nats-url", "nats://127.0.0.1:4222", "The URL of the NATS server")
  35. natsUser = flag.String("nats-user", "", "The user for NATS authentication")
  36. natsPassword = flag.String("nats-password", "", "The password for NATS authentication")
  37. natsQueue = flag.String("nats-queue", "requests", "The NATS queue name")
  38. natsCA = flag.String("nats-ca", "", "CA chain for NATS TLS")
  39. reconnectToNatsAfter = flag.Duration("reconnect-to-nats-after", 0, "reconnect to nats after this long periodically")
  40. resetLiveCapAfter = flag.Duration("rest-live-cap-after", 0, "reset the live capture setup after this amount of time")
  41. sleepFor = flag.Duration("sleep", 0, "Sleep this long between sending data (only when replaying a file)")
  42. requestsFile = flag.String("requests", "", "CSV file containing requests (IP and URL)")
  43. protocol = flag.String("protocol", "http", "which protocol to parse: http or ajp13")
  44. useXForwardedAsSource = flag.Bool("use-x-forwarded", false, "Use the IP address in X-Forwarded-For as source")
  45. trace = flag.Bool("trace", false, "Trace the packet capturing")
  46. apacheLog = flag.String("apache-log", "", "Parse an Apache Log file")
  47. apacheReplay = flag.String("apache-replay", "", "Apache log file to replay into the system")
  48. nginxLog = flag.String("nginx-log", "", "Nginx log file to tail")
  49. nginxFormat = flag.String("nginx-format", "", "The nginx log file format")
  50. nginxReplay = flag.String("nginx-replay", "", "Replay this nginx logfile")
  51. hostName = flag.String("hostname", "", "Override the captured hostname with this one")
  52. accessWatchKey = flag.String("access-watch-key", "", "access.watch API key")
  53. configFile = flag.String("config", "", "The location of the TOML config file")
  54. rpcAddr = flag.String("rpc-address", "", "The address where the RPC server is listening")
  55. beQuiet = flag.Bool("quiet", true, "Be quiet")
  56. doVersion = flag.Bool("version", false, "Show version information")
  57. natsEC *nats.EncodedConn
  58. natsJSONEC *nats.EncodedConn
  59. rpcClient *ScwRPC
  60. natsErrorChan chan error
  61. natsIsAvailable bool
  62. count uint64
  63. timeout = -1 * time.Second
  64. ipPriv *ip.IP
  65. config Config
  66. // Version contains the program Version, e.g. 1.0.1
  67. Version string
  68. // BuildDate contains the date and time at which the program was compiled
  69. BuildDate string
  70. )
  71. // Config contains the program configuration
  72. type Config struct {
  73. Live bool
  74. Interface string
  75. SnapshotLen int
  76. Filter string
  77. Promiscuous bool
  78. NatsURL string
  79. NatsQueue string
  80. NatsUser string
  81. NatsPassword string
  82. NatsCA string
  83. SleepFor duration
  84. RequestsFile string
  85. UseXForwardedAsSource bool
  86. Quiet bool
  87. Protocol string
  88. Trace bool
  89. ApacheLog string
  90. ApacheReplay string
  91. NginxLog string
  92. NginxLogFormat string
  93. NginxReplay string
  94. HostName string
  95. AccessWatchKey string
  96. RPCAddress string
  97. ReconnectToNatsAfter duration
  98. ResetLiveCaptureAfter duration
  99. }
  100. type duration struct {
  101. time.Duration
  102. }
  103. func (d *duration) UnmarshalText(text []byte) error {
  104. var err error
  105. d.Duration, err = time.ParseDuration(string(text))
  106. return err
  107. }
  108. func (c Config) print() {
  109. fmt.Printf("Live: %t\n", c.Live)
  110. fmt.Printf("Interface: %s\n", c.Interface)
  111. fmt.Printf("SnapshotLen: %d\n", c.SnapshotLen)
  112. fmt.Printf("Filter: %s\n", c.Filter)
  113. fmt.Printf("Promiscuous: %t\n", c.Promiscuous)
  114. fmt.Printf("NatsURL: %s\n", c.NatsURL)
  115. fmt.Printf("NatsQueue: %s\n", c.NatsQueue)
  116. fmt.Printf("NatsUser: %s\n", c.NatsUser)
  117. fmt.Printf("NatsPassword: %s\n", c.NatsPassword)
  118. fmt.Printf("NatsCA: %s\n", c.NatsCA)
  119. fmt.Printf("ReconnectToNatsAfter: %s\n", c.ReconnectToNatsAfter.String())
  120. fmt.Printf("SleepFor: %s\n", c.SleepFor.String())
  121. fmt.Printf("RequestsFile: %s\n", c.RequestsFile)
  122. fmt.Printf("Apache Log: %s\n", c.ApacheLog)
  123. fmt.Printf("Apache Replay: %s\n", c.ApacheReplay)
  124. fmt.Printf("Nginx Log: %s\n", c.NginxLog)
  125. fmt.Printf("Nginx Log Format: %s\n", c.NginxLogFormat)
  126. fmt.Printf("NginxReplay: %s\n", c.NginxReplay)
  127. fmt.Printf("HostName: %s\n", c.HostName)
  128. fmt.Printf("AccessWatchKey: %s\n", c.AccessWatchKey)
  129. fmt.Printf("UseXForwardedAsSource: %t\n", c.UseXForwardedAsSource)
  130. fmt.Printf("Protocol: %s\n", c.Protocol)
  131. fmt.Printf("Reset Live Cap After: %s\n", c.ResetLiveCaptureAfter.String())
  132. fmt.Printf("RPCAddress: %s\n", c.RPCAddress)
  133. fmt.Printf("Quiet: %t\n", c.Quiet)
  134. fmt.Printf("Trace: %t\n", c.Trace)
  135. }
  136. func init() {
  137. flag.Parse()
  138. nats.RegisterEncoder(protobuf.PROTOBUF_ENCODER, &protobuf.ProtobufEncoder{})
  139. }
  140. func main() {
  141. if *doVersion {
  142. version()
  143. os.Exit(0)
  144. }
  145. loadConfig()
  146. // Output how many requests per second were sent
  147. if !config.Quiet {
  148. go func(c *uint64) {
  149. for {
  150. fmt.Printf("%d requests per second\n", *c)
  151. *c = 0
  152. time.Sleep(time.Second)
  153. }
  154. }(&count)
  155. }
  156. // NATS
  157. //
  158. if config.NatsURL == "" && config.AccessWatchKey == "" {
  159. log.Fatal("No NATS URL specified (-nats-url)!")
  160. }
  161. natsIsAvailable = false
  162. natsErrorChan = make(chan error, 1)
  163. err := connectToNATS()
  164. if err != nil && config.AccessWatchKey == "" {
  165. log.Fatal(err)
  166. }
  167. // reconnect to nats periodically
  168. //
  169. if *reconnectToNatsAfter > 0 {
  170. go func(interval time.Duration) {
  171. for range time.Tick(interval) {
  172. natsEC.Conn.Close()
  173. natsJSONEC.Conn.Close()
  174. }
  175. }(*reconnectToNatsAfter)
  176. }
  177. go natsWatchdog(natsErrorChan)
  178. // RPC
  179. //
  180. if config.RPCAddress != "" {
  181. rpcClient, err = NewScwRPC(config.RPCAddress)
  182. if err != nil {
  183. log.Fatal(err)
  184. }
  185. }
  186. // What should I do?
  187. if config.RequestsFile != "" {
  188. replayFile()
  189. } else if config.ApacheReplay != "" {
  190. apacheLogReplay(config.ApacheReplay)
  191. } else if config.NginxReplay != "" {
  192. nginxLogReplay(config.NginxReplay, config.NginxLogFormat)
  193. } else if config.ApacheLog != "" {
  194. apacheLogCapture(config.ApacheLog)
  195. } else if config.Live {
  196. fmt.Printf("live capture (%s, %s) to %s\n", config.Interface, config.Filter, config.NatsURL)
  197. liveCapture()
  198. } else if config.NginxLog != "" && config.NginxLogFormat != "" {
  199. nginxLogCapture(config.NginxLog, config.NginxLogFormat)
  200. }
  201. }
  202. func natsWatchdog(closedChan chan error) {
  203. var lastError error
  204. for err := range closedChan {
  205. if lastError != err {
  206. lastError = err
  207. log.Println(err)
  208. }
  209. if err != nats.ErrConnectionClosed {
  210. continue
  211. }
  212. RECONNECT:
  213. for {
  214. log.Printf("Reconnecting to NATS at %s\n", *natsURL)
  215. err := connectToNATS()
  216. if err == nil {
  217. break RECONNECT
  218. }
  219. time.Sleep(1 * time.Second)
  220. }
  221. }
  222. }
  223. func connectToNATS() error {
  224. var natsConn *nats.Conn
  225. var err error
  226. if config.NatsUser != "" && config.NatsPassword != "" && config.NatsCA != "" {
  227. natsConn, err = nats.Connect(config.NatsURL, nats.UserInfo(config.NatsUser, config.NatsPassword), nats.RootCAs(config.NatsCA))
  228. } else {
  229. if config.NatsPassword != "" && config.NatsUser != "" {
  230. natsConn, err = nats.Connect(config.NatsURL, nats.UserInfo(config.NatsUser, config.NatsPassword))
  231. } else {
  232. natsConn, err = nats.Connect(config.NatsURL)
  233. }
  234. }
  235. if err != nil {
  236. return err
  237. }
  238. natsEC, err = nats.NewEncodedConn(natsConn, protobuf.PROTOBUF_ENCODER)
  239. if err != nil {
  240. return fmt.Errorf("Encoded Connection: %v", err)
  241. }
  242. natsJSONEC, err = nats.NewEncodedConn(natsConn, nats.JSON_ENCODER)
  243. if err != nil {
  244. return fmt.Errorf("Encoded Connection: %v", err)
  245. }
  246. natsIsAvailable = true
  247. return nil
  248. }
  249. type liveCap struct {
  250. filter string
  251. device string
  252. promisc bool
  253. snapshotLen int
  254. handle *pcap.Handle
  255. packetChan chan gopacket.Packet
  256. }
  257. func newLiveCap(device string, filter string, snapshotLen int, promisc bool) (*liveCap, error) {
  258. lc := liveCap{
  259. filter: filter,
  260. device: device,
  261. promisc: promisc,
  262. snapshotLen: snapshotLen,
  263. }
  264. err := lc.Setup()
  265. if err != nil {
  266. return nil, err
  267. }
  268. return &lc, nil
  269. }
  270. func (lc *liveCap) Setup() error {
  271. if lc.packetChan != nil {
  272. close(lc.packetChan)
  273. }
  274. if lc.handle != nil {
  275. lc.handle.Close()
  276. }
  277. // PCAP setup
  278. //
  279. handle, err := pcap.OpenLive(config.Interface, int32(config.SnapshotLen), config.Promiscuous, timeout)
  280. if err != nil {
  281. return err
  282. }
  283. // defer handle.Close()
  284. lc.handle = handle
  285. err = lc.handle.SetBPFFilter(config.Filter)
  286. if err != nil {
  287. return err
  288. }
  289. packetSource := gopacket.NewPacketSource(handle, handle.LinkType())
  290. lc.packetChan = packetSource.Packets()
  291. return nil
  292. }
  293. func liveCapture() {
  294. ipPriv = ip.NewIP()
  295. livecap, err := newLiveCap(config.Interface, config.Filter, config.SnapshotLen, config.Promiscuous)
  296. if err != nil {
  297. log.Fatal(err)
  298. }
  299. closeChan := make(chan bool, 1)
  300. go func(cc chan bool) {
  301. for range time.Tick(1 * time.Minute) {
  302. cc <- true
  303. }
  304. }(closeChan)
  305. for {
  306. if !config.Quiet {
  307. log.Printf("reading incoming HTTP requests on %s %s\n", config.Interface, config.Filter)
  308. }
  309. select {
  310. case <-closeChan:
  311. livecap.Setup()
  312. case packet := <-livecap.packetChan:
  313. go processPacket(packet)
  314. }
  315. }
  316. }
  317. func setupLiveCapture() (chan gopacket.Packet, error) {
  318. // PCAP setup
  319. //
  320. handle, err := pcap.OpenLive(config.Interface, int32(config.SnapshotLen), config.Promiscuous, timeout)
  321. if err != nil {
  322. return nil, err
  323. // log.Fatal(err)
  324. }
  325. defer handle.Close()
  326. err = handle.SetBPFFilter(config.Filter)
  327. if err != nil {
  328. return nil, err
  329. // log.Fatal(err)
  330. }
  331. packetSource := gopacket.NewPacketSource(handle, handle.LinkType())
  332. return packetSource.Packets(), nil
  333. }
  334. func writeLogToWatch(r *data.Request) {
  335. h := map[string]string{}
  336. if r.AcceptEncoding != "" {
  337. h["Accept-Encoding"] = r.AcceptEncoding
  338. }
  339. if r.Accept != "" {
  340. h["Accept"] = r.Accept
  341. }
  342. if r.AcceptLanguage != "" {
  343. h["Accept-Language"] = r.AcceptLanguage
  344. }
  345. if r.Cookie != "" {
  346. h["Cookie"] = r.Cookie
  347. }
  348. if r.Host != "" {
  349. h["Host"] = r.Host
  350. }
  351. if r.Referer != "" {
  352. h["Referer"] = r.Referer
  353. }
  354. if r.UserAgent != "" {
  355. h["User-Agent"] = r.UserAgent
  356. }
  357. if r.Via != "" {
  358. h["Via"] = r.Via
  359. }
  360. if r.XForwardedFor != "" {
  361. h["X-Forwarded-For"] = r.XForwardedFor
  362. }
  363. if r.XRequestedWith != "" {
  364. h["X-Requested-With"] = r.XRequestedWith
  365. }
  366. data := map[string]interface{}{
  367. "request": map[string]interface{}{
  368. "time": time.Unix(0, r.CreatedAt),
  369. "address": r.Source,
  370. "protocol": r.Protocol,
  371. "scheme": "https",
  372. "method": r.Method,
  373. "url": r.Url,
  374. "headers": h,
  375. },
  376. "response": map[string]interface{}{"status": "200"},
  377. }
  378. jdata, err := json.Marshal(data)
  379. client := &http.Client{}
  380. fmt.Println(string(jdata))
  381. buf := bytes.NewBuffer(jdata)
  382. req, err := http.NewRequest("POST", "https://log.access.watch/1.1/log", buf)
  383. req.Header.Add("Api-Key", config.AccessWatchKey)
  384. req.Header.Add("Accept", "application/json")
  385. req.Header.Add("Content-Type", "application/json")
  386. resp, err := client.Do(req)
  387. if err != nil {
  388. log.Println(err)
  389. }
  390. resp.Body.Close()
  391. }
  392. func publishRequest(queue string, request *data.Request) {
  393. if config.AccessWatchKey != "" {
  394. writeLogToWatch(request)
  395. return
  396. }
  397. if rpcClient != nil {
  398. select {
  399. case rpcClient.RChan <- request:
  400. default:
  401. }
  402. return
  403. }
  404. if !natsIsAvailable {
  405. if rand.Intn(100) == 0 {
  406. log.Println("nats connection is not available")
  407. }
  408. return
  409. }
  410. if err := natsEC.Publish(config.NatsQueue, request); err != nil {
  411. natsErrorChan <- err
  412. if err == nats.ErrConnectionClosed {
  413. natsIsAvailable = false
  414. }
  415. }
  416. }
  417. // processPacket receives a raw packet from pcap, builds a Request item from it and sends it to the queue
  418. func processPacket(packet gopacket.Packet) {
  419. hasIPv4 := false
  420. var ipSrc, ipDst string
  421. // IPv4
  422. if ipLayer := packet.Layer(layers.LayerTypeIPv4); ipLayer != nil {
  423. ip := ipLayer.(*layers.IPv4)
  424. ipSrc = ip.SrcIP.String()
  425. ipDst = ip.DstIP.String()
  426. hasIPv4 = true
  427. }
  428. // IPv6
  429. if !hasIPv4 {
  430. if ipLayer := packet.Layer(layers.LayerTypeIPv6); ipLayer != nil {
  431. ip := ipLayer.(*layers.IPv6)
  432. ipSrc = ip.SrcIP.String()
  433. ipDst = ip.DstIP.String()
  434. }
  435. }
  436. // TCP
  437. tcpLayer := packet.Layer(layers.LayerTypeTCP)
  438. if tcpLayer == nil {
  439. return
  440. }
  441. tcp, _ := tcpLayer.(*layers.TCP)
  442. portSrc := tcp.SrcPort
  443. portDst := tcp.DstPort
  444. sequence := tcp.Seq
  445. applicationLayer := packet.ApplicationLayer()
  446. if applicationLayer == nil {
  447. return
  448. }
  449. count++
  450. if len(applicationLayer.Payload()) < 50 {
  451. log.Println("application layer too small!")
  452. return
  453. }
  454. request := data.Request{
  455. IpSrc: ipSrc,
  456. IpDst: ipDst,
  457. PortSrc: uint32(portSrc),
  458. PortDst: uint32(portDst),
  459. TcpSeq: uint32(sequence),
  460. CreatedAt: packet.Metadata().CaptureInfo.Timestamp.UnixNano(),
  461. }
  462. switch config.Protocol {
  463. case "http":
  464. err := processHTTP(&request, applicationLayer.Payload())
  465. if err != nil {
  466. log.Println(err)
  467. return
  468. }
  469. case "ajp13":
  470. err := processAJP13(&request, applicationLayer.Payload())
  471. if err != nil {
  472. log.Println(err)
  473. return
  474. }
  475. }
  476. if config.UseXForwardedAsSource && request.XForwardedFor != "" {
  477. if strings.Contains(request.XForwardedFor, ",") {
  478. ips := strings.Split(request.XForwardedFor, ",")
  479. for i := len(ips) - 1; i >= 0; i-- {
  480. ipRaw := strings.TrimSpace(ips[i])
  481. ipAddr := net.ParseIP(ipRaw)
  482. if ipAddr != nil && !ipPriv.IsPrivate(ipAddr) {
  483. request.Source = ipRaw
  484. break
  485. }
  486. }
  487. } else {
  488. ipAddr := net.ParseIP(strings.TrimSpace(request.XForwardedFor))
  489. if !ipPriv.IsPrivate(ipAddr) {
  490. request.Source = request.XForwardedFor
  491. }
  492. }
  493. }
  494. if request.Source == request.IpSrc && request.XRealIP != "" {
  495. request.Source = request.XRealIP
  496. }
  497. if config.Trace {
  498. log.Printf("[%s] %s\n", request.Source, request.Url)
  499. }
  500. publishRequest(config.NatsQueue, &request)
  501. }
  502. func processAJP13(request *data.Request, appData []byte) error {
  503. a, err := ajp13.Parse(appData)
  504. if err != nil {
  505. return fmt.Errorf("Failed to parse AJP13 request: %s", err)
  506. }
  507. request.Url = a.URI
  508. request.Method = a.Method()
  509. request.Host = a.Server
  510. request.Protocol = a.Version
  511. request.Origin = a.RemoteAddr.String()
  512. request.Source = a.RemoteAddr.String()
  513. if v, ok := a.Header("Referer"); ok {
  514. request.Referer = v
  515. }
  516. if v, ok := a.Header("Connection"); ok {
  517. request.Connection = v
  518. }
  519. if v, ok := a.Header("X-Forwarded-For"); ok {
  520. request.XForwardedFor = v
  521. }
  522. if v, ok := a.Header("X-Real-IP"); ok {
  523. request.XRealIP = v
  524. }
  525. if v, ok := a.Header("X-Requested-With"); ok {
  526. request.XRequestedWith = v
  527. }
  528. if v, ok := a.Header("Accept-Encoding"); ok {
  529. request.AcceptEncoding = v
  530. }
  531. if v, ok := a.Header("Accept-Language"); ok {
  532. request.AcceptLanguage = v
  533. }
  534. if v, ok := a.Header("User-Agent"); ok {
  535. request.UserAgent = v
  536. }
  537. if v, ok := a.Header("Accept"); ok {
  538. request.Accept = v
  539. }
  540. if v, ok := a.Header("Cookie"); ok {
  541. request.Cookie = v
  542. }
  543. if v, ok := a.Header("X-Forwarded-Host"); ok {
  544. if v != request.Host {
  545. request.Host = v
  546. }
  547. }
  548. return nil
  549. }
  550. func processHTTP(request *data.Request, appData []byte) error {
  551. reader := bufio.NewReader(strings.NewReader(string(appData)))
  552. req, err := http.ReadRequest(reader)
  553. if err != nil {
  554. return fmt.Errorf("Failed to parse HTTP header: %s", err)
  555. }
  556. request.Url = req.URL.String()
  557. request.Method = req.Method
  558. request.Referer = req.Referer()
  559. request.Host = req.Host
  560. request.Protocol = req.Proto
  561. request.Origin = request.Host
  562. if _, ok := req.Header["Connection"]; ok {
  563. request.Connection = req.Header["Connection"][0]
  564. }
  565. if _, ok := req.Header["X-Forwarded-For"]; ok {
  566. request.XForwardedFor = req.Header["X-Forwarded-For"][0]
  567. }
  568. // CloudFlare: override X-Forwarded for since it is tainted by cloudflare
  569. if _, ok := req.Header["True-Client-Ip"]; ok {
  570. request.XForwardedFor = req.Header["True-Client-Ip"][0]
  571. }
  572. if _, ok := req.Header["X-Real-Ip"]; ok {
  573. request.XRealIP = req.Header["X-Real-Ip"][0]
  574. }
  575. if _, ok := req.Header["X-Requested-With"]; ok {
  576. request.XRequestedWith = req.Header["X-Requested-With"][0]
  577. }
  578. if _, ok := req.Header["Accept-Encoding"]; ok {
  579. request.AcceptEncoding = req.Header["Accept-Encoding"][0]
  580. }
  581. if _, ok := req.Header["Accept-Language"]; ok {
  582. request.AcceptLanguage = req.Header["Accept-Language"][0]
  583. }
  584. if _, ok := req.Header["User-Agent"]; ok {
  585. request.UserAgent = req.Header["User-Agent"][0]
  586. }
  587. if _, ok := req.Header["Accept"]; ok {
  588. request.Accept = req.Header["Accept"][0]
  589. }
  590. if _, ok := req.Header["Cookie"]; ok {
  591. request.Cookie = req.Header["Cookie"][0]
  592. }
  593. request.Source = request.IpSrc
  594. return nil
  595. }
  596. // replayFile takes a file containing a list of requests (SourceIP Url) and queues the requests
  597. // e.g.
  598. // 157.55.39.229 /gross-gerau/12012260-beate-anstatt
  599. // 103.232.100.98 /weinsheim-eifel/13729444-plus-warenhandelsges-mbh
  600. func replayFile() {
  601. var req data.Request
  602. var startTs time.Time
  603. var endTs time.Time
  604. rand.Seed(time.Now().UnixNano())
  605. for {
  606. fh, err := os.Open(config.RequestsFile)
  607. if err != nil {
  608. log.Fatalf("Failed to open request file '%s': %s", config.RequestsFile, err)
  609. }
  610. c := csv.NewReader(fh)
  611. c.Comma = ' '
  612. for {
  613. if config.SleepFor.Duration > time.Nanosecond {
  614. startTs = time.Now()
  615. }
  616. r, err := c.Read()
  617. if err == io.EOF {
  618. break
  619. }
  620. if err != nil {
  621. log.Println(err)
  622. continue
  623. }
  624. req.IpSrc = r[0]
  625. req.Source = r[0]
  626. req.Url = r[1]
  627. req.UserAgent = "Munch/1.0"
  628. req.Host = "demo.scraperwall.com"
  629. req.CreatedAt = time.Now().UnixNano()
  630. publishRequest(config.NatsQueue, &req)
  631. if strings.Index(r[1], ".") < 0 {
  632. hash := sha1.New()
  633. io.WriteString(hash, r[0])
  634. fp := data.Fingerprint{
  635. ClientID: "scw",
  636. Fingerprint: fmt.Sprintf("%x", hash.Sum(nil)),
  637. Remote: r[0],
  638. Url: r[1],
  639. Source: r[0],
  640. CreatedAt: time.Now(),
  641. }
  642. if strings.HasPrefix(r[0], "50.31.") {
  643. fp.Fingerprint = "a1f2c2ee560ce6580d66d451a9c8dfbf"
  644. natsJSONEC.Publish("fingerprints_scw", fp)
  645. } else if rand.Intn(10) < 5 {
  646. natsJSONEC.Publish("fingerprints_scw", fp)
  647. }
  648. }
  649. count++
  650. if config.SleepFor.Duration >= time.Nanosecond {
  651. endTs = time.Now()
  652. if endTs.Before(startTs.Add(config.SleepFor.Duration)) {
  653. time.Sleep(config.SleepFor.Duration - endTs.Sub(startTs))
  654. }
  655. }
  656. }
  657. }
  658. }
  659. func loadConfig() {
  660. // initialize with values from the command line / environment
  661. config.Live = *doLiveCapture
  662. config.Interface = *iface
  663. config.SnapshotLen = *snapshotLen
  664. config.Filter = *filter
  665. config.Promiscuous = *promiscuous
  666. config.NatsURL = *natsURL
  667. config.NatsQueue = *natsQueue
  668. config.NatsUser = *natsUser
  669. config.NatsPassword = *natsPassword
  670. config.NatsCA = *natsCA
  671. config.SleepFor.Duration = *sleepFor
  672. config.RequestsFile = *requestsFile
  673. config.UseXForwardedAsSource = *useXForwardedAsSource
  674. config.Protocol = *protocol
  675. config.ApacheLog = *apacheLog
  676. config.ApacheReplay = *apacheReplay
  677. config.NginxLog = *nginxLog
  678. config.NginxLogFormat = *nginxFormat
  679. config.NginxReplay = *nginxReplay
  680. config.HostName = *hostName
  681. config.Quiet = *beQuiet
  682. config.Trace = *trace
  683. config.AccessWatchKey = *accessWatchKey
  684. config.RPCAddress = *rpcAddr
  685. config.ReconnectToNatsAfter.Duration = *reconnectToNatsAfter
  686. config.ResetLiveCaptureAfter.Duration = *resetLiveCapAfter
  687. if *configFile == "" {
  688. return
  689. }
  690. _, err := os.Stat(*configFile)
  691. if err != nil {
  692. log.Printf("%s: %s\n", *configFile, err)
  693. return
  694. }
  695. if _, err = toml.DecodeFile(*configFile, &config); err != nil {
  696. log.Printf("%s: %s\n", *configFile, err)
  697. }
  698. if !config.Quiet {
  699. config.print()
  700. }
  701. }
  702. // version outputs build information...
  703. func version() {
  704. fmt.Printf("munchclient %s, built on %s\n", Version, BuildDate)
  705. }