main.go 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808
  1. package main
  2. import (
  3. "bufio"
  4. "bytes"
  5. "crypto/sha1"
  6. "encoding/csv"
  7. "encoding/json"
  8. "flag"
  9. "fmt"
  10. "io"
  11. "log"
  12. "math/rand"
  13. "net"
  14. "net/http"
  15. "os"
  16. "strings"
  17. "time"
  18. "github.com/BurntSushi/toml"
  19. "github.com/google/gopacket"
  20. "github.com/google/gopacket/layers"
  21. "github.com/google/gopacket/pcap"
  22. "github.com/nats-io/nats"
  23. "github.com/nats-io/nats/encoders/protobuf"
  24. "git.scraperwall.com/scw/ajp13"
  25. "git.scraperwall.com/scw/data"
  26. "git.scraperwall.com/scw/ip"
  27. )
  28. var (
  29. doLiveCapture = flag.Bool("live", false, "Capture data in real time from a given interface")
  30. iface = flag.String("interface", "eth0", "Interface to get packets from")
  31. snapshotLen = flag.Int("snapshot-len", 8192, "Snapshot Length in Bytes")
  32. filter = flag.String("filter", "tcp", "PCAP filter expression")
  33. promiscuous = flag.Bool("promiscuous", false, "Switch interface into promiscuous mode?")
  34. natsURL = flag.String("nats-url", "nats://127.0.0.1:4222", "The URL of the NATS server")
  35. natsUser = flag.String("nats-user", "", "The user for NATS authentication")
  36. natsPassword = flag.String("nats-password", "", "The password for NATS authentication")
  37. natsQueue = flag.String("nats-queue", "requests", "The NATS queue name")
  38. natsCA = flag.String("nats-ca", "", "CA chain for NATS TLS")
  39. reconnectToNatsAfter = flag.Duration("reconnect-to-nats-after", 0, "reconnect to nats after this time periodically")
  40. resetLiveCapAfter = flag.Duration("reset-live-cap-after", 613*time.Second, "reset the live capture setup after this amount of time")
  41. sleepFor = flag.Duration("sleep", 0, "Sleep this long between sending data (only when replaying a file)")
  42. requestsFile = flag.String("requests", "", "CSV file containing requests (IP and URL)")
  43. protocol = flag.String("protocol", "http", "which protocol to parse: http or ajp13")
  44. useXForwardedAsSource = flag.Bool("use-x-forwarded", false, "Use the IP address in X-Forwarded-For as source")
  45. trace = flag.Bool("trace", false, "Trace the packet capturing")
  46. apacheLog = flag.String("apache-log", "", "Parse an Apache Log file")
  47. apacheReplay = flag.String("apache-replay", "", "Apache log file to replay into the system")
  48. nginxLog = flag.String("nginx-log", "", "Nginx log file to tail")
  49. nginxFormat = flag.String("nginx-format", "", "The nginx log file format")
  50. nginxReplay = flag.String("nginx-replay", "", "Replay this nginx logfile")
  51. hostName = flag.String("hostname", "", "Override the captured hostname with this one")
  52. accessWatchKey = flag.String("access-watch-key", "", "access.watch API key")
  53. configFile = flag.String("config", "", "The location of the TOML config file")
  54. rpcAddr = flag.String("rpc-address", "", "The address where the RPC server is listening")
  55. beQuiet = flag.Bool("quiet", true, "Be quiet")
  56. doVersion = flag.Bool("version", false, "Show version information")
  57. natsEC *nats.EncodedConn
  58. natsJSONEC *nats.EncodedConn
  59. rpcClient *ScwRPC
  60. natsErrorChan chan error
  61. natsIsAvailable bool
  62. count uint64
  63. timeout = -1 * time.Second
  64. ipPriv *ip.IP
  65. config Config
  66. // Version contains the program Version, e.g. 1.0.1
  67. Version string
  68. // BuildDate contains the date and time at which the program was compiled
  69. BuildDate string
  70. )
  71. // Config contains the program configuration
  72. type Config struct {
  73. Live bool
  74. Interface string
  75. SnapshotLen int
  76. Filter string
  77. Promiscuous bool
  78. NatsURL string
  79. NatsQueue string
  80. NatsUser string
  81. NatsPassword string
  82. NatsCA string
  83. SleepFor duration
  84. RequestsFile string
  85. UseXForwardedAsSource bool
  86. Quiet bool
  87. Protocol string
  88. Trace bool
  89. ApacheLog string
  90. ApacheReplay string
  91. NginxLog string
  92. NginxLogFormat string
  93. NginxReplay string
  94. HostName string
  95. AccessWatchKey string
  96. RPCAddress string
  97. ReconnectToNatsAfter duration
  98. ResetLiveCaptureAfter duration
  99. }
  100. type duration struct {
  101. time.Duration
  102. }
  103. func (d *duration) UnmarshalText(text []byte) error {
  104. var err error
  105. d.Duration, err = time.ParseDuration(string(text))
  106. return err
  107. }
  108. func (c Config) print() {
  109. fmt.Printf("Live: %t\n", c.Live)
  110. fmt.Printf("Interface: %s\n", c.Interface)
  111. fmt.Printf("SnapshotLen: %d\n", c.SnapshotLen)
  112. fmt.Printf("Filter: %s\n", c.Filter)
  113. fmt.Printf("Promiscuous: %t\n", c.Promiscuous)
  114. fmt.Printf("NatsURL: %s\n", c.NatsURL)
  115. fmt.Printf("NatsQueue: %s\n", c.NatsQueue)
  116. fmt.Printf("NatsUser: %s\n", c.NatsUser)
  117. fmt.Printf("NatsPassword: %s\n", c.NatsPassword)
  118. fmt.Printf("NatsCA: %s\n", c.NatsCA)
  119. fmt.Printf("ReconnectToNatsAfter: %s\n", c.ReconnectToNatsAfter.String())
  120. fmt.Printf("SleepFor: %s\n", c.SleepFor.String())
  121. fmt.Printf("RequestsFile: %s\n", c.RequestsFile)
  122. fmt.Printf("Apache Log: %s\n", c.ApacheLog)
  123. fmt.Printf("Apache Replay: %s\n", c.ApacheReplay)
  124. fmt.Printf("Nginx Log: %s\n", c.NginxLog)
  125. fmt.Printf("Nginx Log Format: %s\n", c.NginxLogFormat)
  126. fmt.Printf("NginxReplay: %s\n", c.NginxReplay)
  127. fmt.Printf("HostName: %s\n", c.HostName)
  128. fmt.Printf("AccessWatchKey: %s\n", c.AccessWatchKey)
  129. fmt.Printf("UseXForwardedAsSource: %t\n", c.UseXForwardedAsSource)
  130. fmt.Printf("Protocol: %s\n", c.Protocol)
  131. fmt.Printf("Reset Live Cap After: %s\n", c.ResetLiveCaptureAfter.String())
  132. fmt.Printf("RPCAddress: %s\n", c.RPCAddress)
  133. fmt.Printf("Quiet: %t\n", c.Quiet)
  134. fmt.Printf("Trace: %t\n", c.Trace)
  135. }
  136. func init() {
  137. flag.Parse()
  138. nats.RegisterEncoder(protobuf.PROTOBUF_ENCODER, &protobuf.ProtobufEncoder{})
  139. }
  140. func main() {
  141. if *doVersion {
  142. version()
  143. os.Exit(0)
  144. }
  145. loadConfig()
  146. // Output how many requests per second were sent
  147. if !config.Quiet {
  148. go func(c *uint64) {
  149. for {
  150. fmt.Printf("%d requests per second\n", *c)
  151. *c = 0
  152. time.Sleep(time.Second)
  153. }
  154. }(&count)
  155. }
  156. // NATS
  157. //
  158. if config.NatsURL == "" && config.AccessWatchKey == "" {
  159. log.Fatal("No NATS URL specified (-nats-url)!")
  160. }
  161. natsIsAvailable = false
  162. natsErrorChan = make(chan error, 1)
  163. err := connectToNATS()
  164. if err != nil && config.AccessWatchKey == "" {
  165. log.Fatal(err)
  166. }
  167. // reconnect to nats periodically
  168. //
  169. if *reconnectToNatsAfter > time.Minute {
  170. go func(interval time.Duration) {
  171. for range time.Tick(interval) {
  172. if config.Trace {
  173. log.Printf("reconnecting to NATS")
  174. }
  175. natsEC.Conn.Close()
  176. natsJSONEC.Conn.Close()
  177. }
  178. }(*reconnectToNatsAfter)
  179. }
  180. go natsWatchdog(natsErrorChan)
  181. // RPC
  182. //
  183. if config.RPCAddress != "" {
  184. rpcClient, err = NewScwRPC(config.RPCAddress)
  185. if err != nil {
  186. log.Fatal(err)
  187. }
  188. }
  189. // What should I do?
  190. if config.RequestsFile != "" {
  191. replayFile()
  192. } else if config.ApacheReplay != "" {
  193. apacheLogReplay(config.ApacheReplay)
  194. } else if config.NginxReplay != "" {
  195. nginxLogReplay(config.NginxReplay, config.NginxLogFormat)
  196. } else if config.ApacheLog != "" {
  197. apacheLogCapture(config.ApacheLog)
  198. } else if config.Live {
  199. fmt.Printf("live capture (%s, %s) to %s\n", config.Interface, config.Filter, config.NatsURL)
  200. liveCapture()
  201. } else if config.NginxLog != "" && config.NginxLogFormat != "" {
  202. nginxLogCapture(config.NginxLog, config.NginxLogFormat)
  203. }
  204. }
  205. func natsWatchdog(closedChan chan error) {
  206. var lastError error
  207. for err := range closedChan {
  208. if lastError != err {
  209. lastError = err
  210. log.Println(err)
  211. }
  212. if err != nats.ErrConnectionClosed {
  213. continue
  214. }
  215. RECONNECT:
  216. for {
  217. log.Printf("Reconnecting to NATS at %s\n", *natsURL)
  218. err := connectToNATS()
  219. if err == nil {
  220. break RECONNECT
  221. }
  222. time.Sleep(1 * time.Second)
  223. }
  224. }
  225. }
  226. func connectToNATS() error {
  227. var natsConn *nats.Conn
  228. var err error
  229. if config.NatsUser != "" && config.NatsPassword != "" && config.NatsCA != "" {
  230. natsConn, err = nats.Connect(config.NatsURL, nats.UserInfo(config.NatsUser, config.NatsPassword), nats.RootCAs(config.NatsCA))
  231. } else {
  232. if config.NatsPassword != "" && config.NatsUser != "" {
  233. natsConn, err = nats.Connect(config.NatsURL, nats.UserInfo(config.NatsUser, config.NatsPassword))
  234. } else {
  235. natsConn, err = nats.Connect(config.NatsURL)
  236. }
  237. }
  238. if err != nil {
  239. return err
  240. }
  241. natsEC, err = nats.NewEncodedConn(natsConn, protobuf.PROTOBUF_ENCODER)
  242. if err != nil {
  243. return fmt.Errorf("Encoded Connection: %v", err)
  244. }
  245. natsJSONEC, err = nats.NewEncodedConn(natsConn, nats.JSON_ENCODER)
  246. if err != nil {
  247. return fmt.Errorf("Encoded Connection: %v", err)
  248. }
  249. natsIsAvailable = true
  250. return nil
  251. }
  252. type liveCap struct {
  253. filter string
  254. device string
  255. promisc bool
  256. snapshotLen int
  257. handle *pcap.Handle
  258. packetChan chan gopacket.Packet
  259. }
  260. func newLiveCap(device string, filter string, snapshotLen int, promisc bool) (*liveCap, error) {
  261. lc := &liveCap{
  262. filter: filter,
  263. device: device,
  264. promisc: promisc,
  265. snapshotLen: snapshotLen,
  266. }
  267. err := lc.SetupCap()
  268. if err != nil {
  269. return nil, err
  270. }
  271. return lc, nil
  272. }
  273. func (lc *liveCap) SetupCap() error {
  274. if !config.Quiet {
  275. log.Printf("reading incoming HTTP requests on %s %s\n", config.Interface, config.Filter)
  276. }
  277. if lc.handle != nil {
  278. lc.handle.Close()
  279. }
  280. // PCAP setup
  281. //
  282. handle, err := pcap.OpenLive(config.Interface, int32(config.SnapshotLen), config.Promiscuous, timeout)
  283. if err != nil {
  284. return err
  285. }
  286. // defer handle.Close()
  287. lc.handle = handle
  288. err = lc.handle.SetBPFFilter(config.Filter)
  289. if err != nil {
  290. return err
  291. }
  292. packetSource := gopacket.NewPacketSource(handle, handle.LinkType())
  293. lc.packetChan = packetSource.Packets()
  294. return nil
  295. }
  296. func liveCapture() {
  297. ipPriv = ip.NewIP()
  298. livecap, err := newLiveCap(config.Interface, config.Filter, config.SnapshotLen, config.Promiscuous)
  299. if err != nil {
  300. log.Fatal(err)
  301. }
  302. closeChan := time.Tick(config.ResetLiveCaptureAfter.Duration)
  303. for {
  304. select {
  305. case <-closeChan:
  306. livecap.SetupCap()
  307. case p := <-livecap.packetChan:
  308. go processPacket(p)
  309. }
  310. }
  311. }
  312. func writeLogToWatch(r *data.Request) {
  313. h := map[string]string{}
  314. if r.AcceptEncoding != "" {
  315. h["Accept-Encoding"] = r.AcceptEncoding
  316. }
  317. if r.Accept != "" {
  318. h["Accept"] = r.Accept
  319. }
  320. if r.AcceptLanguage != "" {
  321. h["Accept-Language"] = r.AcceptLanguage
  322. }
  323. if r.Cookie != "" {
  324. h["Cookie"] = r.Cookie
  325. }
  326. if r.Host != "" {
  327. h["Host"] = r.Host
  328. }
  329. if r.Referer != "" {
  330. h["Referer"] = r.Referer
  331. }
  332. if r.UserAgent != "" {
  333. h["User-Agent"] = r.UserAgent
  334. }
  335. if r.Via != "" {
  336. h["Via"] = r.Via
  337. }
  338. if r.XForwardedFor != "" {
  339. h["X-Forwarded-For"] = r.XForwardedFor
  340. }
  341. if r.XRequestedWith != "" {
  342. h["X-Requested-With"] = r.XRequestedWith
  343. }
  344. data := map[string]interface{}{
  345. "request": map[string]interface{}{
  346. "time": time.Unix(0, r.CreatedAt),
  347. "address": r.Source,
  348. "protocol": r.Protocol,
  349. "scheme": "https",
  350. "method": r.Method,
  351. "url": r.Url,
  352. "headers": h,
  353. },
  354. "response": map[string]interface{}{"status": "200"},
  355. }
  356. jdata, err := json.Marshal(data)
  357. client := &http.Client{}
  358. fmt.Println(string(jdata))
  359. buf := bytes.NewBuffer(jdata)
  360. req, err := http.NewRequest("POST", "https://log.access.watch/1.1/log", buf)
  361. req.Header.Add("Api-Key", config.AccessWatchKey)
  362. req.Header.Add("Accept", "application/json")
  363. req.Header.Add("Content-Type", "application/json")
  364. resp, err := client.Do(req)
  365. if err != nil {
  366. log.Println(err)
  367. }
  368. resp.Body.Close()
  369. }
  370. func publishRequest(queue string, request *data.Request) {
  371. if config.AccessWatchKey != "" {
  372. writeLogToWatch(request)
  373. return
  374. }
  375. if rpcClient != nil {
  376. select {
  377. case rpcClient.RChan <- request:
  378. default:
  379. }
  380. return
  381. }
  382. if !natsIsAvailable {
  383. if rand.Intn(100) == 0 {
  384. log.Println("nats connection is not available")
  385. }
  386. return
  387. }
  388. if err := natsEC.Publish(config.NatsQueue, request); err != nil {
  389. natsErrorChan <- err
  390. if err == nats.ErrConnectionClosed {
  391. natsIsAvailable = false
  392. }
  393. }
  394. }
  395. // processPacket receives a raw packet from pcap, builds a Request item from it and sends it to the queue
  396. func processPacket(packet gopacket.Packet) {
  397. hasIPv4 := false
  398. var ipSrc, ipDst string
  399. // IPv4
  400. if ipLayer := packet.Layer(layers.LayerTypeIPv4); ipLayer != nil {
  401. ip := ipLayer.(*layers.IPv4)
  402. ipSrc = ip.SrcIP.String()
  403. ipDst = ip.DstIP.String()
  404. hasIPv4 = true
  405. }
  406. // IPv6
  407. if !hasIPv4 {
  408. if ipLayer := packet.Layer(layers.LayerTypeIPv6); ipLayer != nil {
  409. ip := ipLayer.(*layers.IPv6)
  410. ipSrc = ip.SrcIP.String()
  411. ipDst = ip.DstIP.String()
  412. }
  413. }
  414. // TCP
  415. tcpLayer := packet.Layer(layers.LayerTypeTCP)
  416. if tcpLayer == nil {
  417. return
  418. }
  419. tcp, _ := tcpLayer.(*layers.TCP)
  420. portSrc := tcp.SrcPort
  421. portDst := tcp.DstPort
  422. sequence := tcp.Seq
  423. applicationLayer := packet.ApplicationLayer()
  424. if applicationLayer == nil {
  425. return
  426. }
  427. count++
  428. if len(applicationLayer.Payload()) < 50 {
  429. log.Println("application layer too small!")
  430. return
  431. }
  432. request := data.Request{
  433. IpSrc: ipSrc,
  434. IpDst: ipDst,
  435. PortSrc: uint32(portSrc),
  436. PortDst: uint32(portDst),
  437. TcpSeq: uint32(sequence),
  438. CreatedAt: packet.Metadata().CaptureInfo.Timestamp.UnixNano(),
  439. }
  440. switch config.Protocol {
  441. case "http":
  442. err := processHTTP(&request, applicationLayer.Payload())
  443. if err != nil {
  444. log.Println(err)
  445. return
  446. }
  447. case "ajp13":
  448. err := processAJP13(&request, applicationLayer.Payload())
  449. if err != nil {
  450. log.Println(err)
  451. return
  452. }
  453. }
  454. if config.UseXForwardedAsSource && request.XForwardedFor != "" {
  455. if strings.Contains(request.XForwardedFor, ",") {
  456. ips := strings.Split(request.XForwardedFor, ",")
  457. for i := len(ips) - 1; i >= 0; i-- {
  458. ipRaw := strings.TrimSpace(ips[i])
  459. ipAddr := net.ParseIP(ipRaw)
  460. if ipAddr != nil && !ipPriv.IsPrivate(ipAddr) {
  461. request.Source = ipRaw
  462. break
  463. }
  464. }
  465. } else {
  466. ipAddr := net.ParseIP(strings.TrimSpace(request.XForwardedFor))
  467. if !ipPriv.IsPrivate(ipAddr) {
  468. request.Source = request.XForwardedFor
  469. }
  470. }
  471. }
  472. if request.Source == request.IpSrc && request.XRealIP != "" {
  473. request.Source = request.XRealIP
  474. }
  475. if config.Trace {
  476. log.Printf("[%s] %s\n", request.Source, request.Url)
  477. }
  478. publishRequest(config.NatsQueue, &request)
  479. }
  480. func processAJP13(request *data.Request, appData []byte) error {
  481. a, err := ajp13.Parse(appData)
  482. if err != nil {
  483. return fmt.Errorf("Failed to parse AJP13 request: %s", err)
  484. }
  485. request.Url = a.URI
  486. request.Method = a.Method()
  487. request.Host = a.Server
  488. request.Protocol = a.Version
  489. request.Origin = a.RemoteAddr.String()
  490. request.Source = a.RemoteAddr.String()
  491. if v, ok := a.Header("Referer"); ok {
  492. request.Referer = v
  493. }
  494. if v, ok := a.Header("Connection"); ok {
  495. request.Connection = v
  496. }
  497. if v, ok := a.Header("X-Forwarded-For"); ok {
  498. request.XForwardedFor = v
  499. }
  500. if v, ok := a.Header("X-Real-IP"); ok {
  501. request.XRealIP = v
  502. }
  503. if v, ok := a.Header("X-Requested-With"); ok {
  504. request.XRequestedWith = v
  505. }
  506. if v, ok := a.Header("Accept-Encoding"); ok {
  507. request.AcceptEncoding = v
  508. }
  509. if v, ok := a.Header("Accept-Language"); ok {
  510. request.AcceptLanguage = v
  511. }
  512. if v, ok := a.Header("User-Agent"); ok {
  513. request.UserAgent = v
  514. }
  515. if v, ok := a.Header("Accept"); ok {
  516. request.Accept = v
  517. }
  518. if v, ok := a.Header("Cookie"); ok {
  519. request.Cookie = v
  520. }
  521. if v, ok := a.Header("X-Forwarded-Host"); ok {
  522. if v != request.Host {
  523. request.Host = v
  524. }
  525. }
  526. return nil
  527. }
  528. func processHTTP(request *data.Request, appData []byte) error {
  529. reader := bufio.NewReader(strings.NewReader(string(appData)))
  530. req, err := http.ReadRequest(reader)
  531. if err != nil {
  532. return fmt.Errorf("Failed to parse HTTP header: %s", err)
  533. }
  534. request.Url = req.URL.String()
  535. request.Method = req.Method
  536. request.Referer = req.Referer()
  537. request.Host = req.Host
  538. request.Protocol = req.Proto
  539. request.Origin = request.Host
  540. if _, ok := req.Header["Connection"]; ok {
  541. request.Connection = req.Header["Connection"][0]
  542. }
  543. if _, ok := req.Header["X-Forwarded-For"]; ok {
  544. request.XForwardedFor = req.Header["X-Forwarded-For"][0]
  545. }
  546. // CloudFlare: override X-Forwarded for since it is tainted by cloudflare
  547. if _, ok := req.Header["True-Client-Ip"]; ok {
  548. request.XForwardedFor = req.Header["True-Client-Ip"][0]
  549. }
  550. if _, ok := req.Header["X-Real-Ip"]; ok {
  551. request.XRealIP = req.Header["X-Real-Ip"][0]
  552. }
  553. if _, ok := req.Header["X-Requested-With"]; ok {
  554. request.XRequestedWith = req.Header["X-Requested-With"][0]
  555. }
  556. if _, ok := req.Header["Accept-Encoding"]; ok {
  557. request.AcceptEncoding = req.Header["Accept-Encoding"][0]
  558. }
  559. if _, ok := req.Header["Accept-Language"]; ok {
  560. request.AcceptLanguage = req.Header["Accept-Language"][0]
  561. }
  562. if _, ok := req.Header["User-Agent"]; ok {
  563. request.UserAgent = req.Header["User-Agent"][0]
  564. }
  565. if _, ok := req.Header["Accept"]; ok {
  566. request.Accept = req.Header["Accept"][0]
  567. }
  568. if _, ok := req.Header["Cookie"]; ok {
  569. request.Cookie = req.Header["Cookie"][0]
  570. }
  571. request.Source = request.IpSrc
  572. return nil
  573. }
  574. // replayFile takes a file containing a list of requests (SourceIP Url) and queues the requests
  575. // e.g.
  576. // 157.55.39.229 /gross-gerau/12012260-beate-anstatt
  577. // 103.232.100.98 /weinsheim-eifel/13729444-plus-warenhandelsges-mbh
  578. func replayFile() {
  579. var req data.Request
  580. var startTs time.Time
  581. var endTs time.Time
  582. rand.Seed(time.Now().UnixNano())
  583. for {
  584. fh, err := os.Open(config.RequestsFile)
  585. if err != nil {
  586. log.Fatalf("Failed to open request file '%s': %s", config.RequestsFile, err)
  587. }
  588. c := csv.NewReader(fh)
  589. c.Comma = ' '
  590. for {
  591. if config.SleepFor.Duration > time.Nanosecond {
  592. startTs = time.Now()
  593. }
  594. r, err := c.Read()
  595. if err == io.EOF {
  596. break
  597. }
  598. if err != nil {
  599. log.Println(err)
  600. continue
  601. }
  602. req.IpSrc = r[0]
  603. req.Source = r[0]
  604. req.Url = r[1]
  605. req.UserAgent = "Munch/1.0"
  606. req.Host = "demo.scraperwall.com"
  607. req.CreatedAt = time.Now().UnixNano()
  608. publishRequest(config.NatsQueue, &req)
  609. if strings.Index(r[1], ".") < 0 {
  610. hash := sha1.New()
  611. io.WriteString(hash, r[0])
  612. fp := data.Fingerprint{
  613. ClientID: "scw",
  614. Fingerprint: fmt.Sprintf("%x", hash.Sum(nil)),
  615. Remote: r[0],
  616. Url: r[1],
  617. Source: r[0],
  618. CreatedAt: time.Now(),
  619. }
  620. if strings.HasPrefix(r[0], "50.31.") {
  621. fp.Fingerprint = "a1f2c2ee560ce6580d66d451a9c8dfbf"
  622. natsJSONEC.Publish("fingerprints_scw", fp)
  623. } else if rand.Intn(10) < 5 {
  624. natsJSONEC.Publish("fingerprints_scw", fp)
  625. }
  626. }
  627. count++
  628. if config.SleepFor.Duration >= time.Nanosecond {
  629. endTs = time.Now()
  630. if endTs.Before(startTs.Add(config.SleepFor.Duration)) {
  631. time.Sleep(config.SleepFor.Duration - endTs.Sub(startTs))
  632. }
  633. }
  634. }
  635. }
  636. }
  637. func loadConfig() {
  638. // initialize with values from the command line / environment
  639. config.Live = *doLiveCapture
  640. config.Interface = *iface
  641. config.SnapshotLen = *snapshotLen
  642. config.Filter = *filter
  643. config.Promiscuous = *promiscuous
  644. config.NatsURL = *natsURL
  645. config.NatsQueue = *natsQueue
  646. config.NatsUser = *natsUser
  647. config.NatsPassword = *natsPassword
  648. config.NatsCA = *natsCA
  649. config.SleepFor.Duration = *sleepFor
  650. config.RequestsFile = *requestsFile
  651. config.UseXForwardedAsSource = *useXForwardedAsSource
  652. config.Protocol = *protocol
  653. config.ApacheLog = *apacheLog
  654. config.ApacheReplay = *apacheReplay
  655. config.NginxLog = *nginxLog
  656. config.NginxLogFormat = *nginxFormat
  657. config.NginxReplay = *nginxReplay
  658. config.HostName = *hostName
  659. config.Quiet = *beQuiet
  660. config.Trace = *trace
  661. config.AccessWatchKey = *accessWatchKey
  662. config.RPCAddress = *rpcAddr
  663. config.ReconnectToNatsAfter.Duration = *reconnectToNatsAfter
  664. config.ResetLiveCaptureAfter.Duration = *resetLiveCapAfter
  665. if *configFile == "" {
  666. return
  667. }
  668. _, err := os.Stat(*configFile)
  669. if err != nil {
  670. log.Printf("%s: %s\n", *configFile, err)
  671. return
  672. }
  673. if _, err = toml.DecodeFile(*configFile, &config); err != nil {
  674. log.Printf("%s: %s\n", *configFile, err)
  675. }
  676. if !config.Quiet {
  677. config.print()
  678. }
  679. }
  680. // version outputs build information...
  681. func version() {
  682. fmt.Printf("munchclient %s, built on %s\n", Version, BuildDate)
  683. }