main.go 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785
  1. package main
  2. import (
  3. "bufio"
  4. "crypto/sha1"
  5. "encoding/csv"
  6. "flag"
  7. "fmt"
  8. "io"
  9. "log"
  10. "math/rand"
  11. "net"
  12. "net/http"
  13. "os"
  14. "regexp"
  15. "strings"
  16. "time"
  17. "github.com/BurntSushi/toml"
  18. "github.com/Songmu/axslogparser"
  19. "github.com/google/gopacket"
  20. "github.com/google/gopacket/layers"
  21. "github.com/google/gopacket/pcap"
  22. "github.com/hpcloud/tail"
  23. "github.com/nats-io/nats"
  24. "github.com/nats-io/nats/encoders/protobuf"
  25. "github.com/satyrius/gonx"
  26. "git.scraperwall.com/scw/ajp13"
  27. "git.scraperwall.com/scw/data"
  28. "git.scraperwall.com/scw/ip"
  29. )
  30. var (
  31. doLiveCapture = flag.Bool("live", false, "Capture data in real time from a given interface")
  32. iface = flag.String("interface", "eth0", "Interface to get packets from")
  33. snapshotLen = flag.Int("snapshot-len", 8192, "Snapshot Length in Bytes")
  34. filter = flag.String("filter", "tcp", "PCAP filter expression")
  35. promiscuous = flag.Bool("promiscuous", false, "Switch interface into promiscuous mode?")
  36. natsURL = flag.String("nats-url", "nats://127.0.0.1:4222", "The URL of the NATS server")
  37. natsUser = flag.String("nats-user", "", "The user for NATS authentication")
  38. natsPassword = flag.String("nats-password", "", "The password for NATS authentication")
  39. natsQueue = flag.String("nats-queue", "requests", "The NATS queue name")
  40. natsCA = flag.String("nats-ca", "", "CA chain for NATS TLS")
  41. sleepFor = flag.Duration("sleep", 0, "Sleep this long between sending data (only when replaying a file)")
  42. requestsFile = flag.String("requests", "", "CSV file containing requests (IP and URL)")
  43. protocol = flag.String("protocol", "http", "which protocol to parse: http or ajp13")
  44. useXForwardedAsSource = flag.Bool("use-x-forwarded", false, "Use the IP address in X-Forwarded-For as source")
  45. trace = flag.Bool("trace", false, "Trace the packet capturing")
  46. apacheLog = flag.String("apache-log", "", "Parse an Apache Log file")
  47. nginxLog = flag.String("nginx-log", "", "Nginx log file to tail")
  48. nginxFormat = flag.String("nginx-format", "", "The nginx log file format")
  49. configFile = flag.String("config", "", "The location of the TOML config file")
  50. beQuiet = flag.Bool("quiet", true, "Be quiet")
  51. doVersion = flag.Bool("version", false, "Show version information")
  52. natsEC *nats.EncodedConn
  53. natsJSONEC *nats.EncodedConn
  54. natsErrorChan chan error
  55. natsIsAvailable bool
  56. count uint64
  57. timeout = -1 * time.Second
  58. ipPriv *ip.IP
  59. config Config
  60. // Version contains the program Version, e.g. 1.0.1
  61. Version string
  62. // BuildDate contains the date and time at which the program was compiled
  63. BuildDate string
  64. )
  65. // Config contains the program configuration
  66. type Config struct {
  67. Live bool
  68. Interface string
  69. SnapshotLen int
  70. Filter string
  71. Promiscuous bool
  72. NatsURL string
  73. NatsQueue string
  74. NatsUser string
  75. NatsPassword string
  76. NatsCA string
  77. SleepFor duration
  78. RequestsFile string
  79. UseXForwardedAsSource bool
  80. Quiet bool
  81. Protocol string
  82. Trace bool
  83. ApacheLog string
  84. NginxLog string
  85. NginxLogFormat string
  86. }
  87. type duration struct {
  88. time.Duration
  89. }
  90. func (d *duration) UnmarshalText(text []byte) error {
  91. var err error
  92. d.Duration, err = time.ParseDuration(string(text))
  93. return err
  94. }
  95. func (c Config) print() {
  96. fmt.Printf("Live: %t\n", c.Live)
  97. fmt.Printf("Interface: %s\n", c.Interface)
  98. fmt.Printf("SnapshotLen: %d\n", c.SnapshotLen)
  99. fmt.Printf("Filter: %s\n", c.Filter)
  100. fmt.Printf("Promiscuous: %t\n", c.Promiscuous)
  101. fmt.Printf("NatsURL: %s\n", c.NatsURL)
  102. fmt.Printf("NatsQueue: %s\n", c.NatsQueue)
  103. fmt.Printf("NatsUser: %s\n", c.NatsUser)
  104. fmt.Printf("NatsPassword: %s\n", c.NatsPassword)
  105. fmt.Printf("NatsCA: %s\n", c.NatsCA)
  106. fmt.Printf("SleepFor: %s\n", c.SleepFor.String())
  107. fmt.Printf("RequestsFile: %s\n", c.RequestsFile)
  108. fmt.Printf("Apache Log: %s\n", c.ApacheLog)
  109. fmt.Printf("Nginx Log: %s\n", c.NginxLog)
  110. fmt.Printf("Nginx Log Format: %s\n", c.NginxLogFormat)
  111. fmt.Printf("UseXForwardedAsSource: %t\n", c.UseXForwardedAsSource)
  112. fmt.Printf("Protocol: %s\n", c.Protocol)
  113. fmt.Printf("Quiet: %t\n", c.Quiet)
  114. fmt.Printf("Trace: %t\n", c.Trace)
  115. }
  116. func init() {
  117. flag.Parse()
  118. nats.RegisterEncoder(protobuf.PROTOBUF_ENCODER, &protobuf.ProtobufEncoder{})
  119. }
  120. func main() {
  121. if *doVersion {
  122. version()
  123. os.Exit(0)
  124. }
  125. loadConfig()
  126. // Output how many requests per second were sent
  127. if !config.Quiet {
  128. go func(c *uint64) {
  129. for {
  130. fmt.Printf("%d requests per second\n", *c)
  131. *c = 0
  132. time.Sleep(time.Second)
  133. }
  134. }(&count)
  135. }
  136. // NATS
  137. //
  138. if config.NatsURL == "" {
  139. log.Fatal("No NATS URL specified (-nats-url)!")
  140. }
  141. natsIsAvailable = false
  142. natsErrorChan = make(chan error, 1)
  143. err := connectToNATS()
  144. if err != nil {
  145. log.Fatal(err)
  146. }
  147. go natsWatchdog(natsErrorChan)
  148. // What should I do?
  149. if config.RequestsFile != "" {
  150. replayFile()
  151. } else if config.ApacheLog != "" {
  152. apacheLogCapture(config.ApacheLog)
  153. } else if config.Live {
  154. fmt.Printf("live capture (%s, %s) to %s\n", config.Interface, config.Filter, config.NatsURL)
  155. liveCapture()
  156. } else if config.NginxLog != "" && config.NginxLogFormat != "" {
  157. nginxLogCapture(config.NginxLog, config.NginxLogFormat)
  158. }
  159. }
  160. func natsWatchdog(closedChan chan error) {
  161. var lastError error
  162. for err := range closedChan {
  163. if lastError != err {
  164. lastError = err
  165. log.Println(err)
  166. }
  167. if err != nats.ErrConnectionClosed {
  168. continue
  169. }
  170. RECONNECT:
  171. for {
  172. log.Printf("Reconnecting to NATS at %s\n", *natsURL)
  173. err := connectToNATS()
  174. if err == nil {
  175. break RECONNECT
  176. }
  177. time.Sleep(1 * time.Second)
  178. }
  179. }
  180. }
  181. func connectToNATS() error {
  182. var natsConn *nats.Conn
  183. var err error
  184. if config.NatsUser != "" && config.NatsPassword != "" && config.NatsCA != "" {
  185. natsConn, err = nats.Connect(config.NatsURL, nats.UserInfo(config.NatsUser, config.NatsPassword), nats.RootCAs(config.NatsCA))
  186. } else {
  187. if config.NatsPassword != "" && config.NatsUser != "" {
  188. natsConn, err = nats.Connect(config.NatsURL, nats.UserInfo(config.NatsUser, config.NatsPassword))
  189. } else {
  190. natsConn, err = nats.Connect(config.NatsURL)
  191. }
  192. }
  193. if err != nil {
  194. return err
  195. }
  196. natsEC, err = nats.NewEncodedConn(natsConn, protobuf.PROTOBUF_ENCODER)
  197. if err != nil {
  198. return fmt.Errorf("Encoded Connection: %v", err)
  199. }
  200. natsJSONEC, err = nats.NewEncodedConn(natsConn, nats.JSON_ENCODER)
  201. if err != nil {
  202. return fmt.Errorf("Encoded Connection: %v", err)
  203. }
  204. natsIsAvailable = true
  205. return nil
  206. }
  207. func nginxLogCapture(logfile, format string) {
  208. if _, err := os.Stat(logfile); err != nil {
  209. log.Fatalf("%s: %s", logfile, err)
  210. }
  211. t, err := tail.TailFile(logfile, tail.Config{
  212. Follow: true, // follow the file
  213. ReOpen: true, // reopen log file when it gets closed/rotated
  214. Logger: tail.DiscardingLogger, // don't log anything
  215. Location: &tail.SeekInfo{Offset: 0, Whence: 2}, // start at the end of the file
  216. })
  217. if err != nil {
  218. log.Fatalf("%s: %s", logfile, err)
  219. }
  220. // `$remote_addr - $remote_user [$time_local] "$request" $status $body_bytes_sent "$http_referer" "$http_user_agent"`
  221. p := gonx.NewParser(format)
  222. reqRegexp := regexp.MustCompile(`^([A-Z]+)\s+(.+?)\s+(HTTP/\d+\.\d+)$`)
  223. for line := range t.Lines {
  224. l := line.Text
  225. logEntry, err := p.ParseString(l)
  226. if err != nil {
  227. log.Println(err)
  228. continue
  229. }
  230. remote, err := logEntry.Field("remote_addr")
  231. if err != nil {
  232. log.Println(err)
  233. continue
  234. }
  235. // only use the first host in case there are multiple hosts in the log
  236. if cidx := strings.Index(remote, ","); cidx >= 0 {
  237. remote = remote[0:cidx]
  238. }
  239. timestampStr, err := logEntry.Field("time_local")
  240. if err != nil {
  241. log.Println(err)
  242. continue
  243. }
  244. timeStamp, err := time.Parse("02/Jan/2006:15:04:05 -0700", timestampStr)
  245. if err != nil {
  246. log.Println(err)
  247. continue
  248. }
  249. httpRequest, err := logEntry.Field("request")
  250. if err != nil {
  251. log.Println(err)
  252. continue
  253. }
  254. reqData := reqRegexp.FindStringSubmatch(httpRequest)
  255. if len(reqData) < 4 {
  256. log.Printf("reqData is too short: %d instead of 4\n", len(reqData))
  257. continue
  258. }
  259. request := data.Request{
  260. IpSrc: remote,
  261. Origin: remote,
  262. Source: remote,
  263. IpDst: "127.0.0.1",
  264. PortSrc: 0,
  265. PortDst: 0,
  266. TcpSeq: 0,
  267. CreatedAt: timeStamp.Unix(),
  268. Url: reqData[2],
  269. Method: reqData[1],
  270. Host: "[not available]",
  271. Protocol: reqData[3],
  272. }
  273. request.Referer, _ = logEntry.Field("http_referer")
  274. request.UserAgent, _ = logEntry.Field("http_user_agent")
  275. if config.Trace {
  276. log.Printf("[%s] %s\n", request.Source, request.Url)
  277. }
  278. count++
  279. publishRequest(config.NatsQueue, &request)
  280. }
  281. }
  282. func apacheLogCapture(logfile string) {
  283. if _, err := os.Stat(logfile); err != nil {
  284. log.Fatalf("%s: %s", logfile, err)
  285. }
  286. t, err := tail.TailFile(logfile, tail.Config{
  287. Follow: true, // follow the file
  288. ReOpen: true, // reopen log file when it gets closed/rotated
  289. Logger: tail.DiscardingLogger, // don't log anything
  290. Location: &tail.SeekInfo{Offset: 0, Whence: 2}, // start at the end of the file
  291. })
  292. if err != nil {
  293. log.Fatalf("%s: %s", logfile, err)
  294. }
  295. var p axslogparser.Parser
  296. parserSet := false
  297. for line := range t.Lines {
  298. l := line.Text
  299. if !parserSet {
  300. p, _, err = axslogparser.GuessParser(l)
  301. if err != nil {
  302. log.Println(err)
  303. continue
  304. }
  305. parserSet = true
  306. }
  307. logEntry, err := p.Parse(l)
  308. if err != nil {
  309. log.Println(err)
  310. continue
  311. }
  312. remote := logEntry.Host
  313. if *useXForwardedAsSource && logEntry.ForwardedFor != "" {
  314. remote = logEntry.ForwardedFor
  315. }
  316. // only use the first host in case there are multiple hosts in the log
  317. if cidx := strings.Index(remote, ","); cidx >= 0 {
  318. remote = remote[0:cidx]
  319. }
  320. // extract the virtual host
  321. var virtualHost string
  322. vhost := logEntry.VirtualHost
  323. if vhost != "" {
  324. vhostAndPort := strings.Split(logEntry.VirtualHost, ":")
  325. virtualHost = vhostAndPort[0]
  326. } else {
  327. vhost = "[not available]"
  328. }
  329. request := data.Request{
  330. IpSrc: remote,
  331. IpDst: "127.0.0.1",
  332. PortSrc: 0,
  333. PortDst: 0,
  334. TcpSeq: 0,
  335. CreatedAt: logEntry.Time.UnixNano(),
  336. Url: logEntry.RequestURI,
  337. Method: logEntry.Method,
  338. Host: virtualHost,
  339. Protocol: logEntry.Protocol,
  340. Origin: remote,
  341. Source: remote,
  342. Referer: logEntry.Referer,
  343. XForwardedFor: logEntry.ForwardedFor,
  344. UserAgent: logEntry.UserAgent,
  345. }
  346. if config.Trace {
  347. log.Printf("[%s] %s\n", request.Source, request.Url)
  348. }
  349. count++
  350. publishRequest(config.NatsQueue, &request)
  351. }
  352. }
  353. func liveCapture() {
  354. ipPriv = ip.NewIP()
  355. // PCAP setup
  356. //
  357. handle, err := pcap.OpenLive(config.Interface, int32(config.SnapshotLen), config.Promiscuous, timeout)
  358. if err != nil {
  359. log.Fatal(err)
  360. }
  361. defer handle.Close()
  362. err = handle.SetBPFFilter(config.Filter)
  363. if err != nil {
  364. log.Fatal(err)
  365. }
  366. packetSource := gopacket.NewPacketSource(handle, handle.LinkType())
  367. for packet := range packetSource.Packets() {
  368. go processPacket(packet)
  369. }
  370. }
  371. func publishRequest(queue string, request *data.Request) {
  372. if !natsIsAvailable {
  373. return
  374. }
  375. if err := natsEC.Publish(config.NatsQueue, request); err != nil {
  376. natsErrorChan <- err
  377. if err == nats.ErrConnectionClosed {
  378. natsIsAvailable = false
  379. }
  380. }
  381. }
  382. // processPacket receives a raw packet from pcap, builds a Request item from it and sends it to the queue
  383. func processPacket(packet gopacket.Packet) {
  384. hasIPv4 := false
  385. var ipSrc, ipDst string
  386. // IPv4
  387. if ipLayer := packet.Layer(layers.LayerTypeIPv4); ipLayer != nil {
  388. ip := ipLayer.(*layers.IPv4)
  389. ipSrc = ip.SrcIP.String()
  390. ipDst = ip.DstIP.String()
  391. hasIPv4 = true
  392. }
  393. // IPv6
  394. if !hasIPv4 {
  395. if ipLayer := packet.Layer(layers.LayerTypeIPv6); ipLayer != nil {
  396. ip := ipLayer.(*layers.IPv6)
  397. ipSrc = ip.SrcIP.String()
  398. ipDst = ip.DstIP.String()
  399. }
  400. }
  401. // TCP
  402. tcpLayer := packet.Layer(layers.LayerTypeTCP)
  403. if tcpLayer == nil {
  404. return
  405. }
  406. tcp, _ := tcpLayer.(*layers.TCP)
  407. portSrc := tcp.SrcPort
  408. portDst := tcp.DstPort
  409. sequence := tcp.Seq
  410. applicationLayer := packet.ApplicationLayer()
  411. if applicationLayer == nil {
  412. return
  413. }
  414. count++
  415. if len(applicationLayer.Payload()) < 50 {
  416. log.Println("application layer too small!")
  417. return
  418. }
  419. request := data.Request{
  420. IpSrc: ipSrc,
  421. IpDst: ipDst,
  422. PortSrc: uint32(portSrc),
  423. PortDst: uint32(portDst),
  424. TcpSeq: uint32(sequence),
  425. CreatedAt: packet.Metadata().CaptureInfo.Timestamp.UnixNano(),
  426. }
  427. switch config.Protocol {
  428. case "http":
  429. err := processHTTP(&request, applicationLayer.Payload())
  430. if err != nil {
  431. log.Println(err)
  432. return
  433. }
  434. case "ajp13":
  435. err := processAJP13(&request, applicationLayer.Payload())
  436. if err != nil {
  437. log.Println(err)
  438. return
  439. }
  440. }
  441. if config.UseXForwardedAsSource && request.XForwardedFor != "" {
  442. if strings.Contains(request.XForwardedFor, ",") {
  443. ips := strings.Split(request.XForwardedFor, ",")
  444. for i := len(ips) - 1; i >= 0; i-- {
  445. ipRaw := strings.TrimSpace(ips[i])
  446. ipAddr := net.ParseIP(ipRaw)
  447. if ipAddr != nil && !ipPriv.IsPrivate(ipAddr) {
  448. request.Source = ipRaw
  449. break
  450. }
  451. }
  452. } else {
  453. ipAddr := net.ParseIP(strings.TrimSpace(request.XForwardedFor))
  454. if !ipPriv.IsPrivate(ipAddr) {
  455. request.Source = request.XForwardedFor
  456. }
  457. }
  458. }
  459. if request.Source == request.IpSrc && request.XRealIP != "" {
  460. request.Source = request.XRealIP
  461. }
  462. if config.Trace {
  463. log.Printf("[%s] %s\n", request.Source, request.Url)
  464. }
  465. publishRequest(config.NatsQueue, &request)
  466. }
  467. func processAJP13(request *data.Request, appData []byte) error {
  468. a, err := ajp13.Parse(appData)
  469. if err != nil {
  470. return fmt.Errorf("Failed to parse AJP13 request: %s", err)
  471. }
  472. request.Url = a.URI
  473. request.Method = a.Method()
  474. request.Host = a.Server
  475. request.Protocol = a.Version
  476. request.Origin = a.RemoteAddr.String()
  477. request.Source = a.RemoteAddr.String()
  478. if v, ok := a.Header("Referer"); ok {
  479. request.Referer = v
  480. }
  481. if v, ok := a.Header("Connection"); ok {
  482. request.Connection = v
  483. }
  484. if v, ok := a.Header("X-Forwarded-For"); ok {
  485. request.XForwardedFor = v
  486. }
  487. if v, ok := a.Header("X-Real-IP"); ok {
  488. request.XRealIP = v
  489. }
  490. if v, ok := a.Header("X-Requested-With"); ok {
  491. request.XRequestedWith = v
  492. }
  493. if v, ok := a.Header("Accept-Encoding"); ok {
  494. request.AcceptEncoding = v
  495. }
  496. if v, ok := a.Header("Accept-Language"); ok {
  497. request.AcceptLanguage = v
  498. }
  499. if v, ok := a.Header("User-Agent"); ok {
  500. request.UserAgent = v
  501. }
  502. if v, ok := a.Header("Accept"); ok {
  503. request.Accept = v
  504. }
  505. if v, ok := a.Header("Cookie"); ok {
  506. request.Cookie = v
  507. }
  508. if v, ok := a.Header("X-Forwarded-Host"); ok {
  509. if v != request.Host {
  510. request.Host = v
  511. }
  512. }
  513. return nil
  514. }
  515. func processHTTP(request *data.Request, appData []byte) error {
  516. reader := bufio.NewReader(strings.NewReader(string(appData)))
  517. req, err := http.ReadRequest(reader)
  518. if err != nil {
  519. return fmt.Errorf("Failed to parse HTTP header: %s", err)
  520. }
  521. request.Url = req.URL.String()
  522. request.Method = req.Method
  523. request.Referer = req.Referer()
  524. request.Host = req.Host
  525. request.Protocol = req.Proto
  526. request.Origin = request.Host
  527. if _, ok := req.Header["Connection"]; ok {
  528. request.Connection = req.Header["Connection"][0]
  529. }
  530. if _, ok := req.Header["X-Forwarded-For"]; ok {
  531. request.XForwardedFor = req.Header["X-Forwarded-For"][0]
  532. }
  533. // CloudFlare: override X-Forwarded for since it is tainted by cloudflare
  534. if _, ok := req.Header["True-Client-Ip"]; ok {
  535. request.XForwardedFor = req.Header["True-Client-Ip"][0]
  536. }
  537. if _, ok := req.Header["X-Real-Ip"]; ok {
  538. request.XRealIP = req.Header["X-Real-Ip"][0]
  539. }
  540. if _, ok := req.Header["X-Requested-With"]; ok {
  541. request.XRequestedWith = req.Header["X-Requested-With"][0]
  542. }
  543. if _, ok := req.Header["Accept-Encoding"]; ok {
  544. request.AcceptEncoding = req.Header["Accept-Encoding"][0]
  545. }
  546. if _, ok := req.Header["Accept-Language"]; ok {
  547. request.AcceptLanguage = req.Header["Accept-Language"][0]
  548. }
  549. if _, ok := req.Header["User-Agent"]; ok {
  550. request.UserAgent = req.Header["User-Agent"][0]
  551. }
  552. if _, ok := req.Header["Accept"]; ok {
  553. request.Accept = req.Header["Accept"][0]
  554. }
  555. if _, ok := req.Header["Cookie"]; ok {
  556. request.Cookie = req.Header["Cookie"][0]
  557. }
  558. request.Source = request.IpSrc
  559. return nil
  560. }
  561. // replayFile takes a file containing a list of requests (SourceIP Url) and queues the requests
  562. // e.g.
  563. // 157.55.39.229 /gross-gerau/12012260-beate-anstatt
  564. // 103.232.100.98 /weinsheim-eifel/13729444-plus-warenhandelsges-mbh
  565. func replayFile() {
  566. var req data.Request
  567. var startTs time.Time
  568. var endTs time.Time
  569. rand.Seed(time.Now().UnixNano())
  570. for {
  571. fh, err := os.Open(config.RequestsFile)
  572. if err != nil {
  573. log.Fatalf("Failed to open request file '%s': %s", config.RequestsFile, err)
  574. }
  575. c := csv.NewReader(fh)
  576. c.Comma = ' '
  577. for {
  578. if config.SleepFor.Duration > time.Nanosecond {
  579. startTs = time.Now()
  580. }
  581. r, err := c.Read()
  582. if err == io.EOF {
  583. break
  584. }
  585. if err != nil {
  586. log.Println(err)
  587. continue
  588. }
  589. req.IpSrc = r[0]
  590. req.Source = r[0]
  591. req.Url = r[1]
  592. req.UserAgent = "Munch/1.0"
  593. req.Host = "demo.scraperwall.com"
  594. req.CreatedAt = time.Now().UnixNano()
  595. publishRequest(config.NatsQueue, &req)
  596. if strings.Index(r[1], ".") < 0 {
  597. hash := sha1.New()
  598. io.WriteString(hash, r[0])
  599. fp := data.Fingerprint{
  600. ClientID: "scw",
  601. Fingerprint: fmt.Sprintf("%x", hash.Sum(nil)),
  602. Remote: r[0],
  603. Url: r[1],
  604. Source: r[0],
  605. CreatedAt: time.Now(),
  606. }
  607. if strings.HasPrefix(r[0], "50.31.") {
  608. fp.Fingerprint = "a1f2c2ee560ce6580d66d451a9c8dfbf"
  609. natsJSONEC.Publish("fingerprints_scw", fp)
  610. } else if rand.Intn(10) < 5 {
  611. natsJSONEC.Publish("fingerprints_scw", fp)
  612. }
  613. }
  614. count++
  615. if config.SleepFor.Duration >= time.Nanosecond {
  616. endTs = time.Now()
  617. if endTs.Before(startTs.Add(config.SleepFor.Duration)) {
  618. time.Sleep(config.SleepFor.Duration - endTs.Sub(startTs))
  619. }
  620. }
  621. }
  622. }
  623. }
  624. func loadConfig() {
  625. // initialize with values from the command line / environment
  626. config.Live = *doLiveCapture
  627. config.Interface = *iface
  628. config.SnapshotLen = *snapshotLen
  629. config.Filter = *filter
  630. config.Promiscuous = *promiscuous
  631. config.NatsURL = *natsURL
  632. config.NatsQueue = *natsQueue
  633. config.NatsUser = *natsUser
  634. config.NatsPassword = *natsPassword
  635. config.NatsCA = *natsCA
  636. config.SleepFor.Duration = *sleepFor
  637. config.RequestsFile = *requestsFile
  638. config.UseXForwardedAsSource = *useXForwardedAsSource
  639. config.Protocol = *protocol
  640. config.ApacheLog = *apacheLog
  641. config.NginxLog = *nginxLog
  642. config.NginxLogFormat = *nginxFormat
  643. config.Quiet = *beQuiet
  644. config.Trace = *trace
  645. if *configFile == "" {
  646. return
  647. }
  648. _, err := os.Stat(*configFile)
  649. if err != nil {
  650. log.Printf("%s: %s\n", *configFile, err)
  651. return
  652. }
  653. if _, err = toml.DecodeFile(*configFile, &config); err != nil {
  654. log.Printf("%s: %s\n", *configFile, err)
  655. }
  656. if !config.Quiet {
  657. config.print()
  658. }
  659. }
  660. // version outputs build information
  661. func version() {
  662. fmt.Printf("munchclient %s, built on %s\n", Version, BuildDate)
  663. }