main.go 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750
  1. package main
  2. import (
  3. "bufio"
  4. "bytes"
  5. "crypto/sha1"
  6. "encoding/csv"
  7. "encoding/json"
  8. "flag"
  9. "fmt"
  10. "io"
  11. "log"
  12. "math/rand"
  13. "net"
  14. "net/http"
  15. "os"
  16. "strings"
  17. "time"
  18. "github.com/BurntSushi/toml"
  19. "github.com/google/gopacket"
  20. "github.com/google/gopacket/layers"
  21. "github.com/google/gopacket/pcap"
  22. "github.com/nats-io/nats"
  23. "github.com/nats-io/nats/encoders/protobuf"
  24. "git.scraperwall.com/scw/ajp13"
  25. "git.scraperwall.com/scw/data"
  26. "git.scraperwall.com/scw/ip"
  27. )
  28. var (
  29. doLiveCapture = flag.Bool("live", false, "Capture data in real time from a given interface")
  30. iface = flag.String("interface", "eth0", "Interface to get packets from")
  31. snapshotLen = flag.Int("snapshot-len", 8192, "Snapshot Length in Bytes")
  32. filter = flag.String("filter", "tcp", "PCAP filter expression")
  33. promiscuous = flag.Bool("promiscuous", false, "Switch interface into promiscuous mode?")
  34. natsURL = flag.String("nats-url", "nats://127.0.0.1:4222", "The URL of the NATS server")
  35. natsUser = flag.String("nats-user", "", "The user for NATS authentication")
  36. natsPassword = flag.String("nats-password", "", "The password for NATS authentication")
  37. natsQueue = flag.String("nats-queue", "requests", "The NATS queue name")
  38. natsCA = flag.String("nats-ca", "", "CA chain for NATS TLS")
  39. reconnectToNatsAfter = flag.Duration("reconnect-to-nats-after", 0, "reconnect to nats after this long periodically")
  40. sleepFor = flag.Duration("sleep", 0, "Sleep this long between sending data (only when replaying a file)")
  41. requestsFile = flag.String("requests", "", "CSV file containing requests (IP and URL)")
  42. protocol = flag.String("protocol", "http", "which protocol to parse: http or ajp13")
  43. useXForwardedAsSource = flag.Bool("use-x-forwarded", false, "Use the IP address in X-Forwarded-For as source")
  44. trace = flag.Bool("trace", false, "Trace the packet capturing")
  45. apacheLog = flag.String("apache-log", "", "Parse an Apache Log file")
  46. apacheReplay = flag.String("apache-replay", "", "Apache log file to replay into the system")
  47. nginxLog = flag.String("nginx-log", "", "Nginx log file to tail")
  48. nginxFormat = flag.String("nginx-format", "", "The nginx log file format")
  49. nginxReplay = flag.String("nginx-replay", "", "Replay this nginx logfile")
  50. hostName = flag.String("hostname", "", "Override the captured hostname with this one")
  51. accessWatchKey = flag.String("access-watch-key", "", "access.watch API key")
  52. configFile = flag.String("config", "", "The location of the TOML config file")
  53. rpcAddr = flag.String("rpc-address", "", "The address where the RPC server is listening")
  54. beQuiet = flag.Bool("quiet", true, "Be quiet")
  55. doVersion = flag.Bool("version", false, "Show version information")
  56. natsEC *nats.EncodedConn
  57. natsJSONEC *nats.EncodedConn
  58. rpcClient *ScwRPC
  59. natsErrorChan chan error
  60. natsIsAvailable bool
  61. count uint64
  62. timeout = -1 * time.Second
  63. ipPriv *ip.IP
  64. config Config
  65. // Version contains the program Version, e.g. 1.0.1
  66. Version string
  67. // BuildDate contains the date and time at which the program was compiled
  68. BuildDate string
  69. )
  70. // Config contains the program configuration
  71. type Config struct {
  72. Live bool
  73. Interface string
  74. SnapshotLen int
  75. Filter string
  76. Promiscuous bool
  77. NatsURL string
  78. NatsQueue string
  79. NatsUser string
  80. NatsPassword string
  81. NatsCA string
  82. SleepFor duration
  83. RequestsFile string
  84. UseXForwardedAsSource bool
  85. Quiet bool
  86. Protocol string
  87. Trace bool
  88. ApacheLog string
  89. ApacheReplay string
  90. NginxLog string
  91. NginxLogFormat string
  92. NginxReplay string
  93. HostName string
  94. AccessWatchKey string
  95. RPCAddress string
  96. ReconnectToNatsAfter duration
  97. }
  98. type duration struct {
  99. time.Duration
  100. }
  101. func (d *duration) UnmarshalText(text []byte) error {
  102. var err error
  103. d.Duration, err = time.ParseDuration(string(text))
  104. return err
  105. }
  106. func (c Config) print() {
  107. fmt.Printf("Live: %t\n", c.Live)
  108. fmt.Printf("Interface: %s\n", c.Interface)
  109. fmt.Printf("SnapshotLen: %d\n", c.SnapshotLen)
  110. fmt.Printf("Filter: %s\n", c.Filter)
  111. fmt.Printf("Promiscuous: %t\n", c.Promiscuous)
  112. fmt.Printf("NatsURL: %s\n", c.NatsURL)
  113. fmt.Printf("NatsQueue: %s\n", c.NatsQueue)
  114. fmt.Printf("NatsUser: %s\n", c.NatsUser)
  115. fmt.Printf("NatsPassword: %s\n", c.NatsPassword)
  116. fmt.Printf("NatsCA: %s\n", c.NatsCA)
  117. fmt.Printf("ReconnectToNatsAfter: %s\n", c.ReconnectToNatsAfter.String())
  118. fmt.Printf("SleepFor: %s\n", c.SleepFor.String())
  119. fmt.Printf("RequestsFile: %s\n", c.RequestsFile)
  120. fmt.Printf("Apache Log: %s\n", c.ApacheLog)
  121. fmt.Printf("Apache Replay: %s\n", c.ApacheReplay)
  122. fmt.Printf("Nginx Log: %s\n", c.NginxLog)
  123. fmt.Printf("Nginx Log Format: %s\n", c.NginxLogFormat)
  124. fmt.Printf("NginxReplay: %s\n", c.NginxReplay)
  125. fmt.Printf("HostName: %s\n", c.HostName)
  126. fmt.Printf("AccessWatchKey: %s\n", c.AccessWatchKey)
  127. fmt.Printf("UseXForwardedAsSource: %t\n", c.UseXForwardedAsSource)
  128. fmt.Printf("Protocol: %s\n", c.Protocol)
  129. fmt.Printf("RPCAddress: %s\n", c.RPCAddress)
  130. fmt.Printf("Quiet: %t\n", c.Quiet)
  131. fmt.Printf("Trace: %t\n", c.Trace)
  132. }
  133. func init() {
  134. flag.Parse()
  135. nats.RegisterEncoder(protobuf.PROTOBUF_ENCODER, &protobuf.ProtobufEncoder{})
  136. }
  137. func main() {
  138. if *doVersion {
  139. version()
  140. os.Exit(0)
  141. }
  142. loadConfig()
  143. // Output how many requests per second were sent
  144. if !config.Quiet {
  145. go func(c *uint64) {
  146. for {
  147. fmt.Printf("%d requests per second\n", *c)
  148. *c = 0
  149. time.Sleep(time.Second)
  150. }
  151. }(&count)
  152. }
  153. // NATS
  154. //
  155. if config.NatsURL == "" && config.AccessWatchKey == "" {
  156. log.Fatal("No NATS URL specified (-nats-url)!")
  157. }
  158. natsIsAvailable = false
  159. natsErrorChan = make(chan error, 1)
  160. err := connectToNATS()
  161. if err != nil && config.AccessWatchKey == "" {
  162. log.Fatal(err)
  163. }
  164. // reconnect to nats periodically
  165. //
  166. if *reconnectToNatsAfter > 0 {
  167. go func(interval time.Duration) {
  168. for range time.Tick(interval) {
  169. natsEC.Conn.Close()
  170. natsJSONEC.Conn.Close()
  171. }
  172. }(*reconnectToNatsAfter)
  173. }
  174. go natsWatchdog(natsErrorChan)
  175. // RPC
  176. //
  177. if config.RPCAddress != "" {
  178. rpcClient, err = NewScwRPC(config.RPCAddress)
  179. if err != nil {
  180. log.Fatal(err)
  181. }
  182. }
  183. // What should I do?
  184. if config.RequestsFile != "" {
  185. replayFile()
  186. } else if config.ApacheReplay != "" {
  187. apacheLogReplay(config.ApacheReplay)
  188. } else if config.NginxReplay != "" {
  189. nginxLogReplay(config.NginxReplay, config.NginxLogFormat)
  190. } else if config.ApacheLog != "" {
  191. apacheLogCapture(config.ApacheLog)
  192. } else if config.Live {
  193. fmt.Printf("live capture (%s, %s) to %s\n", config.Interface, config.Filter, config.NatsURL)
  194. liveCapture()
  195. } else if config.NginxLog != "" && config.NginxLogFormat != "" {
  196. nginxLogCapture(config.NginxLog, config.NginxLogFormat)
  197. }
  198. }
  199. func natsWatchdog(closedChan chan error) {
  200. var lastError error
  201. for err := range closedChan {
  202. if lastError != err {
  203. lastError = err
  204. log.Println(err)
  205. }
  206. if err != nats.ErrConnectionClosed {
  207. continue
  208. }
  209. RECONNECT:
  210. for {
  211. log.Printf("Reconnecting to NATS at %s\n", *natsURL)
  212. err := connectToNATS()
  213. if err == nil {
  214. break RECONNECT
  215. }
  216. time.Sleep(1 * time.Second)
  217. }
  218. }
  219. }
  220. func connectToNATS() error {
  221. var natsConn *nats.Conn
  222. var err error
  223. if config.NatsUser != "" && config.NatsPassword != "" && config.NatsCA != "" {
  224. natsConn, err = nats.Connect(config.NatsURL, nats.UserInfo(config.NatsUser, config.NatsPassword), nats.RootCAs(config.NatsCA))
  225. } else {
  226. if config.NatsPassword != "" && config.NatsUser != "" {
  227. natsConn, err = nats.Connect(config.NatsURL, nats.UserInfo(config.NatsUser, config.NatsPassword))
  228. } else {
  229. natsConn, err = nats.Connect(config.NatsURL)
  230. }
  231. }
  232. if err != nil {
  233. return err
  234. }
  235. natsEC, err = nats.NewEncodedConn(natsConn, protobuf.PROTOBUF_ENCODER)
  236. if err != nil {
  237. return fmt.Errorf("Encoded Connection: %v", err)
  238. }
  239. natsJSONEC, err = nats.NewEncodedConn(natsConn, nats.JSON_ENCODER)
  240. if err != nil {
  241. return fmt.Errorf("Encoded Connection: %v", err)
  242. }
  243. natsIsAvailable = true
  244. return nil
  245. }
  246. func liveCapture() {
  247. ipPriv = ip.NewIP()
  248. // PCAP setup
  249. //
  250. handle, err := pcap.OpenLive(config.Interface, int32(config.SnapshotLen), config.Promiscuous, timeout)
  251. if err != nil {
  252. log.Fatal(err)
  253. }
  254. defer handle.Close()
  255. err = handle.SetBPFFilter(config.Filter)
  256. if err != nil {
  257. log.Fatal(err)
  258. }
  259. packetSource := gopacket.NewPacketSource(handle, handle.LinkType())
  260. for packet := range packetSource.Packets() {
  261. go processPacket(packet)
  262. }
  263. }
  264. func writeLogToWatch(r *data.Request) {
  265. h := map[string]string{}
  266. if r.AcceptEncoding != "" {
  267. h["Accept-Encoding"] = r.AcceptEncoding
  268. }
  269. if r.Accept != "" {
  270. h["Accept"] = r.Accept
  271. }
  272. if r.AcceptLanguage != "" {
  273. h["Accept-Language"] = r.AcceptLanguage
  274. }
  275. if r.Cookie != "" {
  276. h["Cookie"] = r.Cookie
  277. }
  278. if r.Host != "" {
  279. h["Host"] = r.Host
  280. }
  281. if r.Referer != "" {
  282. h["Referer"] = r.Referer
  283. }
  284. if r.UserAgent != "" {
  285. h["User-Agent"] = r.UserAgent
  286. }
  287. if r.Via != "" {
  288. h["Via"] = r.Via
  289. }
  290. if r.XForwardedFor != "" {
  291. h["X-Forwarded-For"] = r.XForwardedFor
  292. }
  293. if r.XRequestedWith != "" {
  294. h["X-Requested-With"] = r.XRequestedWith
  295. }
  296. data := map[string]interface{}{
  297. "request": map[string]interface{}{
  298. "time": time.Unix(0, r.CreatedAt),
  299. "address": r.Source,
  300. "protocol": r.Protocol,
  301. "scheme": "https",
  302. "method": r.Method,
  303. "url": r.Url,
  304. "headers": h,
  305. },
  306. "response": map[string]interface{}{"status": "200"},
  307. }
  308. jdata, err := json.Marshal(data)
  309. client := &http.Client{}
  310. fmt.Println(string(jdata))
  311. buf := bytes.NewBuffer(jdata)
  312. req, err := http.NewRequest("POST", "https://log.access.watch/1.1/log", buf)
  313. req.Header.Add("Api-Key", config.AccessWatchKey)
  314. req.Header.Add("Accept", "application/json")
  315. req.Header.Add("Content-Type", "application/json")
  316. resp, err := client.Do(req)
  317. if err != nil {
  318. log.Println(err)
  319. }
  320. resp.Body.Close()
  321. }
  322. func publishRequest(queue string, request *data.Request) {
  323. if config.AccessWatchKey != "" {
  324. writeLogToWatch(request)
  325. return
  326. }
  327. if rpcClient != nil {
  328. select {
  329. case rpcClient.RChan <- request:
  330. default:
  331. }
  332. return
  333. }
  334. if !natsIsAvailable {
  335. if rand.Intn(100) == 0 {
  336. log.Println("nats connection is not available")
  337. }
  338. return
  339. }
  340. if err := natsEC.Publish(config.NatsQueue, request); err != nil {
  341. natsErrorChan <- err
  342. if err == nats.ErrConnectionClosed {
  343. natsIsAvailable = false
  344. }
  345. }
  346. }
  347. // processPacket receives a raw packet from pcap, builds a Request item from it and sends it to the queue
  348. func processPacket(packet gopacket.Packet) {
  349. hasIPv4 := false
  350. var ipSrc, ipDst string
  351. // IPv4
  352. if ipLayer := packet.Layer(layers.LayerTypeIPv4); ipLayer != nil {
  353. ip := ipLayer.(*layers.IPv4)
  354. ipSrc = ip.SrcIP.String()
  355. ipDst = ip.DstIP.String()
  356. hasIPv4 = true
  357. }
  358. // IPv6
  359. if !hasIPv4 {
  360. if ipLayer := packet.Layer(layers.LayerTypeIPv6); ipLayer != nil {
  361. ip := ipLayer.(*layers.IPv6)
  362. ipSrc = ip.SrcIP.String()
  363. ipDst = ip.DstIP.String()
  364. }
  365. }
  366. // TCP
  367. tcpLayer := packet.Layer(layers.LayerTypeTCP)
  368. if tcpLayer == nil {
  369. return
  370. }
  371. tcp, _ := tcpLayer.(*layers.TCP)
  372. portSrc := tcp.SrcPort
  373. portDst := tcp.DstPort
  374. sequence := tcp.Seq
  375. applicationLayer := packet.ApplicationLayer()
  376. if applicationLayer == nil {
  377. return
  378. }
  379. count++
  380. if len(applicationLayer.Payload()) < 50 {
  381. log.Println("application layer too small!")
  382. return
  383. }
  384. request := data.Request{
  385. IpSrc: ipSrc,
  386. IpDst: ipDst,
  387. PortSrc: uint32(portSrc),
  388. PortDst: uint32(portDst),
  389. TcpSeq: uint32(sequence),
  390. CreatedAt: packet.Metadata().CaptureInfo.Timestamp.UnixNano(),
  391. }
  392. switch config.Protocol {
  393. case "http":
  394. err := processHTTP(&request, applicationLayer.Payload())
  395. if err != nil {
  396. log.Println(err)
  397. return
  398. }
  399. case "ajp13":
  400. err := processAJP13(&request, applicationLayer.Payload())
  401. if err != nil {
  402. log.Println(err)
  403. return
  404. }
  405. }
  406. if config.UseXForwardedAsSource && request.XForwardedFor != "" {
  407. if strings.Contains(request.XForwardedFor, ",") {
  408. ips := strings.Split(request.XForwardedFor, ",")
  409. for i := len(ips) - 1; i >= 0; i-- {
  410. ipRaw := strings.TrimSpace(ips[i])
  411. ipAddr := net.ParseIP(ipRaw)
  412. if ipAddr != nil && !ipPriv.IsPrivate(ipAddr) {
  413. request.Source = ipRaw
  414. break
  415. }
  416. }
  417. } else {
  418. ipAddr := net.ParseIP(strings.TrimSpace(request.XForwardedFor))
  419. if !ipPriv.IsPrivate(ipAddr) {
  420. request.Source = request.XForwardedFor
  421. }
  422. }
  423. }
  424. if request.Source == request.IpSrc && request.XRealIP != "" {
  425. request.Source = request.XRealIP
  426. }
  427. if config.Trace {
  428. log.Printf("[%s] %s\n", request.Source, request.Url)
  429. }
  430. publishRequest(config.NatsQueue, &request)
  431. }
  432. func processAJP13(request *data.Request, appData []byte) error {
  433. a, err := ajp13.Parse(appData)
  434. if err != nil {
  435. return fmt.Errorf("Failed to parse AJP13 request: %s", err)
  436. }
  437. request.Url = a.URI
  438. request.Method = a.Method()
  439. request.Host = a.Server
  440. request.Protocol = a.Version
  441. request.Origin = a.RemoteAddr.String()
  442. request.Source = a.RemoteAddr.String()
  443. if v, ok := a.Header("Referer"); ok {
  444. request.Referer = v
  445. }
  446. if v, ok := a.Header("Connection"); ok {
  447. request.Connection = v
  448. }
  449. if v, ok := a.Header("X-Forwarded-For"); ok {
  450. request.XForwardedFor = v
  451. }
  452. if v, ok := a.Header("X-Real-IP"); ok {
  453. request.XRealIP = v
  454. }
  455. if v, ok := a.Header("X-Requested-With"); ok {
  456. request.XRequestedWith = v
  457. }
  458. if v, ok := a.Header("Accept-Encoding"); ok {
  459. request.AcceptEncoding = v
  460. }
  461. if v, ok := a.Header("Accept-Language"); ok {
  462. request.AcceptLanguage = v
  463. }
  464. if v, ok := a.Header("User-Agent"); ok {
  465. request.UserAgent = v
  466. }
  467. if v, ok := a.Header("Accept"); ok {
  468. request.Accept = v
  469. }
  470. if v, ok := a.Header("Cookie"); ok {
  471. request.Cookie = v
  472. }
  473. if v, ok := a.Header("X-Forwarded-Host"); ok {
  474. if v != request.Host {
  475. request.Host = v
  476. }
  477. }
  478. return nil
  479. }
  480. func processHTTP(request *data.Request, appData []byte) error {
  481. reader := bufio.NewReader(strings.NewReader(string(appData)))
  482. req, err := http.ReadRequest(reader)
  483. if err != nil {
  484. return fmt.Errorf("Failed to parse HTTP header: %s", err)
  485. }
  486. request.Url = req.URL.String()
  487. request.Method = req.Method
  488. request.Referer = req.Referer()
  489. request.Host = req.Host
  490. request.Protocol = req.Proto
  491. request.Origin = request.Host
  492. if _, ok := req.Header["Connection"]; ok {
  493. request.Connection = req.Header["Connection"][0]
  494. }
  495. if _, ok := req.Header["X-Forwarded-For"]; ok {
  496. request.XForwardedFor = req.Header["X-Forwarded-For"][0]
  497. }
  498. // CloudFlare: override X-Forwarded for since it is tainted by cloudflare
  499. if _, ok := req.Header["True-Client-Ip"]; ok {
  500. request.XForwardedFor = req.Header["True-Client-Ip"][0]
  501. }
  502. if _, ok := req.Header["X-Real-Ip"]; ok {
  503. request.XRealIP = req.Header["X-Real-Ip"][0]
  504. }
  505. if _, ok := req.Header["X-Requested-With"]; ok {
  506. request.XRequestedWith = req.Header["X-Requested-With"][0]
  507. }
  508. if _, ok := req.Header["Accept-Encoding"]; ok {
  509. request.AcceptEncoding = req.Header["Accept-Encoding"][0]
  510. }
  511. if _, ok := req.Header["Accept-Language"]; ok {
  512. request.AcceptLanguage = req.Header["Accept-Language"][0]
  513. }
  514. if _, ok := req.Header["User-Agent"]; ok {
  515. request.UserAgent = req.Header["User-Agent"][0]
  516. }
  517. if _, ok := req.Header["Accept"]; ok {
  518. request.Accept = req.Header["Accept"][0]
  519. }
  520. if _, ok := req.Header["Cookie"]; ok {
  521. request.Cookie = req.Header["Cookie"][0]
  522. }
  523. request.Source = request.IpSrc
  524. return nil
  525. }
  526. // replayFile takes a file containing a list of requests (SourceIP Url) and queues the requests
  527. // e.g.
  528. // 157.55.39.229 /gross-gerau/12012260-beate-anstatt
  529. // 103.232.100.98 /weinsheim-eifel/13729444-plus-warenhandelsges-mbh
  530. func replayFile() {
  531. var req data.Request
  532. var startTs time.Time
  533. var endTs time.Time
  534. rand.Seed(time.Now().UnixNano())
  535. for {
  536. fh, err := os.Open(config.RequestsFile)
  537. if err != nil {
  538. log.Fatalf("Failed to open request file '%s': %s", config.RequestsFile, err)
  539. }
  540. c := csv.NewReader(fh)
  541. c.Comma = ' '
  542. for {
  543. if config.SleepFor.Duration > time.Nanosecond {
  544. startTs = time.Now()
  545. }
  546. r, err := c.Read()
  547. if err == io.EOF {
  548. break
  549. }
  550. if err != nil {
  551. log.Println(err)
  552. continue
  553. }
  554. req.IpSrc = r[0]
  555. req.Source = r[0]
  556. req.Url = r[1]
  557. req.UserAgent = "Munch/1.0"
  558. req.Host = "demo.scraperwall.com"
  559. req.CreatedAt = time.Now().UnixNano()
  560. publishRequest(config.NatsQueue, &req)
  561. if strings.Index(r[1], ".") < 0 {
  562. hash := sha1.New()
  563. io.WriteString(hash, r[0])
  564. fp := data.Fingerprint{
  565. ClientID: "scw",
  566. Fingerprint: fmt.Sprintf("%x", hash.Sum(nil)),
  567. Remote: r[0],
  568. Url: r[1],
  569. Source: r[0],
  570. CreatedAt: time.Now(),
  571. }
  572. if strings.HasPrefix(r[0], "50.31.") {
  573. fp.Fingerprint = "a1f2c2ee560ce6580d66d451a9c8dfbf"
  574. natsJSONEC.Publish("fingerprints_scw", fp)
  575. } else if rand.Intn(10) < 5 {
  576. natsJSONEC.Publish("fingerprints_scw", fp)
  577. }
  578. }
  579. count++
  580. if config.SleepFor.Duration >= time.Nanosecond {
  581. endTs = time.Now()
  582. if endTs.Before(startTs.Add(config.SleepFor.Duration)) {
  583. time.Sleep(config.SleepFor.Duration - endTs.Sub(startTs))
  584. }
  585. }
  586. }
  587. }
  588. }
  589. func loadConfig() {
  590. // initialize with values from the command line / environment
  591. config.Live = *doLiveCapture
  592. config.Interface = *iface
  593. config.SnapshotLen = *snapshotLen
  594. config.Filter = *filter
  595. config.Promiscuous = *promiscuous
  596. config.NatsURL = *natsURL
  597. config.NatsQueue = *natsQueue
  598. config.NatsUser = *natsUser
  599. config.NatsPassword = *natsPassword
  600. config.NatsCA = *natsCA
  601. config.SleepFor.Duration = *sleepFor
  602. config.RequestsFile = *requestsFile
  603. config.UseXForwardedAsSource = *useXForwardedAsSource
  604. config.Protocol = *protocol
  605. config.ApacheLog = *apacheLog
  606. config.ApacheReplay = *apacheReplay
  607. config.NginxLog = *nginxLog
  608. config.NginxLogFormat = *nginxFormat
  609. config.NginxReplay = *nginxReplay
  610. config.HostName = *hostName
  611. config.Quiet = *beQuiet
  612. config.Trace = *trace
  613. config.AccessWatchKey = *accessWatchKey
  614. config.RPCAddress = *rpcAddr
  615. config.ReconnectToNatsAfter.Duration = *reconnectToNatsAfter
  616. if *configFile == "" {
  617. return
  618. }
  619. _, err := os.Stat(*configFile)
  620. if err != nil {
  621. log.Printf("%s: %s\n", *configFile, err)
  622. return
  623. }
  624. if _, err = toml.DecodeFile(*configFile, &config); err != nil {
  625. log.Printf("%s: %s\n", *configFile, err)
  626. }
  627. if !config.Quiet {
  628. config.print()
  629. }
  630. }
  631. // version outputs build information...
  632. func version() {
  633. fmt.Printf("munchclient %s, built on %s\n", Version, BuildDate)
  634. }