main.go 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789
  1. package main
  2. import (
  3. "bufio"
  4. "crypto/sha1"
  5. "encoding/csv"
  6. "flag"
  7. "fmt"
  8. "io"
  9. "log"
  10. "math/rand"
  11. "net"
  12. "net/http"
  13. "os"
  14. "regexp"
  15. "strings"
  16. "time"
  17. "github.com/BurntSushi/toml"
  18. "github.com/Songmu/axslogparser"
  19. "github.com/google/gopacket"
  20. "github.com/google/gopacket/layers"
  21. "github.com/google/gopacket/pcap"
  22. "github.com/hpcloud/tail"
  23. "github.com/nats-io/nats"
  24. "github.com/nats-io/nats/encoders/protobuf"
  25. "github.com/satyrius/gonx"
  26. "git.scraperwall.com/scw/ajp13"
  27. "git.scraperwall.com/scw/data"
  28. "git.scraperwall.com/scw/ip"
  29. )
  30. var (
  31. doLiveCapture = flag.Bool("live", false, "Capture data in real time from a given interface")
  32. iface = flag.String("interface", "eth0", "Interface to get packets from")
  33. snapshotLen = flag.Int("snapshot-len", 8192, "Snapshot Length in Bytes")
  34. filter = flag.String("filter", "tcp", "PCAP filter expression")
  35. promiscuous = flag.Bool("promiscuous", false, "Switch interface into promiscuous mode?")
  36. natsURL = flag.String("nats-url", "nats://127.0.0.1:4222", "The URL of the NATS server")
  37. natsUser = flag.String("nats-user", "", "The user for NATS authentication")
  38. natsPassword = flag.String("nats-password", "", "The password for NATS authentication")
  39. natsQueue = flag.String("nats-queue", "requests", "The NATS queue name")
  40. natsCA = flag.String("nats-ca", "", "CA chain for NATS TLS")
  41. sleepFor = flag.Duration("sleep", 0, "Sleep this long between sending data (only when replaying a file)")
  42. requestsFile = flag.String("requests", "", "CSV file containing requests (IP and URL)")
  43. protocol = flag.String("protocol", "http", "which protocol to parse: http or ajp13")
  44. useXForwardedAsSource = flag.Bool("use-x-forwarded", false, "Use the IP address in X-Forwarded-For as source")
  45. trace = flag.Bool("trace", false, "Trace the packet capturing")
  46. apacheLog = flag.String("apache-log", "", "Parse an Apache Log file")
  47. nginxLog = flag.String("nginx-log", "", "Nginx log file to tail")
  48. nginxFormat = flag.String("nginx-format", "", "The nginx log file format")
  49. hostName = flag.String("hostname", "", "Override the captured hostname with this one")
  50. configFile = flag.String("config", "", "The location of the TOML config file")
  51. beQuiet = flag.Bool("quiet", true, "Be quiet")
  52. doVersion = flag.Bool("version", false, "Show version information")
  53. natsEC *nats.EncodedConn
  54. natsJSONEC *nats.EncodedConn
  55. natsErrorChan chan error
  56. natsIsAvailable bool
  57. count uint64
  58. timeout = -1 * time.Second
  59. ipPriv *ip.IP
  60. config Config
  61. // Version contains the program Version, e.g. 1.0.1
  62. Version string
  63. // BuildDate contains the date and time at which the program was compiled
  64. BuildDate string
  65. )
  66. // Config contains the program configuration
  67. type Config struct {
  68. Live bool
  69. Interface string
  70. SnapshotLen int
  71. Filter string
  72. Promiscuous bool
  73. NatsURL string
  74. NatsQueue string
  75. NatsUser string
  76. NatsPassword string
  77. NatsCA string
  78. SleepFor duration
  79. RequestsFile string
  80. UseXForwardedAsSource bool
  81. Quiet bool
  82. Protocol string
  83. Trace bool
  84. ApacheLog string
  85. NginxLog string
  86. NginxLogFormat string
  87. HostName string
  88. }
  89. type duration struct {
  90. time.Duration
  91. }
  92. func (d *duration) UnmarshalText(text []byte) error {
  93. var err error
  94. d.Duration, err = time.ParseDuration(string(text))
  95. return err
  96. }
  97. func (c Config) print() {
  98. fmt.Printf("Live: %t\n", c.Live)
  99. fmt.Printf("Interface: %s\n", c.Interface)
  100. fmt.Printf("SnapshotLen: %d\n", c.SnapshotLen)
  101. fmt.Printf("Filter: %s\n", c.Filter)
  102. fmt.Printf("Promiscuous: %t\n", c.Promiscuous)
  103. fmt.Printf("NatsURL: %s\n", c.NatsURL)
  104. fmt.Printf("NatsQueue: %s\n", c.NatsQueue)
  105. fmt.Printf("NatsUser: %s\n", c.NatsUser)
  106. fmt.Printf("NatsPassword: %s\n", c.NatsPassword)
  107. fmt.Printf("NatsCA: %s\n", c.NatsCA)
  108. fmt.Printf("SleepFor: %s\n", c.SleepFor.String())
  109. fmt.Printf("RequestsFile: %s\n", c.RequestsFile)
  110. fmt.Printf("Apache Log: %s\n", c.ApacheLog)
  111. fmt.Printf("Nginx Log: %s\n", c.NginxLog)
  112. fmt.Printf("Nginx Log Format: %s\n", c.NginxLogFormat)
  113. fmt.Printf("HostName: %s\n", c.HostName)
  114. fmt.Printf("UseXForwardedAsSource: %t\n", c.UseXForwardedAsSource)
  115. fmt.Printf("Protocol: %s\n", c.Protocol)
  116. fmt.Printf("Quiet: %t\n", c.Quiet)
  117. fmt.Printf("Trace: %t\n", c.Trace)
  118. }
  119. func init() {
  120. flag.Parse()
  121. nats.RegisterEncoder(protobuf.PROTOBUF_ENCODER, &protobuf.ProtobufEncoder{})
  122. }
  123. func main() {
  124. if *doVersion {
  125. version()
  126. os.Exit(0)
  127. }
  128. loadConfig()
  129. // Output how many requests per second were sent
  130. if !config.Quiet {
  131. go func(c *uint64) {
  132. for {
  133. fmt.Printf("%d requests per second\n", *c)
  134. *c = 0
  135. time.Sleep(time.Second)
  136. }
  137. }(&count)
  138. }
  139. // NATS
  140. //
  141. if config.NatsURL == "" {
  142. log.Fatal("No NATS URL specified (-nats-url)!")
  143. }
  144. natsIsAvailable = false
  145. natsErrorChan = make(chan error, 1)
  146. err := connectToNATS()
  147. if err != nil {
  148. log.Fatal(err)
  149. }
  150. go natsWatchdog(natsErrorChan)
  151. // What should I do?
  152. if config.RequestsFile != "" {
  153. replayFile()
  154. } else if config.ApacheLog != "" {
  155. apacheLogCapture(config.ApacheLog)
  156. } else if config.Live {
  157. fmt.Printf("live capture (%s, %s) to %s\n", config.Interface, config.Filter, config.NatsURL)
  158. liveCapture()
  159. } else if config.NginxLog != "" && config.NginxLogFormat != "" {
  160. nginxLogCapture(config.NginxLog, config.NginxLogFormat)
  161. }
  162. }
  163. func natsWatchdog(closedChan chan error) {
  164. var lastError error
  165. for err := range closedChan {
  166. if lastError != err {
  167. lastError = err
  168. log.Println(err)
  169. }
  170. if err != nats.ErrConnectionClosed {
  171. continue
  172. }
  173. RECONNECT:
  174. for {
  175. log.Printf("Reconnecting to NATS at %s\n", *natsURL)
  176. err := connectToNATS()
  177. if err == nil {
  178. break RECONNECT
  179. }
  180. time.Sleep(1 * time.Second)
  181. }
  182. }
  183. }
  184. func connectToNATS() error {
  185. var natsConn *nats.Conn
  186. var err error
  187. if config.NatsUser != "" && config.NatsPassword != "" && config.NatsCA != "" {
  188. natsConn, err = nats.Connect(config.NatsURL, nats.UserInfo(config.NatsUser, config.NatsPassword), nats.RootCAs(config.NatsCA))
  189. } else {
  190. if config.NatsPassword != "" && config.NatsUser != "" {
  191. natsConn, err = nats.Connect(config.NatsURL, nats.UserInfo(config.NatsUser, config.NatsPassword))
  192. } else {
  193. natsConn, err = nats.Connect(config.NatsURL)
  194. }
  195. }
  196. if err != nil {
  197. return err
  198. }
  199. natsEC, err = nats.NewEncodedConn(natsConn, protobuf.PROTOBUF_ENCODER)
  200. if err != nil {
  201. return fmt.Errorf("Encoded Connection: %v", err)
  202. }
  203. natsJSONEC, err = nats.NewEncodedConn(natsConn, nats.JSON_ENCODER)
  204. if err != nil {
  205. return fmt.Errorf("Encoded Connection: %v", err)
  206. }
  207. natsIsAvailable = true
  208. return nil
  209. }
  210. func nginxLogCapture(logfile, format string) {
  211. if _, err := os.Stat(logfile); err != nil {
  212. log.Fatalf("%s: %s", logfile, err)
  213. }
  214. t, err := tail.TailFile(logfile, tail.Config{
  215. Follow: true, // follow the file
  216. ReOpen: true, // reopen log file when it gets closed/rotated
  217. Logger: tail.DiscardingLogger, // don't log anything
  218. Location: &tail.SeekInfo{Offset: 0, Whence: 2}, // start at the end of the file
  219. })
  220. if err != nil {
  221. log.Fatalf("%s: %s", logfile, err)
  222. }
  223. // `$remote_addr - $remote_user [$time_local] "$request" $status $body_bytes_sent "$http_referer" "$http_user_agent"`
  224. p := gonx.NewParser(format)
  225. reqRegexp := regexp.MustCompile(`^([A-Z]+)\s+(.+?)\s+(HTTP/\d+\.\d+)$`)
  226. for line := range t.Lines {
  227. l := line.Text
  228. logEntry, err := p.ParseString(l)
  229. if err != nil {
  230. log.Println(err)
  231. continue
  232. }
  233. remote, err := logEntry.Field("remote_addr")
  234. if err != nil {
  235. log.Println(err)
  236. continue
  237. }
  238. // only use the first host in case there are multiple hosts in the log
  239. if cidx := strings.Index(remote, ","); cidx >= 0 {
  240. remote = remote[0:cidx]
  241. }
  242. timestampStr, err := logEntry.Field("time_local")
  243. if err != nil {
  244. log.Println(err)
  245. continue
  246. }
  247. timeStamp, err := time.Parse("02/Jan/2006:15:04:05 -0700", timestampStr)
  248. if err != nil {
  249. log.Println(err)
  250. continue
  251. }
  252. httpRequest, err := logEntry.Field("request")
  253. if err != nil {
  254. log.Println(err)
  255. continue
  256. }
  257. reqData := reqRegexp.FindStringSubmatch(httpRequest)
  258. if len(reqData) < 4 {
  259. log.Printf("reqData is too short: %d instead of 4\n", len(reqData))
  260. continue
  261. }
  262. request := data.Request{
  263. IpSrc: remote,
  264. Origin: remote,
  265. Source: remote,
  266. IpDst: "127.0.0.1",
  267. PortSrc: 0,
  268. PortDst: 0,
  269. TcpSeq: 0,
  270. CreatedAt: timeStamp.Unix(),
  271. Url: reqData[2],
  272. Method: reqData[1],
  273. Host: "[not available]",
  274. Protocol: reqData[3],
  275. }
  276. request.Referer, _ = logEntry.Field("http_referer")
  277. request.UserAgent, _ = logEntry.Field("http_user_agent")
  278. if config.Trace {
  279. log.Printf("[%s] %s\n", request.Source, request.Url)
  280. }
  281. count++
  282. publishRequest(config.NatsQueue, &request)
  283. }
  284. }
  285. func apacheLogCapture(logfile string) {
  286. if _, err := os.Stat(logfile); err != nil {
  287. log.Fatalf("%s: %s", logfile, err)
  288. }
  289. t, err := tail.TailFile(logfile, tail.Config{
  290. Follow: true, // follow the file
  291. ReOpen: true, // reopen log file when it gets closed/rotated
  292. Logger: tail.DiscardingLogger, // don't log anything
  293. Location: &tail.SeekInfo{Offset: 0, Whence: 2}, // start at the end of the file
  294. })
  295. if err != nil {
  296. log.Fatalf("%s: %s", logfile, err)
  297. }
  298. var p axslogparser.Parser
  299. parserSet := false
  300. for line := range t.Lines {
  301. l := line.Text
  302. if !parserSet {
  303. p, _, err = axslogparser.GuessParser(l)
  304. if err != nil {
  305. log.Println(err)
  306. continue
  307. }
  308. parserSet = true
  309. }
  310. logEntry, err := p.Parse(l)
  311. if err != nil {
  312. log.Println(err)
  313. continue
  314. }
  315. remote := logEntry.Host
  316. if *useXForwardedAsSource && logEntry.ForwardedFor != "" {
  317. remote = logEntry.ForwardedFor
  318. }
  319. // only use the first host in case there are multiple hosts in the log
  320. if cidx := strings.Index(remote, ","); cidx >= 0 {
  321. remote = remote[0:cidx]
  322. }
  323. // extract the virtual host
  324. var virtualHost string
  325. vhost := logEntry.VirtualHost
  326. if vhost != "" {
  327. vhostAndPort := strings.Split(logEntry.VirtualHost, ":")
  328. virtualHost = vhostAndPort[0]
  329. } else {
  330. vhost = "[not available]"
  331. }
  332. request := data.Request{
  333. IpSrc: remote,
  334. IpDst: "127.0.0.1",
  335. PortSrc: 0,
  336. PortDst: 0,
  337. TcpSeq: 0,
  338. CreatedAt: logEntry.Time.UnixNano(),
  339. Url: logEntry.RequestURI,
  340. Method: logEntry.Method,
  341. Host: virtualHost,
  342. Protocol: logEntry.Protocol,
  343. Origin: remote,
  344. Source: remote,
  345. Referer: logEntry.Referer,
  346. XForwardedFor: logEntry.ForwardedFor,
  347. UserAgent: logEntry.UserAgent,
  348. }
  349. if config.Trace {
  350. log.Printf("[%s] %s\n", request.Source, request.Url)
  351. }
  352. count++
  353. publishRequest(config.NatsQueue, &request)
  354. }
  355. }
  356. func liveCapture() {
  357. ipPriv = ip.NewIP()
  358. // PCAP setup
  359. //
  360. handle, err := pcap.OpenLive(config.Interface, int32(config.SnapshotLen), config.Promiscuous, timeout)
  361. if err != nil {
  362. log.Fatal(err)
  363. }
  364. defer handle.Close()
  365. err = handle.SetBPFFilter(config.Filter)
  366. if err != nil {
  367. log.Fatal(err)
  368. }
  369. packetSource := gopacket.NewPacketSource(handle, handle.LinkType())
  370. for packet := range packetSource.Packets() {
  371. go processPacket(packet)
  372. }
  373. }
  374. func publishRequest(queue string, request *data.Request) {
  375. if !natsIsAvailable {
  376. return
  377. }
  378. if err := natsEC.Publish(config.NatsQueue, request); err != nil {
  379. natsErrorChan <- err
  380. if err == nats.ErrConnectionClosed {
  381. natsIsAvailable = false
  382. }
  383. }
  384. }
  385. // processPacket receives a raw packet from pcap, builds a Request item from it and sends it to the queue
  386. func processPacket(packet gopacket.Packet) {
  387. hasIPv4 := false
  388. var ipSrc, ipDst string
  389. // IPv4
  390. if ipLayer := packet.Layer(layers.LayerTypeIPv4); ipLayer != nil {
  391. ip := ipLayer.(*layers.IPv4)
  392. ipSrc = ip.SrcIP.String()
  393. ipDst = ip.DstIP.String()
  394. hasIPv4 = true
  395. }
  396. // IPv6
  397. if !hasIPv4 {
  398. if ipLayer := packet.Layer(layers.LayerTypeIPv6); ipLayer != nil {
  399. ip := ipLayer.(*layers.IPv6)
  400. ipSrc = ip.SrcIP.String()
  401. ipDst = ip.DstIP.String()
  402. }
  403. }
  404. // TCP
  405. tcpLayer := packet.Layer(layers.LayerTypeTCP)
  406. if tcpLayer == nil {
  407. return
  408. }
  409. tcp, _ := tcpLayer.(*layers.TCP)
  410. portSrc := tcp.SrcPort
  411. portDst := tcp.DstPort
  412. sequence := tcp.Seq
  413. applicationLayer := packet.ApplicationLayer()
  414. if applicationLayer == nil {
  415. return
  416. }
  417. count++
  418. if len(applicationLayer.Payload()) < 50 {
  419. log.Println("application layer too small!")
  420. return
  421. }
  422. request := data.Request{
  423. IpSrc: ipSrc,
  424. IpDst: ipDst,
  425. PortSrc: uint32(portSrc),
  426. PortDst: uint32(portDst),
  427. TcpSeq: uint32(sequence),
  428. CreatedAt: packet.Metadata().CaptureInfo.Timestamp.UnixNano(),
  429. }
  430. switch config.Protocol {
  431. case "http":
  432. err := processHTTP(&request, applicationLayer.Payload())
  433. if err != nil {
  434. log.Println(err)
  435. return
  436. }
  437. case "ajp13":
  438. err := processAJP13(&request, applicationLayer.Payload())
  439. if err != nil {
  440. log.Println(err)
  441. return
  442. }
  443. }
  444. if config.UseXForwardedAsSource && request.XForwardedFor != "" {
  445. if strings.Contains(request.XForwardedFor, ",") {
  446. ips := strings.Split(request.XForwardedFor, ",")
  447. for i := len(ips) - 1; i >= 0; i-- {
  448. ipRaw := strings.TrimSpace(ips[i])
  449. ipAddr := net.ParseIP(ipRaw)
  450. if ipAddr != nil && !ipPriv.IsPrivate(ipAddr) {
  451. request.Source = ipRaw
  452. break
  453. }
  454. }
  455. } else {
  456. ipAddr := net.ParseIP(strings.TrimSpace(request.XForwardedFor))
  457. if !ipPriv.IsPrivate(ipAddr) {
  458. request.Source = request.XForwardedFor
  459. }
  460. }
  461. }
  462. if request.Source == request.IpSrc && request.XRealIP != "" {
  463. request.Source = request.XRealIP
  464. }
  465. if config.Trace {
  466. log.Printf("[%s] %s\n", request.Source, request.Url)
  467. }
  468. publishRequest(config.NatsQueue, &request)
  469. }
  470. func processAJP13(request *data.Request, appData []byte) error {
  471. a, err := ajp13.Parse(appData)
  472. if err != nil {
  473. return fmt.Errorf("Failed to parse AJP13 request: %s", err)
  474. }
  475. request.Url = a.URI
  476. request.Method = a.Method()
  477. request.Host = a.Server
  478. request.Protocol = a.Version
  479. request.Origin = a.RemoteAddr.String()
  480. request.Source = a.RemoteAddr.String()
  481. if v, ok := a.Header("Referer"); ok {
  482. request.Referer = v
  483. }
  484. if v, ok := a.Header("Connection"); ok {
  485. request.Connection = v
  486. }
  487. if v, ok := a.Header("X-Forwarded-For"); ok {
  488. request.XForwardedFor = v
  489. }
  490. if v, ok := a.Header("X-Real-IP"); ok {
  491. request.XRealIP = v
  492. }
  493. if v, ok := a.Header("X-Requested-With"); ok {
  494. request.XRequestedWith = v
  495. }
  496. if v, ok := a.Header("Accept-Encoding"); ok {
  497. request.AcceptEncoding = v
  498. }
  499. if v, ok := a.Header("Accept-Language"); ok {
  500. request.AcceptLanguage = v
  501. }
  502. if v, ok := a.Header("User-Agent"); ok {
  503. request.UserAgent = v
  504. }
  505. if v, ok := a.Header("Accept"); ok {
  506. request.Accept = v
  507. }
  508. if v, ok := a.Header("Cookie"); ok {
  509. request.Cookie = v
  510. }
  511. if v, ok := a.Header("X-Forwarded-Host"); ok {
  512. if v != request.Host {
  513. request.Host = v
  514. }
  515. }
  516. return nil
  517. }
  518. func processHTTP(request *data.Request, appData []byte) error {
  519. reader := bufio.NewReader(strings.NewReader(string(appData)))
  520. req, err := http.ReadRequest(reader)
  521. if err != nil {
  522. return fmt.Errorf("Failed to parse HTTP header: %s", err)
  523. }
  524. request.Url = req.URL.String()
  525. request.Method = req.Method
  526. request.Referer = req.Referer()
  527. request.Host = req.Host
  528. request.Protocol = req.Proto
  529. request.Origin = request.Host
  530. if _, ok := req.Header["Connection"]; ok {
  531. request.Connection = req.Header["Connection"][0]
  532. }
  533. if _, ok := req.Header["X-Forwarded-For"]; ok {
  534. request.XForwardedFor = req.Header["X-Forwarded-For"][0]
  535. }
  536. // CloudFlare: override X-Forwarded for since it is tainted by cloudflare
  537. if _, ok := req.Header["True-Client-Ip"]; ok {
  538. request.XForwardedFor = req.Header["True-Client-Ip"][0]
  539. }
  540. if _, ok := req.Header["X-Real-Ip"]; ok {
  541. request.XRealIP = req.Header["X-Real-Ip"][0]
  542. }
  543. if _, ok := req.Header["X-Requested-With"]; ok {
  544. request.XRequestedWith = req.Header["X-Requested-With"][0]
  545. }
  546. if _, ok := req.Header["Accept-Encoding"]; ok {
  547. request.AcceptEncoding = req.Header["Accept-Encoding"][0]
  548. }
  549. if _, ok := req.Header["Accept-Language"]; ok {
  550. request.AcceptLanguage = req.Header["Accept-Language"][0]
  551. }
  552. if _, ok := req.Header["User-Agent"]; ok {
  553. request.UserAgent = req.Header["User-Agent"][0]
  554. }
  555. if _, ok := req.Header["Accept"]; ok {
  556. request.Accept = req.Header["Accept"][0]
  557. }
  558. if _, ok := req.Header["Cookie"]; ok {
  559. request.Cookie = req.Header["Cookie"][0]
  560. }
  561. request.Source = request.IpSrc
  562. return nil
  563. }
  564. // replayFile takes a file containing a list of requests (SourceIP Url) and queues the requests
  565. // e.g.
  566. // 157.55.39.229 /gross-gerau/12012260-beate-anstatt
  567. // 103.232.100.98 /weinsheim-eifel/13729444-plus-warenhandelsges-mbh
  568. func replayFile() {
  569. var req data.Request
  570. var startTs time.Time
  571. var endTs time.Time
  572. rand.Seed(time.Now().UnixNano())
  573. for {
  574. fh, err := os.Open(config.RequestsFile)
  575. if err != nil {
  576. log.Fatalf("Failed to open request file '%s': %s", config.RequestsFile, err)
  577. }
  578. c := csv.NewReader(fh)
  579. c.Comma = ' '
  580. for {
  581. if config.SleepFor.Duration > time.Nanosecond {
  582. startTs = time.Now()
  583. }
  584. r, err := c.Read()
  585. if err == io.EOF {
  586. break
  587. }
  588. if err != nil {
  589. log.Println(err)
  590. continue
  591. }
  592. req.IpSrc = r[0]
  593. req.Source = r[0]
  594. req.Url = r[1]
  595. req.UserAgent = "Munch/1.0"
  596. req.Host = "demo.scraperwall.com"
  597. req.CreatedAt = time.Now().UnixNano()
  598. publishRequest(config.NatsQueue, &req)
  599. if strings.Index(r[1], ".") < 0 {
  600. hash := sha1.New()
  601. io.WriteString(hash, r[0])
  602. fp := data.Fingerprint{
  603. ClientID: "scw",
  604. Fingerprint: fmt.Sprintf("%x", hash.Sum(nil)),
  605. Remote: r[0],
  606. Url: r[1],
  607. Source: r[0],
  608. CreatedAt: time.Now(),
  609. }
  610. if strings.HasPrefix(r[0], "50.31.") {
  611. fp.Fingerprint = "a1f2c2ee560ce6580d66d451a9c8dfbf"
  612. natsJSONEC.Publish("fingerprints_scw", fp)
  613. } else if rand.Intn(10) < 5 {
  614. natsJSONEC.Publish("fingerprints_scw", fp)
  615. }
  616. }
  617. count++
  618. if config.SleepFor.Duration >= time.Nanosecond {
  619. endTs = time.Now()
  620. if endTs.Before(startTs.Add(config.SleepFor.Duration)) {
  621. time.Sleep(config.SleepFor.Duration - endTs.Sub(startTs))
  622. }
  623. }
  624. }
  625. }
  626. }
  627. func loadConfig() {
  628. // initialize with values from the command line / environment
  629. config.Live = *doLiveCapture
  630. config.Interface = *iface
  631. config.SnapshotLen = *snapshotLen
  632. config.Filter = *filter
  633. config.Promiscuous = *promiscuous
  634. config.NatsURL = *natsURL
  635. config.NatsQueue = *natsQueue
  636. config.NatsUser = *natsUser
  637. config.NatsPassword = *natsPassword
  638. config.NatsCA = *natsCA
  639. config.SleepFor.Duration = *sleepFor
  640. config.RequestsFile = *requestsFile
  641. config.UseXForwardedAsSource = *useXForwardedAsSource
  642. config.Protocol = *protocol
  643. config.ApacheLog = *apacheLog
  644. config.NginxLog = *nginxLog
  645. config.NginxLogFormat = *nginxFormat
  646. config.HostName = *hostName
  647. config.Quiet = *beQuiet
  648. config.Trace = *trace
  649. if *configFile == "" {
  650. return
  651. }
  652. _, err := os.Stat(*configFile)
  653. if err != nil {
  654. log.Printf("%s: %s\n", *configFile, err)
  655. return
  656. }
  657. if _, err = toml.DecodeFile(*configFile, &config); err != nil {
  658. log.Printf("%s: %s\n", *configFile, err)
  659. }
  660. if !config.Quiet {
  661. config.print()
  662. }
  663. }
  664. // version outputs build information
  665. func version() {
  666. fmt.Printf("munchclient %s, built on %s\n", Version, BuildDate)
  667. }