main.go 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793
  1. package main
  2. import (
  3. "bufio"
  4. "crypto/sha1"
  5. "encoding/csv"
  6. "flag"
  7. "fmt"
  8. "io"
  9. "log"
  10. "math/rand"
  11. "net"
  12. "net/http"
  13. "os"
  14. "regexp"
  15. "strings"
  16. "time"
  17. "github.com/BurntSushi/toml"
  18. "github.com/Songmu/axslogparser"
  19. "github.com/google/gopacket"
  20. "github.com/google/gopacket/layers"
  21. "github.com/google/gopacket/pcap"
  22. "github.com/hpcloud/tail"
  23. "github.com/nats-io/nats"
  24. "github.com/nats-io/nats/encoders/protobuf"
  25. "github.com/satyrius/gonx"
  26. "git.scraperwall.com/scw/ajp13"
  27. "git.scraperwall.com/scw/data"
  28. "git.scraperwall.com/scw/ip"
  29. )
  30. var (
  31. doLiveCapture = flag.Bool("live", false, "Capture data in real time from a given interface")
  32. iface = flag.String("interface", "eth0", "Interface to get packets from")
  33. snapshotLen = flag.Int("snapshot-len", 8192, "Snapshot Length in Bytes")
  34. filter = flag.String("filter", "tcp", "PCAP filter expression")
  35. promiscuous = flag.Bool("promiscuous", false, "Switch interface into promiscuous mode?")
  36. natsURL = flag.String("nats-url", "nats://127.0.0.1:4222", "The URL of the NATS server")
  37. natsUser = flag.String("nats-user", "", "The user for NATS authentication")
  38. natsPassword = flag.String("nats-password", "", "The password for NATS authentication")
  39. natsQueue = flag.String("nats-queue", "requests", "The NATS queue name")
  40. natsCA = flag.String("nats-ca", "", "CA chain for NATS TLS")
  41. sleepFor = flag.Duration("sleep", 0, "Sleep this long between sending data (only when replaying a file)")
  42. requestsFile = flag.String("requests", "", "CSV file containing requests (IP and URL)")
  43. protocol = flag.String("protocol", "http", "which protocol to parse: http or ajp13")
  44. useXForwardedAsSource = flag.Bool("use-x-forwarded", false, "Use the IP address in X-Forwarded-For as source")
  45. trace = flag.Bool("trace", false, "Trace the packet capturing")
  46. apacheLog = flag.String("apache-log", "", "Parse an Apache Log file")
  47. nginxLog = flag.String("nginx-log", "", "Nginx log file to tail")
  48. nginxFormat = flag.String("nginx-format", "", "The nginx log file format")
  49. hostName = flag.String("hostname", "", "Override the captured hostname with this one")
  50. configFile = flag.String("config", "", "The location of the TOML config file")
  51. beQuiet = flag.Bool("quiet", true, "Be quiet")
  52. doVersion = flag.Bool("version", false, "Show version information")
  53. natsEC *nats.EncodedConn
  54. natsJSONEC *nats.EncodedConn
  55. natsErrorChan chan error
  56. natsIsAvailable bool
  57. count uint64
  58. timeout = -1 * time.Second
  59. ipPriv *ip.IP
  60. config Config
  61. // Version contains the program Version, e.g. 1.0.1
  62. Version string
  63. // BuildDate contains the date and time at which the program was compiled
  64. BuildDate string
  65. )
  66. // Config contains the program configuration
  67. type Config struct {
  68. Live bool
  69. Interface string
  70. SnapshotLen int
  71. Filter string
  72. Promiscuous bool
  73. NatsURL string
  74. NatsQueue string
  75. NatsUser string
  76. NatsPassword string
  77. NatsCA string
  78. SleepFor duration
  79. RequestsFile string
  80. UseXForwardedAsSource bool
  81. Quiet bool
  82. Protocol string
  83. Trace bool
  84. ApacheLog string
  85. NginxLog string
  86. NginxLogFormat string
  87. HostName string
  88. }
  89. type duration struct {
  90. time.Duration
  91. }
  92. func (d *duration) UnmarshalText(text []byte) error {
  93. var err error
  94. d.Duration, err = time.ParseDuration(string(text))
  95. return err
  96. }
  97. func (c Config) print() {
  98. fmt.Printf("Live: %t\n", c.Live)
  99. fmt.Printf("Interface: %s\n", c.Interface)
  100. fmt.Printf("SnapshotLen: %d\n", c.SnapshotLen)
  101. fmt.Printf("Filter: %s\n", c.Filter)
  102. fmt.Printf("Promiscuous: %t\n", c.Promiscuous)
  103. fmt.Printf("NatsURL: %s\n", c.NatsURL)
  104. fmt.Printf("NatsQueue: %s\n", c.NatsQueue)
  105. fmt.Printf("NatsUser: %s\n", c.NatsUser)
  106. fmt.Printf("NatsPassword: %s\n", c.NatsPassword)
  107. fmt.Printf("NatsCA: %s\n", c.NatsCA)
  108. fmt.Printf("SleepFor: %s\n", c.SleepFor.String())
  109. fmt.Printf("RequestsFile: %s\n", c.RequestsFile)
  110. fmt.Printf("Apache Log: %s\n", c.ApacheLog)
  111. fmt.Printf("Nginx Log: %s\n", c.NginxLog)
  112. fmt.Printf("Nginx Log Format: %s\n", c.NginxLogFormat)
  113. fmt.Printf("HostName: %s\n", c.HostName)
  114. fmt.Printf("UseXForwardedAsSource: %t\n", c.UseXForwardedAsSource)
  115. fmt.Printf("Protocol: %s\n", c.Protocol)
  116. fmt.Printf("Quiet: %t\n", c.Quiet)
  117. fmt.Printf("Trace: %t\n", c.Trace)
  118. }
  119. func init() {
  120. flag.Parse()
  121. nats.RegisterEncoder(protobuf.PROTOBUF_ENCODER, &protobuf.ProtobufEncoder{})
  122. }
  123. func main() {
  124. if *doVersion {
  125. version()
  126. os.Exit(0)
  127. }
  128. loadConfig()
  129. // Output how many requests per second were sent
  130. if !config.Quiet {
  131. go func(c *uint64) {
  132. for {
  133. fmt.Printf("%d requests per second\n", *c)
  134. *c = 0
  135. time.Sleep(time.Second)
  136. }
  137. }(&count)
  138. }
  139. // NATS
  140. //
  141. if config.NatsURL == "" {
  142. log.Fatal("No NATS URL specified (-nats-url)!")
  143. }
  144. natsIsAvailable = false
  145. natsErrorChan = make(chan error, 1)
  146. err := connectToNATS()
  147. if err != nil {
  148. log.Fatal(err)
  149. }
  150. go natsWatchdog(natsErrorChan)
  151. // What should I do?
  152. if config.RequestsFile != "" {
  153. replayFile()
  154. } else if config.ApacheLog != "" {
  155. apacheLogCapture(config.ApacheLog)
  156. } else if config.Live {
  157. fmt.Printf("live capture (%s, %s) to %s\n", config.Interface, config.Filter, config.NatsURL)
  158. liveCapture()
  159. } else if config.NginxLog != "" && config.NginxLogFormat != "" {
  160. nginxLogCapture(config.NginxLog, config.NginxLogFormat)
  161. }
  162. }
  163. func natsWatchdog(closedChan chan error) {
  164. var lastError error
  165. for err := range closedChan {
  166. if lastError != err {
  167. lastError = err
  168. log.Println(err)
  169. }
  170. if err != nats.ErrConnectionClosed {
  171. continue
  172. }
  173. RECONNECT:
  174. for {
  175. log.Printf("Reconnecting to NATS at %s\n", *natsURL)
  176. err := connectToNATS()
  177. if err == nil {
  178. break RECONNECT
  179. }
  180. time.Sleep(1 * time.Second)
  181. }
  182. }
  183. }
  184. func connectToNATS() error {
  185. var natsConn *nats.Conn
  186. var err error
  187. if config.NatsUser != "" && config.NatsPassword != "" && config.NatsCA != "" {
  188. natsConn, err = nats.Connect(config.NatsURL, nats.UserInfo(config.NatsUser, config.NatsPassword), nats.RootCAs(config.NatsCA))
  189. } else {
  190. if config.NatsPassword != "" && config.NatsUser != "" {
  191. natsConn, err = nats.Connect(config.NatsURL, nats.UserInfo(config.NatsUser, config.NatsPassword))
  192. } else {
  193. natsConn, err = nats.Connect(config.NatsURL)
  194. }
  195. }
  196. if err != nil {
  197. return err
  198. }
  199. natsEC, err = nats.NewEncodedConn(natsConn, protobuf.PROTOBUF_ENCODER)
  200. if err != nil {
  201. return fmt.Errorf("Encoded Connection: %v", err)
  202. }
  203. natsJSONEC, err = nats.NewEncodedConn(natsConn, nats.JSON_ENCODER)
  204. if err != nil {
  205. return fmt.Errorf("Encoded Connection: %v", err)
  206. }
  207. natsIsAvailable = true
  208. return nil
  209. }
  210. func nginxLogCapture(logfile, format string) {
  211. if _, err := os.Stat(logfile); err != nil {
  212. log.Fatalf("%s: %s", logfile, err)
  213. }
  214. t, err := tail.TailFile(logfile, tail.Config{
  215. Follow: true, // follow the file
  216. ReOpen: true, // reopen log file when it gets closed/rotated
  217. Logger: tail.DiscardingLogger, // don't log anything
  218. Location: &tail.SeekInfo{Offset: 0, Whence: 2}, // start at the end of the file
  219. })
  220. if err != nil {
  221. log.Fatalf("%s: %s", logfile, err)
  222. }
  223. // `$remote_addr - $remote_user [$time_local] "$request" $status $body_bytes_sent "$http_referer" "$http_user_agent"`
  224. p := gonx.NewParser(format)
  225. reqRegexp := regexp.MustCompile(`^([A-Z]+)\s+(.+?)\s+(HTTP/\d+\.\d+)$`)
  226. for line := range t.Lines {
  227. l := line.Text
  228. logEntry, err := p.ParseString(l)
  229. if err != nil {
  230. log.Println(err)
  231. continue
  232. }
  233. remote, err := logEntry.Field("remote_addr")
  234. if err != nil {
  235. log.Println(err)
  236. continue
  237. }
  238. // only use the first host in case there are multiple hosts in the log
  239. if cidx := strings.Index(remote, ","); cidx >= 0 {
  240. remote = remote[0:cidx]
  241. }
  242. timestampStr, err := logEntry.Field("time_local")
  243. if err != nil {
  244. log.Println(err)
  245. continue
  246. }
  247. timeStamp, err := time.Parse("02/Jan/2006:15:04:05 -0700", timestampStr)
  248. if err != nil {
  249. log.Println(err)
  250. continue
  251. }
  252. httpRequest, err := logEntry.Field("request")
  253. if err != nil {
  254. log.Println(err)
  255. continue
  256. }
  257. reqData := reqRegexp.FindStringSubmatch(httpRequest)
  258. if len(reqData) < 4 {
  259. log.Printf("reqData is too short: %d instead of 4\n", len(reqData))
  260. continue
  261. }
  262. request := data.Request{
  263. IpSrc: remote,
  264. Origin: remote,
  265. Source: remote,
  266. IpDst: "127.0.0.1",
  267. PortSrc: 0,
  268. PortDst: 0,
  269. TcpSeq: 0,
  270. CreatedAt: timeStamp.Unix(),
  271. Url: reqData[2],
  272. Method: reqData[1],
  273. Host: host,
  274. Protocol: reqData[3],
  275. }
  276. request.Referer, _ = logEntry.Field("http_referer")
  277. request.UserAgent, _ = logEntry.Field("http_user_agent")
  278. if config.Trace {
  279. log.Printf("[%s] %s\n", request.Source, request.Url)
  280. }
  281. count++
  282. publishRequest(config.NatsQueue, &request)
  283. }
  284. }
  285. func apacheLogCapture(logfile string) {
  286. if _, err := os.Stat(logfile); err != nil {
  287. log.Fatalf("%s: %s", logfile, err)
  288. }
  289. t, err := tail.TailFile(logfile, tail.Config{
  290. Follow: true, // follow the file
  291. ReOpen: true, // reopen log file when it gets closed/rotated
  292. Logger: tail.DiscardingLogger, // don't log anything
  293. Location: &tail.SeekInfo{Offset: 0, Whence: 2}, // start at the end of the file
  294. })
  295. if err != nil {
  296. log.Fatalf("%s: %s", logfile, err)
  297. }
  298. var p axslogparser.Parser
  299. parserSet := false
  300. for line := range t.Lines {
  301. l := line.Text
  302. if !parserSet {
  303. p, _, err = axslogparser.GuessParser(l)
  304. if err != nil {
  305. log.Println(err)
  306. continue
  307. }
  308. parserSet = true
  309. }
  310. logEntry, err := p.Parse(l)
  311. if err != nil {
  312. log.Println(err)
  313. continue
  314. }
  315. remote := logEntry.Host
  316. if *useXForwardedAsSource && logEntry.ForwardedFor != "" {
  317. remote = logEntry.ForwardedFor
  318. }
  319. // only use the first host in case there are multiple hosts in the log
  320. if cidx := strings.Index(remote, ","); cidx >= 0 {
  321. remote = remote[0:cidx]
  322. }
  323. // extract the virtual host
  324. var virtualHost string
  325. vhost := logEntry.VirtualHost
  326. if vhost != "" {
  327. vhostAndPort := strings.Split(logEntry.VirtualHost, ":")
  328. virtualHost = vhostAndPort[0]
  329. } else {
  330. if config.HostName != "" {
  331. vhost = config.HostName
  332. } else {
  333. vhost = "[not available]"
  334. }
  335. }
  336. request := data.Request{
  337. IpSrc: remote,
  338. IpDst: "127.0.0.1",
  339. PortSrc: 0,
  340. PortDst: 0,
  341. TcpSeq: 0,
  342. CreatedAt: logEntry.Time.UnixNano(),
  343. Url: logEntry.RequestURI,
  344. Method: logEntry.Method,
  345. Host: virtualHost,
  346. Protocol: logEntry.Protocol,
  347. Origin: remote,
  348. Source: remote,
  349. Referer: logEntry.Referer,
  350. XForwardedFor: logEntry.ForwardedFor,
  351. UserAgent: logEntry.UserAgent,
  352. }
  353. if config.Trace {
  354. log.Printf("[%s] %s\n", request.Source, request.Url)
  355. }
  356. count++
  357. publishRequest(config.NatsQueue, &request)
  358. }
  359. }
  360. func liveCapture() {
  361. ipPriv = ip.NewIP()
  362. // PCAP setup
  363. //
  364. handle, err := pcap.OpenLive(config.Interface, int32(config.SnapshotLen), config.Promiscuous, timeout)
  365. if err != nil {
  366. log.Fatal(err)
  367. }
  368. defer handle.Close()
  369. err = handle.SetBPFFilter(config.Filter)
  370. if err != nil {
  371. log.Fatal(err)
  372. }
  373. packetSource := gopacket.NewPacketSource(handle, handle.LinkType())
  374. for packet := range packetSource.Packets() {
  375. go processPacket(packet)
  376. }
  377. }
  378. func publishRequest(queue string, request *data.Request) {
  379. if !natsIsAvailable {
  380. return
  381. }
  382. if err := natsEC.Publish(config.NatsQueue, request); err != nil {
  383. natsErrorChan <- err
  384. if err == nats.ErrConnectionClosed {
  385. natsIsAvailable = false
  386. }
  387. }
  388. }
  389. // processPacket receives a raw packet from pcap, builds a Request item from it and sends it to the queue
  390. func processPacket(packet gopacket.Packet) {
  391. hasIPv4 := false
  392. var ipSrc, ipDst string
  393. // IPv4
  394. if ipLayer := packet.Layer(layers.LayerTypeIPv4); ipLayer != nil {
  395. ip := ipLayer.(*layers.IPv4)
  396. ipSrc = ip.SrcIP.String()
  397. ipDst = ip.DstIP.String()
  398. hasIPv4 = true
  399. }
  400. // IPv6
  401. if !hasIPv4 {
  402. if ipLayer := packet.Layer(layers.LayerTypeIPv6); ipLayer != nil {
  403. ip := ipLayer.(*layers.IPv6)
  404. ipSrc = ip.SrcIP.String()
  405. ipDst = ip.DstIP.String()
  406. }
  407. }
  408. // TCP
  409. tcpLayer := packet.Layer(layers.LayerTypeTCP)
  410. if tcpLayer == nil {
  411. return
  412. }
  413. tcp, _ := tcpLayer.(*layers.TCP)
  414. portSrc := tcp.SrcPort
  415. portDst := tcp.DstPort
  416. sequence := tcp.Seq
  417. applicationLayer := packet.ApplicationLayer()
  418. if applicationLayer == nil {
  419. return
  420. }
  421. count++
  422. if len(applicationLayer.Payload()) < 50 {
  423. log.Println("application layer too small!")
  424. return
  425. }
  426. request := data.Request{
  427. IpSrc: ipSrc,
  428. IpDst: ipDst,
  429. PortSrc: uint32(portSrc),
  430. PortDst: uint32(portDst),
  431. TcpSeq: uint32(sequence),
  432. CreatedAt: packet.Metadata().CaptureInfo.Timestamp.UnixNano(),
  433. }
  434. switch config.Protocol {
  435. case "http":
  436. err := processHTTP(&request, applicationLayer.Payload())
  437. if err != nil {
  438. log.Println(err)
  439. return
  440. }
  441. case "ajp13":
  442. err := processAJP13(&request, applicationLayer.Payload())
  443. if err != nil {
  444. log.Println(err)
  445. return
  446. }
  447. }
  448. if config.UseXForwardedAsSource && request.XForwardedFor != "" {
  449. if strings.Contains(request.XForwardedFor, ",") {
  450. ips := strings.Split(request.XForwardedFor, ",")
  451. for i := len(ips) - 1; i >= 0; i-- {
  452. ipRaw := strings.TrimSpace(ips[i])
  453. ipAddr := net.ParseIP(ipRaw)
  454. if ipAddr != nil && !ipPriv.IsPrivate(ipAddr) {
  455. request.Source = ipRaw
  456. break
  457. }
  458. }
  459. } else {
  460. ipAddr := net.ParseIP(strings.TrimSpace(request.XForwardedFor))
  461. if !ipPriv.IsPrivate(ipAddr) {
  462. request.Source = request.XForwardedFor
  463. }
  464. }
  465. }
  466. if request.Source == request.IpSrc && request.XRealIP != "" {
  467. request.Source = request.XRealIP
  468. }
  469. if config.Trace {
  470. log.Printf("[%s] %s\n", request.Source, request.Url)
  471. }
  472. publishRequest(config.NatsQueue, &request)
  473. }
  474. func processAJP13(request *data.Request, appData []byte) error {
  475. a, err := ajp13.Parse(appData)
  476. if err != nil {
  477. return fmt.Errorf("Failed to parse AJP13 request: %s", err)
  478. }
  479. request.Url = a.URI
  480. request.Method = a.Method()
  481. request.Host = a.Server
  482. request.Protocol = a.Version
  483. request.Origin = a.RemoteAddr.String()
  484. request.Source = a.RemoteAddr.String()
  485. if v, ok := a.Header("Referer"); ok {
  486. request.Referer = v
  487. }
  488. if v, ok := a.Header("Connection"); ok {
  489. request.Connection = v
  490. }
  491. if v, ok := a.Header("X-Forwarded-For"); ok {
  492. request.XForwardedFor = v
  493. }
  494. if v, ok := a.Header("X-Real-IP"); ok {
  495. request.XRealIP = v
  496. }
  497. if v, ok := a.Header("X-Requested-With"); ok {
  498. request.XRequestedWith = v
  499. }
  500. if v, ok := a.Header("Accept-Encoding"); ok {
  501. request.AcceptEncoding = v
  502. }
  503. if v, ok := a.Header("Accept-Language"); ok {
  504. request.AcceptLanguage = v
  505. }
  506. if v, ok := a.Header("User-Agent"); ok {
  507. request.UserAgent = v
  508. }
  509. if v, ok := a.Header("Accept"); ok {
  510. request.Accept = v
  511. }
  512. if v, ok := a.Header("Cookie"); ok {
  513. request.Cookie = v
  514. }
  515. if v, ok := a.Header("X-Forwarded-Host"); ok {
  516. if v != request.Host {
  517. request.Host = v
  518. }
  519. }
  520. return nil
  521. }
  522. func processHTTP(request *data.Request, appData []byte) error {
  523. reader := bufio.NewReader(strings.NewReader(string(appData)))
  524. req, err := http.ReadRequest(reader)
  525. if err != nil {
  526. return fmt.Errorf("Failed to parse HTTP header: %s", err)
  527. }
  528. request.Url = req.URL.String()
  529. request.Method = req.Method
  530. request.Referer = req.Referer()
  531. request.Host = req.Host
  532. request.Protocol = req.Proto
  533. request.Origin = request.Host
  534. if _, ok := req.Header["Connection"]; ok {
  535. request.Connection = req.Header["Connection"][0]
  536. }
  537. if _, ok := req.Header["X-Forwarded-For"]; ok {
  538. request.XForwardedFor = req.Header["X-Forwarded-For"][0]
  539. }
  540. // CloudFlare: override X-Forwarded for since it is tainted by cloudflare
  541. if _, ok := req.Header["True-Client-Ip"]; ok {
  542. request.XForwardedFor = req.Header["True-Client-Ip"][0]
  543. }
  544. if _, ok := req.Header["X-Real-Ip"]; ok {
  545. request.XRealIP = req.Header["X-Real-Ip"][0]
  546. }
  547. if _, ok := req.Header["X-Requested-With"]; ok {
  548. request.XRequestedWith = req.Header["X-Requested-With"][0]
  549. }
  550. if _, ok := req.Header["Accept-Encoding"]; ok {
  551. request.AcceptEncoding = req.Header["Accept-Encoding"][0]
  552. }
  553. if _, ok := req.Header["Accept-Language"]; ok {
  554. request.AcceptLanguage = req.Header["Accept-Language"][0]
  555. }
  556. if _, ok := req.Header["User-Agent"]; ok {
  557. request.UserAgent = req.Header["User-Agent"][0]
  558. }
  559. if _, ok := req.Header["Accept"]; ok {
  560. request.Accept = req.Header["Accept"][0]
  561. }
  562. if _, ok := req.Header["Cookie"]; ok {
  563. request.Cookie = req.Header["Cookie"][0]
  564. }
  565. request.Source = request.IpSrc
  566. return nil
  567. }
  568. // replayFile takes a file containing a list of requests (SourceIP Url) and queues the requests
  569. // e.g.
  570. // 157.55.39.229 /gross-gerau/12012260-beate-anstatt
  571. // 103.232.100.98 /weinsheim-eifel/13729444-plus-warenhandelsges-mbh
  572. func replayFile() {
  573. var req data.Request
  574. var startTs time.Time
  575. var endTs time.Time
  576. rand.Seed(time.Now().UnixNano())
  577. for {
  578. fh, err := os.Open(config.RequestsFile)
  579. if err != nil {
  580. log.Fatalf("Failed to open request file '%s': %s", config.RequestsFile, err)
  581. }
  582. c := csv.NewReader(fh)
  583. c.Comma = ' '
  584. for {
  585. if config.SleepFor.Duration > time.Nanosecond {
  586. startTs = time.Now()
  587. }
  588. r, err := c.Read()
  589. if err == io.EOF {
  590. break
  591. }
  592. if err != nil {
  593. log.Println(err)
  594. continue
  595. }
  596. req.IpSrc = r[0]
  597. req.Source = r[0]
  598. req.Url = r[1]
  599. req.UserAgent = "Munch/1.0"
  600. req.Host = "demo.scraperwall.com"
  601. req.CreatedAt = time.Now().UnixNano()
  602. publishRequest(config.NatsQueue, &req)
  603. if strings.Index(r[1], ".") < 0 {
  604. hash := sha1.New()
  605. io.WriteString(hash, r[0])
  606. fp := data.Fingerprint{
  607. ClientID: "scw",
  608. Fingerprint: fmt.Sprintf("%x", hash.Sum(nil)),
  609. Remote: r[0],
  610. Url: r[1],
  611. Source: r[0],
  612. CreatedAt: time.Now(),
  613. }
  614. if strings.HasPrefix(r[0], "50.31.") {
  615. fp.Fingerprint = "a1f2c2ee560ce6580d66d451a9c8dfbf"
  616. natsJSONEC.Publish("fingerprints_scw", fp)
  617. } else if rand.Intn(10) < 5 {
  618. natsJSONEC.Publish("fingerprints_scw", fp)
  619. }
  620. }
  621. count++
  622. if config.SleepFor.Duration >= time.Nanosecond {
  623. endTs = time.Now()
  624. if endTs.Before(startTs.Add(config.SleepFor.Duration)) {
  625. time.Sleep(config.SleepFor.Duration - endTs.Sub(startTs))
  626. }
  627. }
  628. }
  629. }
  630. }
  631. func loadConfig() {
  632. // initialize with values from the command line / environment
  633. config.Live = *doLiveCapture
  634. config.Interface = *iface
  635. config.SnapshotLen = *snapshotLen
  636. config.Filter = *filter
  637. config.Promiscuous = *promiscuous
  638. config.NatsURL = *natsURL
  639. config.NatsQueue = *natsQueue
  640. config.NatsUser = *natsUser
  641. config.NatsPassword = *natsPassword
  642. config.NatsCA = *natsCA
  643. config.SleepFor.Duration = *sleepFor
  644. config.RequestsFile = *requestsFile
  645. config.UseXForwardedAsSource = *useXForwardedAsSource
  646. config.Protocol = *protocol
  647. config.ApacheLog = *apacheLog
  648. config.NginxLog = *nginxLog
  649. config.NginxLogFormat = *nginxFormat
  650. config.HostName = *hostName
  651. config.Quiet = *beQuiet
  652. config.Trace = *trace
  653. if *configFile == "" {
  654. return
  655. }
  656. _, err := os.Stat(*configFile)
  657. if err != nil {
  658. log.Printf("%s: %s\n", *configFile, err)
  659. return
  660. }
  661. if _, err = toml.DecodeFile(*configFile, &config); err != nil {
  662. log.Printf("%s: %s\n", *configFile, err)
  663. }
  664. if !config.Quiet {
  665. config.print()
  666. }
  667. }
  668. // version outputs build information
  669. func version() {
  670. fmt.Printf("munchclient %s, built on %s\n", Version, BuildDate)
  671. }