main.go 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894
  1. package main
  2. import (
  3. "bufio"
  4. "bytes"
  5. "crypto/sha1"
  6. "encoding/csv"
  7. "encoding/json"
  8. "flag"
  9. "fmt"
  10. "io"
  11. "log"
  12. "math/rand"
  13. "net"
  14. "net/http"
  15. "os"
  16. "regexp"
  17. "strings"
  18. "time"
  19. "github.com/BurntSushi/toml"
  20. "github.com/Songmu/axslogparser"
  21. "github.com/google/gopacket"
  22. "github.com/google/gopacket/layers"
  23. "github.com/google/gopacket/pcap"
  24. "github.com/hpcloud/tail"
  25. "github.com/nats-io/nats"
  26. "github.com/nats-io/nats/encoders/protobuf"
  27. "github.com/satyrius/gonx"
  28. "git.scraperwall.com/scw/ajp13"
  29. "git.scraperwall.com/scw/data"
  30. "git.scraperwall.com/scw/ip"
  31. )
  32. var (
  33. doLiveCapture = flag.Bool("live", false, "Capture data in real time from a given interface")
  34. iface = flag.String("interface", "eth0", "Interface to get packets from")
  35. snapshotLen = flag.Int("snapshot-len", 8192, "Snapshot Length in Bytes")
  36. filter = flag.String("filter", "tcp", "PCAP filter expression")
  37. promiscuous = flag.Bool("promiscuous", false, "Switch interface into promiscuous mode?")
  38. natsURL = flag.String("nats-url", "nats://127.0.0.1:4222", "The URL of the NATS server")
  39. natsUser = flag.String("nats-user", "", "The user for NATS authentication")
  40. natsPassword = flag.String("nats-password", "", "The password for NATS authentication")
  41. natsQueue = flag.String("nats-queue", "requests", "The NATS queue name")
  42. natsCA = flag.String("nats-ca", "", "CA chain for NATS TLS")
  43. sleepFor = flag.Duration("sleep", 0, "Sleep this long between sending data (only when replaying a file)")
  44. requestsFile = flag.String("requests", "", "CSV file containing requests (IP and URL)")
  45. protocol = flag.String("protocol", "http", "which protocol to parse: http or ajp13")
  46. useXForwardedAsSource = flag.Bool("use-x-forwarded", false, "Use the IP address in X-Forwarded-For as source")
  47. trace = flag.Bool("trace", false, "Trace the packet capturing")
  48. apacheLog = flag.String("apache-log", "", "Parse an Apache Log file")
  49. nginxLog = flag.String("nginx-log", "", "Nginx log file to tail")
  50. nginxFormat = flag.String("nginx-format", "", "The nginx log file format")
  51. hostName = flag.String("hostname", "", "Override the captured hostname with this one")
  52. accessWatchKey = flag.String("access-watch-key", "", "access.watch API key")
  53. configFile = flag.String("config", "", "The location of the TOML config file")
  54. beQuiet = flag.Bool("quiet", true, "Be quiet")
  55. doVersion = flag.Bool("version", false, "Show version information")
  56. natsEC *nats.EncodedConn
  57. natsJSONEC *nats.EncodedConn
  58. natsErrorChan chan error
  59. natsIsAvailable bool
  60. count uint64
  61. timeout = -1 * time.Second
  62. ipPriv *ip.IP
  63. config Config
  64. // Version contains the program Version, e.g. 1.0.1
  65. Version string
  66. // BuildDate contains the date and time at which the program was compiled
  67. BuildDate string
  68. )
  69. // Config contains the program configuration
  70. type Config struct {
  71. Live bool
  72. Interface string
  73. SnapshotLen int
  74. Filter string
  75. Promiscuous bool
  76. NatsURL string
  77. NatsQueue string
  78. NatsUser string
  79. NatsPassword string
  80. NatsCA string
  81. SleepFor duration
  82. RequestsFile string
  83. UseXForwardedAsSource bool
  84. Quiet bool
  85. Protocol string
  86. Trace bool
  87. ApacheLog string
  88. NginxLog string
  89. NginxLogFormat string
  90. HostName string
  91. AccessWatchKey string
  92. }
  93. type duration struct {
  94. time.Duration
  95. }
  96. func (d *duration) UnmarshalText(text []byte) error {
  97. var err error
  98. d.Duration, err = time.ParseDuration(string(text))
  99. return err
  100. }
  101. func (c Config) print() {
  102. fmt.Printf("Live: %t\n", c.Live)
  103. fmt.Printf("Interface: %s\n", c.Interface)
  104. fmt.Printf("SnapshotLen: %d\n", c.SnapshotLen)
  105. fmt.Printf("Filter: %s\n", c.Filter)
  106. fmt.Printf("Promiscuous: %t\n", c.Promiscuous)
  107. fmt.Printf("NatsURL: %s\n", c.NatsURL)
  108. fmt.Printf("NatsQueue: %s\n", c.NatsQueue)
  109. fmt.Printf("NatsUser: %s\n", c.NatsUser)
  110. fmt.Printf("NatsPassword: %s\n", c.NatsPassword)
  111. fmt.Printf("NatsCA: %s\n", c.NatsCA)
  112. fmt.Printf("SleepFor: %s\n", c.SleepFor.String())
  113. fmt.Printf("RequestsFile: %s\n", c.RequestsFile)
  114. fmt.Printf("Apache Log: %s\n", c.ApacheLog)
  115. fmt.Printf("Nginx Log: %s\n", c.NginxLog)
  116. fmt.Printf("Nginx Log Format: %s\n", c.NginxLogFormat)
  117. fmt.Printf("HostName: %s\n", c.HostName)
  118. fmt.Printf("AccessWatchKey: %s\n", c.AccessWatchKey)
  119. fmt.Printf("UseXForwardedAsSource: %t\n", c.UseXForwardedAsSource)
  120. fmt.Printf("Protocol: %s\n", c.Protocol)
  121. fmt.Printf("Quiet: %t\n", c.Quiet)
  122. fmt.Printf("Trace: %t\n", c.Trace)
  123. }
  124. func init() {
  125. flag.Parse()
  126. nats.RegisterEncoder(protobuf.PROTOBUF_ENCODER, &protobuf.ProtobufEncoder{})
  127. }
  128. func main() {
  129. if *doVersion {
  130. version()
  131. os.Exit(0)
  132. }
  133. loadConfig()
  134. // Output how many requests per second were sent
  135. if !config.Quiet {
  136. go func(c *uint64) {
  137. for {
  138. fmt.Printf("%d requests per second\n", *c)
  139. *c = 0
  140. time.Sleep(time.Second)
  141. }
  142. }(&count)
  143. }
  144. // NATS
  145. //
  146. if config.NatsURL == "" && config.AccessWatchKey == "" {
  147. log.Fatal("No NATS URL specified (-nats-url)!")
  148. }
  149. natsIsAvailable = false
  150. natsErrorChan = make(chan error, 1)
  151. err := connectToNATS()
  152. if err != nil && config.AccessWatchKey == "" {
  153. log.Fatal(err)
  154. }
  155. go natsWatchdog(natsErrorChan)
  156. // What should I do?
  157. if config.RequestsFile != "" {
  158. replayFile()
  159. } else if config.ApacheLog != "" {
  160. apacheLogCapture(config.ApacheLog)
  161. } else if config.Live {
  162. fmt.Printf("live capture (%s, %s) to %s\n", config.Interface, config.Filter, config.NatsURL)
  163. liveCapture()
  164. } else if config.NginxLog != "" && config.NginxLogFormat != "" {
  165. nginxLogCapture(config.NginxLog, config.NginxLogFormat)
  166. }
  167. }
  168. func natsWatchdog(closedChan chan error) {
  169. var lastError error
  170. for err := range closedChan {
  171. if lastError != err {
  172. lastError = err
  173. log.Println(err)
  174. }
  175. if err != nats.ErrConnectionClosed {
  176. continue
  177. }
  178. RECONNECT:
  179. for {
  180. log.Printf("Reconnecting to NATS at %s\n", *natsURL)
  181. err := connectToNATS()
  182. if err == nil {
  183. break RECONNECT
  184. }
  185. time.Sleep(1 * time.Second)
  186. }
  187. }
  188. }
  189. func connectToNATS() error {
  190. var natsConn *nats.Conn
  191. var err error
  192. if config.NatsUser != "" && config.NatsPassword != "" && config.NatsCA != "" {
  193. natsConn, err = nats.Connect(config.NatsURL, nats.UserInfo(config.NatsUser, config.NatsPassword), nats.RootCAs(config.NatsCA))
  194. } else {
  195. if config.NatsPassword != "" && config.NatsUser != "" {
  196. natsConn, err = nats.Connect(config.NatsURL, nats.UserInfo(config.NatsUser, config.NatsPassword))
  197. } else {
  198. natsConn, err = nats.Connect(config.NatsURL)
  199. }
  200. }
  201. if err != nil {
  202. return err
  203. }
  204. natsEC, err = nats.NewEncodedConn(natsConn, protobuf.PROTOBUF_ENCODER)
  205. if err != nil {
  206. return fmt.Errorf("Encoded Connection: %v", err)
  207. }
  208. natsJSONEC, err = nats.NewEncodedConn(natsConn, nats.JSON_ENCODER)
  209. if err != nil {
  210. return fmt.Errorf("Encoded Connection: %v", err)
  211. }
  212. natsIsAvailable = true
  213. return nil
  214. }
  215. func nginxLogCapture(logfile, format string) {
  216. if _, err := os.Stat(logfile); err != nil {
  217. log.Fatalf("%s: %s", logfile, err)
  218. }
  219. t, err := tail.TailFile(logfile, tail.Config{
  220. Follow: true, // follow the file
  221. ReOpen: true, // reopen log file when it gets closed/rotated
  222. Logger: tail.DiscardingLogger, // don't log anything
  223. Location: &tail.SeekInfo{Offset: 0, Whence: 2}, // start at the end of the file
  224. })
  225. if err != nil {
  226. log.Fatalf("%s: %s", logfile, err)
  227. }
  228. // `$remote_addr - $remote_user [$time_local] "$request" $status $body_bytes_sent "$http_referer" "$http_user_agent"`
  229. p := gonx.NewParser(format)
  230. reqRegexp := regexp.MustCompile(`^([A-Z]+)\s+(.+?)\s+(HTTP/\d+\.\d+)$`)
  231. for line := range t.Lines {
  232. l := line.Text
  233. logEntry, err := p.ParseString(l)
  234. if err != nil {
  235. log.Println(err)
  236. continue
  237. }
  238. remote, err := logEntry.Field("remote_addr")
  239. if err != nil {
  240. log.Println(err)
  241. continue
  242. }
  243. if *useXForwardedAsSource {
  244. xff, err := logEntry.Field("http_x_forwarded_for")
  245. if err != nil {
  246. remote = xff
  247. }
  248. }
  249. if remote == "" {
  250. log.Println("remote is empty: ignoring request.")
  251. continue
  252. }
  253. // only use the first host in case there are multiple hosts in the log
  254. if cidx := strings.Index(remote, ","); cidx >= 0 {
  255. remote = remote[0:cidx]
  256. }
  257. timestampStr, err := logEntry.Field("time_local")
  258. if err != nil {
  259. log.Println(err)
  260. continue
  261. }
  262. timeStamp, err := time.Parse("02/Jan/2006:15:04:05 -0700", timestampStr)
  263. if err != nil {
  264. log.Println(err)
  265. continue
  266. }
  267. httpRequest, err := logEntry.Field("request")
  268. if err != nil {
  269. log.Println(err)
  270. continue
  271. }
  272. reqData := reqRegexp.FindStringSubmatch(httpRequest)
  273. if len(reqData) < 4 {
  274. log.Printf("reqData is too short: %d instead of 4\n", len(reqData))
  275. continue
  276. }
  277. host := config.HostName
  278. if host == "" {
  279. host = "[not available]"
  280. }
  281. request := data.Request{
  282. IpSrc: remote,
  283. Origin: remote,
  284. Source: remote,
  285. IpDst: "127.0.0.1",
  286. PortSrc: 0,
  287. PortDst: 0,
  288. TcpSeq: 0,
  289. CreatedAt: timeStamp.Unix(),
  290. Url: reqData[2],
  291. Method: reqData[1],
  292. Host: host,
  293. Protocol: reqData[3],
  294. }
  295. request.Referer, _ = logEntry.Field("http_referer")
  296. request.UserAgent, _ = logEntry.Field("http_user_agent")
  297. if config.Trace {
  298. log.Printf("[%s] %s\n", request.Source, request.Url)
  299. }
  300. count++
  301. publishRequest(config.NatsQueue, &request)
  302. }
  303. }
  304. func apacheLogCapture(logfile string) {
  305. if _, err := os.Stat(logfile); err != nil {
  306. log.Fatalf("%s: %s", logfile, err)
  307. }
  308. t, err := tail.TailFile(logfile, tail.Config{
  309. Follow: true, // follow the file
  310. ReOpen: true, // reopen log file when it gets closed/rotated
  311. Logger: tail.DiscardingLogger, // don't log anything
  312. Location: &tail.SeekInfo{Offset: 0, Whence: 2}, // start at the end of the file
  313. })
  314. if err != nil {
  315. log.Fatalf("%s: %s", logfile, err)
  316. }
  317. var p axslogparser.Parser
  318. parserSet := false
  319. for line := range t.Lines {
  320. l := line.Text
  321. if !parserSet {
  322. p, _, err = axslogparser.GuessParser(l)
  323. if err != nil {
  324. log.Println(err)
  325. continue
  326. }
  327. parserSet = true
  328. }
  329. logEntry, err := p.Parse(l)
  330. if err != nil {
  331. log.Println(err)
  332. continue
  333. }
  334. remote := logEntry.Host
  335. if *useXForwardedAsSource && logEntry.ForwardedFor != "" {
  336. remote = logEntry.ForwardedFor
  337. }
  338. // only use the first host in case there are multiple hosts in the log
  339. if cidx := strings.Index(remote, ","); cidx >= 0 {
  340. remote = remote[0:cidx]
  341. }
  342. // extract the virtual host
  343. var virtualHost string
  344. vhost := logEntry.VirtualHost
  345. if vhost != "" {
  346. vhostAndPort := strings.Split(logEntry.VirtualHost, ":")
  347. virtualHost = vhostAndPort[0]
  348. } else {
  349. if config.HostName != "" {
  350. vhost = config.HostName
  351. } else {
  352. vhost = "[not available]"
  353. }
  354. }
  355. request := data.Request{
  356. IpSrc: remote,
  357. IpDst: "127.0.0.1",
  358. PortSrc: 0,
  359. PortDst: 0,
  360. TcpSeq: 0,
  361. CreatedAt: logEntry.Time.UnixNano(),
  362. Url: logEntry.RequestURI,
  363. Method: logEntry.Method,
  364. Host: virtualHost,
  365. Protocol: logEntry.Protocol,
  366. Origin: remote,
  367. Source: remote,
  368. Referer: logEntry.Referer,
  369. XForwardedFor: logEntry.ForwardedFor,
  370. UserAgent: logEntry.UserAgent,
  371. }
  372. if config.Trace {
  373. log.Printf("[%s] %s\n", request.Source, request.Url)
  374. }
  375. count++
  376. publishRequest(config.NatsQueue, &request)
  377. }
  378. }
  379. func liveCapture() {
  380. ipPriv = ip.NewIP()
  381. // PCAP setup
  382. //
  383. handle, err := pcap.OpenLive(config.Interface, int32(config.SnapshotLen), config.Promiscuous, timeout)
  384. if err != nil {
  385. log.Fatal(err)
  386. }
  387. defer handle.Close()
  388. err = handle.SetBPFFilter(config.Filter)
  389. if err != nil {
  390. log.Fatal(err)
  391. }
  392. packetSource := gopacket.NewPacketSource(handle, handle.LinkType())
  393. for packet := range packetSource.Packets() {
  394. go processPacket(packet)
  395. }
  396. }
  397. func writeLogToWatch(r *data.Request) {
  398. h := map[string]string{}
  399. if r.AcceptEncoding != "" {
  400. h["Accept-Encoding"] = r.AcceptEncoding
  401. }
  402. if r.Accept != "" {
  403. h["Accept"] = r.Accept
  404. }
  405. if r.AcceptLanguage != "" {
  406. h["Accept-Language"] = r.AcceptLanguage
  407. }
  408. if r.Cookie != "" {
  409. h["Cookie"] = r.Cookie
  410. }
  411. if r.Host != "" {
  412. h["Host"] = r.Host
  413. }
  414. if r.Referer != "" {
  415. h["Referer"] = r.Referer
  416. }
  417. if r.UserAgent != "" {
  418. h["User-Agent"] = r.UserAgent
  419. }
  420. if r.Via != "" {
  421. h["Via"] = r.Via
  422. }
  423. if r.XForwardedFor != "" {
  424. h["X-Forwarded-For"] = r.XForwardedFor
  425. }
  426. if r.XRequestedWith != "" {
  427. h["X-Requested-With"] = r.XRequestedWith
  428. }
  429. if r.XRequestedWith != "" {
  430. h["X-Requested-With"] = r.XRequestedWith
  431. }
  432. data := map[string]interface{}{
  433. "request": map[string]interface{}{
  434. "time": time.Unix(0, r.CreatedAt),
  435. "address": r.Source,
  436. // "scheme": r.Protocol,
  437. "method": r.Method,
  438. "url": r.Url,
  439. "headers": h,
  440. },
  441. "response": map[string]interface{}{"status": 200},
  442. }
  443. jdata, err := json.Marshal(data)
  444. client := &http.Client{}
  445. buf := bytes.NewBuffer(jdata)
  446. req, err := http.NewRequest("POST", "https://log.access.watch/1.1/log", buf)
  447. req.Header.Add("Api-Key", *accessWatchKey)
  448. resp, err := client.Do(req)
  449. if err != nil {
  450. log.Println(err)
  451. }
  452. resp.Body.Close()
  453. }
  454. func publishRequest(queue string, request *data.Request) {
  455. if config.AccessWatchKey != "" {
  456. writeLogToWatch(request)
  457. return
  458. }
  459. if !natsIsAvailable {
  460. return
  461. }
  462. if err := natsEC.Publish(config.NatsQueue, request); err != nil {
  463. natsErrorChan <- err
  464. if err == nats.ErrConnectionClosed {
  465. natsIsAvailable = false
  466. }
  467. }
  468. }
  469. // processPacket receives a raw packet from pcap, builds a Request item from it and sends it to the queue
  470. func processPacket(packet gopacket.Packet) {
  471. hasIPv4 := false
  472. var ipSrc, ipDst string
  473. // IPv4
  474. if ipLayer := packet.Layer(layers.LayerTypeIPv4); ipLayer != nil {
  475. ip := ipLayer.(*layers.IPv4)
  476. ipSrc = ip.SrcIP.String()
  477. ipDst = ip.DstIP.String()
  478. hasIPv4 = true
  479. }
  480. // IPv6
  481. if !hasIPv4 {
  482. if ipLayer := packet.Layer(layers.LayerTypeIPv6); ipLayer != nil {
  483. ip := ipLayer.(*layers.IPv6)
  484. ipSrc = ip.SrcIP.String()
  485. ipDst = ip.DstIP.String()
  486. }
  487. }
  488. // TCP
  489. tcpLayer := packet.Layer(layers.LayerTypeTCP)
  490. if tcpLayer == nil {
  491. return
  492. }
  493. tcp, _ := tcpLayer.(*layers.TCP)
  494. portSrc := tcp.SrcPort
  495. portDst := tcp.DstPort
  496. sequence := tcp.Seq
  497. applicationLayer := packet.ApplicationLayer()
  498. if applicationLayer == nil {
  499. return
  500. }
  501. count++
  502. if len(applicationLayer.Payload()) < 50 {
  503. log.Println("application layer too small!")
  504. return
  505. }
  506. request := data.Request{
  507. IpSrc: ipSrc,
  508. IpDst: ipDst,
  509. PortSrc: uint32(portSrc),
  510. PortDst: uint32(portDst),
  511. TcpSeq: uint32(sequence),
  512. CreatedAt: packet.Metadata().CaptureInfo.Timestamp.UnixNano(),
  513. }
  514. switch config.Protocol {
  515. case "http":
  516. err := processHTTP(&request, applicationLayer.Payload())
  517. if err != nil {
  518. log.Println(err)
  519. return
  520. }
  521. case "ajp13":
  522. err := processAJP13(&request, applicationLayer.Payload())
  523. if err != nil {
  524. log.Println(err)
  525. return
  526. }
  527. }
  528. if config.UseXForwardedAsSource && request.XForwardedFor != "" {
  529. if strings.Contains(request.XForwardedFor, ",") {
  530. ips := strings.Split(request.XForwardedFor, ",")
  531. for i := len(ips) - 1; i >= 0; i-- {
  532. ipRaw := strings.TrimSpace(ips[i])
  533. ipAddr := net.ParseIP(ipRaw)
  534. if ipAddr != nil && !ipPriv.IsPrivate(ipAddr) {
  535. request.Source = ipRaw
  536. break
  537. }
  538. }
  539. } else {
  540. ipAddr := net.ParseIP(strings.TrimSpace(request.XForwardedFor))
  541. if !ipPriv.IsPrivate(ipAddr) {
  542. request.Source = request.XForwardedFor
  543. }
  544. }
  545. }
  546. if request.Source == request.IpSrc && request.XRealIP != "" {
  547. request.Source = request.XRealIP
  548. }
  549. if config.Trace {
  550. log.Printf("[%s] %s\n", request.Source, request.Url)
  551. }
  552. publishRequest(config.NatsQueue, &request)
  553. }
  554. func processAJP13(request *data.Request, appData []byte) error {
  555. a, err := ajp13.Parse(appData)
  556. if err != nil {
  557. return fmt.Errorf("Failed to parse AJP13 request: %s", err)
  558. }
  559. request.Url = a.URI
  560. request.Method = a.Method()
  561. request.Host = a.Server
  562. request.Protocol = a.Version
  563. request.Origin = a.RemoteAddr.String()
  564. request.Source = a.RemoteAddr.String()
  565. if v, ok := a.Header("Referer"); ok {
  566. request.Referer = v
  567. }
  568. if v, ok := a.Header("Connection"); ok {
  569. request.Connection = v
  570. }
  571. if v, ok := a.Header("X-Forwarded-For"); ok {
  572. request.XForwardedFor = v
  573. }
  574. if v, ok := a.Header("X-Real-IP"); ok {
  575. request.XRealIP = v
  576. }
  577. if v, ok := a.Header("X-Requested-With"); ok {
  578. request.XRequestedWith = v
  579. }
  580. if v, ok := a.Header("Accept-Encoding"); ok {
  581. request.AcceptEncoding = v
  582. }
  583. if v, ok := a.Header("Accept-Language"); ok {
  584. request.AcceptLanguage = v
  585. }
  586. if v, ok := a.Header("User-Agent"); ok {
  587. request.UserAgent = v
  588. }
  589. if v, ok := a.Header("Accept"); ok {
  590. request.Accept = v
  591. }
  592. if v, ok := a.Header("Cookie"); ok {
  593. request.Cookie = v
  594. }
  595. if v, ok := a.Header("X-Forwarded-Host"); ok {
  596. if v != request.Host {
  597. request.Host = v
  598. }
  599. }
  600. return nil
  601. }
  602. func processHTTP(request *data.Request, appData []byte) error {
  603. reader := bufio.NewReader(strings.NewReader(string(appData)))
  604. req, err := http.ReadRequest(reader)
  605. if err != nil {
  606. return fmt.Errorf("Failed to parse HTTP header: %s", err)
  607. }
  608. request.Url = req.URL.String()
  609. request.Method = req.Method
  610. request.Referer = req.Referer()
  611. request.Host = req.Host
  612. request.Protocol = req.Proto
  613. request.Origin = request.Host
  614. if _, ok := req.Header["Connection"]; ok {
  615. request.Connection = req.Header["Connection"][0]
  616. }
  617. if _, ok := req.Header["X-Forwarded-For"]; ok {
  618. request.XForwardedFor = req.Header["X-Forwarded-For"][0]
  619. }
  620. // CloudFlare: override X-Forwarded for since it is tainted by cloudflare
  621. if _, ok := req.Header["True-Client-Ip"]; ok {
  622. request.XForwardedFor = req.Header["True-Client-Ip"][0]
  623. }
  624. if _, ok := req.Header["X-Real-Ip"]; ok {
  625. request.XRealIP = req.Header["X-Real-Ip"][0]
  626. }
  627. if _, ok := req.Header["X-Requested-With"]; ok {
  628. request.XRequestedWith = req.Header["X-Requested-With"][0]
  629. }
  630. if _, ok := req.Header["Accept-Encoding"]; ok {
  631. request.AcceptEncoding = req.Header["Accept-Encoding"][0]
  632. }
  633. if _, ok := req.Header["Accept-Language"]; ok {
  634. request.AcceptLanguage = req.Header["Accept-Language"][0]
  635. }
  636. if _, ok := req.Header["User-Agent"]; ok {
  637. request.UserAgent = req.Header["User-Agent"][0]
  638. }
  639. if _, ok := req.Header["Accept"]; ok {
  640. request.Accept = req.Header["Accept"][0]
  641. }
  642. if _, ok := req.Header["Cookie"]; ok {
  643. request.Cookie = req.Header["Cookie"][0]
  644. }
  645. request.Source = request.IpSrc
  646. return nil
  647. }
  648. // replayFile takes a file containing a list of requests (SourceIP Url) and queues the requests
  649. // e.g.
  650. // 157.55.39.229 /gross-gerau/12012260-beate-anstatt
  651. // 103.232.100.98 /weinsheim-eifel/13729444-plus-warenhandelsges-mbh
  652. func replayFile() {
  653. var req data.Request
  654. var startTs time.Time
  655. var endTs time.Time
  656. rand.Seed(time.Now().UnixNano())
  657. for {
  658. fh, err := os.Open(config.RequestsFile)
  659. if err != nil {
  660. log.Fatalf("Failed to open request file '%s': %s", config.RequestsFile, err)
  661. }
  662. c := csv.NewReader(fh)
  663. c.Comma = ' '
  664. for {
  665. if config.SleepFor.Duration > time.Nanosecond {
  666. startTs = time.Now()
  667. }
  668. r, err := c.Read()
  669. if err == io.EOF {
  670. break
  671. }
  672. if err != nil {
  673. log.Println(err)
  674. continue
  675. }
  676. req.IpSrc = r[0]
  677. req.Source = r[0]
  678. req.Url = r[1]
  679. req.UserAgent = "Munch/1.0"
  680. req.Host = "demo.scraperwall.com"
  681. req.CreatedAt = time.Now().UnixNano()
  682. publishRequest(config.NatsQueue, &req)
  683. if strings.Index(r[1], ".") < 0 {
  684. hash := sha1.New()
  685. io.WriteString(hash, r[0])
  686. fp := data.Fingerprint{
  687. ClientID: "scw",
  688. Fingerprint: fmt.Sprintf("%x", hash.Sum(nil)),
  689. Remote: r[0],
  690. Url: r[1],
  691. Source: r[0],
  692. CreatedAt: time.Now(),
  693. }
  694. if strings.HasPrefix(r[0], "50.31.") {
  695. fp.Fingerprint = "a1f2c2ee560ce6580d66d451a9c8dfbf"
  696. natsJSONEC.Publish("fingerprints_scw", fp)
  697. } else if rand.Intn(10) < 5 {
  698. natsJSONEC.Publish("fingerprints_scw", fp)
  699. }
  700. }
  701. count++
  702. if config.SleepFor.Duration >= time.Nanosecond {
  703. endTs = time.Now()
  704. if endTs.Before(startTs.Add(config.SleepFor.Duration)) {
  705. time.Sleep(config.SleepFor.Duration - endTs.Sub(startTs))
  706. }
  707. }
  708. }
  709. }
  710. }
  711. func loadConfig() {
  712. // initialize with values from the command line / environment
  713. config.Live = *doLiveCapture
  714. config.Interface = *iface
  715. config.SnapshotLen = *snapshotLen
  716. config.Filter = *filter
  717. config.Promiscuous = *promiscuous
  718. config.NatsURL = *natsURL
  719. config.NatsQueue = *natsQueue
  720. config.NatsUser = *natsUser
  721. config.NatsPassword = *natsPassword
  722. config.NatsCA = *natsCA
  723. config.SleepFor.Duration = *sleepFor
  724. config.RequestsFile = *requestsFile
  725. config.UseXForwardedAsSource = *useXForwardedAsSource
  726. config.Protocol = *protocol
  727. config.ApacheLog = *apacheLog
  728. config.NginxLog = *nginxLog
  729. config.NginxLogFormat = *nginxFormat
  730. config.HostName = *hostName
  731. config.Quiet = *beQuiet
  732. config.Trace = *trace
  733. config.AccessWatchKey = *accessWatchKey
  734. if *configFile == "" {
  735. return
  736. }
  737. _, err := os.Stat(*configFile)
  738. if err != nil {
  739. log.Printf("%s: %s\n", *configFile, err)
  740. return
  741. }
  742. if _, err = toml.DecodeFile(*configFile, &config); err != nil {
  743. log.Printf("%s: %s\n", *configFile, err)
  744. }
  745. if !config.Quiet {
  746. config.print()
  747. }
  748. }
  749. // version outputs build information
  750. func version() {
  751. fmt.Printf("munchclient %s, built on %s\n", Version, BuildDate)
  752. }