main.go 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747
  1. package main
  2. import (
  3. "bufio"
  4. "bytes"
  5. "crypto/sha1"
  6. "encoding/csv"
  7. "encoding/json"
  8. "flag"
  9. "fmt"
  10. "io"
  11. "log"
  12. "math/rand"
  13. "net"
  14. "net/http"
  15. "os"
  16. "strings"
  17. "time"
  18. "github.com/BurntSushi/toml"
  19. "github.com/google/gopacket"
  20. "github.com/google/gopacket/layers"
  21. "github.com/google/gopacket/pcap"
  22. "github.com/nats-io/nats"
  23. "github.com/nats-io/nats/encoders/protobuf"
  24. "git.scraperwall.com/scw/ajp13"
  25. "git.scraperwall.com/scw/data"
  26. "git.scraperwall.com/scw/ip"
  27. )
  28. var (
  29. doLiveCapture = flag.Bool("live", false, "Capture data in real time from a given interface")
  30. iface = flag.String("interface", "eth0", "Interface to get packets from")
  31. snapshotLen = flag.Int("snapshot-len", 8192, "Snapshot Length in Bytes")
  32. filter = flag.String("filter", "tcp", "PCAP filter expression")
  33. promiscuous = flag.Bool("promiscuous", false, "Switch interface into promiscuous mode?")
  34. natsURL = flag.String("nats-url", "nats://127.0.0.1:4222", "The URL of the NATS server")
  35. natsUser = flag.String("nats-user", "", "The user for NATS authentication")
  36. natsPassword = flag.String("nats-password", "", "The password for NATS authentication")
  37. natsQueue = flag.String("nats-queue", "requests", "The NATS queue name")
  38. natsCA = flag.String("nats-ca", "", "CA chain for NATS TLS")
  39. reconnectToNatsAfter = flag.Duration("reconnect-to-nats-after", 0, "reconnect to nats after this long periodically")
  40. sleepFor = flag.Duration("sleep", 0, "Sleep this long between sending data (only when replaying a file)")
  41. requestsFile = flag.String("requests", "", "CSV file containing requests (IP and URL)")
  42. protocol = flag.String("protocol", "http", "which protocol to parse: http or ajp13")
  43. useXForwardedAsSource = flag.Bool("use-x-forwarded", false, "Use the IP address in X-Forwarded-For as source")
  44. trace = flag.Bool("trace", false, "Trace the packet capturing")
  45. apacheLog = flag.String("apache-log", "", "Parse an Apache Log file")
  46. apacheReplay = flag.String("apache-replay", "", "Apache log file to replay into the system")
  47. nginxLog = flag.String("nginx-log", "", "Nginx log file to tail")
  48. nginxFormat = flag.String("nginx-format", "", "The nginx log file format")
  49. nginxReplay = flag.String("nginx-replay", "", "Replay this nginx logfile")
  50. hostName = flag.String("hostname", "", "Override the captured hostname with this one")
  51. accessWatchKey = flag.String("access-watch-key", "", "access.watch API key")
  52. configFile = flag.String("config", "", "The location of the TOML config file")
  53. rpcAddr = flag.String("rpc-address", "", "The address where the RPC server is listening")
  54. beQuiet = flag.Bool("quiet", true, "Be quiet")
  55. doVersion = flag.Bool("version", false, "Show version information")
  56. natsEC *nats.EncodedConn
  57. natsJSONEC *nats.EncodedConn
  58. rpcClient *ScwRPC
  59. natsErrorChan chan error
  60. natsIsAvailable bool
  61. count uint64
  62. timeout = -1 * time.Second
  63. ipPriv *ip.IP
  64. config Config
  65. // Version contains the program Version, e.g. 1.0.1
  66. Version string
  67. // BuildDate contains the date and time at which the program was compiled
  68. BuildDate string
  69. )
  70. // Config contains the program configuration
  71. type Config struct {
  72. Live bool
  73. Interface string
  74. SnapshotLen int
  75. Filter string
  76. Promiscuous bool
  77. NatsURL string
  78. NatsQueue string
  79. NatsUser string
  80. NatsPassword string
  81. NatsCA string
  82. SleepFor duration
  83. RequestsFile string
  84. UseXForwardedAsSource bool
  85. Quiet bool
  86. Protocol string
  87. Trace bool
  88. ApacheLog string
  89. ApacheReplay string
  90. NginxLog string
  91. NginxLogFormat string
  92. NginxReplay string
  93. HostName string
  94. AccessWatchKey string
  95. RPCAddress string
  96. }
  97. type duration struct {
  98. time.Duration
  99. }
  100. func (d *duration) UnmarshalText(text []byte) error {
  101. var err error
  102. d.Duration, err = time.ParseDuration(string(text))
  103. return err
  104. }
  105. func (c Config) print() {
  106. fmt.Printf("Live: %t\n", c.Live)
  107. fmt.Printf("Interface: %s\n", c.Interface)
  108. fmt.Printf("SnapshotLen: %d\n", c.SnapshotLen)
  109. fmt.Printf("Filter: %s\n", c.Filter)
  110. fmt.Printf("Promiscuous: %t\n", c.Promiscuous)
  111. fmt.Printf("NatsURL: %s\n", c.NatsURL)
  112. fmt.Printf("NatsQueue: %s\n", c.NatsQueue)
  113. fmt.Printf("NatsUser: %s\n", c.NatsUser)
  114. fmt.Printf("NatsPassword: %s\n", c.NatsPassword)
  115. fmt.Printf("NatsCA: %s\n", c.NatsCA)
  116. fmt.Printf("SleepFor: %s\n", c.SleepFor.String())
  117. fmt.Printf("RequestsFile: %s\n", c.RequestsFile)
  118. fmt.Printf("Apache Log: %s\n", c.ApacheLog)
  119. fmt.Printf("Apache Replay: %s\n", c.ApacheReplay)
  120. fmt.Printf("Nginx Log: %s\n", c.NginxLog)
  121. fmt.Printf("Nginx Log Format: %s\n", c.NginxLogFormat)
  122. fmt.Printf("NginxReplay: %s\n", c.NginxReplay)
  123. fmt.Printf("HostName: %s\n", c.HostName)
  124. fmt.Printf("AccessWatchKey: %s\n", c.AccessWatchKey)
  125. fmt.Printf("UseXForwardedAsSource: %t\n", c.UseXForwardedAsSource)
  126. fmt.Printf("Protocol: %s\n", c.Protocol)
  127. fmt.Printf("RPCAddress: %s\n", c.RPCAddress)
  128. fmt.Printf("Quiet: %t\n", c.Quiet)
  129. fmt.Printf("Trace: %t\n", c.Trace)
  130. }
  131. func init() {
  132. flag.Parse()
  133. nats.RegisterEncoder(protobuf.PROTOBUF_ENCODER, &protobuf.ProtobufEncoder{})
  134. }
  135. func main() {
  136. if *doVersion {
  137. version()
  138. os.Exit(0)
  139. }
  140. loadConfig()
  141. // Output how many requests per second were sent
  142. if !config.Quiet {
  143. go func(c *uint64) {
  144. for {
  145. fmt.Printf("%d requests per second\n", *c)
  146. *c = 0
  147. time.Sleep(time.Second)
  148. }
  149. }(&count)
  150. }
  151. // NATS
  152. //
  153. if config.NatsURL == "" && config.AccessWatchKey == "" {
  154. log.Fatal("No NATS URL specified (-nats-url)!")
  155. }
  156. natsIsAvailable = false
  157. natsErrorChan = make(chan error, 1)
  158. err := connectToNATS()
  159. if err != nil && config.AccessWatchKey == "" {
  160. log.Fatal(err)
  161. }
  162. // reconnect to nats periodically
  163. //
  164. if *reconnectToNatsAfter > 0 {
  165. go func(interval time.Duration) {
  166. for range time.Tick(interval) {
  167. natsEC.Conn.Close()
  168. natsJSONEC.Conn.Close()
  169. }
  170. }(*reconnectToNatsAfter)
  171. }
  172. go natsWatchdog(natsErrorChan)
  173. // RPC
  174. //
  175. if config.RPCAddress != "" {
  176. rpcClient, err = NewScwRPC(config.RPCAddress)
  177. if err != nil {
  178. log.Fatal(err)
  179. }
  180. }
  181. // What should I do?
  182. if config.RequestsFile != "" {
  183. replayFile()
  184. } else if config.ApacheReplay != "" {
  185. apacheLogReplay(config.ApacheReplay)
  186. } else if config.NginxReplay != "" {
  187. nginxLogReplay(config.NginxReplay, config.NginxLogFormat)
  188. } else if config.ApacheLog != "" {
  189. apacheLogCapture(config.ApacheLog)
  190. } else if config.Live {
  191. fmt.Printf("live capture (%s, %s) to %s\n", config.Interface, config.Filter, config.NatsURL)
  192. liveCapture()
  193. } else if config.NginxLog != "" && config.NginxLogFormat != "" {
  194. nginxLogCapture(config.NginxLog, config.NginxLogFormat)
  195. }
  196. }
  197. func natsWatchdog(closedChan chan error) {
  198. var lastError error
  199. for err := range closedChan {
  200. if lastError != err {
  201. lastError = err
  202. log.Println(err)
  203. }
  204. if err != nats.ErrConnectionClosed {
  205. continue
  206. }
  207. RECONNECT:
  208. for {
  209. log.Printf("Reconnecting to NATS at %s\n", *natsURL)
  210. err := connectToNATS()
  211. if err == nil {
  212. break RECONNECT
  213. }
  214. time.Sleep(1 * time.Second)
  215. }
  216. }
  217. }
  218. func connectToNATS() error {
  219. var natsConn *nats.Conn
  220. var err error
  221. if config.NatsUser != "" && config.NatsPassword != "" && config.NatsCA != "" {
  222. natsConn, err = nats.Connect(config.NatsURL, nats.UserInfo(config.NatsUser, config.NatsPassword), nats.RootCAs(config.NatsCA))
  223. } else {
  224. if config.NatsPassword != "" && config.NatsUser != "" {
  225. natsConn, err = nats.Connect(config.NatsURL, nats.UserInfo(config.NatsUser, config.NatsPassword))
  226. } else {
  227. natsConn, err = nats.Connect(config.NatsURL)
  228. }
  229. }
  230. if err != nil {
  231. return err
  232. }
  233. natsEC, err = nats.NewEncodedConn(natsConn, protobuf.PROTOBUF_ENCODER)
  234. if err != nil {
  235. return fmt.Errorf("Encoded Connection: %v", err)
  236. }
  237. natsJSONEC, err = nats.NewEncodedConn(natsConn, nats.JSON_ENCODER)
  238. if err != nil {
  239. return fmt.Errorf("Encoded Connection: %v", err)
  240. }
  241. natsIsAvailable = true
  242. return nil
  243. }
  244. func liveCapture() {
  245. ipPriv = ip.NewIP()
  246. // PCAP setup
  247. //
  248. handle, err := pcap.OpenLive(config.Interface, int32(config.SnapshotLen), config.Promiscuous, timeout)
  249. if err != nil {
  250. log.Fatal(err)
  251. }
  252. defer handle.Close()
  253. err = handle.SetBPFFilter(config.Filter)
  254. if err != nil {
  255. log.Fatal(err)
  256. }
  257. packetSource := gopacket.NewPacketSource(handle, handle.LinkType())
  258. for packet := range packetSource.Packets() {
  259. go processPacket(packet)
  260. }
  261. }
  262. func writeLogToWatch(r *data.Request) {
  263. h := map[string]string{}
  264. if r.AcceptEncoding != "" {
  265. h["Accept-Encoding"] = r.AcceptEncoding
  266. }
  267. if r.Accept != "" {
  268. h["Accept"] = r.Accept
  269. }
  270. if r.AcceptLanguage != "" {
  271. h["Accept-Language"] = r.AcceptLanguage
  272. }
  273. if r.Cookie != "" {
  274. h["Cookie"] = r.Cookie
  275. }
  276. if r.Host != "" {
  277. h["Host"] = r.Host
  278. }
  279. if r.Referer != "" {
  280. h["Referer"] = r.Referer
  281. }
  282. if r.UserAgent != "" {
  283. h["User-Agent"] = r.UserAgent
  284. }
  285. if r.Via != "" {
  286. h["Via"] = r.Via
  287. }
  288. if r.XForwardedFor != "" {
  289. h["X-Forwarded-For"] = r.XForwardedFor
  290. }
  291. if r.XRequestedWith != "" {
  292. h["X-Requested-With"] = r.XRequestedWith
  293. }
  294. data := map[string]interface{}{
  295. "request": map[string]interface{}{
  296. "time": time.Unix(0, r.CreatedAt),
  297. "address": r.Source,
  298. "protocol": r.Protocol,
  299. "scheme": "https",
  300. "method": r.Method,
  301. "url": r.Url,
  302. "headers": h,
  303. },
  304. "response": map[string]interface{}{"status": "200"},
  305. }
  306. jdata, err := json.Marshal(data)
  307. client := &http.Client{}
  308. fmt.Println(string(jdata))
  309. buf := bytes.NewBuffer(jdata)
  310. req, err := http.NewRequest("POST", "https://log.access.watch/1.1/log", buf)
  311. req.Header.Add("Api-Key", config.AccessWatchKey)
  312. req.Header.Add("Accept", "application/json")
  313. req.Header.Add("Content-Type", "application/json")
  314. resp, err := client.Do(req)
  315. if err != nil {
  316. log.Println(err)
  317. }
  318. resp.Body.Close()
  319. }
  320. func publishRequest(queue string, request *data.Request) {
  321. if config.AccessWatchKey != "" {
  322. writeLogToWatch(request)
  323. return
  324. }
  325. if rpcClient != nil {
  326. select {
  327. case rpcClient.RChan <- request:
  328. default:
  329. }
  330. return
  331. }
  332. if !natsIsAvailable {
  333. if rand.Intn(100) == 0 {
  334. log.Println("nats connection is not available")
  335. }
  336. return
  337. }
  338. if err := natsEC.Publish(config.NatsQueue, request); err != nil {
  339. natsErrorChan <- err
  340. if err == nats.ErrConnectionClosed {
  341. natsIsAvailable = false
  342. }
  343. }
  344. }
  345. // processPacket receives a raw packet from pcap, builds a Request item from it and sends it to the queue
  346. func processPacket(packet gopacket.Packet) {
  347. hasIPv4 := false
  348. var ipSrc, ipDst string
  349. // IPv4
  350. if ipLayer := packet.Layer(layers.LayerTypeIPv4); ipLayer != nil {
  351. ip := ipLayer.(*layers.IPv4)
  352. ipSrc = ip.SrcIP.String()
  353. ipDst = ip.DstIP.String()
  354. hasIPv4 = true
  355. }
  356. // IPv6
  357. if !hasIPv4 {
  358. if ipLayer := packet.Layer(layers.LayerTypeIPv6); ipLayer != nil {
  359. ip := ipLayer.(*layers.IPv6)
  360. ipSrc = ip.SrcIP.String()
  361. ipDst = ip.DstIP.String()
  362. }
  363. }
  364. // TCP
  365. tcpLayer := packet.Layer(layers.LayerTypeTCP)
  366. if tcpLayer == nil {
  367. return
  368. }
  369. tcp, _ := tcpLayer.(*layers.TCP)
  370. portSrc := tcp.SrcPort
  371. portDst := tcp.DstPort
  372. sequence := tcp.Seq
  373. applicationLayer := packet.ApplicationLayer()
  374. if applicationLayer == nil {
  375. return
  376. }
  377. count++
  378. if len(applicationLayer.Payload()) < 50 {
  379. log.Println("application layer too small!")
  380. return
  381. }
  382. request := data.Request{
  383. IpSrc: ipSrc,
  384. IpDst: ipDst,
  385. PortSrc: uint32(portSrc),
  386. PortDst: uint32(portDst),
  387. TcpSeq: uint32(sequence),
  388. CreatedAt: packet.Metadata().CaptureInfo.Timestamp.UnixNano(),
  389. }
  390. switch config.Protocol {
  391. case "http":
  392. err := processHTTP(&request, applicationLayer.Payload())
  393. if err != nil {
  394. log.Println(err)
  395. return
  396. }
  397. case "ajp13":
  398. err := processAJP13(&request, applicationLayer.Payload())
  399. if err != nil {
  400. log.Println(err)
  401. return
  402. }
  403. }
  404. if config.UseXForwardedAsSource && request.XForwardedFor != "" {
  405. if strings.Contains(request.XForwardedFor, ",") {
  406. ips := strings.Split(request.XForwardedFor, ",")
  407. for i := len(ips) - 1; i >= 0; i-- {
  408. ipRaw := strings.TrimSpace(ips[i])
  409. ipAddr := net.ParseIP(ipRaw)
  410. if ipAddr != nil && !ipPriv.IsPrivate(ipAddr) {
  411. request.Source = ipRaw
  412. break
  413. }
  414. }
  415. } else {
  416. ipAddr := net.ParseIP(strings.TrimSpace(request.XForwardedFor))
  417. if !ipPriv.IsPrivate(ipAddr) {
  418. request.Source = request.XForwardedFor
  419. }
  420. }
  421. }
  422. if request.Source == request.IpSrc && request.XRealIP != "" {
  423. request.Source = request.XRealIP
  424. }
  425. if config.Trace {
  426. log.Printf("[%s] %s\n", request.Source, request.Url)
  427. }
  428. publishRequest(config.NatsQueue, &request)
  429. }
  430. func processAJP13(request *data.Request, appData []byte) error {
  431. a, err := ajp13.Parse(appData)
  432. if err != nil {
  433. return fmt.Errorf("Failed to parse AJP13 request: %s", err)
  434. }
  435. request.Url = a.URI
  436. request.Method = a.Method()
  437. request.Host = a.Server
  438. request.Protocol = a.Version
  439. request.Origin = a.RemoteAddr.String()
  440. request.Source = a.RemoteAddr.String()
  441. if v, ok := a.Header("Referer"); ok {
  442. request.Referer = v
  443. }
  444. if v, ok := a.Header("Connection"); ok {
  445. request.Connection = v
  446. }
  447. if v, ok := a.Header("X-Forwarded-For"); ok {
  448. request.XForwardedFor = v
  449. }
  450. if v, ok := a.Header("X-Real-IP"); ok {
  451. request.XRealIP = v
  452. }
  453. if v, ok := a.Header("X-Requested-With"); ok {
  454. request.XRequestedWith = v
  455. }
  456. if v, ok := a.Header("Accept-Encoding"); ok {
  457. request.AcceptEncoding = v
  458. }
  459. if v, ok := a.Header("Accept-Language"); ok {
  460. request.AcceptLanguage = v
  461. }
  462. if v, ok := a.Header("User-Agent"); ok {
  463. request.UserAgent = v
  464. }
  465. if v, ok := a.Header("Accept"); ok {
  466. request.Accept = v
  467. }
  468. if v, ok := a.Header("Cookie"); ok {
  469. request.Cookie = v
  470. }
  471. if v, ok := a.Header("X-Forwarded-Host"); ok {
  472. if v != request.Host {
  473. request.Host = v
  474. }
  475. }
  476. return nil
  477. }
  478. func processHTTP(request *data.Request, appData []byte) error {
  479. reader := bufio.NewReader(strings.NewReader(string(appData)))
  480. req, err := http.ReadRequest(reader)
  481. if err != nil {
  482. return fmt.Errorf("Failed to parse HTTP header: %s", err)
  483. }
  484. request.Url = req.URL.String()
  485. request.Method = req.Method
  486. request.Referer = req.Referer()
  487. request.Host = req.Host
  488. request.Protocol = req.Proto
  489. request.Origin = request.Host
  490. if _, ok := req.Header["Connection"]; ok {
  491. request.Connection = req.Header["Connection"][0]
  492. }
  493. if _, ok := req.Header["X-Forwarded-For"]; ok {
  494. request.XForwardedFor = req.Header["X-Forwarded-For"][0]
  495. }
  496. // CloudFlare: override X-Forwarded for since it is tainted by cloudflare
  497. if _, ok := req.Header["True-Client-Ip"]; ok {
  498. request.XForwardedFor = req.Header["True-Client-Ip"][0]
  499. }
  500. if _, ok := req.Header["X-Real-Ip"]; ok {
  501. request.XRealIP = req.Header["X-Real-Ip"][0]
  502. }
  503. if _, ok := req.Header["X-Requested-With"]; ok {
  504. request.XRequestedWith = req.Header["X-Requested-With"][0]
  505. }
  506. if _, ok := req.Header["Accept-Encoding"]; ok {
  507. request.AcceptEncoding = req.Header["Accept-Encoding"][0]
  508. }
  509. if _, ok := req.Header["Accept-Language"]; ok {
  510. request.AcceptLanguage = req.Header["Accept-Language"][0]
  511. }
  512. if _, ok := req.Header["User-Agent"]; ok {
  513. request.UserAgent = req.Header["User-Agent"][0]
  514. }
  515. if _, ok := req.Header["Accept"]; ok {
  516. request.Accept = req.Header["Accept"][0]
  517. }
  518. if _, ok := req.Header["Cookie"]; ok {
  519. request.Cookie = req.Header["Cookie"][0]
  520. }
  521. request.Source = request.IpSrc
  522. return nil
  523. }
  524. // replayFile takes a file containing a list of requests (SourceIP Url) and queues the requests
  525. // e.g.
  526. // 157.55.39.229 /gross-gerau/12012260-beate-anstatt
  527. // 103.232.100.98 /weinsheim-eifel/13729444-plus-warenhandelsges-mbh
  528. func replayFile() {
  529. var req data.Request
  530. var startTs time.Time
  531. var endTs time.Time
  532. rand.Seed(time.Now().UnixNano())
  533. for {
  534. fh, err := os.Open(config.RequestsFile)
  535. if err != nil {
  536. log.Fatalf("Failed to open request file '%s': %s", config.RequestsFile, err)
  537. }
  538. c := csv.NewReader(fh)
  539. c.Comma = ' '
  540. for {
  541. if config.SleepFor.Duration > time.Nanosecond {
  542. startTs = time.Now()
  543. }
  544. r, err := c.Read()
  545. if err == io.EOF {
  546. break
  547. }
  548. if err != nil {
  549. log.Println(err)
  550. continue
  551. }
  552. req.IpSrc = r[0]
  553. req.Source = r[0]
  554. req.Url = r[1]
  555. req.UserAgent = "Munch/1.0"
  556. req.Host = "demo.scraperwall.com"
  557. req.CreatedAt = time.Now().UnixNano()
  558. publishRequest(config.NatsQueue, &req)
  559. if strings.Index(r[1], ".") < 0 {
  560. hash := sha1.New()
  561. io.WriteString(hash, r[0])
  562. fp := data.Fingerprint{
  563. ClientID: "scw",
  564. Fingerprint: fmt.Sprintf("%x", hash.Sum(nil)),
  565. Remote: r[0],
  566. Url: r[1],
  567. Source: r[0],
  568. CreatedAt: time.Now(),
  569. }
  570. if strings.HasPrefix(r[0], "50.31.") {
  571. fp.Fingerprint = "a1f2c2ee560ce6580d66d451a9c8dfbf"
  572. natsJSONEC.Publish("fingerprints_scw", fp)
  573. } else if rand.Intn(10) < 5 {
  574. natsJSONEC.Publish("fingerprints_scw", fp)
  575. }
  576. }
  577. count++
  578. if config.SleepFor.Duration >= time.Nanosecond {
  579. endTs = time.Now()
  580. if endTs.Before(startTs.Add(config.SleepFor.Duration)) {
  581. time.Sleep(config.SleepFor.Duration - endTs.Sub(startTs))
  582. }
  583. }
  584. }
  585. }
  586. }
  587. func loadConfig() {
  588. // initialize with values from the command line / environment
  589. config.Live = *doLiveCapture
  590. config.Interface = *iface
  591. config.SnapshotLen = *snapshotLen
  592. config.Filter = *filter
  593. config.Promiscuous = *promiscuous
  594. config.NatsURL = *natsURL
  595. config.NatsQueue = *natsQueue
  596. config.NatsUser = *natsUser
  597. config.NatsPassword = *natsPassword
  598. config.NatsCA = *natsCA
  599. config.SleepFor.Duration = *sleepFor
  600. config.RequestsFile = *requestsFile
  601. config.UseXForwardedAsSource = *useXForwardedAsSource
  602. config.Protocol = *protocol
  603. config.ApacheLog = *apacheLog
  604. config.ApacheReplay = *apacheReplay
  605. config.NginxLog = *nginxLog
  606. config.NginxLogFormat = *nginxFormat
  607. config.NginxReplay = *nginxReplay
  608. config.HostName = *hostName
  609. config.Quiet = *beQuiet
  610. config.Trace = *trace
  611. config.AccessWatchKey = *accessWatchKey
  612. config.RPCAddress = *rpcAddr
  613. if *configFile == "" {
  614. return
  615. }
  616. _, err := os.Stat(*configFile)
  617. if err != nil {
  618. log.Printf("%s: %s\n", *configFile, err)
  619. return
  620. }
  621. if _, err = toml.DecodeFile(*configFile, &config); err != nil {
  622. log.Printf("%s: %s\n", *configFile, err)
  623. }
  624. if !config.Quiet {
  625. config.print()
  626. }
  627. }
  628. // version outputs build information...
  629. func version() {
  630. fmt.Printf("munchclient %s, built on %s\n", Version, BuildDate)
  631. }