main.go 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882
  1. package main
  2. import (
  3. "bufio"
  4. "bytes"
  5. "crypto/sha1"
  6. "encoding/csv"
  7. "encoding/json"
  8. "flag"
  9. "fmt"
  10. "io"
  11. "log"
  12. "math/rand"
  13. "net"
  14. "net/http"
  15. "os"
  16. "regexp"
  17. "strings"
  18. "time"
  19. "github.com/BurntSushi/toml"
  20. "github.com/Songmu/axslogparser"
  21. "github.com/google/gopacket"
  22. "github.com/google/gopacket/layers"
  23. "github.com/google/gopacket/pcap"
  24. "github.com/hpcloud/tail"
  25. "github.com/nats-io/nats"
  26. "github.com/nats-io/nats/encoders/protobuf"
  27. "github.com/satyrius/gonx"
  28. "git.scraperwall.com/scw/ajp13"
  29. "git.scraperwall.com/scw/data"
  30. "git.scraperwall.com/scw/ip"
  31. )
  32. var (
  33. doLiveCapture = flag.Bool("live", false, "Capture data in real time from a given interface")
  34. iface = flag.String("interface", "eth0", "Interface to get packets from")
  35. snapshotLen = flag.Int("snapshot-len", 8192, "Snapshot Length in Bytes")
  36. filter = flag.String("filter", "tcp", "PCAP filter expression")
  37. promiscuous = flag.Bool("promiscuous", false, "Switch interface into promiscuous mode?")
  38. natsURL = flag.String("nats-url", "nats://127.0.0.1:4222", "The URL of the NATS server")
  39. natsUser = flag.String("nats-user", "", "The user for NATS authentication")
  40. natsPassword = flag.String("nats-password", "", "The password for NATS authentication")
  41. natsQueue = flag.String("nats-queue", "requests", "The NATS queue name")
  42. natsCA = flag.String("nats-ca", "", "CA chain for NATS TLS")
  43. sleepFor = flag.Duration("sleep", 0, "Sleep this long between sending data (only when replaying a file)")
  44. requestsFile = flag.String("requests", "", "CSV file containing requests (IP and URL)")
  45. protocol = flag.String("protocol", "http", "which protocol to parse: http or ajp13")
  46. useXForwardedAsSource = flag.Bool("use-x-forwarded", false, "Use the IP address in X-Forwarded-For as source")
  47. trace = flag.Bool("trace", false, "Trace the packet capturing")
  48. apacheLog = flag.String("apache-log", "", "Parse an Apache Log file")
  49. nginxLog = flag.String("nginx-log", "", "Nginx log file to tail")
  50. nginxFormat = flag.String("nginx-format", "", "The nginx log file format")
  51. hostName = flag.String("hostname", "", "Override the captured hostname with this one")
  52. accessWatchKey = flag.String("access-watch-key", "", "access.watch API key")
  53. configFile = flag.String("config", "", "The location of the TOML config file")
  54. beQuiet = flag.Bool("quiet", true, "Be quiet")
  55. doVersion = flag.Bool("version", false, "Show version information")
  56. natsEC *nats.EncodedConn
  57. natsJSONEC *nats.EncodedConn
  58. natsErrorChan chan error
  59. natsIsAvailable bool
  60. count uint64
  61. timeout = -1 * time.Second
  62. ipPriv *ip.IP
  63. config Config
  64. // Version contains the program Version, e.g. 1.0.1
  65. Version string
  66. // BuildDate contains the date and time at which the program was compiled
  67. BuildDate string
  68. )
  69. // Config contains the program configuration
  70. type Config struct {
  71. Live bool
  72. Interface string
  73. SnapshotLen int
  74. Filter string
  75. Promiscuous bool
  76. NatsURL string
  77. NatsQueue string
  78. NatsUser string
  79. NatsPassword string
  80. NatsCA string
  81. SleepFor duration
  82. RequestsFile string
  83. UseXForwardedAsSource bool
  84. Quiet bool
  85. Protocol string
  86. Trace bool
  87. ApacheLog string
  88. NginxLog string
  89. NginxLogFormat string
  90. HostName string
  91. AccessWatchKey string
  92. }
  93. type duration struct {
  94. time.Duration
  95. }
  96. func (d *duration) UnmarshalText(text []byte) error {
  97. var err error
  98. d.Duration, err = time.ParseDuration(string(text))
  99. return err
  100. }
  101. func (c Config) print() {
  102. fmt.Printf("Live: %t\n", c.Live)
  103. fmt.Printf("Interface: %s\n", c.Interface)
  104. fmt.Printf("SnapshotLen: %d\n", c.SnapshotLen)
  105. fmt.Printf("Filter: %s\n", c.Filter)
  106. fmt.Printf("Promiscuous: %t\n", c.Promiscuous)
  107. fmt.Printf("NatsURL: %s\n", c.NatsURL)
  108. fmt.Printf("NatsQueue: %s\n", c.NatsQueue)
  109. fmt.Printf("NatsUser: %s\n", c.NatsUser)
  110. fmt.Printf("NatsPassword: %s\n", c.NatsPassword)
  111. fmt.Printf("NatsCA: %s\n", c.NatsCA)
  112. fmt.Printf("SleepFor: %s\n", c.SleepFor.String())
  113. fmt.Printf("RequestsFile: %s\n", c.RequestsFile)
  114. fmt.Printf("Apache Log: %s\n", c.ApacheLog)
  115. fmt.Printf("Nginx Log: %s\n", c.NginxLog)
  116. fmt.Printf("Nginx Log Format: %s\n", c.NginxLogFormat)
  117. fmt.Printf("HostName: %s\n", c.HostName)
  118. fmt.Printf("AccessWatchKey: %s\n", c.AccessWatchKey)
  119. fmt.Printf("UseXForwardedAsSource: %t\n", c.UseXForwardedAsSource)
  120. fmt.Printf("Protocol: %s\n", c.Protocol)
  121. fmt.Printf("Quiet: %t\n", c.Quiet)
  122. fmt.Printf("Trace: %t\n", c.Trace)
  123. }
  124. func init() {
  125. flag.Parse()
  126. nats.RegisterEncoder(protobuf.PROTOBUF_ENCODER, &protobuf.ProtobufEncoder{})
  127. }
  128. func main() {
  129. if *doVersion {
  130. version()
  131. os.Exit(0)
  132. }
  133. loadConfig()
  134. // Output how many requests per second were sent
  135. if !config.Quiet {
  136. go func(c *uint64) {
  137. for {
  138. fmt.Printf("%d requests per second\n", *c)
  139. *c = 0
  140. time.Sleep(time.Second)
  141. }
  142. }(&count)
  143. }
  144. // NATS
  145. //
  146. if config.NatsURL == "" && config.AccessWatchKey == "" {
  147. log.Fatal("No NATS URL specified (-nats-url)!")
  148. }
  149. natsIsAvailable = false
  150. natsErrorChan = make(chan error, 1)
  151. err := connectToNATS()
  152. if err != nil && config.AccessWatchKey == "" {
  153. log.Fatal(err)
  154. }
  155. go natsWatchdog(natsErrorChan)
  156. // What should I do?
  157. if config.RequestsFile != "" {
  158. replayFile()
  159. } else if config.ApacheLog != "" {
  160. apacheLogCapture(config.ApacheLog)
  161. } else if config.Live {
  162. fmt.Printf("live capture (%s, %s) to %s\n", config.Interface, config.Filter, config.NatsURL)
  163. liveCapture()
  164. } else if config.NginxLog != "" && config.NginxLogFormat != "" {
  165. nginxLogCapture(config.NginxLog, config.NginxLogFormat)
  166. }
  167. }
  168. func natsWatchdog(closedChan chan error) {
  169. var lastError error
  170. for err := range closedChan {
  171. if lastError != err {
  172. lastError = err
  173. log.Println(err)
  174. }
  175. if err != nats.ErrConnectionClosed {
  176. continue
  177. }
  178. RECONNECT:
  179. for {
  180. log.Printf("Reconnecting to NATS at %s\n", *natsURL)
  181. err := connectToNATS()
  182. if err == nil {
  183. break RECONNECT
  184. }
  185. time.Sleep(1 * time.Second)
  186. }
  187. }
  188. }
  189. func connectToNATS() error {
  190. var natsConn *nats.Conn
  191. var err error
  192. if config.NatsUser != "" && config.NatsPassword != "" && config.NatsCA != "" {
  193. natsConn, err = nats.Connect(config.NatsURL, nats.UserInfo(config.NatsUser, config.NatsPassword), nats.RootCAs(config.NatsCA))
  194. } else {
  195. if config.NatsPassword != "" && config.NatsUser != "" {
  196. natsConn, err = nats.Connect(config.NatsURL, nats.UserInfo(config.NatsUser, config.NatsPassword))
  197. } else {
  198. natsConn, err = nats.Connect(config.NatsURL)
  199. }
  200. }
  201. if err != nil {
  202. return err
  203. }
  204. natsEC, err = nats.NewEncodedConn(natsConn, protobuf.PROTOBUF_ENCODER)
  205. if err != nil {
  206. return fmt.Errorf("Encoded Connection: %v", err)
  207. }
  208. natsJSONEC, err = nats.NewEncodedConn(natsConn, nats.JSON_ENCODER)
  209. if err != nil {
  210. return fmt.Errorf("Encoded Connection: %v", err)
  211. }
  212. natsIsAvailable = true
  213. return nil
  214. }
  215. func nginxLogCapture(logfile, format string) {
  216. if _, err := os.Stat(logfile); err != nil {
  217. log.Fatalf("%s: %s", logfile, err)
  218. }
  219. t, err := tail.TailFile(logfile, tail.Config{
  220. Follow: true, // follow the file
  221. ReOpen: true, // reopen log file when it gets closed/rotated
  222. Logger: tail.DiscardingLogger, // don't log anything
  223. Location: &tail.SeekInfo{Offset: 0, Whence: 2}, // start at the end of the file
  224. })
  225. if err != nil {
  226. log.Fatalf("%s: %s", logfile, err)
  227. }
  228. // `$remote_addr - $remote_user [$time_local] "$request" $status $body_bytes_sent "$http_referer" "$http_user_agent"`
  229. p := gonx.NewParser(format)
  230. reqRegexp := regexp.MustCompile(`^([A-Z]+)\s+(.+?)\s+(HTTP/\d+\.\d+)$`)
  231. for line := range t.Lines {
  232. l := line.Text
  233. logEntry, err := p.ParseString(l)
  234. if err != nil {
  235. log.Println(err)
  236. continue
  237. }
  238. remote, err := logEntry.Field("remote_addr")
  239. if err != nil {
  240. log.Println(err)
  241. continue
  242. }
  243. // only use the first host in case there are multiple hosts in the log
  244. if cidx := strings.Index(remote, ","); cidx >= 0 {
  245. remote = remote[0:cidx]
  246. }
  247. timestampStr, err := logEntry.Field("time_local")
  248. if err != nil {
  249. log.Println(err)
  250. continue
  251. }
  252. timeStamp, err := time.Parse("02/Jan/2006:15:04:05 -0700", timestampStr)
  253. if err != nil {
  254. log.Println(err)
  255. continue
  256. }
  257. httpRequest, err := logEntry.Field("request")
  258. if err != nil {
  259. log.Println(err)
  260. continue
  261. }
  262. reqData := reqRegexp.FindStringSubmatch(httpRequest)
  263. if len(reqData) < 4 {
  264. log.Printf("reqData is too short: %d instead of 4\n", len(reqData))
  265. continue
  266. }
  267. host := config.HostName
  268. if host == "" {
  269. host = "[not available]"
  270. }
  271. request := data.Request{
  272. IpSrc: remote,
  273. Origin: remote,
  274. Source: remote,
  275. IpDst: "127.0.0.1",
  276. PortSrc: 0,
  277. PortDst: 0,
  278. TcpSeq: 0,
  279. CreatedAt: timeStamp.Unix(),
  280. Url: reqData[2],
  281. Method: reqData[1],
  282. Host: host,
  283. Protocol: reqData[3],
  284. }
  285. request.Referer, _ = logEntry.Field("http_referer")
  286. request.UserAgent, _ = logEntry.Field("http_user_agent")
  287. if config.Trace {
  288. log.Printf("[%s] %s\n", request.Source, request.Url)
  289. }
  290. count++
  291. publishRequest(config.NatsQueue, &request)
  292. }
  293. }
  294. func apacheLogCapture(logfile string) {
  295. if _, err := os.Stat(logfile); err != nil {
  296. log.Fatalf("%s: %s", logfile, err)
  297. }
  298. t, err := tail.TailFile(logfile, tail.Config{
  299. Follow: true, // follow the file
  300. ReOpen: true, // reopen log file when it gets closed/rotated
  301. Logger: tail.DiscardingLogger, // don't log anything
  302. Location: &tail.SeekInfo{Offset: 0, Whence: 2}, // start at the end of the file
  303. })
  304. if err != nil {
  305. log.Fatalf("%s: %s", logfile, err)
  306. }
  307. var p axslogparser.Parser
  308. parserSet := false
  309. for line := range t.Lines {
  310. l := line.Text
  311. if !parserSet {
  312. p, _, err = axslogparser.GuessParser(l)
  313. if err != nil {
  314. log.Println(err)
  315. continue
  316. }
  317. parserSet = true
  318. }
  319. logEntry, err := p.Parse(l)
  320. if err != nil {
  321. log.Println(err)
  322. continue
  323. }
  324. remote := logEntry.Host
  325. if *useXForwardedAsSource && logEntry.ForwardedFor != "" {
  326. remote = logEntry.ForwardedFor
  327. }
  328. // only use the first host in case there are multiple hosts in the log
  329. if cidx := strings.Index(remote, ","); cidx >= 0 {
  330. remote = remote[0:cidx]
  331. }
  332. // extract the virtual host
  333. var virtualHost string
  334. vhost := logEntry.VirtualHost
  335. if vhost != "" {
  336. vhostAndPort := strings.Split(logEntry.VirtualHost, ":")
  337. virtualHost = vhostAndPort[0]
  338. } else {
  339. if config.HostName != "" {
  340. vhost = config.HostName
  341. } else {
  342. vhost = "[not available]"
  343. }
  344. }
  345. request := data.Request{
  346. IpSrc: remote,
  347. IpDst: "127.0.0.1",
  348. PortSrc: 0,
  349. PortDst: 0,
  350. TcpSeq: 0,
  351. CreatedAt: logEntry.Time.UnixNano(),
  352. Url: logEntry.RequestURI,
  353. Method: logEntry.Method,
  354. Host: virtualHost,
  355. Protocol: logEntry.Protocol,
  356. Origin: remote,
  357. Source: remote,
  358. Referer: logEntry.Referer,
  359. XForwardedFor: logEntry.ForwardedFor,
  360. UserAgent: logEntry.UserAgent,
  361. }
  362. if config.Trace {
  363. log.Printf("[%s] %s\n", request.Source, request.Url)
  364. }
  365. count++
  366. publishRequest(config.NatsQueue, &request)
  367. }
  368. }
  369. func liveCapture() {
  370. ipPriv = ip.NewIP()
  371. // PCAP setup
  372. //
  373. handle, err := pcap.OpenLive(config.Interface, int32(config.SnapshotLen), config.Promiscuous, timeout)
  374. if err != nil {
  375. log.Fatal(err)
  376. }
  377. defer handle.Close()
  378. err = handle.SetBPFFilter(config.Filter)
  379. if err != nil {
  380. log.Fatal(err)
  381. }
  382. packetSource := gopacket.NewPacketSource(handle, handle.LinkType())
  383. for packet := range packetSource.Packets() {
  384. go processPacket(packet)
  385. }
  386. }
  387. func writeLogToWatch(r *data.Request) {
  388. h := map[string]string{}
  389. if r.AcceptEncoding != "" {
  390. h["Accept-Encoding"] = r.AcceptEncoding
  391. }
  392. if r.Accept != "" {
  393. h["Accept"] = r.Accept
  394. }
  395. if r.AcceptLanguage != "" {
  396. h["Accept-Language"] = r.AcceptLanguage
  397. }
  398. if r.Cookie != "" {
  399. h["Cookie"] = r.Cookie
  400. }
  401. if r.Host != "" {
  402. h["Host"] = r.Host
  403. }
  404. if r.Referer != "" {
  405. h["Referer"] = r.Referer
  406. }
  407. if r.UserAgent != "" {
  408. h["User-Agent"] = r.UserAgent
  409. }
  410. if r.Via != "" {
  411. h["Via"] = r.Via
  412. }
  413. if r.XForwardedFor != "" {
  414. h["X-Forwarded-For"] = r.XForwardedFor
  415. }
  416. if r.XRequestedWith != "" {
  417. h["X-Requested-With"] = r.XRequestedWith
  418. }
  419. if r.XRequestedWith != "" {
  420. h["X-Requested-With"] = r.XRequestedWith
  421. }
  422. data := map[string]interface{}{
  423. "request": map[string]interface{}{
  424. "time": time.Unix(0, r.CreatedAt),
  425. "address": r.Source,
  426. // "scheme": r.Protocol,
  427. "method": r.Method,
  428. "url": r.Url,
  429. "headers": h,
  430. },
  431. "response": map[string]interface{}{"status": 200},
  432. }
  433. jdata, err := json.Marshal(data)
  434. client := &http.Client{}
  435. buf := bytes.NewBuffer(jdata)
  436. req, err := http.NewRequest("POST", "https://log.access.watch/1.1/log", buf)
  437. req.Header.Add("Api-Key", *accessWatchKey)
  438. resp, err := client.Do(req)
  439. if err != nil {
  440. log.Println(err)
  441. }
  442. resp.Body.Close()
  443. }
  444. func publishRequest(queue string, request *data.Request) {
  445. if config.AccessWatchKey != "" {
  446. writeLogToWatch(request)
  447. return
  448. }
  449. if !natsIsAvailable {
  450. return
  451. }
  452. if err := natsEC.Publish(config.NatsQueue, request); err != nil {
  453. natsErrorChan <- err
  454. if err == nats.ErrConnectionClosed {
  455. natsIsAvailable = false
  456. }
  457. }
  458. }
  459. // processPacket receives a raw packet from pcap, builds a Request item from it and sends it to the queue
  460. func processPacket(packet gopacket.Packet) {
  461. hasIPv4 := false
  462. var ipSrc, ipDst string
  463. // IPv4
  464. if ipLayer := packet.Layer(layers.LayerTypeIPv4); ipLayer != nil {
  465. ip := ipLayer.(*layers.IPv4)
  466. ipSrc = ip.SrcIP.String()
  467. ipDst = ip.DstIP.String()
  468. hasIPv4 = true
  469. }
  470. // IPv6
  471. if !hasIPv4 {
  472. if ipLayer := packet.Layer(layers.LayerTypeIPv6); ipLayer != nil {
  473. ip := ipLayer.(*layers.IPv6)
  474. ipSrc = ip.SrcIP.String()
  475. ipDst = ip.DstIP.String()
  476. }
  477. }
  478. // TCP
  479. tcpLayer := packet.Layer(layers.LayerTypeTCP)
  480. if tcpLayer == nil {
  481. return
  482. }
  483. tcp, _ := tcpLayer.(*layers.TCP)
  484. portSrc := tcp.SrcPort
  485. portDst := tcp.DstPort
  486. sequence := tcp.Seq
  487. applicationLayer := packet.ApplicationLayer()
  488. if applicationLayer == nil {
  489. return
  490. }
  491. count++
  492. if len(applicationLayer.Payload()) < 50 {
  493. log.Println("application layer too small!")
  494. return
  495. }
  496. request := data.Request{
  497. IpSrc: ipSrc,
  498. IpDst: ipDst,
  499. PortSrc: uint32(portSrc),
  500. PortDst: uint32(portDst),
  501. TcpSeq: uint32(sequence),
  502. CreatedAt: packet.Metadata().CaptureInfo.Timestamp.UnixNano(),
  503. }
  504. switch config.Protocol {
  505. case "http":
  506. err := processHTTP(&request, applicationLayer.Payload())
  507. if err != nil {
  508. log.Println(err)
  509. return
  510. }
  511. case "ajp13":
  512. err := processAJP13(&request, applicationLayer.Payload())
  513. if err != nil {
  514. log.Println(err)
  515. return
  516. }
  517. }
  518. if config.UseXForwardedAsSource && request.XForwardedFor != "" {
  519. if strings.Contains(request.XForwardedFor, ",") {
  520. ips := strings.Split(request.XForwardedFor, ",")
  521. for i := len(ips) - 1; i >= 0; i-- {
  522. ipRaw := strings.TrimSpace(ips[i])
  523. ipAddr := net.ParseIP(ipRaw)
  524. if ipAddr != nil && !ipPriv.IsPrivate(ipAddr) {
  525. request.Source = ipRaw
  526. break
  527. }
  528. }
  529. } else {
  530. ipAddr := net.ParseIP(strings.TrimSpace(request.XForwardedFor))
  531. if !ipPriv.IsPrivate(ipAddr) {
  532. request.Source = request.XForwardedFor
  533. }
  534. }
  535. }
  536. if request.Source == request.IpSrc && request.XRealIP != "" {
  537. request.Source = request.XRealIP
  538. }
  539. if config.Trace {
  540. log.Printf("[%s] %s\n", request.Source, request.Url)
  541. }
  542. publishRequest(config.NatsQueue, &request)
  543. }
  544. func processAJP13(request *data.Request, appData []byte) error {
  545. a, err := ajp13.Parse(appData)
  546. if err != nil {
  547. return fmt.Errorf("Failed to parse AJP13 request: %s", err)
  548. }
  549. request.Url = a.URI
  550. request.Method = a.Method()
  551. request.Host = a.Server
  552. request.Protocol = a.Version
  553. request.Origin = a.RemoteAddr.String()
  554. request.Source = a.RemoteAddr.String()
  555. if v, ok := a.Header("Referer"); ok {
  556. request.Referer = v
  557. }
  558. if v, ok := a.Header("Connection"); ok {
  559. request.Connection = v
  560. }
  561. if v, ok := a.Header("X-Forwarded-For"); ok {
  562. request.XForwardedFor = v
  563. }
  564. if v, ok := a.Header("X-Real-IP"); ok {
  565. request.XRealIP = v
  566. }
  567. if v, ok := a.Header("X-Requested-With"); ok {
  568. request.XRequestedWith = v
  569. }
  570. if v, ok := a.Header("Accept-Encoding"); ok {
  571. request.AcceptEncoding = v
  572. }
  573. if v, ok := a.Header("Accept-Language"); ok {
  574. request.AcceptLanguage = v
  575. }
  576. if v, ok := a.Header("User-Agent"); ok {
  577. request.UserAgent = v
  578. }
  579. if v, ok := a.Header("Accept"); ok {
  580. request.Accept = v
  581. }
  582. if v, ok := a.Header("Cookie"); ok {
  583. request.Cookie = v
  584. }
  585. if v, ok := a.Header("X-Forwarded-Host"); ok {
  586. if v != request.Host {
  587. request.Host = v
  588. }
  589. }
  590. return nil
  591. }
  592. func processHTTP(request *data.Request, appData []byte) error {
  593. reader := bufio.NewReader(strings.NewReader(string(appData)))
  594. req, err := http.ReadRequest(reader)
  595. if err != nil {
  596. return fmt.Errorf("Failed to parse HTTP header: %s", err)
  597. }
  598. request.Url = req.URL.String()
  599. request.Method = req.Method
  600. request.Referer = req.Referer()
  601. request.Host = req.Host
  602. request.Protocol = req.Proto
  603. request.Origin = request.Host
  604. if _, ok := req.Header["Connection"]; ok {
  605. request.Connection = req.Header["Connection"][0]
  606. }
  607. if _, ok := req.Header["X-Forwarded-For"]; ok {
  608. request.XForwardedFor = req.Header["X-Forwarded-For"][0]
  609. }
  610. // CloudFlare: override X-Forwarded for since it is tainted by cloudflare
  611. if _, ok := req.Header["True-Client-Ip"]; ok {
  612. request.XForwardedFor = req.Header["True-Client-Ip"][0]
  613. }
  614. if _, ok := req.Header["X-Real-Ip"]; ok {
  615. request.XRealIP = req.Header["X-Real-Ip"][0]
  616. }
  617. if _, ok := req.Header["X-Requested-With"]; ok {
  618. request.XRequestedWith = req.Header["X-Requested-With"][0]
  619. }
  620. if _, ok := req.Header["Accept-Encoding"]; ok {
  621. request.AcceptEncoding = req.Header["Accept-Encoding"][0]
  622. }
  623. if _, ok := req.Header["Accept-Language"]; ok {
  624. request.AcceptLanguage = req.Header["Accept-Language"][0]
  625. }
  626. if _, ok := req.Header["User-Agent"]; ok {
  627. request.UserAgent = req.Header["User-Agent"][0]
  628. }
  629. if _, ok := req.Header["Accept"]; ok {
  630. request.Accept = req.Header["Accept"][0]
  631. }
  632. if _, ok := req.Header["Cookie"]; ok {
  633. request.Cookie = req.Header["Cookie"][0]
  634. }
  635. request.Source = request.IpSrc
  636. return nil
  637. }
  638. // replayFile takes a file containing a list of requests (SourceIP Url) and queues the requests
  639. // e.g.
  640. // 157.55.39.229 /gross-gerau/12012260-beate-anstatt
  641. // 103.232.100.98 /weinsheim-eifel/13729444-plus-warenhandelsges-mbh
  642. func replayFile() {
  643. var req data.Request
  644. var startTs time.Time
  645. var endTs time.Time
  646. rand.Seed(time.Now().UnixNano())
  647. for {
  648. fh, err := os.Open(config.RequestsFile)
  649. if err != nil {
  650. log.Fatalf("Failed to open request file '%s': %s", config.RequestsFile, err)
  651. }
  652. c := csv.NewReader(fh)
  653. c.Comma = ' '
  654. for {
  655. if config.SleepFor.Duration > time.Nanosecond {
  656. startTs = time.Now()
  657. }
  658. r, err := c.Read()
  659. if err == io.EOF {
  660. break
  661. }
  662. if err != nil {
  663. log.Println(err)
  664. continue
  665. }
  666. req.IpSrc = r[0]
  667. req.Source = r[0]
  668. req.Url = r[1]
  669. req.UserAgent = "Munch/1.0"
  670. req.Host = "demo.scraperwall.com"
  671. req.CreatedAt = time.Now().UnixNano()
  672. publishRequest(config.NatsQueue, &req)
  673. if strings.Index(r[1], ".") < 0 {
  674. hash := sha1.New()
  675. io.WriteString(hash, r[0])
  676. fp := data.Fingerprint{
  677. ClientID: "scw",
  678. Fingerprint: fmt.Sprintf("%x", hash.Sum(nil)),
  679. Remote: r[0],
  680. Url: r[1],
  681. Source: r[0],
  682. CreatedAt: time.Now(),
  683. }
  684. if strings.HasPrefix(r[0], "50.31.") {
  685. fp.Fingerprint = "a1f2c2ee560ce6580d66d451a9c8dfbf"
  686. natsJSONEC.Publish("fingerprints_scw", fp)
  687. } else if rand.Intn(10) < 5 {
  688. natsJSONEC.Publish("fingerprints_scw", fp)
  689. }
  690. }
  691. count++
  692. if config.SleepFor.Duration >= time.Nanosecond {
  693. endTs = time.Now()
  694. if endTs.Before(startTs.Add(config.SleepFor.Duration)) {
  695. time.Sleep(config.SleepFor.Duration - endTs.Sub(startTs))
  696. }
  697. }
  698. }
  699. }
  700. }
  701. func loadConfig() {
  702. // initialize with values from the command line / environment
  703. config.Live = *doLiveCapture
  704. config.Interface = *iface
  705. config.SnapshotLen = *snapshotLen
  706. config.Filter = *filter
  707. config.Promiscuous = *promiscuous
  708. config.NatsURL = *natsURL
  709. config.NatsQueue = *natsQueue
  710. config.NatsUser = *natsUser
  711. config.NatsPassword = *natsPassword
  712. config.NatsCA = *natsCA
  713. config.SleepFor.Duration = *sleepFor
  714. config.RequestsFile = *requestsFile
  715. config.UseXForwardedAsSource = *useXForwardedAsSource
  716. config.Protocol = *protocol
  717. config.ApacheLog = *apacheLog
  718. config.NginxLog = *nginxLog
  719. config.NginxLogFormat = *nginxFormat
  720. config.HostName = *hostName
  721. config.Quiet = *beQuiet
  722. config.Trace = *trace
  723. config.AccessWatchKey = *accessWatchKey
  724. if *configFile == "" {
  725. return
  726. }
  727. _, err := os.Stat(*configFile)
  728. if err != nil {
  729. log.Printf("%s: %s\n", *configFile, err)
  730. return
  731. }
  732. if _, err = toml.DecodeFile(*configFile, &config); err != nil {
  733. log.Printf("%s: %s\n", *configFile, err)
  734. }
  735. if !config.Quiet {
  736. config.print()
  737. }
  738. }
  739. // version outputs build information
  740. func version() {
  741. fmt.Printf("munchclient %s, built on %s\n", Version, BuildDate)
  742. }