main.go 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794
  1. package main
  2. import (
  3. "bufio"
  4. "bytes"
  5. "crypto/sha1"
  6. "encoding/csv"
  7. "encoding/json"
  8. "flag"
  9. "fmt"
  10. "io"
  11. "log"
  12. "math/rand"
  13. "net"
  14. "net/http"
  15. "os"
  16. "strings"
  17. "time"
  18. "github.com/BurntSushi/toml"
  19. "github.com/google/gopacket"
  20. "github.com/google/gopacket/layers"
  21. "github.com/google/gopacket/pcap"
  22. "github.com/nats-io/nats"
  23. "github.com/nats-io/nats/encoders/protobuf"
  24. "git.scraperwall.com/scw/ajp13"
  25. "git.scraperwall.com/scw/data"
  26. "git.scraperwall.com/scw/ip"
  27. )
  28. var (
  29. doLiveCapture = flag.Bool("live", false, "Capture data in real time from a given interface")
  30. iface = flag.String("interface", "eth0", "Interface to get packets from")
  31. snapshotLen = flag.Int("snapshot-len", 8192, "Snapshot Length in Bytes")
  32. filter = flag.String("filter", "tcp", "PCAP filter expression")
  33. promiscuous = flag.Bool("promiscuous", false, "Switch interface into promiscuous mode?")
  34. natsURL = flag.String("nats-url", "nats://127.0.0.1:4222", "The URL of the NATS server")
  35. natsUser = flag.String("nats-user", "", "The user for NATS authentication")
  36. natsPassword = flag.String("nats-password", "", "The password for NATS authentication")
  37. natsQueue = flag.String("nats-queue", "requests", "The NATS queue name")
  38. natsCA = flag.String("nats-ca", "", "CA chain for NATS TLS")
  39. reconnectToNatsAfter = flag.Duration("reconnect-to-nats-after", 0, "reconnect to nats after this time periodically")
  40. resetLiveCapAfter = flag.Duration("reset-live-cap-after", 613*time.Second, "reset the live capture setup after this amount of time")
  41. sleepFor = flag.Duration("sleep", 0, "Sleep this long between sending data (only when replaying a file)")
  42. requestsFile = flag.String("requests", "", "CSV file containing requests (IP and URL)")
  43. protocol = flag.String("protocol", "http", "which protocol to parse: http or ajp13")
  44. useXForwardedAsSource = flag.Bool("use-x-forwarded", false, "Use the IP address in X-Forwarded-For as source")
  45. useVhostAsSource = flag.Bool("use-vhost-as-source", false, "Use the Vhost as source")
  46. trace = flag.Bool("trace", false, "Trace the packet capturing")
  47. tailPoll = flag.Bool("tail-poll", false, "use file polling to detect file changes when tailing logrfiles")
  48. apacheLog = flag.String("apache-log", "", "Parse an Apache Log file")
  49. apacheReplay = flag.String("apache-replay", "", "Apache log file to replay into the system")
  50. nginxLog = flag.String("nginx-log", "", "Nginx log file to tail")
  51. nginxFormat = flag.String("nginx-format", "", "The nginx log file format")
  52. nginxReplay = flag.String("nginx-replay", "", "Replay this nginx logfile")
  53. hostName = flag.String("hostname", "", "Override the captured hostname with this one")
  54. accessWatchKey = flag.String("access-watch-key", "", "access.watch API key")
  55. configFile = flag.String("config", "", "The location of the TOML config file")
  56. beQuiet = flag.Bool("quiet", true, "Be quiet")
  57. doVersion = flag.Bool("version", false, "Show version information")
  58. natsEC *nats.EncodedConn
  59. natsJSONEC *nats.EncodedConn
  60. natsErrorChan chan error
  61. natsIsAvailable bool
  62. count uint64
  63. timeout = -1 * time.Second
  64. ipPriv *ip.IP
  65. config Config
  66. // Version contains the program Version, e.g. 1.0.1
  67. Version string
  68. // BuildDate contains the date and time at which the program was compiled
  69. BuildDate string
  70. )
  71. // Config contains the program configuration
  72. type Config struct {
  73. Live bool
  74. Interface string
  75. SnapshotLen int
  76. Filter string
  77. Promiscuous bool
  78. NatsURL string
  79. NatsQueue string
  80. NatsUser string
  81. NatsPassword string
  82. NatsCA string
  83. SleepFor duration
  84. RequestsFile string
  85. UseXForwardedAsSource bool
  86. UseVhostAsSource bool
  87. Quiet bool
  88. Protocol string
  89. Trace bool
  90. TailPoll bool
  91. ApacheLog string
  92. ApacheReplay string
  93. NginxLog string
  94. NginxLogFormat string
  95. NginxReplay string
  96. HostName string
  97. AccessWatchKey string
  98. ReconnectToNatsAfter duration
  99. ResetLiveCaptureAfter duration
  100. }
  101. type duration struct {
  102. time.Duration
  103. }
  104. func (d *duration) UnmarshalText(text []byte) error {
  105. var err error
  106. d.Duration, err = time.ParseDuration(string(text))
  107. return err
  108. }
  109. func (c Config) print() {
  110. fmt.Printf("Live: %t\n", c.Live)
  111. fmt.Printf("Interface: %s\n", c.Interface)
  112. fmt.Printf("SnapshotLen: %d\n", c.SnapshotLen)
  113. fmt.Printf("Filter: %s\n", c.Filter)
  114. fmt.Printf("Promiscuous: %t\n", c.Promiscuous)
  115. fmt.Printf("NatsURL: %s\n", c.NatsURL)
  116. fmt.Printf("NatsQueue: %s\n", c.NatsQueue)
  117. fmt.Printf("NatsUser: %s\n", c.NatsUser)
  118. fmt.Printf("NatsPassword: %s\n", c.NatsPassword)
  119. fmt.Printf("NatsCA: %s\n", c.NatsCA)
  120. fmt.Printf("ReconnectToNatsAfter: %s\n", c.ReconnectToNatsAfter.String())
  121. fmt.Printf("SleepFor: %s\n", c.SleepFor.String())
  122. fmt.Printf("RequestsFile: %s\n", c.RequestsFile)
  123. fmt.Printf("TailPoll: %t\n", c.TailPoll)
  124. fmt.Printf("Apache Log: %s\n", c.ApacheLog)
  125. fmt.Printf("Apache Replay: %s\n", c.ApacheReplay)
  126. fmt.Printf("Nginx Log: %s\n", c.NginxLog)
  127. fmt.Printf("Nginx Log Format: %s\n", c.NginxLogFormat)
  128. fmt.Printf("NginxReplay: %s\n", c.NginxReplay)
  129. fmt.Printf("HostName: %s\n", c.HostName)
  130. fmt.Printf("AccessWatchKey: %s\n", c.AccessWatchKey)
  131. fmt.Printf("UseXForwardedAsSource: %t\n", c.UseXForwardedAsSource)
  132. fmt.Printf("UseVhostAsSource: %t\n", c.UseVhostAsSource)
  133. fmt.Printf("Protocol: %s\n", c.Protocol)
  134. fmt.Printf("Reset Live Cap After: %s\n", c.ResetLiveCaptureAfter.String())
  135. fmt.Printf("Quiet: %t\n", c.Quiet)
  136. fmt.Printf("Trace: %t\n", c.Trace)
  137. }
  138. func init() {
  139. flag.Parse()
  140. nats.RegisterEncoder(protobuf.PROTOBUF_ENCODER, &protobuf.ProtobufEncoder{})
  141. }
  142. func main() {
  143. if *doVersion {
  144. version()
  145. os.Exit(0)
  146. }
  147. loadConfig()
  148. // Output how many requests per second were sent
  149. if !config.Quiet {
  150. go func(c *uint64) {
  151. for {
  152. fmt.Printf("%d requests per second\n", *c)
  153. *c = 0
  154. time.Sleep(time.Second)
  155. }
  156. }(&count)
  157. }
  158. // NATS
  159. //
  160. if config.NatsURL == "" && config.AccessWatchKey == "" {
  161. log.Fatal("No NATS URL specified (-nats-url)!")
  162. }
  163. natsIsAvailable = false
  164. natsErrorChan = make(chan error, 1)
  165. err := connectToNATS()
  166. if err != nil && config.AccessWatchKey == "" {
  167. log.Fatal(err)
  168. }
  169. // reconnect to nats periodically
  170. //
  171. if *reconnectToNatsAfter > time.Minute {
  172. go func(interval time.Duration) {
  173. for range time.Tick(interval) {
  174. if config.Trace {
  175. log.Printf("reconnecting to NATS")
  176. }
  177. natsEC.Conn.Close()
  178. natsJSONEC.Conn.Close()
  179. }
  180. }(*reconnectToNatsAfter)
  181. }
  182. go natsWatchdog(natsErrorChan)
  183. // What should I do?
  184. if config.RequestsFile != "" {
  185. replayFile()
  186. } else if config.ApacheReplay != "" {
  187. apacheLogReplay(config.ApacheReplay)
  188. } else if config.NginxReplay != "" {
  189. nginxLogReplay(config.NginxReplay, config.NginxLogFormat)
  190. } else if config.ApacheLog != "" {
  191. apacheLogCapture(config.ApacheLog)
  192. } else if config.Live {
  193. fmt.Printf("live capture (%s, %s) to %s\n", config.Interface, config.Filter, config.NatsURL)
  194. liveCapture()
  195. } else if config.NginxLog != "" && config.NginxLogFormat != "" {
  196. nginxLogCapture(config.NginxLog, config.NginxLogFormat)
  197. }
  198. }
  199. func natsWatchdog(closedChan chan error) {
  200. var lastError error
  201. for err := range closedChan {
  202. if lastError != err {
  203. lastError = err
  204. log.Println(err)
  205. }
  206. if err != nats.ErrConnectionClosed {
  207. continue
  208. }
  209. RECONNECT:
  210. for {
  211. log.Printf("Reconnecting to NATS at %s\n", *natsURL)
  212. err := connectToNATS()
  213. if err == nil {
  214. break RECONNECT
  215. }
  216. time.Sleep(1 * time.Second)
  217. }
  218. }
  219. }
  220. func connectToNATS() error {
  221. var natsConn *nats.Conn
  222. var err error
  223. if config.NatsUser != "" && config.NatsPassword != "" && config.NatsCA != "" {
  224. natsConn, err = nats.Connect(config.NatsURL, nats.UserInfo(config.NatsUser, config.NatsPassword), nats.RootCAs(config.NatsCA))
  225. } else {
  226. if config.NatsPassword != "" && config.NatsUser != "" {
  227. natsConn, err = nats.Connect(config.NatsURL, nats.UserInfo(config.NatsUser, config.NatsPassword))
  228. } else {
  229. natsConn, err = nats.Connect(config.NatsURL)
  230. }
  231. }
  232. if err != nil {
  233. return err
  234. }
  235. natsEC, err = nats.NewEncodedConn(natsConn, protobuf.PROTOBUF_ENCODER)
  236. if err != nil {
  237. return fmt.Errorf("Encoded Connection: %v", err)
  238. }
  239. natsJSONEC, err = nats.NewEncodedConn(natsConn, nats.JSON_ENCODER)
  240. if err != nil {
  241. return fmt.Errorf("Encoded Connection: %v", err)
  242. }
  243. natsIsAvailable = true
  244. return nil
  245. }
  246. type liveCap struct {
  247. filter string
  248. device string
  249. promisc bool
  250. snapshotLen int
  251. handle *pcap.Handle
  252. packetChan chan gopacket.Packet
  253. }
  254. func newLiveCap(device string, filter string, snapshotLen int, promisc bool) (*liveCap, error) {
  255. lc := &liveCap{
  256. filter: filter,
  257. device: device,
  258. promisc: promisc,
  259. snapshotLen: snapshotLen,
  260. }
  261. err := lc.SetupCap()
  262. if err != nil {
  263. return nil, err
  264. }
  265. return lc, nil
  266. }
  267. func (lc *liveCap) SetupCap() error {
  268. if !config.Quiet {
  269. log.Printf("reading incoming HTTP requests on %s %s\n", config.Interface, config.Filter)
  270. }
  271. if lc.handle != nil {
  272. lc.handle.Close()
  273. }
  274. // PCAP setup
  275. //
  276. handle, err := pcap.OpenLive(config.Interface, int32(config.SnapshotLen), config.Promiscuous, timeout)
  277. if err != nil {
  278. return err
  279. }
  280. // defer handle.Close()
  281. lc.handle = handle
  282. err = lc.handle.SetBPFFilter(config.Filter)
  283. if err != nil {
  284. return err
  285. }
  286. packetSource := gopacket.NewPacketSource(handle, handle.LinkType())
  287. lc.packetChan = packetSource.Packets()
  288. return nil
  289. }
  290. func liveCapture() {
  291. ipPriv = ip.NewIP()
  292. livecap, err := newLiveCap(config.Interface, config.Filter, config.SnapshotLen, config.Promiscuous)
  293. if err != nil {
  294. log.Fatal(err)
  295. }
  296. closeChan := time.Tick(config.ResetLiveCaptureAfter.Duration)
  297. for {
  298. select {
  299. case <-closeChan:
  300. livecap.SetupCap()
  301. case p := <-livecap.packetChan:
  302. go processPacket(p)
  303. }
  304. }
  305. }
  306. func writeLogToWatch(r *data.Request) {
  307. h := map[string]string{}
  308. if r.AcceptEncoding != "" {
  309. h["Accept-Encoding"] = r.AcceptEncoding
  310. }
  311. if r.Accept != "" {
  312. h["Accept"] = r.Accept
  313. }
  314. if r.AcceptLanguage != "" {
  315. h["Accept-Language"] = r.AcceptLanguage
  316. }
  317. if r.Cookie != "" {
  318. h["Cookie"] = r.Cookie
  319. }
  320. if r.Host != "" {
  321. h["Host"] = r.Host
  322. }
  323. if r.Referer != "" {
  324. h["Referer"] = r.Referer
  325. }
  326. if r.UserAgent != "" {
  327. h["User-Agent"] = r.UserAgent
  328. }
  329. if r.Via != "" {
  330. h["Via"] = r.Via
  331. }
  332. if r.XForwardedFor != "" {
  333. h["X-Forwarded-For"] = r.XForwardedFor
  334. }
  335. if r.XRequestedWith != "" {
  336. h["X-Requested-With"] = r.XRequestedWith
  337. }
  338. data := map[string]interface{}{
  339. "request": map[string]interface{}{
  340. "time": time.Unix(0, r.CreatedAt),
  341. "address": r.Source,
  342. "protocol": r.Protocol,
  343. "scheme": "https",
  344. "method": r.Method,
  345. "url": r.Url,
  346. "headers": h,
  347. },
  348. "response": map[string]interface{}{"status": "200"},
  349. }
  350. jdata, err := json.Marshal(data)
  351. client := &http.Client{}
  352. fmt.Println(string(jdata))
  353. buf := bytes.NewBuffer(jdata)
  354. req, err := http.NewRequest("POST", "https://log.access.watch/1.1/log", buf)
  355. req.Header.Add("Api-Key", config.AccessWatchKey)
  356. req.Header.Add("Accept", "application/json")
  357. req.Header.Add("Content-Type", "application/json")
  358. resp, err := client.Do(req)
  359. if err != nil {
  360. log.Println(err)
  361. }
  362. resp.Body.Close()
  363. }
  364. func publishRequest(queue string, request *data.Request) {
  365. if config.AccessWatchKey != "" {
  366. writeLogToWatch(request)
  367. return
  368. }
  369. if !natsIsAvailable {
  370. if rand.Intn(100) == 0 {
  371. log.Println("nats connection is not available")
  372. }
  373. return
  374. }
  375. if err := natsEC.Publish(config.NatsQueue, request); err != nil {
  376. natsErrorChan <- err
  377. if err == nats.ErrConnectionClosed {
  378. natsIsAvailable = false
  379. }
  380. }
  381. }
  382. // processPacket receives a raw packet from pcap, builds a Request item from it and sends it to the queue
  383. func processPacket(packet gopacket.Packet) {
  384. hasIPv4 := false
  385. var ipSrc, ipDst string
  386. // IPv4
  387. if ipLayer := packet.Layer(layers.LayerTypeIPv4); ipLayer != nil {
  388. ip := ipLayer.(*layers.IPv4)
  389. ipSrc = ip.SrcIP.String()
  390. ipDst = ip.DstIP.String()
  391. hasIPv4 = true
  392. }
  393. // IPv6
  394. if !hasIPv4 {
  395. if ipLayer := packet.Layer(layers.LayerTypeIPv6); ipLayer != nil {
  396. ip := ipLayer.(*layers.IPv6)
  397. ipSrc = ip.SrcIP.String()
  398. ipDst = ip.DstIP.String()
  399. }
  400. }
  401. // TCP
  402. tcpLayer := packet.Layer(layers.LayerTypeTCP)
  403. if tcpLayer == nil {
  404. return
  405. }
  406. tcp, _ := tcpLayer.(*layers.TCP)
  407. portSrc := tcp.SrcPort
  408. portDst := tcp.DstPort
  409. sequence := tcp.Seq
  410. applicationLayer := packet.ApplicationLayer()
  411. if applicationLayer == nil {
  412. return
  413. }
  414. count++
  415. if len(applicationLayer.Payload()) < 50 {
  416. log.Println("application layer too small!")
  417. return
  418. }
  419. request := data.Request{
  420. IpSrc: ipSrc,
  421. IpDst: ipDst,
  422. PortSrc: uint32(portSrc),
  423. PortDst: uint32(portDst),
  424. TcpSeq: uint32(sequence),
  425. CreatedAt: packet.Metadata().CaptureInfo.Timestamp.UnixNano(),
  426. }
  427. switch config.Protocol {
  428. case "http":
  429. err := processHTTP(&request, applicationLayer.Payload())
  430. if err != nil {
  431. log.Println(err)
  432. return
  433. }
  434. case "ajp13":
  435. err := processAJP13(&request, applicationLayer.Payload())
  436. if err != nil {
  437. log.Println(err)
  438. return
  439. }
  440. }
  441. if config.UseXForwardedAsSource && request.XForwardedFor != "" {
  442. if strings.Contains(request.XForwardedFor, ",") {
  443. ips := strings.Split(request.XForwardedFor, ",")
  444. for i := len(ips) - 1; i >= 0; i-- {
  445. ipRaw := strings.TrimSpace(ips[i])
  446. ipAddr := net.ParseIP(ipRaw)
  447. if ipAddr != nil && !ipPriv.IsPrivate(ipAddr) {
  448. request.Source = ipRaw
  449. break
  450. }
  451. }
  452. } else {
  453. ipAddr := net.ParseIP(strings.TrimSpace(request.XForwardedFor))
  454. if !ipPriv.IsPrivate(ipAddr) {
  455. request.Source = request.XForwardedFor
  456. }
  457. }
  458. }
  459. if request.Source == request.IpSrc && request.XRealIP != "" {
  460. request.Source = request.XRealIP
  461. }
  462. if config.Trace {
  463. log.Printf("[%s] %s\n", request.Source, request.Url)
  464. }
  465. publishRequest(config.NatsQueue, &request)
  466. }
  467. func processAJP13(request *data.Request, appData []byte) error {
  468. a, err := ajp13.Parse(appData)
  469. if err != nil {
  470. return fmt.Errorf("Failed to parse AJP13 request: %s", err)
  471. }
  472. request.Url = a.URI
  473. request.Method = a.Method()
  474. request.Host = a.Server
  475. request.Protocol = a.Version
  476. request.Origin = a.RemoteAddr.String()
  477. request.Source = a.RemoteAddr.String()
  478. if v, ok := a.Header("Referer"); ok {
  479. request.Referer = v
  480. }
  481. if v, ok := a.Header("Connection"); ok {
  482. request.Connection = v
  483. }
  484. if v, ok := a.Header("X-Forwarded-For"); ok {
  485. request.XForwardedFor = v
  486. }
  487. if v, ok := a.Header("X-Real-IP"); ok {
  488. request.XRealIP = v
  489. }
  490. if v, ok := a.Header("X-Requested-With"); ok {
  491. request.XRequestedWith = v
  492. }
  493. if v, ok := a.Header("Accept-Encoding"); ok {
  494. request.AcceptEncoding = v
  495. }
  496. if v, ok := a.Header("Accept-Language"); ok {
  497. request.AcceptLanguage = v
  498. }
  499. if v, ok := a.Header("User-Agent"); ok {
  500. request.UserAgent = v
  501. }
  502. if v, ok := a.Header("Accept"); ok {
  503. request.Accept = v
  504. }
  505. if v, ok := a.Header("Cookie"); ok {
  506. request.Cookie = v
  507. }
  508. if v, ok := a.Header("X-Forwarded-Host"); ok {
  509. if v != request.Host {
  510. request.Host = v
  511. }
  512. }
  513. return nil
  514. }
  515. func processHTTP(request *data.Request, appData []byte) error {
  516. reader := bufio.NewReader(strings.NewReader(string(appData)))
  517. req, err := http.ReadRequest(reader)
  518. if err != nil {
  519. return fmt.Errorf("Failed to parse HTTP header: %s", err)
  520. }
  521. request.Url = req.URL.String()
  522. request.Method = req.Method
  523. request.Referer = req.Referer()
  524. request.Host = req.Host
  525. request.Protocol = req.Proto
  526. request.Origin = request.Host
  527. if _, ok := req.Header["Connection"]; ok {
  528. request.Connection = req.Header["Connection"][0]
  529. }
  530. if _, ok := req.Header["X-Forwarded-For"]; ok {
  531. request.XForwardedFor = req.Header["X-Forwarded-For"][0]
  532. }
  533. // CloudFlare: override X-Forwarded for since it is tainted by cloudflare
  534. if _, ok := req.Header["True-Client-Ip"]; ok {
  535. request.XForwardedFor = req.Header["True-Client-Ip"][0]
  536. }
  537. if _, ok := req.Header["X-Real-Ip"]; ok {
  538. request.XRealIP = req.Header["X-Real-Ip"][0]
  539. }
  540. if _, ok := req.Header["X-Requested-With"]; ok {
  541. request.XRequestedWith = req.Header["X-Requested-With"][0]
  542. }
  543. if _, ok := req.Header["Accept-Encoding"]; ok {
  544. request.AcceptEncoding = req.Header["Accept-Encoding"][0]
  545. }
  546. if _, ok := req.Header["Accept-Language"]; ok {
  547. request.AcceptLanguage = req.Header["Accept-Language"][0]
  548. }
  549. if _, ok := req.Header["User-Agent"]; ok {
  550. request.UserAgent = req.Header["User-Agent"][0]
  551. }
  552. if _, ok := req.Header["Accept"]; ok {
  553. request.Accept = req.Header["Accept"][0]
  554. }
  555. if _, ok := req.Header["Cookie"]; ok {
  556. request.Cookie = req.Header["Cookie"][0]
  557. }
  558. request.Source = request.IpSrc
  559. return nil
  560. }
  561. // replayFile takes a file containing a list of requests (SourceIP Url) and queues the requests
  562. // e.g.
  563. // 157.55.39.229 /gross-gerau/12012260-beate-anstatt
  564. // 103.232.100.98 /weinsheim-eifel/13729444-plus-warenhandelsges-mbh
  565. func replayFile() {
  566. var req data.Request
  567. var startTs time.Time
  568. var endTs time.Time
  569. rand.Seed(time.Now().UnixNano())
  570. for {
  571. fh, err := os.Open(config.RequestsFile)
  572. if err != nil {
  573. log.Fatalf("Failed to open request file '%s': %s", config.RequestsFile, err)
  574. }
  575. c := csv.NewReader(fh)
  576. c.Comma = ' '
  577. for {
  578. if config.SleepFor.Duration > time.Nanosecond {
  579. startTs = time.Now()
  580. }
  581. r, err := c.Read()
  582. if err == io.EOF {
  583. break
  584. }
  585. if err != nil {
  586. log.Println(err)
  587. continue
  588. }
  589. req.IpSrc = r[0]
  590. req.Source = r[0]
  591. req.Url = r[1]
  592. req.UserAgent = "Munch/1.0"
  593. req.Host = "demo.scraperwall.com"
  594. req.CreatedAt = time.Now().UnixNano()
  595. publishRequest(config.NatsQueue, &req)
  596. if strings.Index(r[1], ".") < 0 {
  597. hash := sha1.New()
  598. io.WriteString(hash, r[0])
  599. fp := data.Fingerprint{
  600. ClientID: "scw",
  601. Fingerprint: fmt.Sprintf("%x", hash.Sum(nil)),
  602. Remote: r[0],
  603. Url: r[1],
  604. Source: r[0],
  605. CreatedAt: time.Now(),
  606. }
  607. if strings.HasPrefix(r[0], "50.31.") {
  608. fp.Fingerprint = "a1f2c2ee560ce6580d66d451a9c8dfbf"
  609. natsJSONEC.Publish("fingerprints_scw", fp)
  610. } else if rand.Intn(10) < 5 {
  611. natsJSONEC.Publish("fingerprints_scw", fp)
  612. }
  613. }
  614. count++
  615. if config.SleepFor.Duration >= time.Nanosecond {
  616. endTs = time.Now()
  617. if endTs.Before(startTs.Add(config.SleepFor.Duration)) {
  618. time.Sleep(config.SleepFor.Duration - endTs.Sub(startTs))
  619. }
  620. }
  621. }
  622. }
  623. }
  624. func loadConfig() {
  625. // initialize with values from the command line / environment
  626. config.Live = *doLiveCapture
  627. config.Interface = *iface
  628. config.SnapshotLen = *snapshotLen
  629. config.Filter = *filter
  630. config.Promiscuous = *promiscuous
  631. config.NatsURL = *natsURL
  632. config.NatsQueue = *natsQueue
  633. config.NatsUser = *natsUser
  634. config.NatsPassword = *natsPassword
  635. config.NatsCA = *natsCA
  636. config.SleepFor.Duration = *sleepFor
  637. config.RequestsFile = *requestsFile
  638. config.UseXForwardedAsSource = *useXForwardedAsSource
  639. config.UseVhostAsSource = *useVhostAsSource
  640. config.Protocol = *protocol
  641. config.TailPoll = *tailPoll
  642. config.ApacheLog = *apacheLog
  643. config.ApacheReplay = *apacheReplay
  644. config.NginxLog = *nginxLog
  645. config.NginxLogFormat = *nginxFormat
  646. config.NginxReplay = *nginxReplay
  647. config.HostName = *hostName
  648. config.Quiet = *beQuiet
  649. config.Trace = *trace
  650. config.AccessWatchKey = *accessWatchKey
  651. config.ReconnectToNatsAfter.Duration = *reconnectToNatsAfter
  652. config.ResetLiveCaptureAfter.Duration = *resetLiveCapAfter
  653. if *configFile == "" {
  654. return
  655. }
  656. _, err := os.Stat(*configFile)
  657. if err != nil {
  658. log.Printf("%s: %s\n", *configFile, err)
  659. return
  660. }
  661. if _, err = toml.DecodeFile(*configFile, &config); err != nil {
  662. log.Printf("%s: %s\n", *configFile, err)
  663. }
  664. if !config.Quiet {
  665. config.print()
  666. }
  667. }
  668. // version outputs build information...
  669. func version() {
  670. fmt.Printf("munchclient %s, built on %s\n", Version, BuildDate)
  671. }