main.go 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790
  1. package main
  2. import (
  3. "bufio"
  4. "bytes"
  5. "crypto/sha1"
  6. "encoding/csv"
  7. "encoding/json"
  8. "flag"
  9. "fmt"
  10. "io"
  11. "log"
  12. "math/rand"
  13. "net"
  14. "net/http"
  15. "os"
  16. "strings"
  17. "time"
  18. "github.com/BurntSushi/toml"
  19. "github.com/google/gopacket"
  20. "github.com/google/gopacket/layers"
  21. "github.com/google/gopacket/pcap"
  22. "github.com/nats-io/nats"
  23. "github.com/nats-io/nats/encoders/protobuf"
  24. "git.scraperwall.com/scw/ajp13"
  25. "git.scraperwall.com/scw/data"
  26. "git.scraperwall.com/scw/ip"
  27. )
  28. var (
  29. doLiveCapture = flag.Bool("live", false, "Capture data in real time from a given interface")
  30. iface = flag.String("interface", "eth0", "Interface to get packets from")
  31. snapshotLen = flag.Int("snapshot-len", 8192, "Snapshot Length in Bytes")
  32. filter = flag.String("filter", "tcp", "PCAP filter expression")
  33. promiscuous = flag.Bool("promiscuous", false, "Switch interface into promiscuous mode?")
  34. natsURL = flag.String("nats-url", "nats://127.0.0.1:4222", "The URL of the NATS server")
  35. natsUser = flag.String("nats-user", "", "The user for NATS authentication")
  36. natsPassword = flag.String("nats-password", "", "The password for NATS authentication")
  37. natsQueue = flag.String("nats-queue", "requests", "The NATS queue name")
  38. natsCA = flag.String("nats-ca", "", "CA chain for NATS TLS")
  39. reconnectToNatsAfter = flag.Duration("reconnect-to-nats-after", 0, "reconnect to nats after this time periodically")
  40. resetLiveCapAfter = flag.Duration("reset-live-cap-after", 613*time.Second, "reset the live capture setup after this amount of time")
  41. sleepFor = flag.Duration("sleep", 0, "Sleep this long between sending data (only when replaying a file)")
  42. requestsFile = flag.String("requests", "", "CSV file containing requests (IP and URL)")
  43. protocol = flag.String("protocol", "http", "which protocol to parse: http or ajp13")
  44. useXForwardedAsSource = flag.Bool("use-x-forwarded", false, "Use the IP address in X-Forwarded-For as source")
  45. useVhostAsSource = flag.Bool("use-vhost-as-source", false, "Use the Vhost as source")
  46. trace = flag.Bool("trace", false, "Trace the packet capturing")
  47. apacheLog = flag.String("apache-log", "", "Parse an Apache Log file")
  48. apacheReplay = flag.String("apache-replay", "", "Apache log file to replay into the system")
  49. nginxLog = flag.String("nginx-log", "", "Nginx log file to tail")
  50. nginxFormat = flag.String("nginx-format", "", "The nginx log file format")
  51. nginxReplay = flag.String("nginx-replay", "", "Replay this nginx logfile")
  52. hostName = flag.String("hostname", "", "Override the captured hostname with this one")
  53. accessWatchKey = flag.String("access-watch-key", "", "access.watch API key")
  54. configFile = flag.String("config", "", "The location of the TOML config file")
  55. beQuiet = flag.Bool("quiet", true, "Be quiet")
  56. doVersion = flag.Bool("version", false, "Show version information")
  57. natsEC *nats.EncodedConn
  58. natsJSONEC *nats.EncodedConn
  59. natsErrorChan chan error
  60. natsIsAvailable bool
  61. count uint64
  62. timeout = -1 * time.Second
  63. ipPriv *ip.IP
  64. config Config
  65. // Version contains the program Version, e.g. 1.0.1
  66. Version string
  67. // BuildDate contains the date and time at which the program was compiled
  68. BuildDate string
  69. )
  70. // Config contains the program configuration
  71. type Config struct {
  72. Live bool
  73. Interface string
  74. SnapshotLen int
  75. Filter string
  76. Promiscuous bool
  77. NatsURL string
  78. NatsQueue string
  79. NatsUser string
  80. NatsPassword string
  81. NatsCA string
  82. SleepFor duration
  83. RequestsFile string
  84. UseXForwardedAsSource bool
  85. UseVhostAsSource bool
  86. Quiet bool
  87. Protocol string
  88. Trace bool
  89. ApacheLog string
  90. ApacheReplay string
  91. NginxLog string
  92. NginxLogFormat string
  93. NginxReplay string
  94. HostName string
  95. AccessWatchKey string
  96. ReconnectToNatsAfter duration
  97. ResetLiveCaptureAfter duration
  98. }
  99. type duration struct {
  100. time.Duration
  101. }
  102. func (d *duration) UnmarshalText(text []byte) error {
  103. var err error
  104. d.Duration, err = time.ParseDuration(string(text))
  105. return err
  106. }
  107. func (c Config) print() {
  108. fmt.Printf("Live: %t\n", c.Live)
  109. fmt.Printf("Interface: %s\n", c.Interface)
  110. fmt.Printf("SnapshotLen: %d\n", c.SnapshotLen)
  111. fmt.Printf("Filter: %s\n", c.Filter)
  112. fmt.Printf("Promiscuous: %t\n", c.Promiscuous)
  113. fmt.Printf("NatsURL: %s\n", c.NatsURL)
  114. fmt.Printf("NatsQueue: %s\n", c.NatsQueue)
  115. fmt.Printf("NatsUser: %s\n", c.NatsUser)
  116. fmt.Printf("NatsPassword: %s\n", c.NatsPassword)
  117. fmt.Printf("NatsCA: %s\n", c.NatsCA)
  118. fmt.Printf("ReconnectToNatsAfter: %s\n", c.ReconnectToNatsAfter.String())
  119. fmt.Printf("SleepFor: %s\n", c.SleepFor.String())
  120. fmt.Printf("RequestsFile: %s\n", c.RequestsFile)
  121. fmt.Printf("Apache Log: %s\n", c.ApacheLog)
  122. fmt.Printf("Apache Replay: %s\n", c.ApacheReplay)
  123. fmt.Printf("Nginx Log: %s\n", c.NginxLog)
  124. fmt.Printf("Nginx Log Format: %s\n", c.NginxLogFormat)
  125. fmt.Printf("NginxReplay: %s\n", c.NginxReplay)
  126. fmt.Printf("HostName: %s\n", c.HostName)
  127. fmt.Printf("AccessWatchKey: %s\n", c.AccessWatchKey)
  128. fmt.Printf("UseXForwardedAsSource: %t\n", c.UseXForwardedAsSource)
  129. fmt.Printf("UseVhostAsSource: %t\n", c.UseVhostAsSource)
  130. fmt.Printf("Protocol: %s\n", c.Protocol)
  131. fmt.Printf("Reset Live Cap After: %s\n", c.ResetLiveCaptureAfter.String())
  132. fmt.Printf("Quiet: %t\n", c.Quiet)
  133. fmt.Printf("Trace: %t\n", c.Trace)
  134. }
  135. func init() {
  136. flag.Parse()
  137. nats.RegisterEncoder(protobuf.PROTOBUF_ENCODER, &protobuf.ProtobufEncoder{})
  138. }
  139. func main() {
  140. if *doVersion {
  141. version()
  142. os.Exit(0)
  143. }
  144. loadConfig()
  145. // Output how many requests per second were sent
  146. if !config.Quiet {
  147. go func(c *uint64) {
  148. for {
  149. fmt.Printf("%d requests per second\n", *c)
  150. *c = 0
  151. time.Sleep(time.Second)
  152. }
  153. }(&count)
  154. }
  155. // NATS
  156. //
  157. if config.NatsURL == "" && config.AccessWatchKey == "" {
  158. log.Fatal("No NATS URL specified (-nats-url)!")
  159. }
  160. natsIsAvailable = false
  161. natsErrorChan = make(chan error, 1)
  162. err := connectToNATS()
  163. if err != nil && config.AccessWatchKey == "" {
  164. log.Fatal(err)
  165. }
  166. // reconnect to nats periodically
  167. //
  168. if *reconnectToNatsAfter > time.Minute {
  169. go func(interval time.Duration) {
  170. for range time.Tick(interval) {
  171. if config.Trace {
  172. log.Printf("reconnecting to NATS")
  173. }
  174. natsEC.Conn.Close()
  175. natsJSONEC.Conn.Close()
  176. }
  177. }(*reconnectToNatsAfter)
  178. }
  179. go natsWatchdog(natsErrorChan)
  180. // What should I do?
  181. if config.RequestsFile != "" {
  182. replayFile()
  183. } else if config.ApacheReplay != "" {
  184. apacheLogReplay(config.ApacheReplay)
  185. } else if config.NginxReplay != "" {
  186. nginxLogReplay(config.NginxReplay, config.NginxLogFormat)
  187. } else if config.ApacheLog != "" {
  188. apacheLogCapture(config.ApacheLog)
  189. } else if config.Live {
  190. fmt.Printf("live capture (%s, %s) to %s\n", config.Interface, config.Filter, config.NatsURL)
  191. liveCapture()
  192. } else if config.NginxLog != "" && config.NginxLogFormat != "" {
  193. nginxLogCapture(config.NginxLog, config.NginxLogFormat)
  194. }
  195. }
  196. func natsWatchdog(closedChan chan error) {
  197. var lastError error
  198. for err := range closedChan {
  199. if lastError != err {
  200. lastError = err
  201. log.Println(err)
  202. }
  203. if err != nats.ErrConnectionClosed {
  204. continue
  205. }
  206. RECONNECT:
  207. for {
  208. log.Printf("Reconnecting to NATS at %s\n", *natsURL)
  209. err := connectToNATS()
  210. if err == nil {
  211. break RECONNECT
  212. }
  213. time.Sleep(1 * time.Second)
  214. }
  215. }
  216. }
  217. func connectToNATS() error {
  218. var natsConn *nats.Conn
  219. var err error
  220. if config.NatsUser != "" && config.NatsPassword != "" && config.NatsCA != "" {
  221. natsConn, err = nats.Connect(config.NatsURL, nats.UserInfo(config.NatsUser, config.NatsPassword), nats.RootCAs(config.NatsCA))
  222. } else {
  223. if config.NatsPassword != "" && config.NatsUser != "" {
  224. natsConn, err = nats.Connect(config.NatsURL, nats.UserInfo(config.NatsUser, config.NatsPassword))
  225. } else {
  226. natsConn, err = nats.Connect(config.NatsURL)
  227. }
  228. }
  229. if err != nil {
  230. return err
  231. }
  232. natsEC, err = nats.NewEncodedConn(natsConn, protobuf.PROTOBUF_ENCODER)
  233. if err != nil {
  234. return fmt.Errorf("Encoded Connection: %v", err)
  235. }
  236. natsJSONEC, err = nats.NewEncodedConn(natsConn, nats.JSON_ENCODER)
  237. if err != nil {
  238. return fmt.Errorf("Encoded Connection: %v", err)
  239. }
  240. natsIsAvailable = true
  241. return nil
  242. }
  243. type liveCap struct {
  244. filter string
  245. device string
  246. promisc bool
  247. snapshotLen int
  248. handle *pcap.Handle
  249. packetChan chan gopacket.Packet
  250. }
  251. func newLiveCap(device string, filter string, snapshotLen int, promisc bool) (*liveCap, error) {
  252. lc := &liveCap{
  253. filter: filter,
  254. device: device,
  255. promisc: promisc,
  256. snapshotLen: snapshotLen,
  257. }
  258. err := lc.SetupCap()
  259. if err != nil {
  260. return nil, err
  261. }
  262. return lc, nil
  263. }
  264. func (lc *liveCap) SetupCap() error {
  265. if !config.Quiet {
  266. log.Printf("reading incoming HTTP requests on %s %s\n", config.Interface, config.Filter)
  267. }
  268. if lc.handle != nil {
  269. lc.handle.Close()
  270. }
  271. // PCAP setup
  272. //
  273. handle, err := pcap.OpenLive(config.Interface, int32(config.SnapshotLen), config.Promiscuous, timeout)
  274. if err != nil {
  275. return err
  276. }
  277. // defer handle.Close()
  278. lc.handle = handle
  279. err = lc.handle.SetBPFFilter(config.Filter)
  280. if err != nil {
  281. return err
  282. }
  283. packetSource := gopacket.NewPacketSource(handle, handle.LinkType())
  284. lc.packetChan = packetSource.Packets()
  285. return nil
  286. }
  287. func liveCapture() {
  288. ipPriv = ip.NewIP()
  289. livecap, err := newLiveCap(config.Interface, config.Filter, config.SnapshotLen, config.Promiscuous)
  290. if err != nil {
  291. log.Fatal(err)
  292. }
  293. closeChan := time.Tick(config.ResetLiveCaptureAfter.Duration)
  294. for {
  295. select {
  296. case <-closeChan:
  297. livecap.SetupCap()
  298. case p := <-livecap.packetChan:
  299. go processPacket(p)
  300. }
  301. }
  302. }
  303. func writeLogToWatch(r *data.Request) {
  304. h := map[string]string{}
  305. if r.AcceptEncoding != "" {
  306. h["Accept-Encoding"] = r.AcceptEncoding
  307. }
  308. if r.Accept != "" {
  309. h["Accept"] = r.Accept
  310. }
  311. if r.AcceptLanguage != "" {
  312. h["Accept-Language"] = r.AcceptLanguage
  313. }
  314. if r.Cookie != "" {
  315. h["Cookie"] = r.Cookie
  316. }
  317. if r.Host != "" {
  318. h["Host"] = r.Host
  319. }
  320. if r.Referer != "" {
  321. h["Referer"] = r.Referer
  322. }
  323. if r.UserAgent != "" {
  324. h["User-Agent"] = r.UserAgent
  325. }
  326. if r.Via != "" {
  327. h["Via"] = r.Via
  328. }
  329. if r.XForwardedFor != "" {
  330. h["X-Forwarded-For"] = r.XForwardedFor
  331. }
  332. if r.XRequestedWith != "" {
  333. h["X-Requested-With"] = r.XRequestedWith
  334. }
  335. data := map[string]interface{}{
  336. "request": map[string]interface{}{
  337. "time": time.Unix(0, r.CreatedAt),
  338. "address": r.Source,
  339. "protocol": r.Protocol,
  340. "scheme": "https",
  341. "method": r.Method,
  342. "url": r.Url,
  343. "headers": h,
  344. },
  345. "response": map[string]interface{}{"status": "200"},
  346. }
  347. jdata, err := json.Marshal(data)
  348. client := &http.Client{}
  349. fmt.Println(string(jdata))
  350. buf := bytes.NewBuffer(jdata)
  351. req, err := http.NewRequest("POST", "https://log.access.watch/1.1/log", buf)
  352. req.Header.Add("Api-Key", config.AccessWatchKey)
  353. req.Header.Add("Accept", "application/json")
  354. req.Header.Add("Content-Type", "application/json")
  355. resp, err := client.Do(req)
  356. if err != nil {
  357. log.Println(err)
  358. }
  359. resp.Body.Close()
  360. }
  361. func publishRequest(queue string, request *data.Request) {
  362. if config.AccessWatchKey != "" {
  363. writeLogToWatch(request)
  364. return
  365. }
  366. if !natsIsAvailable {
  367. if rand.Intn(100) == 0 {
  368. log.Println("nats connection is not available")
  369. }
  370. return
  371. }
  372. if err := natsEC.Publish(config.NatsQueue, request); err != nil {
  373. natsErrorChan <- err
  374. if err == nats.ErrConnectionClosed {
  375. natsIsAvailable = false
  376. }
  377. }
  378. }
  379. // processPacket receives a raw packet from pcap, builds a Request item from it and sends it to the queue
  380. func processPacket(packet gopacket.Packet) {
  381. hasIPv4 := false
  382. var ipSrc, ipDst string
  383. // IPv4
  384. if ipLayer := packet.Layer(layers.LayerTypeIPv4); ipLayer != nil {
  385. ip := ipLayer.(*layers.IPv4)
  386. ipSrc = ip.SrcIP.String()
  387. ipDst = ip.DstIP.String()
  388. hasIPv4 = true
  389. }
  390. // IPv6
  391. if !hasIPv4 {
  392. if ipLayer := packet.Layer(layers.LayerTypeIPv6); ipLayer != nil {
  393. ip := ipLayer.(*layers.IPv6)
  394. ipSrc = ip.SrcIP.String()
  395. ipDst = ip.DstIP.String()
  396. }
  397. }
  398. // TCP
  399. tcpLayer := packet.Layer(layers.LayerTypeTCP)
  400. if tcpLayer == nil {
  401. return
  402. }
  403. tcp, _ := tcpLayer.(*layers.TCP)
  404. portSrc := tcp.SrcPort
  405. portDst := tcp.DstPort
  406. sequence := tcp.Seq
  407. applicationLayer := packet.ApplicationLayer()
  408. if applicationLayer == nil {
  409. return
  410. }
  411. count++
  412. if len(applicationLayer.Payload()) < 50 {
  413. log.Println("application layer too small!")
  414. return
  415. }
  416. request := data.Request{
  417. IpSrc: ipSrc,
  418. IpDst: ipDst,
  419. PortSrc: uint32(portSrc),
  420. PortDst: uint32(portDst),
  421. TcpSeq: uint32(sequence),
  422. CreatedAt: packet.Metadata().CaptureInfo.Timestamp.UnixNano(),
  423. }
  424. switch config.Protocol {
  425. case "http":
  426. err := processHTTP(&request, applicationLayer.Payload())
  427. if err != nil {
  428. log.Println(err)
  429. return
  430. }
  431. case "ajp13":
  432. err := processAJP13(&request, applicationLayer.Payload())
  433. if err != nil {
  434. log.Println(err)
  435. return
  436. }
  437. }
  438. if config.UseXForwardedAsSource && request.XForwardedFor != "" {
  439. if strings.Contains(request.XForwardedFor, ",") {
  440. ips := strings.Split(request.XForwardedFor, ",")
  441. for i := len(ips) - 1; i >= 0; i-- {
  442. ipRaw := strings.TrimSpace(ips[i])
  443. ipAddr := net.ParseIP(ipRaw)
  444. if ipAddr != nil && !ipPriv.IsPrivate(ipAddr) {
  445. request.Source = ipRaw
  446. break
  447. }
  448. }
  449. } else {
  450. ipAddr := net.ParseIP(strings.TrimSpace(request.XForwardedFor))
  451. if !ipPriv.IsPrivate(ipAddr) {
  452. request.Source = request.XForwardedFor
  453. }
  454. }
  455. }
  456. if request.Source == request.IpSrc && request.XRealIP != "" {
  457. request.Source = request.XRealIP
  458. }
  459. if config.Trace {
  460. log.Printf("[%s] %s\n", request.Source, request.Url)
  461. }
  462. publishRequest(config.NatsQueue, &request)
  463. }
  464. func processAJP13(request *data.Request, appData []byte) error {
  465. a, err := ajp13.Parse(appData)
  466. if err != nil {
  467. return fmt.Errorf("Failed to parse AJP13 request: %s", err)
  468. }
  469. request.Url = a.URI
  470. request.Method = a.Method()
  471. request.Host = a.Server
  472. request.Protocol = a.Version
  473. request.Origin = a.RemoteAddr.String()
  474. request.Source = a.RemoteAddr.String()
  475. if v, ok := a.Header("Referer"); ok {
  476. request.Referer = v
  477. }
  478. if v, ok := a.Header("Connection"); ok {
  479. request.Connection = v
  480. }
  481. if v, ok := a.Header("X-Forwarded-For"); ok {
  482. request.XForwardedFor = v
  483. }
  484. if v, ok := a.Header("X-Real-IP"); ok {
  485. request.XRealIP = v
  486. }
  487. if v, ok := a.Header("X-Requested-With"); ok {
  488. request.XRequestedWith = v
  489. }
  490. if v, ok := a.Header("Accept-Encoding"); ok {
  491. request.AcceptEncoding = v
  492. }
  493. if v, ok := a.Header("Accept-Language"); ok {
  494. request.AcceptLanguage = v
  495. }
  496. if v, ok := a.Header("User-Agent"); ok {
  497. request.UserAgent = v
  498. }
  499. if v, ok := a.Header("Accept"); ok {
  500. request.Accept = v
  501. }
  502. if v, ok := a.Header("Cookie"); ok {
  503. request.Cookie = v
  504. }
  505. if v, ok := a.Header("X-Forwarded-Host"); ok {
  506. if v != request.Host {
  507. request.Host = v
  508. }
  509. }
  510. return nil
  511. }
  512. func processHTTP(request *data.Request, appData []byte) error {
  513. reader := bufio.NewReader(strings.NewReader(string(appData)))
  514. req, err := http.ReadRequest(reader)
  515. if err != nil {
  516. return fmt.Errorf("Failed to parse HTTP header: %s", err)
  517. }
  518. request.Url = req.URL.String()
  519. request.Method = req.Method
  520. request.Referer = req.Referer()
  521. request.Host = req.Host
  522. request.Protocol = req.Proto
  523. request.Origin = request.Host
  524. if _, ok := req.Header["Connection"]; ok {
  525. request.Connection = req.Header["Connection"][0]
  526. }
  527. if _, ok := req.Header["X-Forwarded-For"]; ok {
  528. request.XForwardedFor = req.Header["X-Forwarded-For"][0]
  529. }
  530. // CloudFlare: override X-Forwarded for since it is tainted by cloudflare
  531. if _, ok := req.Header["True-Client-Ip"]; ok {
  532. request.XForwardedFor = req.Header["True-Client-Ip"][0]
  533. }
  534. if _, ok := req.Header["X-Real-Ip"]; ok {
  535. request.XRealIP = req.Header["X-Real-Ip"][0]
  536. }
  537. if _, ok := req.Header["X-Requested-With"]; ok {
  538. request.XRequestedWith = req.Header["X-Requested-With"][0]
  539. }
  540. if _, ok := req.Header["Accept-Encoding"]; ok {
  541. request.AcceptEncoding = req.Header["Accept-Encoding"][0]
  542. }
  543. if _, ok := req.Header["Accept-Language"]; ok {
  544. request.AcceptLanguage = req.Header["Accept-Language"][0]
  545. }
  546. if _, ok := req.Header["User-Agent"]; ok {
  547. request.UserAgent = req.Header["User-Agent"][0]
  548. }
  549. if _, ok := req.Header["Accept"]; ok {
  550. request.Accept = req.Header["Accept"][0]
  551. }
  552. if _, ok := req.Header["Cookie"]; ok {
  553. request.Cookie = req.Header["Cookie"][0]
  554. }
  555. request.Source = request.IpSrc
  556. return nil
  557. }
  558. // replayFile takes a file containing a list of requests (SourceIP Url) and queues the requests
  559. // e.g.
  560. // 157.55.39.229 /gross-gerau/12012260-beate-anstatt
  561. // 103.232.100.98 /weinsheim-eifel/13729444-plus-warenhandelsges-mbh
  562. func replayFile() {
  563. var req data.Request
  564. var startTs time.Time
  565. var endTs time.Time
  566. rand.Seed(time.Now().UnixNano())
  567. for {
  568. fh, err := os.Open(config.RequestsFile)
  569. if err != nil {
  570. log.Fatalf("Failed to open request file '%s': %s", config.RequestsFile, err)
  571. }
  572. c := csv.NewReader(fh)
  573. c.Comma = ' '
  574. for {
  575. if config.SleepFor.Duration > time.Nanosecond {
  576. startTs = time.Now()
  577. }
  578. r, err := c.Read()
  579. if err == io.EOF {
  580. break
  581. }
  582. if err != nil {
  583. log.Println(err)
  584. continue
  585. }
  586. req.IpSrc = r[0]
  587. req.Source = r[0]
  588. req.Url = r[1]
  589. req.UserAgent = "Munch/1.0"
  590. req.Host = "demo.scraperwall.com"
  591. req.CreatedAt = time.Now().UnixNano()
  592. publishRequest(config.NatsQueue, &req)
  593. if strings.Index(r[1], ".") < 0 {
  594. hash := sha1.New()
  595. io.WriteString(hash, r[0])
  596. fp := data.Fingerprint{
  597. ClientID: "scw",
  598. Fingerprint: fmt.Sprintf("%x", hash.Sum(nil)),
  599. Remote: r[0],
  600. Url: r[1],
  601. Source: r[0],
  602. CreatedAt: time.Now(),
  603. }
  604. if strings.HasPrefix(r[0], "50.31.") {
  605. fp.Fingerprint = "a1f2c2ee560ce6580d66d451a9c8dfbf"
  606. natsJSONEC.Publish("fingerprints_scw", fp)
  607. } else if rand.Intn(10) < 5 {
  608. natsJSONEC.Publish("fingerprints_scw", fp)
  609. }
  610. }
  611. count++
  612. if config.SleepFor.Duration >= time.Nanosecond {
  613. endTs = time.Now()
  614. if endTs.Before(startTs.Add(config.SleepFor.Duration)) {
  615. time.Sleep(config.SleepFor.Duration - endTs.Sub(startTs))
  616. }
  617. }
  618. }
  619. }
  620. }
  621. func loadConfig() {
  622. // initialize with values from the command line / environment
  623. config.Live = *doLiveCapture
  624. config.Interface = *iface
  625. config.SnapshotLen = *snapshotLen
  626. config.Filter = *filter
  627. config.Promiscuous = *promiscuous
  628. config.NatsURL = *natsURL
  629. config.NatsQueue = *natsQueue
  630. config.NatsUser = *natsUser
  631. config.NatsPassword = *natsPassword
  632. config.NatsCA = *natsCA
  633. config.SleepFor.Duration = *sleepFor
  634. config.RequestsFile = *requestsFile
  635. config.UseXForwardedAsSource = *useXForwardedAsSource
  636. config.UseVhostAsSource = *useVhostAsSource
  637. config.Protocol = *protocol
  638. config.ApacheLog = *apacheLog
  639. config.ApacheReplay = *apacheReplay
  640. config.NginxLog = *nginxLog
  641. config.NginxLogFormat = *nginxFormat
  642. config.NginxReplay = *nginxReplay
  643. config.HostName = *hostName
  644. config.Quiet = *beQuiet
  645. config.Trace = *trace
  646. config.AccessWatchKey = *accessWatchKey
  647. config.ReconnectToNatsAfter.Duration = *reconnectToNatsAfter
  648. config.ResetLiveCaptureAfter.Duration = *resetLiveCapAfter
  649. if *configFile == "" {
  650. return
  651. }
  652. _, err := os.Stat(*configFile)
  653. if err != nil {
  654. log.Printf("%s: %s\n", *configFile, err)
  655. return
  656. }
  657. if _, err = toml.DecodeFile(*configFile, &config); err != nil {
  658. log.Printf("%s: %s\n", *configFile, err)
  659. }
  660. if !config.Quiet {
  661. config.print()
  662. }
  663. }
  664. // version outputs build information...
  665. func version() {
  666. fmt.Printf("munchclient %s, built on %s\n", Version, BuildDate)
  667. }