main.go 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798
  1. package main
  2. import (
  3. "bufio"
  4. "crypto/sha1"
  5. "encoding/csv"
  6. "flag"
  7. "fmt"
  8. "io"
  9. "log"
  10. "math/rand"
  11. "net"
  12. "net/http"
  13. "os"
  14. "regexp"
  15. "strings"
  16. "time"
  17. "github.com/BurntSushi/toml"
  18. "github.com/Songmu/axslogparser"
  19. "github.com/google/gopacket"
  20. "github.com/google/gopacket/layers"
  21. "github.com/google/gopacket/pcap"
  22. "github.com/hpcloud/tail"
  23. "github.com/nats-io/nats"
  24. "github.com/nats-io/nats/encoders/protobuf"
  25. "github.com/satyrius/gonx"
  26. "git.scraperwall.com/scw/ajp13"
  27. "git.scraperwall.com/scw/data"
  28. "git.scraperwall.com/scw/ip"
  29. )
  30. var (
  31. doLiveCapture = flag.Bool("live", false, "Capture data in real time from a given interface")
  32. iface = flag.String("interface", "eth0", "Interface to get packets from")
  33. snapshotLen = flag.Int("snapshot-len", 8192, "Snapshot Length in Bytes")
  34. filter = flag.String("filter", "tcp", "PCAP filter expression")
  35. promiscuous = flag.Bool("promiscuous", false, "Switch interface into promiscuous mode?")
  36. natsURL = flag.String("nats-url", "nats://127.0.0.1:4222", "The URL of the NATS server")
  37. natsUser = flag.String("nats-user", "", "The user for NATS authentication")
  38. natsPassword = flag.String("nats-password", "", "The password for NATS authentication")
  39. natsQueue = flag.String("nats-queue", "requests", "The NATS queue name")
  40. natsCA = flag.String("nats-ca", "", "CA chain for NATS TLS")
  41. sleepFor = flag.Duration("sleep", 0, "Sleep this long between sending data (only when replaying a file)")
  42. requestsFile = flag.String("requests", "", "CSV file containing requests (IP and URL)")
  43. protocol = flag.String("protocol", "http", "which protocol to parse: http or ajp13")
  44. useXForwardedAsSource = flag.Bool("use-x-forwarded", false, "Use the IP address in X-Forwarded-For as source")
  45. trace = flag.Bool("trace", false, "Trace the packet capturing")
  46. apacheLog = flag.String("apache-log", "", "Parse an Apache Log file")
  47. nginxLog = flag.String("nginx-log", "", "Nginx log file to tail")
  48. nginxFormat = flag.String("nginx-format", "", "The nginx log file format")
  49. hostName = flag.String("hostname", "", "Override the captured hostname with this one")
  50. configFile = flag.String("config", "", "The location of the TOML config file")
  51. beQuiet = flag.Bool("quiet", true, "Be quiet")
  52. doVersion = flag.Bool("version", false, "Show version information")
  53. natsEC *nats.EncodedConn
  54. natsJSONEC *nats.EncodedConn
  55. natsErrorChan chan error
  56. natsIsAvailable bool
  57. count uint64
  58. timeout = -1 * time.Second
  59. ipPriv *ip.IP
  60. config Config
  61. // Version contains the program Version, e.g. 1.0.1
  62. Version string
  63. // BuildDate contains the date and time at which the program was compiled
  64. BuildDate string
  65. )
  66. // Config contains the program configuration
  67. type Config struct {
  68. Live bool
  69. Interface string
  70. SnapshotLen int
  71. Filter string
  72. Promiscuous bool
  73. NatsURL string
  74. NatsQueue string
  75. NatsUser string
  76. NatsPassword string
  77. NatsCA string
  78. SleepFor duration
  79. RequestsFile string
  80. UseXForwardedAsSource bool
  81. Quiet bool
  82. Protocol string
  83. Trace bool
  84. ApacheLog string
  85. NginxLog string
  86. NginxLogFormat string
  87. HostName string
  88. }
  89. type duration struct {
  90. time.Duration
  91. }
  92. func (d *duration) UnmarshalText(text []byte) error {
  93. var err error
  94. d.Duration, err = time.ParseDuration(string(text))
  95. return err
  96. }
  97. func (c Config) print() {
  98. fmt.Printf("Live: %t\n", c.Live)
  99. fmt.Printf("Interface: %s\n", c.Interface)
  100. fmt.Printf("SnapshotLen: %d\n", c.SnapshotLen)
  101. fmt.Printf("Filter: %s\n", c.Filter)
  102. fmt.Printf("Promiscuous: %t\n", c.Promiscuous)
  103. fmt.Printf("NatsURL: %s\n", c.NatsURL)
  104. fmt.Printf("NatsQueue: %s\n", c.NatsQueue)
  105. fmt.Printf("NatsUser: %s\n", c.NatsUser)
  106. fmt.Printf("NatsPassword: %s\n", c.NatsPassword)
  107. fmt.Printf("NatsCA: %s\n", c.NatsCA)
  108. fmt.Printf("SleepFor: %s\n", c.SleepFor.String())
  109. fmt.Printf("RequestsFile: %s\n", c.RequestsFile)
  110. fmt.Printf("Apache Log: %s\n", c.ApacheLog)
  111. fmt.Printf("Nginx Log: %s\n", c.NginxLog)
  112. fmt.Printf("Nginx Log Format: %s\n", c.NginxLogFormat)
  113. fmt.Printf("HostName: %s\n", c.HostName)
  114. fmt.Printf("UseXForwardedAsSource: %t\n", c.UseXForwardedAsSource)
  115. fmt.Printf("Protocol: %s\n", c.Protocol)
  116. fmt.Printf("Quiet: %t\n", c.Quiet)
  117. fmt.Printf("Trace: %t\n", c.Trace)
  118. }
  119. func init() {
  120. flag.Parse()
  121. nats.RegisterEncoder(protobuf.PROTOBUF_ENCODER, &protobuf.ProtobufEncoder{})
  122. }
  123. func main() {
  124. if *doVersion {
  125. version()
  126. os.Exit(0)
  127. }
  128. loadConfig()
  129. // Output how many requests per second were sent
  130. if !config.Quiet {
  131. go func(c *uint64) {
  132. for {
  133. fmt.Printf("%d requests per second\n", *c)
  134. *c = 0
  135. time.Sleep(time.Second)
  136. }
  137. }(&count)
  138. }
  139. // NATS
  140. //
  141. if config.NatsURL == "" {
  142. log.Fatal("No NATS URL specified (-nats-url)!")
  143. }
  144. natsIsAvailable = false
  145. natsErrorChan = make(chan error, 1)
  146. err := connectToNATS()
  147. if err != nil {
  148. log.Fatal(err)
  149. }
  150. go natsWatchdog(natsErrorChan)
  151. // What should I do?
  152. if config.RequestsFile != "" {
  153. replayFile()
  154. } else if config.ApacheLog != "" {
  155. apacheLogCapture(config.ApacheLog)
  156. } else if config.Live {
  157. fmt.Printf("live capture (%s, %s) to %s\n", config.Interface, config.Filter, config.NatsURL)
  158. liveCapture()
  159. } else if config.NginxLog != "" && config.NginxLogFormat != "" {
  160. nginxLogCapture(config.NginxLog, config.NginxLogFormat)
  161. }
  162. }
  163. func natsWatchdog(closedChan chan error) {
  164. var lastError error
  165. for err := range closedChan {
  166. if lastError != err {
  167. lastError = err
  168. log.Println(err)
  169. }
  170. if err != nats.ErrConnectionClosed {
  171. continue
  172. }
  173. RECONNECT:
  174. for {
  175. log.Printf("Reconnecting to NATS at %s\n", *natsURL)
  176. err := connectToNATS()
  177. if err == nil {
  178. break RECONNECT
  179. }
  180. time.Sleep(1 * time.Second)
  181. }
  182. }
  183. }
  184. func connectToNATS() error {
  185. var natsConn *nats.Conn
  186. var err error
  187. if config.NatsUser != "" && config.NatsPassword != "" && config.NatsCA != "" {
  188. natsConn, err = nats.Connect(config.NatsURL, nats.UserInfo(config.NatsUser, config.NatsPassword), nats.RootCAs(config.NatsCA))
  189. } else {
  190. if config.NatsPassword != "" && config.NatsUser != "" {
  191. natsConn, err = nats.Connect(config.NatsURL, nats.UserInfo(config.NatsUser, config.NatsPassword))
  192. } else {
  193. natsConn, err = nats.Connect(config.NatsURL)
  194. }
  195. }
  196. if err != nil {
  197. return err
  198. }
  199. natsEC, err = nats.NewEncodedConn(natsConn, protobuf.PROTOBUF_ENCODER)
  200. if err != nil {
  201. return fmt.Errorf("Encoded Connection: %v", err)
  202. }
  203. natsJSONEC, err = nats.NewEncodedConn(natsConn, nats.JSON_ENCODER)
  204. if err != nil {
  205. return fmt.Errorf("Encoded Connection: %v", err)
  206. }
  207. natsIsAvailable = true
  208. return nil
  209. }
  210. func nginxLogCapture(logfile, format string) {
  211. if _, err := os.Stat(logfile); err != nil {
  212. log.Fatalf("%s: %s", logfile, err)
  213. }
  214. t, err := tail.TailFile(logfile, tail.Config{
  215. Follow: true, // follow the file
  216. ReOpen: true, // reopen log file when it gets closed/rotated
  217. Logger: tail.DiscardingLogger, // don't log anything
  218. Location: &tail.SeekInfo{Offset: 0, Whence: 2}, // start at the end of the file
  219. })
  220. if err != nil {
  221. log.Fatalf("%s: %s", logfile, err)
  222. }
  223. // `$remote_addr - $remote_user [$time_local] "$request" $status $body_bytes_sent "$http_referer" "$http_user_agent"`
  224. p := gonx.NewParser(format)
  225. reqRegexp := regexp.MustCompile(`^([A-Z]+)\s+(.+?)\s+(HTTP/\d+\.\d+)$`)
  226. for line := range t.Lines {
  227. l := line.Text
  228. logEntry, err := p.ParseString(l)
  229. if err != nil {
  230. log.Println(err)
  231. continue
  232. }
  233. remote, err := logEntry.Field("remote_addr")
  234. if err != nil {
  235. log.Println(err)
  236. continue
  237. }
  238. // only use the first host in case there are multiple hosts in the log
  239. if cidx := strings.Index(remote, ","); cidx >= 0 {
  240. remote = remote[0:cidx]
  241. }
  242. timestampStr, err := logEntry.Field("time_local")
  243. if err != nil {
  244. log.Println(err)
  245. continue
  246. }
  247. timeStamp, err := time.Parse("02/Jan/2006:15:04:05 -0700", timestampStr)
  248. if err != nil {
  249. log.Println(err)
  250. continue
  251. }
  252. httpRequest, err := logEntry.Field("request")
  253. if err != nil {
  254. log.Println(err)
  255. continue
  256. }
  257. reqData := reqRegexp.FindStringSubmatch(httpRequest)
  258. if len(reqData) < 4 {
  259. log.Printf("reqData is too short: %d instead of 4\n", len(reqData))
  260. continue
  261. }
  262. host := config.HostName
  263. if host == "" {
  264. host = "[not available]"
  265. }
  266. request := data.Request{
  267. IpSrc: remote,
  268. Origin: remote,
  269. Source: remote,
  270. IpDst: "127.0.0.1",
  271. PortSrc: 0,
  272. PortDst: 0,
  273. TcpSeq: 0,
  274. CreatedAt: timeStamp.Unix(),
  275. Url: reqData[2],
  276. Method: reqData[1],
  277. Host: host,
  278. Protocol: reqData[3],
  279. }
  280. request.Referer, _ = logEntry.Field("http_referer")
  281. request.UserAgent, _ = logEntry.Field("http_user_agent")
  282. if config.Trace {
  283. log.Printf("[%s] %s\n", request.Source, request.Url)
  284. }
  285. count++
  286. publishRequest(config.NatsQueue, &request)
  287. }
  288. }
  289. func apacheLogCapture(logfile string) {
  290. if _, err := os.Stat(logfile); err != nil {
  291. log.Fatalf("%s: %s", logfile, err)
  292. }
  293. t, err := tail.TailFile(logfile, tail.Config{
  294. Follow: true, // follow the file
  295. ReOpen: true, // reopen log file when it gets closed/rotated
  296. Logger: tail.DiscardingLogger, // don't log anything
  297. Location: &tail.SeekInfo{Offset: 0, Whence: 2}, // start at the end of the file
  298. })
  299. if err != nil {
  300. log.Fatalf("%s: %s", logfile, err)
  301. }
  302. var p axslogparser.Parser
  303. parserSet := false
  304. for line := range t.Lines {
  305. l := line.Text
  306. if !parserSet {
  307. p, _, err = axslogparser.GuessParser(l)
  308. if err != nil {
  309. log.Println(err)
  310. continue
  311. }
  312. parserSet = true
  313. }
  314. logEntry, err := p.Parse(l)
  315. if err != nil {
  316. log.Println(err)
  317. continue
  318. }
  319. remote := logEntry.Host
  320. if *useXForwardedAsSource && logEntry.ForwardedFor != "" {
  321. remote = logEntry.ForwardedFor
  322. }
  323. // only use the first host in case there are multiple hosts in the log
  324. if cidx := strings.Index(remote, ","); cidx >= 0 {
  325. remote = remote[0:cidx]
  326. }
  327. // extract the virtual host
  328. var virtualHost string
  329. vhost := logEntry.VirtualHost
  330. if vhost != "" {
  331. vhostAndPort := strings.Split(logEntry.VirtualHost, ":")
  332. virtualHost = vhostAndPort[0]
  333. } else {
  334. if config.HostName != "" {
  335. vhost = config.HostName
  336. } else {
  337. vhost = "[not available]"
  338. }
  339. }
  340. request := data.Request{
  341. IpSrc: remote,
  342. IpDst: "127.0.0.1",
  343. PortSrc: 0,
  344. PortDst: 0,
  345. TcpSeq: 0,
  346. CreatedAt: logEntry.Time.UnixNano(),
  347. Url: logEntry.RequestURI,
  348. Method: logEntry.Method,
  349. Host: virtualHost,
  350. Protocol: logEntry.Protocol,
  351. Origin: remote,
  352. Source: remote,
  353. Referer: logEntry.Referer,
  354. XForwardedFor: logEntry.ForwardedFor,
  355. UserAgent: logEntry.UserAgent,
  356. }
  357. if config.Trace {
  358. log.Printf("[%s] %s\n", request.Source, request.Url)
  359. }
  360. count++
  361. publishRequest(config.NatsQueue, &request)
  362. }
  363. }
  364. func liveCapture() {
  365. ipPriv = ip.NewIP()
  366. // PCAP setup
  367. //
  368. handle, err := pcap.OpenLive(config.Interface, int32(config.SnapshotLen), config.Promiscuous, timeout)
  369. if err != nil {
  370. log.Fatal(err)
  371. }
  372. defer handle.Close()
  373. err = handle.SetBPFFilter(config.Filter)
  374. if err != nil {
  375. log.Fatal(err)
  376. }
  377. packetSource := gopacket.NewPacketSource(handle, handle.LinkType())
  378. for packet := range packetSource.Packets() {
  379. go processPacket(packet)
  380. }
  381. }
  382. func publishRequest(queue string, request *data.Request) {
  383. if !natsIsAvailable {
  384. return
  385. }
  386. if err := natsEC.Publish(config.NatsQueue, request); err != nil {
  387. natsErrorChan <- err
  388. if err == nats.ErrConnectionClosed {
  389. natsIsAvailable = false
  390. }
  391. }
  392. }
  393. // processPacket receives a raw packet from pcap, builds a Request item from it and sends it to the queue
  394. func processPacket(packet gopacket.Packet) {
  395. hasIPv4 := false
  396. var ipSrc, ipDst string
  397. // IPv4
  398. if ipLayer := packet.Layer(layers.LayerTypeIPv4); ipLayer != nil {
  399. ip := ipLayer.(*layers.IPv4)
  400. ipSrc = ip.SrcIP.String()
  401. ipDst = ip.DstIP.String()
  402. hasIPv4 = true
  403. }
  404. // IPv6
  405. if !hasIPv4 {
  406. if ipLayer := packet.Layer(layers.LayerTypeIPv6); ipLayer != nil {
  407. ip := ipLayer.(*layers.IPv6)
  408. ipSrc = ip.SrcIP.String()
  409. ipDst = ip.DstIP.String()
  410. }
  411. }
  412. // TCP
  413. tcpLayer := packet.Layer(layers.LayerTypeTCP)
  414. if tcpLayer == nil {
  415. return
  416. }
  417. tcp, _ := tcpLayer.(*layers.TCP)
  418. portSrc := tcp.SrcPort
  419. portDst := tcp.DstPort
  420. sequence := tcp.Seq
  421. applicationLayer := packet.ApplicationLayer()
  422. if applicationLayer == nil {
  423. return
  424. }
  425. count++
  426. if len(applicationLayer.Payload()) < 50 {
  427. log.Println("application layer too small!")
  428. return
  429. }
  430. request := data.Request{
  431. IpSrc: ipSrc,
  432. IpDst: ipDst,
  433. PortSrc: uint32(portSrc),
  434. PortDst: uint32(portDst),
  435. TcpSeq: uint32(sequence),
  436. CreatedAt: packet.Metadata().CaptureInfo.Timestamp.UnixNano(),
  437. }
  438. switch config.Protocol {
  439. case "http":
  440. err := processHTTP(&request, applicationLayer.Payload())
  441. if err != nil {
  442. log.Println(err)
  443. return
  444. }
  445. case "ajp13":
  446. err := processAJP13(&request, applicationLayer.Payload())
  447. if err != nil {
  448. log.Println(err)
  449. return
  450. }
  451. }
  452. if config.UseXForwardedAsSource && request.XForwardedFor != "" {
  453. if strings.Contains(request.XForwardedFor, ",") {
  454. ips := strings.Split(request.XForwardedFor, ",")
  455. for i := len(ips) - 1; i >= 0; i-- {
  456. ipRaw := strings.TrimSpace(ips[i])
  457. ipAddr := net.ParseIP(ipRaw)
  458. if ipAddr != nil && !ipPriv.IsPrivate(ipAddr) {
  459. request.Source = ipRaw
  460. break
  461. }
  462. }
  463. } else {
  464. ipAddr := net.ParseIP(strings.TrimSpace(request.XForwardedFor))
  465. if !ipPriv.IsPrivate(ipAddr) {
  466. request.Source = request.XForwardedFor
  467. }
  468. }
  469. }
  470. if request.Source == request.IpSrc && request.XRealIP != "" {
  471. request.Source = request.XRealIP
  472. }
  473. if config.Trace {
  474. log.Printf("[%s] %s\n", request.Source, request.Url)
  475. }
  476. publishRequest(config.NatsQueue, &request)
  477. }
  478. func processAJP13(request *data.Request, appData []byte) error {
  479. a, err := ajp13.Parse(appData)
  480. if err != nil {
  481. return fmt.Errorf("Failed to parse AJP13 request: %s", err)
  482. }
  483. request.Url = a.URI
  484. request.Method = a.Method()
  485. request.Host = a.Server
  486. request.Protocol = a.Version
  487. request.Origin = a.RemoteAddr.String()
  488. request.Source = a.RemoteAddr.String()
  489. if v, ok := a.Header("Referer"); ok {
  490. request.Referer = v
  491. }
  492. if v, ok := a.Header("Connection"); ok {
  493. request.Connection = v
  494. }
  495. if v, ok := a.Header("X-Forwarded-For"); ok {
  496. request.XForwardedFor = v
  497. }
  498. if v, ok := a.Header("X-Real-IP"); ok {
  499. request.XRealIP = v
  500. }
  501. if v, ok := a.Header("X-Requested-With"); ok {
  502. request.XRequestedWith = v
  503. }
  504. if v, ok := a.Header("Accept-Encoding"); ok {
  505. request.AcceptEncoding = v
  506. }
  507. if v, ok := a.Header("Accept-Language"); ok {
  508. request.AcceptLanguage = v
  509. }
  510. if v, ok := a.Header("User-Agent"); ok {
  511. request.UserAgent = v
  512. }
  513. if v, ok := a.Header("Accept"); ok {
  514. request.Accept = v
  515. }
  516. if v, ok := a.Header("Cookie"); ok {
  517. request.Cookie = v
  518. }
  519. if v, ok := a.Header("X-Forwarded-Host"); ok {
  520. if v != request.Host {
  521. request.Host = v
  522. }
  523. }
  524. return nil
  525. }
  526. func processHTTP(request *data.Request, appData []byte) error {
  527. reader := bufio.NewReader(strings.NewReader(string(appData)))
  528. req, err := http.ReadRequest(reader)
  529. if err != nil {
  530. return fmt.Errorf("Failed to parse HTTP header: %s", err)
  531. }
  532. request.Url = req.URL.String()
  533. request.Method = req.Method
  534. request.Referer = req.Referer()
  535. request.Host = req.Host
  536. request.Protocol = req.Proto
  537. request.Origin = request.Host
  538. if _, ok := req.Header["Connection"]; ok {
  539. request.Connection = req.Header["Connection"][0]
  540. }
  541. if _, ok := req.Header["X-Forwarded-For"]; ok {
  542. request.XForwardedFor = req.Header["X-Forwarded-For"][0]
  543. }
  544. // CloudFlare: override X-Forwarded for since it is tainted by cloudflare
  545. if _, ok := req.Header["True-Client-Ip"]; ok {
  546. request.XForwardedFor = req.Header["True-Client-Ip"][0]
  547. }
  548. if _, ok := req.Header["X-Real-Ip"]; ok {
  549. request.XRealIP = req.Header["X-Real-Ip"][0]
  550. }
  551. if _, ok := req.Header["X-Requested-With"]; ok {
  552. request.XRequestedWith = req.Header["X-Requested-With"][0]
  553. }
  554. if _, ok := req.Header["Accept-Encoding"]; ok {
  555. request.AcceptEncoding = req.Header["Accept-Encoding"][0]
  556. }
  557. if _, ok := req.Header["Accept-Language"]; ok {
  558. request.AcceptLanguage = req.Header["Accept-Language"][0]
  559. }
  560. if _, ok := req.Header["User-Agent"]; ok {
  561. request.UserAgent = req.Header["User-Agent"][0]
  562. }
  563. if _, ok := req.Header["Accept"]; ok {
  564. request.Accept = req.Header["Accept"][0]
  565. }
  566. if _, ok := req.Header["Cookie"]; ok {
  567. request.Cookie = req.Header["Cookie"][0]
  568. }
  569. request.Source = request.IpSrc
  570. return nil
  571. }
  572. // replayFile takes a file containing a list of requests (SourceIP Url) and queues the requests
  573. // e.g.
  574. // 157.55.39.229 /gross-gerau/12012260-beate-anstatt
  575. // 103.232.100.98 /weinsheim-eifel/13729444-plus-warenhandelsges-mbh
  576. func replayFile() {
  577. var req data.Request
  578. var startTs time.Time
  579. var endTs time.Time
  580. rand.Seed(time.Now().UnixNano())
  581. for {
  582. fh, err := os.Open(config.RequestsFile)
  583. if err != nil {
  584. log.Fatalf("Failed to open request file '%s': %s", config.RequestsFile, err)
  585. }
  586. c := csv.NewReader(fh)
  587. c.Comma = ' '
  588. for {
  589. if config.SleepFor.Duration > time.Nanosecond {
  590. startTs = time.Now()
  591. }
  592. r, err := c.Read()
  593. if err == io.EOF {
  594. break
  595. }
  596. if err != nil {
  597. log.Println(err)
  598. continue
  599. }
  600. req.IpSrc = r[0]
  601. req.Source = r[0]
  602. req.Url = r[1]
  603. req.UserAgent = "Munch/1.0"
  604. req.Host = "demo.scraperwall.com"
  605. req.CreatedAt = time.Now().UnixNano()
  606. publishRequest(config.NatsQueue, &req)
  607. if strings.Index(r[1], ".") < 0 {
  608. hash := sha1.New()
  609. io.WriteString(hash, r[0])
  610. fp := data.Fingerprint{
  611. ClientID: "scw",
  612. Fingerprint: fmt.Sprintf("%x", hash.Sum(nil)),
  613. Remote: r[0],
  614. Url: r[1],
  615. Source: r[0],
  616. CreatedAt: time.Now(),
  617. }
  618. if strings.HasPrefix(r[0], "50.31.") {
  619. fp.Fingerprint = "a1f2c2ee560ce6580d66d451a9c8dfbf"
  620. natsJSONEC.Publish("fingerprints_scw", fp)
  621. } else if rand.Intn(10) < 5 {
  622. natsJSONEC.Publish("fingerprints_scw", fp)
  623. }
  624. }
  625. count++
  626. if config.SleepFor.Duration >= time.Nanosecond {
  627. endTs = time.Now()
  628. if endTs.Before(startTs.Add(config.SleepFor.Duration)) {
  629. time.Sleep(config.SleepFor.Duration - endTs.Sub(startTs))
  630. }
  631. }
  632. }
  633. }
  634. }
  635. func loadConfig() {
  636. // initialize with values from the command line / environment
  637. config.Live = *doLiveCapture
  638. config.Interface = *iface
  639. config.SnapshotLen = *snapshotLen
  640. config.Filter = *filter
  641. config.Promiscuous = *promiscuous
  642. config.NatsURL = *natsURL
  643. config.NatsQueue = *natsQueue
  644. config.NatsUser = *natsUser
  645. config.NatsPassword = *natsPassword
  646. config.NatsCA = *natsCA
  647. config.SleepFor.Duration = *sleepFor
  648. config.RequestsFile = *requestsFile
  649. config.UseXForwardedAsSource = *useXForwardedAsSource
  650. config.Protocol = *protocol
  651. config.ApacheLog = *apacheLog
  652. config.NginxLog = *nginxLog
  653. config.NginxLogFormat = *nginxFormat
  654. config.HostName = *hostName
  655. config.Quiet = *beQuiet
  656. config.Trace = *trace
  657. if *configFile == "" {
  658. return
  659. }
  660. _, err := os.Stat(*configFile)
  661. if err != nil {
  662. log.Printf("%s: %s\n", *configFile, err)
  663. return
  664. }
  665. if _, err = toml.DecodeFile(*configFile, &config); err != nil {
  666. log.Printf("%s: %s\n", *configFile, err)
  667. }
  668. if !config.Quiet {
  669. config.print()
  670. }
  671. }
  672. // version outputs build information
  673. func version() {
  674. fmt.Printf("munchclient %s, built on %s\n", Version, BuildDate)
  675. }