main.go 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902
  1. package main
  2. import (
  3. "bufio"
  4. "bytes"
  5. "crypto/sha1"
  6. "encoding/csv"
  7. "encoding/json"
  8. "flag"
  9. "fmt"
  10. "io"
  11. "log"
  12. "math/rand"
  13. "net"
  14. "net/http"
  15. "os"
  16. "regexp"
  17. "strings"
  18. "time"
  19. "github.com/BurntSushi/toml"
  20. "github.com/Songmu/axslogparser"
  21. "github.com/google/gopacket"
  22. "github.com/google/gopacket/layers"
  23. "github.com/google/gopacket/pcap"
  24. "github.com/hpcloud/tail"
  25. "github.com/kr/pretty"
  26. "github.com/nats-io/nats"
  27. "github.com/nats-io/nats/encoders/protobuf"
  28. "github.com/satyrius/gonx"
  29. "git.scraperwall.com/scw/ajp13"
  30. "git.scraperwall.com/scw/data"
  31. "git.scraperwall.com/scw/ip"
  32. )
  33. var (
  34. doLiveCapture = flag.Bool("live", false, "Capture data in real time from a given interface")
  35. iface = flag.String("interface", "eth0", "Interface to get packets from")
  36. snapshotLen = flag.Int("snapshot-len", 8192, "Snapshot Length in Bytes")
  37. filter = flag.String("filter", "tcp", "PCAP filter expression")
  38. promiscuous = flag.Bool("promiscuous", false, "Switch interface into promiscuous mode?")
  39. natsURL = flag.String("nats-url", "nats://127.0.0.1:4222", "The URL of the NATS server")
  40. natsUser = flag.String("nats-user", "", "The user for NATS authentication")
  41. natsPassword = flag.String("nats-password", "", "The password for NATS authentication")
  42. natsQueue = flag.String("nats-queue", "requests", "The NATS queue name")
  43. natsCA = flag.String("nats-ca", "", "CA chain for NATS TLS")
  44. sleepFor = flag.Duration("sleep", 0, "Sleep this long between sending data (only when replaying a file)")
  45. requestsFile = flag.String("requests", "", "CSV file containing requests (IP and URL)")
  46. protocol = flag.String("protocol", "http", "which protocol to parse: http or ajp13")
  47. useXForwardedAsSource = flag.Bool("use-x-forwarded", false, "Use the IP address in X-Forwarded-For as source")
  48. trace = flag.Bool("trace", false, "Trace the packet capturing")
  49. apacheLog = flag.String("apache-log", "", "Parse an Apache Log file")
  50. nginxLog = flag.String("nginx-log", "", "Nginx log file to tail")
  51. nginxFormat = flag.String("nginx-format", "", "The nginx log file format")
  52. hostName = flag.String("hostname", "", "Override the captured hostname with this one")
  53. accessWatchKey = flag.String("access-watch-key", "", "access.watch API key")
  54. configFile = flag.String("config", "", "The location of the TOML config file")
  55. beQuiet = flag.Bool("quiet", true, "Be quiet")
  56. doVersion = flag.Bool("version", false, "Show version information")
  57. natsEC *nats.EncodedConn
  58. natsJSONEC *nats.EncodedConn
  59. natsErrorChan chan error
  60. natsIsAvailable bool
  61. count uint64
  62. timeout = -1 * time.Second
  63. ipPriv *ip.IP
  64. config Config
  65. // Version contains the program Version, e.g. 1.0.1
  66. Version string
  67. // BuildDate contains the date and time at which the program was compiled
  68. BuildDate string
  69. )
  70. // Config contains the program configuration
  71. type Config struct {
  72. Live bool
  73. Interface string
  74. SnapshotLen int
  75. Filter string
  76. Promiscuous bool
  77. NatsURL string
  78. NatsQueue string
  79. NatsUser string
  80. NatsPassword string
  81. NatsCA string
  82. SleepFor duration
  83. RequestsFile string
  84. UseXForwardedAsSource bool
  85. Quiet bool
  86. Protocol string
  87. Trace bool
  88. ApacheLog string
  89. NginxLog string
  90. NginxLogFormat string
  91. HostName string
  92. AccessWatchKey string
  93. }
  94. type duration struct {
  95. time.Duration
  96. }
  97. func (d *duration) UnmarshalText(text []byte) error {
  98. var err error
  99. d.Duration, err = time.ParseDuration(string(text))
  100. return err
  101. }
  102. func (c Config) print() {
  103. fmt.Printf("Live: %t\n", c.Live)
  104. fmt.Printf("Interface: %s\n", c.Interface)
  105. fmt.Printf("SnapshotLen: %d\n", c.SnapshotLen)
  106. fmt.Printf("Filter: %s\n", c.Filter)
  107. fmt.Printf("Promiscuous: %t\n", c.Promiscuous)
  108. fmt.Printf("NatsURL: %s\n", c.NatsURL)
  109. fmt.Printf("NatsQueue: %s\n", c.NatsQueue)
  110. fmt.Printf("NatsUser: %s\n", c.NatsUser)
  111. fmt.Printf("NatsPassword: %s\n", c.NatsPassword)
  112. fmt.Printf("NatsCA: %s\n", c.NatsCA)
  113. fmt.Printf("SleepFor: %s\n", c.SleepFor.String())
  114. fmt.Printf("RequestsFile: %s\n", c.RequestsFile)
  115. fmt.Printf("Apache Log: %s\n", c.ApacheLog)
  116. fmt.Printf("Nginx Log: %s\n", c.NginxLog)
  117. fmt.Printf("Nginx Log Format: %s\n", c.NginxLogFormat)
  118. fmt.Printf("HostName: %s\n", c.HostName)
  119. fmt.Printf("AccessWatchKey: %s\n", c.AccessWatchKey)
  120. fmt.Printf("UseXForwardedAsSource: %t\n", c.UseXForwardedAsSource)
  121. fmt.Printf("Protocol: %s\n", c.Protocol)
  122. fmt.Printf("Quiet: %t\n", c.Quiet)
  123. fmt.Printf("Trace: %t\n", c.Trace)
  124. }
  125. func init() {
  126. flag.Parse()
  127. nats.RegisterEncoder(protobuf.PROTOBUF_ENCODER, &protobuf.ProtobufEncoder{})
  128. }
  129. func main() {
  130. if *doVersion {
  131. version()
  132. os.Exit(0)
  133. }
  134. loadConfig()
  135. // Output how many requests per second were sent
  136. if !config.Quiet {
  137. go func(c *uint64) {
  138. for {
  139. fmt.Printf("%d requests per second\n", *c)
  140. *c = 0
  141. time.Sleep(time.Second)
  142. }
  143. }(&count)
  144. }
  145. // NATS
  146. //
  147. if config.NatsURL == "" && config.AccessWatchKey == "" {
  148. log.Fatal("No NATS URL specified (-nats-url)!")
  149. }
  150. natsIsAvailable = false
  151. natsErrorChan = make(chan error, 1)
  152. err := connectToNATS()
  153. if err != nil && config.AccessWatchKey == "" {
  154. log.Fatal(err)
  155. }
  156. go natsWatchdog(natsErrorChan)
  157. // What should I do?
  158. if config.RequestsFile != "" {
  159. replayFile()
  160. } else if config.ApacheLog != "" {
  161. apacheLogCapture(config.ApacheLog)
  162. } else if config.Live {
  163. fmt.Printf("live capture (%s, %s) to %s\n", config.Interface, config.Filter, config.NatsURL)
  164. liveCapture()
  165. } else if config.NginxLog != "" && config.NginxLogFormat != "" {
  166. nginxLogCapture(config.NginxLog, config.NginxLogFormat)
  167. }
  168. }
  169. func natsWatchdog(closedChan chan error) {
  170. var lastError error
  171. for err := range closedChan {
  172. if lastError != err {
  173. lastError = err
  174. log.Println(err)
  175. }
  176. if err != nats.ErrConnectionClosed {
  177. continue
  178. }
  179. RECONNECT:
  180. for {
  181. log.Printf("Reconnecting to NATS at %s\n", *natsURL)
  182. err := connectToNATS()
  183. if err == nil {
  184. break RECONNECT
  185. }
  186. time.Sleep(1 * time.Second)
  187. }
  188. }
  189. }
  190. func connectToNATS() error {
  191. var natsConn *nats.Conn
  192. var err error
  193. if config.NatsUser != "" && config.NatsPassword != "" && config.NatsCA != "" {
  194. natsConn, err = nats.Connect(config.NatsURL, nats.UserInfo(config.NatsUser, config.NatsPassword), nats.RootCAs(config.NatsCA))
  195. } else {
  196. if config.NatsPassword != "" && config.NatsUser != "" {
  197. natsConn, err = nats.Connect(config.NatsURL, nats.UserInfo(config.NatsUser, config.NatsPassword))
  198. } else {
  199. natsConn, err = nats.Connect(config.NatsURL)
  200. }
  201. }
  202. if err != nil {
  203. return err
  204. }
  205. natsEC, err = nats.NewEncodedConn(natsConn, protobuf.PROTOBUF_ENCODER)
  206. if err != nil {
  207. return fmt.Errorf("Encoded Connection: %v", err)
  208. }
  209. natsJSONEC, err = nats.NewEncodedConn(natsConn, nats.JSON_ENCODER)
  210. if err != nil {
  211. return fmt.Errorf("Encoded Connection: %v", err)
  212. }
  213. natsIsAvailable = true
  214. return nil
  215. }
  216. func nginxLogCapture(logfile, format string) {
  217. if _, err := os.Stat(logfile); err != nil {
  218. log.Fatalf("%s: %s", logfile, err)
  219. }
  220. t, err := tail.TailFile(logfile, tail.Config{
  221. Follow: true, // follow the file
  222. ReOpen: true, // reopen log file when it gets closed/rotated
  223. Logger: tail.DiscardingLogger, // don't log anything
  224. Location: &tail.SeekInfo{Offset: 0, Whence: 2}, // start at the end of the file
  225. })
  226. if err != nil {
  227. log.Fatalf("%s: %s", logfile, err)
  228. }
  229. // `$remote_addr - $remote_user [$time_local] "$request" $status $body_bytes_sent "$http_referer" "$http_user_agent"`
  230. p := gonx.NewParser(format)
  231. reqRegexp := regexp.MustCompile(`^([A-Z]+)\s+(.+?)\s+(HTTP/\d+\.\d+)$`)
  232. for line := range t.Lines {
  233. l := line.Text
  234. logEntry, err := p.ParseString(l)
  235. if err != nil {
  236. log.Println(err)
  237. continue
  238. }
  239. if config.Trace {
  240. pretty.Println(logEntry)
  241. }
  242. remote, err := logEntry.Field("remote_addr")
  243. if err != nil {
  244. log.Println(err)
  245. continue
  246. }
  247. if config.UseXForwardedAsSource {
  248. xff, err := logEntry.Field("http_x_forwarded_for")
  249. if err != nil && xff != "" {
  250. ips := strings.Split(xff, ",")
  251. if len(ips) > 0 {
  252. remote = strings.TrimSpace(ips[0])
  253. }
  254. }
  255. }
  256. if remote == "" {
  257. log.Println("remote is empty: ignoring request.")
  258. continue
  259. }
  260. // only use the first host in case there are multiple hosts in the log
  261. if cidx := strings.Index(remote, ","); cidx >= 0 {
  262. remote = remote[0:cidx]
  263. }
  264. timestampStr, err := logEntry.Field("time_local")
  265. if err != nil {
  266. log.Println(err)
  267. continue
  268. }
  269. timeStamp, err := time.Parse("02/Jan/2006:15:04:05 -0700", timestampStr)
  270. if err != nil {
  271. log.Println(err)
  272. continue
  273. }
  274. httpRequest, err := logEntry.Field("request")
  275. if err != nil {
  276. log.Println(err)
  277. continue
  278. }
  279. reqData := reqRegexp.FindStringSubmatch(httpRequest)
  280. if len(reqData) < 4 {
  281. log.Printf("reqData is too short: %d instead of 4\n", len(reqData))
  282. continue
  283. }
  284. host := config.HostName
  285. if host == "" {
  286. host = "[not available]"
  287. }
  288. request := data.Request{
  289. IpSrc: remote,
  290. Origin: remote,
  291. Source: remote,
  292. IpDst: "127.0.0.1",
  293. PortSrc: 0,
  294. PortDst: 0,
  295. TcpSeq: 0,
  296. CreatedAt: timeStamp.Unix(),
  297. Url: reqData[2],
  298. Method: reqData[1],
  299. Host: host,
  300. Protocol: reqData[3],
  301. }
  302. request.Referer, _ = logEntry.Field("http_referer")
  303. request.UserAgent, _ = logEntry.Field("http_user_agent")
  304. if config.Trace {
  305. log.Printf("[%s] %s\n", request.Source, request.Url)
  306. }
  307. count++
  308. publishRequest(config.NatsQueue, &request)
  309. }
  310. }
  311. func apacheLogCapture(logfile string) {
  312. if _, err := os.Stat(logfile); err != nil {
  313. log.Fatalf("%s: %s", logfile, err)
  314. }
  315. t, err := tail.TailFile(logfile, tail.Config{
  316. Follow: true, // follow the file
  317. ReOpen: true, // reopen log file when it gets closed/rotated
  318. Logger: tail.DiscardingLogger, // don't log anything
  319. Location: &tail.SeekInfo{Offset: 0, Whence: 2}, // start at the end of the file
  320. })
  321. if err != nil {
  322. log.Fatalf("%s: %s", logfile, err)
  323. }
  324. var p axslogparser.Parser
  325. parserSet := false
  326. for line := range t.Lines {
  327. l := line.Text
  328. if !parserSet {
  329. p, _, err = axslogparser.GuessParser(l)
  330. if err != nil {
  331. log.Println(err)
  332. continue
  333. }
  334. parserSet = true
  335. }
  336. logEntry, err := p.Parse(l)
  337. if err != nil {
  338. log.Println(err)
  339. continue
  340. }
  341. remote := logEntry.Host
  342. if config.UseXForwardedAsSource && logEntry.ForwardedFor != "" {
  343. remote = logEntry.ForwardedFor
  344. }
  345. // only use the first host in case there are multiple hosts in the log
  346. if cidx := strings.Index(remote, ","); cidx >= 0 {
  347. remote = remote[0:cidx]
  348. }
  349. // extract the virtual host
  350. var virtualHost string
  351. vhost := logEntry.VirtualHost
  352. if vhost != "" {
  353. vhostAndPort := strings.Split(logEntry.VirtualHost, ":")
  354. virtualHost = vhostAndPort[0]
  355. } else {
  356. if config.HostName != "" {
  357. vhost = config.HostName
  358. } else {
  359. vhost = "[not available]"
  360. }
  361. }
  362. request := data.Request{
  363. IpSrc: remote,
  364. IpDst: "127.0.0.1",
  365. PortSrc: 0,
  366. PortDst: 0,
  367. TcpSeq: 0,
  368. CreatedAt: logEntry.Time.UnixNano(),
  369. Url: logEntry.RequestURI,
  370. Method: logEntry.Method,
  371. Host: virtualHost,
  372. Protocol: logEntry.Protocol,
  373. Origin: remote,
  374. Source: remote,
  375. Referer: logEntry.Referer,
  376. XForwardedFor: logEntry.ForwardedFor,
  377. UserAgent: logEntry.UserAgent,
  378. }
  379. if config.Trace {
  380. log.Printf("[%s] %s\n", request.Source, request.Url)
  381. }
  382. count++
  383. publishRequest(config.NatsQueue, &request)
  384. }
  385. }
  386. func liveCapture() {
  387. ipPriv = ip.NewIP()
  388. // PCAP setup
  389. //
  390. handle, err := pcap.OpenLive(config.Interface, int32(config.SnapshotLen), config.Promiscuous, timeout)
  391. if err != nil {
  392. log.Fatal(err)
  393. }
  394. defer handle.Close()
  395. err = handle.SetBPFFilter(config.Filter)
  396. if err != nil {
  397. log.Fatal(err)
  398. }
  399. packetSource := gopacket.NewPacketSource(handle, handle.LinkType())
  400. for packet := range packetSource.Packets() {
  401. go processPacket(packet)
  402. }
  403. }
  404. func writeLogToWatch(r *data.Request) {
  405. h := map[string]string{}
  406. if r.AcceptEncoding != "" {
  407. h["Accept-Encoding"] = r.AcceptEncoding
  408. }
  409. if r.Accept != "" {
  410. h["Accept"] = r.Accept
  411. }
  412. if r.AcceptLanguage != "" {
  413. h["Accept-Language"] = r.AcceptLanguage
  414. }
  415. if r.Cookie != "" {
  416. h["Cookie"] = r.Cookie
  417. }
  418. if r.Host != "" {
  419. h["Host"] = r.Host
  420. }
  421. if r.Referer != "" {
  422. h["Referer"] = r.Referer
  423. }
  424. if r.UserAgent != "" {
  425. h["User-Agent"] = r.UserAgent
  426. }
  427. if r.Via != "" {
  428. h["Via"] = r.Via
  429. }
  430. if r.XForwardedFor != "" {
  431. h["X-Forwarded-For"] = r.XForwardedFor
  432. }
  433. if r.XRequestedWith != "" {
  434. h["X-Requested-With"] = r.XRequestedWith
  435. }
  436. if r.XRequestedWith != "" {
  437. h["X-Requested-With"] = r.XRequestedWith
  438. }
  439. data := map[string]interface{}{
  440. "request": map[string]interface{}{
  441. "time": time.Unix(0, r.CreatedAt),
  442. "address": r.Source,
  443. // "scheme": r.Protocol,
  444. "method": r.Method,
  445. "url": r.Url,
  446. "headers": h,
  447. },
  448. "response": map[string]interface{}{"status": 200},
  449. }
  450. jdata, err := json.Marshal(data)
  451. client := &http.Client{}
  452. buf := bytes.NewBuffer(jdata)
  453. req, err := http.NewRequest("POST", "https://log.access.watch/1.1/log", buf)
  454. req.Header.Add("Api-Key", *accessWatchKey)
  455. resp, err := client.Do(req)
  456. if err != nil {
  457. log.Println(err)
  458. }
  459. resp.Body.Close()
  460. }
  461. func publishRequest(queue string, request *data.Request) {
  462. if config.AccessWatchKey != "" {
  463. writeLogToWatch(request)
  464. return
  465. }
  466. if !natsIsAvailable {
  467. return
  468. }
  469. if err := natsEC.Publish(config.NatsQueue, request); err != nil {
  470. natsErrorChan <- err
  471. if err == nats.ErrConnectionClosed {
  472. natsIsAvailable = false
  473. }
  474. }
  475. }
  476. // processPacket receives a raw packet from pcap, builds a Request item from it and sends it to the queue
  477. func processPacket(packet gopacket.Packet) {
  478. hasIPv4 := false
  479. var ipSrc, ipDst string
  480. // IPv4
  481. if ipLayer := packet.Layer(layers.LayerTypeIPv4); ipLayer != nil {
  482. ip := ipLayer.(*layers.IPv4)
  483. ipSrc = ip.SrcIP.String()
  484. ipDst = ip.DstIP.String()
  485. hasIPv4 = true
  486. }
  487. // IPv6
  488. if !hasIPv4 {
  489. if ipLayer := packet.Layer(layers.LayerTypeIPv6); ipLayer != nil {
  490. ip := ipLayer.(*layers.IPv6)
  491. ipSrc = ip.SrcIP.String()
  492. ipDst = ip.DstIP.String()
  493. }
  494. }
  495. // TCP
  496. tcpLayer := packet.Layer(layers.LayerTypeTCP)
  497. if tcpLayer == nil {
  498. return
  499. }
  500. tcp, _ := tcpLayer.(*layers.TCP)
  501. portSrc := tcp.SrcPort
  502. portDst := tcp.DstPort
  503. sequence := tcp.Seq
  504. applicationLayer := packet.ApplicationLayer()
  505. if applicationLayer == nil {
  506. return
  507. }
  508. count++
  509. if len(applicationLayer.Payload()) < 50 {
  510. log.Println("application layer too small!")
  511. return
  512. }
  513. request := data.Request{
  514. IpSrc: ipSrc,
  515. IpDst: ipDst,
  516. PortSrc: uint32(portSrc),
  517. PortDst: uint32(portDst),
  518. TcpSeq: uint32(sequence),
  519. CreatedAt: packet.Metadata().CaptureInfo.Timestamp.UnixNano(),
  520. }
  521. switch config.Protocol {
  522. case "http":
  523. err := processHTTP(&request, applicationLayer.Payload())
  524. if err != nil {
  525. log.Println(err)
  526. return
  527. }
  528. case "ajp13":
  529. err := processAJP13(&request, applicationLayer.Payload())
  530. if err != nil {
  531. log.Println(err)
  532. return
  533. }
  534. }
  535. if config.UseXForwardedAsSource && request.XForwardedFor != "" {
  536. if strings.Contains(request.XForwardedFor, ",") {
  537. ips := strings.Split(request.XForwardedFor, ",")
  538. for i := len(ips) - 1; i >= 0; i-- {
  539. ipRaw := strings.TrimSpace(ips[i])
  540. ipAddr := net.ParseIP(ipRaw)
  541. if ipAddr != nil && !ipPriv.IsPrivate(ipAddr) {
  542. request.Source = ipRaw
  543. break
  544. }
  545. }
  546. } else {
  547. ipAddr := net.ParseIP(strings.TrimSpace(request.XForwardedFor))
  548. if !ipPriv.IsPrivate(ipAddr) {
  549. request.Source = request.XForwardedFor
  550. }
  551. }
  552. }
  553. if request.Source == request.IpSrc && request.XRealIP != "" {
  554. request.Source = request.XRealIP
  555. }
  556. if config.Trace {
  557. log.Printf("[%s] %s\n", request.Source, request.Url)
  558. }
  559. publishRequest(config.NatsQueue, &request)
  560. }
  561. func processAJP13(request *data.Request, appData []byte) error {
  562. a, err := ajp13.Parse(appData)
  563. if err != nil {
  564. return fmt.Errorf("Failed to parse AJP13 request: %s", err)
  565. }
  566. request.Url = a.URI
  567. request.Method = a.Method()
  568. request.Host = a.Server
  569. request.Protocol = a.Version
  570. request.Origin = a.RemoteAddr.String()
  571. request.Source = a.RemoteAddr.String()
  572. if v, ok := a.Header("Referer"); ok {
  573. request.Referer = v
  574. }
  575. if v, ok := a.Header("Connection"); ok {
  576. request.Connection = v
  577. }
  578. if v, ok := a.Header("X-Forwarded-For"); ok {
  579. request.XForwardedFor = v
  580. }
  581. if v, ok := a.Header("X-Real-IP"); ok {
  582. request.XRealIP = v
  583. }
  584. if v, ok := a.Header("X-Requested-With"); ok {
  585. request.XRequestedWith = v
  586. }
  587. if v, ok := a.Header("Accept-Encoding"); ok {
  588. request.AcceptEncoding = v
  589. }
  590. if v, ok := a.Header("Accept-Language"); ok {
  591. request.AcceptLanguage = v
  592. }
  593. if v, ok := a.Header("User-Agent"); ok {
  594. request.UserAgent = v
  595. }
  596. if v, ok := a.Header("Accept"); ok {
  597. request.Accept = v
  598. }
  599. if v, ok := a.Header("Cookie"); ok {
  600. request.Cookie = v
  601. }
  602. if v, ok := a.Header("X-Forwarded-Host"); ok {
  603. if v != request.Host {
  604. request.Host = v
  605. }
  606. }
  607. return nil
  608. }
  609. func processHTTP(request *data.Request, appData []byte) error {
  610. reader := bufio.NewReader(strings.NewReader(string(appData)))
  611. req, err := http.ReadRequest(reader)
  612. if err != nil {
  613. return fmt.Errorf("Failed to parse HTTP header: %s", err)
  614. }
  615. request.Url = req.URL.String()
  616. request.Method = req.Method
  617. request.Referer = req.Referer()
  618. request.Host = req.Host
  619. request.Protocol = req.Proto
  620. request.Origin = request.Host
  621. if _, ok := req.Header["Connection"]; ok {
  622. request.Connection = req.Header["Connection"][0]
  623. }
  624. if _, ok := req.Header["X-Forwarded-For"]; ok {
  625. request.XForwardedFor = req.Header["X-Forwarded-For"][0]
  626. }
  627. // CloudFlare: override X-Forwarded for since it is tainted by cloudflare
  628. if _, ok := req.Header["True-Client-Ip"]; ok {
  629. request.XForwardedFor = req.Header["True-Client-Ip"][0]
  630. }
  631. if _, ok := req.Header["X-Real-Ip"]; ok {
  632. request.XRealIP = req.Header["X-Real-Ip"][0]
  633. }
  634. if _, ok := req.Header["X-Requested-With"]; ok {
  635. request.XRequestedWith = req.Header["X-Requested-With"][0]
  636. }
  637. if _, ok := req.Header["Accept-Encoding"]; ok {
  638. request.AcceptEncoding = req.Header["Accept-Encoding"][0]
  639. }
  640. if _, ok := req.Header["Accept-Language"]; ok {
  641. request.AcceptLanguage = req.Header["Accept-Language"][0]
  642. }
  643. if _, ok := req.Header["User-Agent"]; ok {
  644. request.UserAgent = req.Header["User-Agent"][0]
  645. }
  646. if _, ok := req.Header["Accept"]; ok {
  647. request.Accept = req.Header["Accept"][0]
  648. }
  649. if _, ok := req.Header["Cookie"]; ok {
  650. request.Cookie = req.Header["Cookie"][0]
  651. }
  652. request.Source = request.IpSrc
  653. return nil
  654. }
  655. // replayFile takes a file containing a list of requests (SourceIP Url) and queues the requests
  656. // e.g.
  657. // 157.55.39.229 /gross-gerau/12012260-beate-anstatt
  658. // 103.232.100.98 /weinsheim-eifel/13729444-plus-warenhandelsges-mbh
  659. func replayFile() {
  660. var req data.Request
  661. var startTs time.Time
  662. var endTs time.Time
  663. rand.Seed(time.Now().UnixNano())
  664. for {
  665. fh, err := os.Open(config.RequestsFile)
  666. if err != nil {
  667. log.Fatalf("Failed to open request file '%s': %s", config.RequestsFile, err)
  668. }
  669. c := csv.NewReader(fh)
  670. c.Comma = ' '
  671. for {
  672. if config.SleepFor.Duration > time.Nanosecond {
  673. startTs = time.Now()
  674. }
  675. r, err := c.Read()
  676. if err == io.EOF {
  677. break
  678. }
  679. if err != nil {
  680. log.Println(err)
  681. continue
  682. }
  683. req.IpSrc = r[0]
  684. req.Source = r[0]
  685. req.Url = r[1]
  686. req.UserAgent = "Munch/1.0"
  687. req.Host = "demo.scraperwall.com"
  688. req.CreatedAt = time.Now().UnixNano()
  689. publishRequest(config.NatsQueue, &req)
  690. if strings.Index(r[1], ".") < 0 {
  691. hash := sha1.New()
  692. io.WriteString(hash, r[0])
  693. fp := data.Fingerprint{
  694. ClientID: "scw",
  695. Fingerprint: fmt.Sprintf("%x", hash.Sum(nil)),
  696. Remote: r[0],
  697. Url: r[1],
  698. Source: r[0],
  699. CreatedAt: time.Now(),
  700. }
  701. if strings.HasPrefix(r[0], "50.31.") {
  702. fp.Fingerprint = "a1f2c2ee560ce6580d66d451a9c8dfbf"
  703. natsJSONEC.Publish("fingerprints_scw", fp)
  704. } else if rand.Intn(10) < 5 {
  705. natsJSONEC.Publish("fingerprints_scw", fp)
  706. }
  707. }
  708. count++
  709. if config.SleepFor.Duration >= time.Nanosecond {
  710. endTs = time.Now()
  711. if endTs.Before(startTs.Add(config.SleepFor.Duration)) {
  712. time.Sleep(config.SleepFor.Duration - endTs.Sub(startTs))
  713. }
  714. }
  715. }
  716. }
  717. }
  718. func loadConfig() {
  719. // initialize with values from the command line / environment
  720. config.Live = *doLiveCapture
  721. config.Interface = *iface
  722. config.SnapshotLen = *snapshotLen
  723. config.Filter = *filter
  724. config.Promiscuous = *promiscuous
  725. config.NatsURL = *natsURL
  726. config.NatsQueue = *natsQueue
  727. config.NatsUser = *natsUser
  728. config.NatsPassword = *natsPassword
  729. config.NatsCA = *natsCA
  730. config.SleepFor.Duration = *sleepFor
  731. config.RequestsFile = *requestsFile
  732. config.UseXForwardedAsSource = *useXForwardedAsSource
  733. config.Protocol = *protocol
  734. config.ApacheLog = *apacheLog
  735. config.NginxLog = *nginxLog
  736. config.NginxLogFormat = *nginxFormat
  737. config.HostName = *hostName
  738. config.Quiet = *beQuiet
  739. config.Trace = *trace
  740. config.AccessWatchKey = *accessWatchKey
  741. if *configFile == "" {
  742. return
  743. }
  744. _, err := os.Stat(*configFile)
  745. if err != nil {
  746. log.Printf("%s: %s\n", *configFile, err)
  747. return
  748. }
  749. if _, err = toml.DecodeFile(*configFile, &config); err != nil {
  750. log.Printf("%s: %s\n", *configFile, err)
  751. }
  752. if !config.Quiet {
  753. config.print()
  754. }
  755. }
  756. // version outputs build information...
  757. func version() {
  758. fmt.Printf("munchclient %s, built on %s\n", Version, BuildDate)
  759. }