main.go 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897
  1. package main
  2. import (
  3. "bufio"
  4. "bytes"
  5. "crypto/sha1"
  6. "encoding/csv"
  7. "encoding/json"
  8. "flag"
  9. "fmt"
  10. "io"
  11. "log"
  12. "math/rand"
  13. "net"
  14. "net/http"
  15. "os"
  16. "regexp"
  17. "strings"
  18. "time"
  19. "github.com/BurntSushi/toml"
  20. "github.com/Songmu/axslogparser"
  21. "github.com/google/gopacket"
  22. "github.com/google/gopacket/layers"
  23. "github.com/google/gopacket/pcap"
  24. "github.com/hpcloud/tail"
  25. "github.com/nats-io/nats"
  26. "github.com/nats-io/nats/encoders/protobuf"
  27. "github.com/satyrius/gonx"
  28. "git.scraperwall.com/scw/ajp13"
  29. "git.scraperwall.com/scw/data"
  30. "git.scraperwall.com/scw/ip"
  31. )
  32. var (
  33. doLiveCapture = flag.Bool("live", false, "Capture data in real time from a given interface")
  34. iface = flag.String("interface", "eth0", "Interface to get packets from")
  35. snapshotLen = flag.Int("snapshot-len", 8192, "Snapshot Length in Bytes")
  36. filter = flag.String("filter", "tcp", "PCAP filter expression")
  37. promiscuous = flag.Bool("promiscuous", false, "Switch interface into promiscuous mode?")
  38. natsURL = flag.String("nats-url", "nats://127.0.0.1:4222", "The URL of the NATS server")
  39. natsUser = flag.String("nats-user", "", "The user for NATS authentication")
  40. natsPassword = flag.String("nats-password", "", "The password for NATS authentication")
  41. natsQueue = flag.String("nats-queue", "requests", "The NATS queue name")
  42. natsCA = flag.String("nats-ca", "", "CA chain for NATS TLS")
  43. sleepFor = flag.Duration("sleep", 0, "Sleep this long between sending data (only when replaying a file)")
  44. requestsFile = flag.String("requests", "", "CSV file containing requests (IP and URL)")
  45. protocol = flag.String("protocol", "http", "which protocol to parse: http or ajp13")
  46. useXForwardedAsSource = flag.Bool("use-x-forwarded", false, "Use the IP address in X-Forwarded-For as source")
  47. trace = flag.Bool("trace", false, "Trace the packet capturing")
  48. apacheLog = flag.String("apache-log", "", "Parse an Apache Log file")
  49. nginxLog = flag.String("nginx-log", "", "Nginx log file to tail")
  50. nginxFormat = flag.String("nginx-format", "", "The nginx log file format")
  51. hostName = flag.String("hostname", "", "Override the captured hostname with this one")
  52. accessWatchKey = flag.String("access-watch-key", "", "access.watch API key")
  53. configFile = flag.String("config", "", "The location of the TOML config file")
  54. beQuiet = flag.Bool("quiet", true, "Be quiet")
  55. doVersion = flag.Bool("version", false, "Show version information")
  56. natsEC *nats.EncodedConn
  57. natsJSONEC *nats.EncodedConn
  58. natsErrorChan chan error
  59. natsIsAvailable bool
  60. count uint64
  61. timeout = -1 * time.Second
  62. ipPriv *ip.IP
  63. config Config
  64. // Version contains the program Version, e.g. 1.0.1
  65. Version string
  66. // BuildDate contains the date and time at which the program was compiled
  67. BuildDate string
  68. )
  69. // Config contains the program configuration
  70. type Config struct {
  71. Live bool
  72. Interface string
  73. SnapshotLen int
  74. Filter string
  75. Promiscuous bool
  76. NatsURL string
  77. NatsQueue string
  78. NatsUser string
  79. NatsPassword string
  80. NatsCA string
  81. SleepFor duration
  82. RequestsFile string
  83. UseXForwardedAsSource bool
  84. Quiet bool
  85. Protocol string
  86. Trace bool
  87. ApacheLog string
  88. NginxLog string
  89. NginxLogFormat string
  90. HostName string
  91. AccessWatchKey string
  92. }
  93. type duration struct {
  94. time.Duration
  95. }
  96. func (d *duration) UnmarshalText(text []byte) error {
  97. var err error
  98. d.Duration, err = time.ParseDuration(string(text))
  99. return err
  100. }
  101. func (c Config) print() {
  102. fmt.Printf("Live: %t\n", c.Live)
  103. fmt.Printf("Interface: %s\n", c.Interface)
  104. fmt.Printf("SnapshotLen: %d\n", c.SnapshotLen)
  105. fmt.Printf("Filter: %s\n", c.Filter)
  106. fmt.Printf("Promiscuous: %t\n", c.Promiscuous)
  107. fmt.Printf("NatsURL: %s\n", c.NatsURL)
  108. fmt.Printf("NatsQueue: %s\n", c.NatsQueue)
  109. fmt.Printf("NatsUser: %s\n", c.NatsUser)
  110. fmt.Printf("NatsPassword: %s\n", c.NatsPassword)
  111. fmt.Printf("NatsCA: %s\n", c.NatsCA)
  112. fmt.Printf("SleepFor: %s\n", c.SleepFor.String())
  113. fmt.Printf("RequestsFile: %s\n", c.RequestsFile)
  114. fmt.Printf("Apache Log: %s\n", c.ApacheLog)
  115. fmt.Printf("Nginx Log: %s\n", c.NginxLog)
  116. fmt.Printf("Nginx Log Format: %s\n", c.NginxLogFormat)
  117. fmt.Printf("HostName: %s\n", c.HostName)
  118. fmt.Printf("AccessWatchKey: %s\n", c.AccessWatchKey)
  119. fmt.Printf("UseXForwardedAsSource: %t\n", c.UseXForwardedAsSource)
  120. fmt.Printf("Protocol: %s\n", c.Protocol)
  121. fmt.Printf("Quiet: %t\n", c.Quiet)
  122. fmt.Printf("Trace: %t\n", c.Trace)
  123. }
  124. func init() {
  125. flag.Parse()
  126. nats.RegisterEncoder(protobuf.PROTOBUF_ENCODER, &protobuf.ProtobufEncoder{})
  127. }
  128. func main() {
  129. if *doVersion {
  130. version()
  131. os.Exit(0)
  132. }
  133. loadConfig()
  134. // Output how many requests per second were sent
  135. if !config.Quiet {
  136. go func(c *uint64) {
  137. for {
  138. fmt.Printf("%d requests per second\n", *c)
  139. *c = 0
  140. time.Sleep(time.Second)
  141. }
  142. }(&count)
  143. }
  144. // NATS
  145. //
  146. if config.NatsURL == "" && config.AccessWatchKey == "" {
  147. log.Fatal("No NATS URL specified (-nats-url)!")
  148. }
  149. natsIsAvailable = false
  150. natsErrorChan = make(chan error, 1)
  151. err := connectToNATS()
  152. if err != nil && config.AccessWatchKey == "" {
  153. log.Fatal(err)
  154. }
  155. go natsWatchdog(natsErrorChan)
  156. // What should I do?
  157. if config.RequestsFile != "" {
  158. replayFile()
  159. } else if config.ApacheLog != "" {
  160. apacheLogCapture(config.ApacheLog)
  161. } else if config.Live {
  162. fmt.Printf("live capture (%s, %s) to %s\n", config.Interface, config.Filter, config.NatsURL)
  163. liveCapture()
  164. } else if config.NginxLog != "" && config.NginxLogFormat != "" {
  165. nginxLogCapture(config.NginxLog, config.NginxLogFormat)
  166. }
  167. }
  168. func natsWatchdog(closedChan chan error) {
  169. var lastError error
  170. for err := range closedChan {
  171. if lastError != err {
  172. lastError = err
  173. log.Println(err)
  174. }
  175. if err != nats.ErrConnectionClosed {
  176. continue
  177. }
  178. RECONNECT:
  179. for {
  180. log.Printf("Reconnecting to NATS at %s\n", *natsURL)
  181. err := connectToNATS()
  182. if err == nil {
  183. break RECONNECT
  184. }
  185. time.Sleep(1 * time.Second)
  186. }
  187. }
  188. }
  189. func connectToNATS() error {
  190. var natsConn *nats.Conn
  191. var err error
  192. if config.NatsUser != "" && config.NatsPassword != "" && config.NatsCA != "" {
  193. natsConn, err = nats.Connect(config.NatsURL, nats.UserInfo(config.NatsUser, config.NatsPassword), nats.RootCAs(config.NatsCA))
  194. } else {
  195. if config.NatsPassword != "" && config.NatsUser != "" {
  196. natsConn, err = nats.Connect(config.NatsURL, nats.UserInfo(config.NatsUser, config.NatsPassword))
  197. } else {
  198. natsConn, err = nats.Connect(config.NatsURL)
  199. }
  200. }
  201. if err != nil {
  202. return err
  203. }
  204. natsEC, err = nats.NewEncodedConn(natsConn, protobuf.PROTOBUF_ENCODER)
  205. if err != nil {
  206. return fmt.Errorf("Encoded Connection: %v", err)
  207. }
  208. natsJSONEC, err = nats.NewEncodedConn(natsConn, nats.JSON_ENCODER)
  209. if err != nil {
  210. return fmt.Errorf("Encoded Connection: %v", err)
  211. }
  212. natsIsAvailable = true
  213. return nil
  214. }
  215. func nginxLogCapture(logfile, format string) {
  216. if _, err := os.Stat(logfile); err != nil {
  217. log.Fatalf("%s: %s", logfile, err)
  218. }
  219. t, err := tail.TailFile(logfile, tail.Config{
  220. Follow: true, // follow the file
  221. ReOpen: true, // reopen log file when it gets closed/rotated
  222. Logger: tail.DiscardingLogger, // don't log anything
  223. Location: &tail.SeekInfo{Offset: 0, Whence: 2}, // start at the end of the file
  224. })
  225. if err != nil {
  226. log.Fatalf("%s: %s", logfile, err)
  227. }
  228. // `$remote_addr - $remote_user [$time_local] "$request" $status $body_bytes_sent "$http_referer" "$http_user_agent"`
  229. p := gonx.NewParser(format)
  230. reqRegexp := regexp.MustCompile(`^([A-Z]+)\s+(.+?)\s+(HTTP/\d+\.\d+)$`)
  231. for line := range t.Lines {
  232. l := line.Text
  233. logEntry, err := p.ParseString(l)
  234. if err != nil {
  235. log.Println(err)
  236. continue
  237. }
  238. remote, err := logEntry.Field("remote_addr")
  239. if err != nil {
  240. log.Println(err)
  241. continue
  242. }
  243. if config.UseXForwardedAsSource {
  244. xff, err := logEntry.Field("http_x_forwarded_for")
  245. if err != nil && xff != "" {
  246. ips := strings.Split(xff, ",")
  247. if len(ips) > 0 {
  248. remote = strings.TrimSpace(ips[0])
  249. }
  250. }
  251. }
  252. if remote == "" {
  253. log.Println("remote is empty: ignoring request.")
  254. continue
  255. }
  256. // only use the first host in case there are multiple hosts in the log
  257. if cidx := strings.Index(remote, ","); cidx >= 0 {
  258. remote = remote[0:cidx]
  259. }
  260. timestampStr, err := logEntry.Field("time_local")
  261. if err != nil {
  262. log.Println(err)
  263. continue
  264. }
  265. timeStamp, err := time.Parse("02/Jan/2006:15:04:05 -0700", timestampStr)
  266. if err != nil {
  267. log.Println(err)
  268. continue
  269. }
  270. httpRequest, err := logEntry.Field("request")
  271. if err != nil {
  272. log.Println(err)
  273. continue
  274. }
  275. reqData := reqRegexp.FindStringSubmatch(httpRequest)
  276. if len(reqData) < 4 {
  277. log.Printf("reqData is too short: %d instead of 4\n", len(reqData))
  278. continue
  279. }
  280. host := config.HostName
  281. if host == "" {
  282. host = "[not available]"
  283. }
  284. request := data.Request{
  285. IpSrc: remote,
  286. Origin: remote,
  287. Source: remote,
  288. IpDst: "127.0.0.1",
  289. PortSrc: 0,
  290. PortDst: 0,
  291. TcpSeq: 0,
  292. CreatedAt: timeStamp.Unix(),
  293. Url: reqData[2],
  294. Method: reqData[1],
  295. Host: host,
  296. Protocol: reqData[3],
  297. }
  298. request.Referer, _ = logEntry.Field("http_referer")
  299. request.UserAgent, _ = logEntry.Field("http_user_agent")
  300. if config.Trace {
  301. log.Printf("[%s] %s\n", request.Source, request.Url)
  302. }
  303. count++
  304. publishRequest(config.NatsQueue, &request)
  305. }
  306. }
  307. func apacheLogCapture(logfile string) {
  308. if _, err := os.Stat(logfile); err != nil {
  309. log.Fatalf("%s: %s", logfile, err)
  310. }
  311. t, err := tail.TailFile(logfile, tail.Config{
  312. Follow: true, // follow the file
  313. ReOpen: true, // reopen log file when it gets closed/rotated
  314. Logger: tail.DiscardingLogger, // don't log anything
  315. Location: &tail.SeekInfo{Offset: 0, Whence: 2}, // start at the end of the file
  316. })
  317. if err != nil {
  318. log.Fatalf("%s: %s", logfile, err)
  319. }
  320. var p axslogparser.Parser
  321. parserSet := false
  322. for line := range t.Lines {
  323. l := line.Text
  324. if !parserSet {
  325. p, _, err = axslogparser.GuessParser(l)
  326. if err != nil {
  327. log.Println(err)
  328. continue
  329. }
  330. parserSet = true
  331. }
  332. logEntry, err := p.Parse(l)
  333. if err != nil {
  334. log.Println(err)
  335. continue
  336. }
  337. remote := logEntry.Host
  338. if config.UseXForwardedAsSource && logEntry.ForwardedFor != "" {
  339. remote = logEntry.ForwardedFor
  340. }
  341. // only use the first host in case there are multiple hosts in the log
  342. if cidx := strings.Index(remote, ","); cidx >= 0 {
  343. remote = remote[0:cidx]
  344. }
  345. // extract the virtual host
  346. var virtualHost string
  347. vhost := logEntry.VirtualHost
  348. if vhost != "" {
  349. vhostAndPort := strings.Split(logEntry.VirtualHost, ":")
  350. virtualHost = vhostAndPort[0]
  351. } else {
  352. if config.HostName != "" {
  353. vhost = config.HostName
  354. } else {
  355. vhost = "[not available]"
  356. }
  357. }
  358. request := data.Request{
  359. IpSrc: remote,
  360. IpDst: "127.0.0.1",
  361. PortSrc: 0,
  362. PortDst: 0,
  363. TcpSeq: 0,
  364. CreatedAt: logEntry.Time.UnixNano(),
  365. Url: logEntry.RequestURI,
  366. Method: logEntry.Method,
  367. Host: virtualHost,
  368. Protocol: logEntry.Protocol,
  369. Origin: remote,
  370. Source: remote,
  371. Referer: logEntry.Referer,
  372. XForwardedFor: logEntry.ForwardedFor,
  373. UserAgent: logEntry.UserAgent,
  374. }
  375. if config.Trace {
  376. log.Printf("[%s] %s\n", request.Source, request.Url)
  377. }
  378. count++
  379. publishRequest(config.NatsQueue, &request)
  380. }
  381. }
  382. func liveCapture() {
  383. ipPriv = ip.NewIP()
  384. // PCAP setup
  385. //
  386. handle, err := pcap.OpenLive(config.Interface, int32(config.SnapshotLen), config.Promiscuous, timeout)
  387. if err != nil {
  388. log.Fatal(err)
  389. }
  390. defer handle.Close()
  391. err = handle.SetBPFFilter(config.Filter)
  392. if err != nil {
  393. log.Fatal(err)
  394. }
  395. packetSource := gopacket.NewPacketSource(handle, handle.LinkType())
  396. for packet := range packetSource.Packets() {
  397. go processPacket(packet)
  398. }
  399. }
  400. func writeLogToWatch(r *data.Request) {
  401. h := map[string]string{}
  402. if r.AcceptEncoding != "" {
  403. h["Accept-Encoding"] = r.AcceptEncoding
  404. }
  405. if r.Accept != "" {
  406. h["Accept"] = r.Accept
  407. }
  408. if r.AcceptLanguage != "" {
  409. h["Accept-Language"] = r.AcceptLanguage
  410. }
  411. if r.Cookie != "" {
  412. h["Cookie"] = r.Cookie
  413. }
  414. if r.Host != "" {
  415. h["Host"] = r.Host
  416. }
  417. if r.Referer != "" {
  418. h["Referer"] = r.Referer
  419. }
  420. if r.UserAgent != "" {
  421. h["User-Agent"] = r.UserAgent
  422. }
  423. if r.Via != "" {
  424. h["Via"] = r.Via
  425. }
  426. if r.XForwardedFor != "" {
  427. h["X-Forwarded-For"] = r.XForwardedFor
  428. }
  429. if r.XRequestedWith != "" {
  430. h["X-Requested-With"] = r.XRequestedWith
  431. }
  432. if r.XRequestedWith != "" {
  433. h["X-Requested-With"] = r.XRequestedWith
  434. }
  435. data := map[string]interface{}{
  436. "request": map[string]interface{}{
  437. "time": time.Unix(0, r.CreatedAt),
  438. "address": r.Source,
  439. // "scheme": r.Protocol,
  440. "method": r.Method,
  441. "url": r.Url,
  442. "headers": h,
  443. },
  444. "response": map[string]interface{}{"status": 200},
  445. }
  446. jdata, err := json.Marshal(data)
  447. client := &http.Client{}
  448. buf := bytes.NewBuffer(jdata)
  449. req, err := http.NewRequest("POST", "https://log.access.watch/1.1/log", buf)
  450. req.Header.Add("Api-Key", *accessWatchKey)
  451. resp, err := client.Do(req)
  452. if err != nil {
  453. log.Println(err)
  454. }
  455. resp.Body.Close()
  456. }
  457. func publishRequest(queue string, request *data.Request) {
  458. if config.AccessWatchKey != "" {
  459. writeLogToWatch(request)
  460. return
  461. }
  462. if !natsIsAvailable {
  463. return
  464. }
  465. if err := natsEC.Publish(config.NatsQueue, request); err != nil {
  466. natsErrorChan <- err
  467. if err == nats.ErrConnectionClosed {
  468. natsIsAvailable = false
  469. }
  470. }
  471. }
  472. // processPacket receives a raw packet from pcap, builds a Request item from it and sends it to the queue
  473. func processPacket(packet gopacket.Packet) {
  474. hasIPv4 := false
  475. var ipSrc, ipDst string
  476. // IPv4
  477. if ipLayer := packet.Layer(layers.LayerTypeIPv4); ipLayer != nil {
  478. ip := ipLayer.(*layers.IPv4)
  479. ipSrc = ip.SrcIP.String()
  480. ipDst = ip.DstIP.String()
  481. hasIPv4 = true
  482. }
  483. // IPv6
  484. if !hasIPv4 {
  485. if ipLayer := packet.Layer(layers.LayerTypeIPv6); ipLayer != nil {
  486. ip := ipLayer.(*layers.IPv6)
  487. ipSrc = ip.SrcIP.String()
  488. ipDst = ip.DstIP.String()
  489. }
  490. }
  491. // TCP
  492. tcpLayer := packet.Layer(layers.LayerTypeTCP)
  493. if tcpLayer == nil {
  494. return
  495. }
  496. tcp, _ := tcpLayer.(*layers.TCP)
  497. portSrc := tcp.SrcPort
  498. portDst := tcp.DstPort
  499. sequence := tcp.Seq
  500. applicationLayer := packet.ApplicationLayer()
  501. if applicationLayer == nil {
  502. return
  503. }
  504. count++
  505. if len(applicationLayer.Payload()) < 50 {
  506. log.Println("application layer too small!")
  507. return
  508. }
  509. request := data.Request{
  510. IpSrc: ipSrc,
  511. IpDst: ipDst,
  512. PortSrc: uint32(portSrc),
  513. PortDst: uint32(portDst),
  514. TcpSeq: uint32(sequence),
  515. CreatedAt: packet.Metadata().CaptureInfo.Timestamp.UnixNano(),
  516. }
  517. switch config.Protocol {
  518. case "http":
  519. err := processHTTP(&request, applicationLayer.Payload())
  520. if err != nil {
  521. log.Println(err)
  522. return
  523. }
  524. case "ajp13":
  525. err := processAJP13(&request, applicationLayer.Payload())
  526. if err != nil {
  527. log.Println(err)
  528. return
  529. }
  530. }
  531. if config.UseXForwardedAsSource && request.XForwardedFor != "" {
  532. if strings.Contains(request.XForwardedFor, ",") {
  533. ips := strings.Split(request.XForwardedFor, ",")
  534. for i := len(ips) - 1; i >= 0; i-- {
  535. ipRaw := strings.TrimSpace(ips[i])
  536. ipAddr := net.ParseIP(ipRaw)
  537. if ipAddr != nil && !ipPriv.IsPrivate(ipAddr) {
  538. request.Source = ipRaw
  539. break
  540. }
  541. }
  542. } else {
  543. ipAddr := net.ParseIP(strings.TrimSpace(request.XForwardedFor))
  544. if !ipPriv.IsPrivate(ipAddr) {
  545. request.Source = request.XForwardedFor
  546. }
  547. }
  548. }
  549. if request.Source == request.IpSrc && request.XRealIP != "" {
  550. request.Source = request.XRealIP
  551. }
  552. if config.Trace {
  553. log.Printf("[%s] %s\n", request.Source, request.Url)
  554. }
  555. publishRequest(config.NatsQueue, &request)
  556. }
  557. func processAJP13(request *data.Request, appData []byte) error {
  558. a, err := ajp13.Parse(appData)
  559. if err != nil {
  560. return fmt.Errorf("Failed to parse AJP13 request: %s", err)
  561. }
  562. request.Url = a.URI
  563. request.Method = a.Method()
  564. request.Host = a.Server
  565. request.Protocol = a.Version
  566. request.Origin = a.RemoteAddr.String()
  567. request.Source = a.RemoteAddr.String()
  568. if v, ok := a.Header("Referer"); ok {
  569. request.Referer = v
  570. }
  571. if v, ok := a.Header("Connection"); ok {
  572. request.Connection = v
  573. }
  574. if v, ok := a.Header("X-Forwarded-For"); ok {
  575. request.XForwardedFor = v
  576. }
  577. if v, ok := a.Header("X-Real-IP"); ok {
  578. request.XRealIP = v
  579. }
  580. if v, ok := a.Header("X-Requested-With"); ok {
  581. request.XRequestedWith = v
  582. }
  583. if v, ok := a.Header("Accept-Encoding"); ok {
  584. request.AcceptEncoding = v
  585. }
  586. if v, ok := a.Header("Accept-Language"); ok {
  587. request.AcceptLanguage = v
  588. }
  589. if v, ok := a.Header("User-Agent"); ok {
  590. request.UserAgent = v
  591. }
  592. if v, ok := a.Header("Accept"); ok {
  593. request.Accept = v
  594. }
  595. if v, ok := a.Header("Cookie"); ok {
  596. request.Cookie = v
  597. }
  598. if v, ok := a.Header("X-Forwarded-Host"); ok {
  599. if v != request.Host {
  600. request.Host = v
  601. }
  602. }
  603. return nil
  604. }
  605. func processHTTP(request *data.Request, appData []byte) error {
  606. reader := bufio.NewReader(strings.NewReader(string(appData)))
  607. req, err := http.ReadRequest(reader)
  608. if err != nil {
  609. return fmt.Errorf("Failed to parse HTTP header: %s", err)
  610. }
  611. request.Url = req.URL.String()
  612. request.Method = req.Method
  613. request.Referer = req.Referer()
  614. request.Host = req.Host
  615. request.Protocol = req.Proto
  616. request.Origin = request.Host
  617. if _, ok := req.Header["Connection"]; ok {
  618. request.Connection = req.Header["Connection"][0]
  619. }
  620. if _, ok := req.Header["X-Forwarded-For"]; ok {
  621. request.XForwardedFor = req.Header["X-Forwarded-For"][0]
  622. }
  623. // CloudFlare: override X-Forwarded for since it is tainted by cloudflare
  624. if _, ok := req.Header["True-Client-Ip"]; ok {
  625. request.XForwardedFor = req.Header["True-Client-Ip"][0]
  626. }
  627. if _, ok := req.Header["X-Real-Ip"]; ok {
  628. request.XRealIP = req.Header["X-Real-Ip"][0]
  629. }
  630. if _, ok := req.Header["X-Requested-With"]; ok {
  631. request.XRequestedWith = req.Header["X-Requested-With"][0]
  632. }
  633. if _, ok := req.Header["Accept-Encoding"]; ok {
  634. request.AcceptEncoding = req.Header["Accept-Encoding"][0]
  635. }
  636. if _, ok := req.Header["Accept-Language"]; ok {
  637. request.AcceptLanguage = req.Header["Accept-Language"][0]
  638. }
  639. if _, ok := req.Header["User-Agent"]; ok {
  640. request.UserAgent = req.Header["User-Agent"][0]
  641. }
  642. if _, ok := req.Header["Accept"]; ok {
  643. request.Accept = req.Header["Accept"][0]
  644. }
  645. if _, ok := req.Header["Cookie"]; ok {
  646. request.Cookie = req.Header["Cookie"][0]
  647. }
  648. request.Source = request.IpSrc
  649. return nil
  650. }
  651. // replayFile takes a file containing a list of requests (SourceIP Url) and queues the requests
  652. // e.g.
  653. // 157.55.39.229 /gross-gerau/12012260-beate-anstatt
  654. // 103.232.100.98 /weinsheim-eifel/13729444-plus-warenhandelsges-mbh
  655. func replayFile() {
  656. var req data.Request
  657. var startTs time.Time
  658. var endTs time.Time
  659. rand.Seed(time.Now().UnixNano())
  660. for {
  661. fh, err := os.Open(config.RequestsFile)
  662. if err != nil {
  663. log.Fatalf("Failed to open request file '%s': %s", config.RequestsFile, err)
  664. }
  665. c := csv.NewReader(fh)
  666. c.Comma = ' '
  667. for {
  668. if config.SleepFor.Duration > time.Nanosecond {
  669. startTs = time.Now()
  670. }
  671. r, err := c.Read()
  672. if err == io.EOF {
  673. break
  674. }
  675. if err != nil {
  676. log.Println(err)
  677. continue
  678. }
  679. req.IpSrc = r[0]
  680. req.Source = r[0]
  681. req.Url = r[1]
  682. req.UserAgent = "Munch/1.0"
  683. req.Host = "demo.scraperwall.com"
  684. req.CreatedAt = time.Now().UnixNano()
  685. publishRequest(config.NatsQueue, &req)
  686. if strings.Index(r[1], ".") < 0 {
  687. hash := sha1.New()
  688. io.WriteString(hash, r[0])
  689. fp := data.Fingerprint{
  690. ClientID: "scw",
  691. Fingerprint: fmt.Sprintf("%x", hash.Sum(nil)),
  692. Remote: r[0],
  693. Url: r[1],
  694. Source: r[0],
  695. CreatedAt: time.Now(),
  696. }
  697. if strings.HasPrefix(r[0], "50.31.") {
  698. fp.Fingerprint = "a1f2c2ee560ce6580d66d451a9c8dfbf"
  699. natsJSONEC.Publish("fingerprints_scw", fp)
  700. } else if rand.Intn(10) < 5 {
  701. natsJSONEC.Publish("fingerprints_scw", fp)
  702. }
  703. }
  704. count++
  705. if config.SleepFor.Duration >= time.Nanosecond {
  706. endTs = time.Now()
  707. if endTs.Before(startTs.Add(config.SleepFor.Duration)) {
  708. time.Sleep(config.SleepFor.Duration - endTs.Sub(startTs))
  709. }
  710. }
  711. }
  712. }
  713. }
  714. func loadConfig() {
  715. // initialize with values from the command line / environment
  716. config.Live = *doLiveCapture
  717. config.Interface = *iface
  718. config.SnapshotLen = *snapshotLen
  719. config.Filter = *filter
  720. config.Promiscuous = *promiscuous
  721. config.NatsURL = *natsURL
  722. config.NatsQueue = *natsQueue
  723. config.NatsUser = *natsUser
  724. config.NatsPassword = *natsPassword
  725. config.NatsCA = *natsCA
  726. config.SleepFor.Duration = *sleepFor
  727. config.RequestsFile = *requestsFile
  728. config.UseXForwardedAsSource = *useXForwardedAsSource
  729. config.Protocol = *protocol
  730. config.ApacheLog = *apacheLog
  731. config.NginxLog = *nginxLog
  732. config.NginxLogFormat = *nginxFormat
  733. config.HostName = *hostName
  734. config.Quiet = *beQuiet
  735. config.Trace = *trace
  736. config.AccessWatchKey = *accessWatchKey
  737. if *configFile == "" {
  738. return
  739. }
  740. _, err := os.Stat(*configFile)
  741. if err != nil {
  742. log.Printf("%s: %s\n", *configFile, err)
  743. return
  744. }
  745. if _, err = toml.DecodeFile(*configFile, &config); err != nil {
  746. log.Printf("%s: %s\n", *configFile, err)
  747. }
  748. if !config.Quiet {
  749. config.print()
  750. }
  751. }
  752. // version outputs build information...
  753. func version() {
  754. fmt.Printf("munchclient %s, built on %s\n", Version, BuildDate)
  755. }