main.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637
  1. package main
  2. import (
  3. "bufio"
  4. "crypto/sha1"
  5. "encoding/csv"
  6. "flag"
  7. "fmt"
  8. "io"
  9. "log"
  10. "math/rand"
  11. "net"
  12. "net/http"
  13. "os"
  14. "strings"
  15. "time"
  16. "github.com/BurntSushi/toml"
  17. "github.com/Songmu/axslogparser"
  18. "github.com/google/gopacket"
  19. "github.com/google/gopacket/layers"
  20. "github.com/google/gopacket/pcap"
  21. "github.com/hpcloud/tail"
  22. "github.com/kr/pretty"
  23. "github.com/nats-io/nats"
  24. "github.com/nats-io/nats/encoders/protobuf"
  25. "git.scraperwall.com/scw/ajp13"
  26. "git.scraperwall.com/scw/data"
  27. "git.scraperwall.com/scw/ip"
  28. )
  29. var (
  30. doLiveCapture = flag.Bool("live", false, "Capture data in real time from a given interface")
  31. iface = flag.String("interface", "eth0", "Interface to get packets from")
  32. snapshotLen = flag.Int("snapshot-len", 8192, "Snapshot Length in Bytes")
  33. filter = flag.String("filter", "tcp", "PCAP filter expression")
  34. promiscuous = flag.Bool("promiscuous", false, "Switch interface into promiscuous mode?")
  35. natsURL = flag.String("nats-url", "nats://127.0.0.1:4222", "The URL of the NATS server")
  36. natsUser = flag.String("nats-user", "", "The user for NATS authentication")
  37. natsPassword = flag.String("nats-password", "", "The password for NATS authentication")
  38. natsQueue = flag.String("nats-queue", "requests", "The NATS queue name")
  39. natsCA = flag.String("nats-ca", "", "CA chain for NATS TLS")
  40. sleepFor = flag.Duration("sleep", 0, "Sleep this long between sending data (only when replaying a file)")
  41. requestsFile = flag.String("requests", "", "CSV file containing requests (IP and URL)")
  42. protocol = flag.String("protocol", "http", "which protocol to parse: http or ajp13")
  43. useXForwardedAsSource = flag.Bool("use-x-forwarded", false, "Use the IP address in X-Forwarded-For as source")
  44. trace = flag.Bool("trace", false, "Trace the packet capturing")
  45. apacheLog = flag.String("apache-log", "", "Parse an Apache Log file")
  46. configFile = flag.String("config", "", "The location of the TOML config file")
  47. beQuiet = flag.Bool("quiet", true, "Be quiet")
  48. doVersion = flag.Bool("version", false, "Show version information")
  49. natsEC *nats.EncodedConn
  50. natsJsonEC *nats.EncodedConn
  51. count uint64
  52. timeout = -1 * time.Second
  53. ipPriv *ip.IP
  54. config Config
  55. // Version contains the program Version, e.g. 1.0.1
  56. Version string
  57. // BuildDate contains the date and time at which the program was compiled
  58. BuildDate string
  59. )
  60. // Config contains the program configuration
  61. type Config struct {
  62. Live bool
  63. Interface string
  64. SnapshotLen int
  65. Filter string
  66. Promiscuous bool
  67. NatsURL string
  68. NatsQueue string
  69. NatsUser string
  70. NatsPassword string
  71. NatsCA string
  72. SleepFor duration
  73. RequestsFile string
  74. UseXForwardedAsSource bool
  75. Quiet bool
  76. Protocol string
  77. Trace bool
  78. ApacheLog string
  79. }
  80. type duration struct {
  81. time.Duration
  82. }
  83. func (d *duration) UnmarshalText(text []byte) error {
  84. var err error
  85. d.Duration, err = time.ParseDuration(string(text))
  86. return err
  87. }
  88. func (c Config) print() {
  89. fmt.Printf("Live: %t\n", c.Live)
  90. fmt.Printf("Interface: %s\n", c.Interface)
  91. fmt.Printf("SnapshotLen: %d\n", c.SnapshotLen)
  92. fmt.Printf("Filter: %s\n", c.Filter)
  93. fmt.Printf("Promiscuous: %t\n", c.Promiscuous)
  94. fmt.Printf("NatsURL: %s\n", c.NatsURL)
  95. fmt.Printf("NatsQueue: %s\n", c.NatsQueue)
  96. fmt.Printf("NatsUser: %s\n", c.NatsUser)
  97. fmt.Printf("NatsPassword: %s\n", c.NatsPassword)
  98. fmt.Printf("NatsCA: %s\n", c.NatsCA)
  99. fmt.Printf("SleepFor: %s\n", c.SleepFor.String())
  100. fmt.Printf("RequestsFile: %s\n", c.RequestsFile)
  101. fmt.Printf("Apache Log: %s\n", c.ApacheLog)
  102. fmt.Printf("UseXForwardedAsSource: %t\n", c.UseXForwardedAsSource)
  103. fmt.Printf("Protocol: %s\n", c.Protocol)
  104. fmt.Printf("Quiet: %t\n", c.Quiet)
  105. fmt.Printf("Trace: %t\n", c.Trace)
  106. }
  107. func init() {
  108. flag.Parse()
  109. nats.RegisterEncoder(protobuf.PROTOBUF_ENCODER, &protobuf.ProtobufEncoder{})
  110. }
  111. func main() {
  112. if *doVersion {
  113. version()
  114. os.Exit(0)
  115. }
  116. loadConfig()
  117. // Output how many requests per second were sent
  118. if !config.Quiet {
  119. go func(c *uint64) {
  120. for {
  121. fmt.Printf("%d requests per second\n", *c)
  122. *c = 0
  123. time.Sleep(time.Second)
  124. }
  125. }(&count)
  126. }
  127. // NATS
  128. //
  129. if config.NatsURL == "" {
  130. log.Fatal("No NATS URL specified (-nats-url)!")
  131. }
  132. var natsConn *nats.Conn
  133. var err error
  134. if config.NatsUser != "" && config.NatsPassword != "" && config.NatsCA != "" {
  135. natsConn, err = nats.Connect(config.NatsURL, nats.UserInfo(config.NatsUser, config.NatsPassword), nats.RootCAs(config.NatsCA))
  136. } else {
  137. if config.NatsPassword != "" && config.NatsUser != "" {
  138. natsConn, err = nats.Connect(config.NatsURL, nats.UserInfo(config.NatsUser, config.NatsPassword))
  139. } else {
  140. natsConn, err = nats.Connect(config.NatsURL)
  141. }
  142. }
  143. if err != nil {
  144. log.Fatal(err)
  145. }
  146. natsEC, err = nats.NewEncodedConn(natsConn, protobuf.PROTOBUF_ENCODER)
  147. if err != nil {
  148. log.Fatalf("Encoded Connection: %v!\n", err)
  149. }
  150. natsJsonEC, err = nats.NewEncodedConn(natsConn, nats.JSON_ENCODER)
  151. if err != nil {
  152. log.Fatalf("Encoded Connection: %v!\n", err)
  153. }
  154. // What should I do?
  155. if config.RequestsFile != "" {
  156. replayFile()
  157. } else if config.ApacheLog != "" {
  158. apacheLogCapture(config.ApacheLog)
  159. } else if config.Live {
  160. fmt.Printf("live capture (%s, %s) to %s\n", config.Interface, config.Filter, config.NatsURL)
  161. liveCapture()
  162. }
  163. }
  164. func apacheLogCapture(logfile string) {
  165. if _, err := os.Stat(logfile); err != nil {
  166. log.Fatalf("%s: %s", logfile, err)
  167. }
  168. t, err := tail.TailFile(logfile, tail.Config{
  169. Follow: true, // follow the file
  170. ReOpen: true, // reopen log file when it gets closed/rotated
  171. Logger: tail.DiscardingLogger, // don't log anything
  172. Location: &tail.SeekInfo{Offset: 0, Whence: 2}, // start at the end of the file
  173. })
  174. if err != nil {
  175. log.Fatalf("%s: %s", logfile, err)
  176. }
  177. var p axslogparser.Parser
  178. parserSet := false
  179. for line := range t.Lines {
  180. l := line.Text
  181. if !parserSet {
  182. p, _, err = axslogparser.GuessParser(l)
  183. if err != nil {
  184. log.Println(err)
  185. continue
  186. }
  187. parserSet = true
  188. }
  189. logEntry, err := p.Parse(l)
  190. if err != nil {
  191. log.Println(err)
  192. continue
  193. }
  194. remote := logEntry.Host
  195. if *useXForwardedAsSource && logEntry.ForwardedFor != "" {
  196. remote = logEntry.ForwardedFor
  197. }
  198. // only use the first host in case there are multiple hosts in the log
  199. if cidx := strings.Index(remote, ","); cidx >= 0 {
  200. remote = remote[0:cidx]
  201. }
  202. // extract the virtual host
  203. var virtualHost string
  204. vhost := logEntry.VirtualHost
  205. if vhost != "" {
  206. vhostAndPort := strings.Split(logEntry.VirtualHost, ":")
  207. virtualHost = vhostAndPort[0]
  208. }
  209. request := data.Request{
  210. IpSrc: remote,
  211. IpDst: "127.0.0.1",
  212. PortSrc: 0,
  213. PortDst: 0,
  214. TcpSeq: 0,
  215. CreatedAt: logEntry.Time.UnixNano(),
  216. Url: logEntry.RequestURI,
  217. Method: logEntry.Method,
  218. Host: virtualHost,
  219. Protocol: logEntry.Protocol,
  220. Origin: remote,
  221. Source: remote,
  222. Referer: logEntry.Referer,
  223. XForwardedFor: logEntry.ForwardedFor,
  224. UserAgent: logEntry.UserAgent,
  225. }
  226. if config.Trace {
  227. log.Printf("[%s] %s\n", request.Source, request.Url)
  228. }
  229. if err := natsEC.Publish(config.NatsQueue, &request); err != nil {
  230. log.Println(err)
  231. }
  232. }
  233. }
  234. func liveCapture() {
  235. ipPriv = ip.NewIP()
  236. // PCAP setup
  237. //
  238. handle, err := pcap.OpenLive(config.Interface, int32(config.SnapshotLen), config.Promiscuous, timeout)
  239. if err != nil {
  240. log.Fatal(err)
  241. }
  242. defer handle.Close()
  243. err = handle.SetBPFFilter(config.Filter)
  244. if err != nil {
  245. log.Fatal(err)
  246. }
  247. packetSource := gopacket.NewPacketSource(handle, handle.LinkType())
  248. for packet := range packetSource.Packets() {
  249. go processPacket(packet)
  250. }
  251. }
  252. // processPacket receives a raw packet from pcap, builds a Request item from it and sends it to the queue
  253. func processPacket(packet gopacket.Packet) {
  254. hasIPv4 := false
  255. var ipSrc, ipDst string
  256. // IPv4
  257. if ipLayer := packet.Layer(layers.LayerTypeIPv4); ipLayer != nil {
  258. ip := ipLayer.(*layers.IPv4)
  259. ipSrc = ip.SrcIP.String()
  260. ipDst = ip.DstIP.String()
  261. hasIPv4 = true
  262. }
  263. // IPv6
  264. if !hasIPv4 {
  265. if ipLayer := packet.Layer(layers.LayerTypeIPv6); ipLayer != nil {
  266. ip := ipLayer.(*layers.IPv6)
  267. ipSrc = ip.SrcIP.String()
  268. ipDst = ip.DstIP.String()
  269. }
  270. }
  271. // TCP
  272. tcpLayer := packet.Layer(layers.LayerTypeTCP)
  273. if tcpLayer == nil {
  274. return
  275. }
  276. tcp, _ := tcpLayer.(*layers.TCP)
  277. portSrc := tcp.SrcPort
  278. portDst := tcp.DstPort
  279. sequence := tcp.Seq
  280. applicationLayer := packet.ApplicationLayer()
  281. if applicationLayer == nil {
  282. return
  283. }
  284. count++
  285. if len(applicationLayer.Payload()) < 50 {
  286. log.Println("application layer too small!")
  287. return
  288. }
  289. request := data.Request{
  290. IpSrc: ipSrc,
  291. IpDst: ipDst,
  292. PortSrc: uint32(portSrc),
  293. PortDst: uint32(portDst),
  294. TcpSeq: uint32(sequence),
  295. CreatedAt: packet.Metadata().CaptureInfo.Timestamp.UnixNano(),
  296. }
  297. switch config.Protocol {
  298. case "http":
  299. err := processHTTP(&request, applicationLayer.Payload())
  300. if err != nil {
  301. log.Println(err)
  302. return
  303. }
  304. case "ajp13":
  305. err := processAJP13(&request, applicationLayer.Payload())
  306. if err != nil {
  307. log.Println(err)
  308. return
  309. }
  310. }
  311. if config.UseXForwardedAsSource && request.XForwardedFor != "" {
  312. if strings.Contains(request.XForwardedFor, ",") {
  313. ips := strings.Split(request.XForwardedFor, ",")
  314. for i := len(ips) - 1; i >= 0; i-- {
  315. ipRaw := strings.TrimSpace(ips[i])
  316. ipAddr := net.ParseIP(ipRaw)
  317. if ipAddr != nil && !ipPriv.IsPrivate(ipAddr) {
  318. request.Source = ipRaw
  319. break
  320. }
  321. }
  322. } else {
  323. ipAddr := net.ParseIP(strings.TrimSpace(request.XForwardedFor))
  324. if !ipPriv.IsPrivate(ipAddr) {
  325. request.Source = request.XForwardedFor
  326. }
  327. }
  328. }
  329. if request.Source == request.IpSrc && request.XRealIP != "" {
  330. request.Source = request.XRealIP
  331. }
  332. if config.Trace {
  333. log.Printf("[%s] %s\n", request.Source, request.Url)
  334. }
  335. natsEC.Publish(config.NatsQueue, &request)
  336. }
  337. func processAJP13(request *data.Request, appData []byte) error {
  338. a, err := ajp13.Parse(appData)
  339. if err != nil {
  340. return fmt.Errorf("Failed to parse AJP13 request: %s", err)
  341. }
  342. request.Url = a.URI
  343. request.Method = a.Method()
  344. request.Host = a.Server
  345. request.Protocol = a.Version
  346. request.Origin = a.RemoteAddr.String()
  347. request.Source = a.RemoteAddr.String()
  348. if v, ok := a.Header("Referer"); ok {
  349. request.Referer = v
  350. }
  351. if v, ok := a.Header("Connection"); ok {
  352. request.Connection = v
  353. }
  354. if v, ok := a.Header("X-Forwarded-For"); ok {
  355. request.XForwardedFor = v
  356. }
  357. if v, ok := a.Header("X-Real-IP"); ok {
  358. request.XRealIP = v
  359. }
  360. if v, ok := a.Header("X-Requested-With"); ok {
  361. request.XRequestedWith = v
  362. }
  363. if v, ok := a.Header("Accept-Encoding"); ok {
  364. request.AcceptEncoding = v
  365. }
  366. if v, ok := a.Header("Accept-Language"); ok {
  367. request.AcceptLanguage = v
  368. }
  369. if v, ok := a.Header("User-Agent"); ok {
  370. request.UserAgent = v
  371. }
  372. if v, ok := a.Header("Accept"); ok {
  373. request.Accept = v
  374. }
  375. if v, ok := a.Header("Cookie"); ok {
  376. request.Cookie = v
  377. }
  378. if v, ok := a.Header("X-Forwarded-Host"); ok {
  379. if v != request.Host {
  380. request.Host = v
  381. }
  382. }
  383. if false && config.Trace {
  384. log.Println("Request")
  385. pretty.Println(request)
  386. }
  387. return nil
  388. }
  389. func processHTTP(request *data.Request, appData []byte) error {
  390. reader := bufio.NewReader(strings.NewReader(string(appData)))
  391. req, err := http.ReadRequest(reader)
  392. if err != nil {
  393. return fmt.Errorf("Failed to parse HTTP header: %s", err)
  394. }
  395. request.Url = req.URL.String()
  396. request.Method = req.Method
  397. request.Referer = req.Referer()
  398. request.Host = req.Host
  399. request.Protocol = req.Proto
  400. request.Origin = request.Host
  401. if _, ok := req.Header["Connection"]; ok {
  402. request.Connection = req.Header["Connection"][0]
  403. }
  404. if _, ok := req.Header["X-Forwarded-For"]; ok {
  405. request.XForwardedFor = req.Header["X-Forwarded-For"][0]
  406. }
  407. // CloudFlare: override X-Forwarded for since it is tainted by cloudflare
  408. if _, ok := req.Header["True-Client-Ip"]; ok {
  409. request.XForwardedFor = req.Header["True-Client-Ip"][0]
  410. }
  411. if _, ok := req.Header["X-Real-Ip"]; ok {
  412. request.XRealIP = req.Header["X-Real-Ip"][0]
  413. }
  414. if _, ok := req.Header["X-Requested-With"]; ok {
  415. request.XRequestedWith = req.Header["X-Requested-With"][0]
  416. }
  417. if _, ok := req.Header["Accept-Encoding"]; ok {
  418. request.AcceptEncoding = req.Header["Accept-Encoding"][0]
  419. }
  420. if _, ok := req.Header["Accept-Language"]; ok {
  421. request.AcceptLanguage = req.Header["Accept-Language"][0]
  422. }
  423. if _, ok := req.Header["User-Agent"]; ok {
  424. request.UserAgent = req.Header["User-Agent"][0]
  425. }
  426. if _, ok := req.Header["Accept"]; ok {
  427. request.Accept = req.Header["Accept"][0]
  428. }
  429. if _, ok := req.Header["Cookie"]; ok {
  430. request.Cookie = req.Header["Cookie"][0]
  431. }
  432. request.Source = request.IpSrc
  433. return nil
  434. }
  435. // replayFile takes a file containing a list of requests (SourceIP Url) and queues the requests
  436. // e.g.
  437. // 157.55.39.229 /gross-gerau/12012260-beate-anstatt
  438. // 103.232.100.98 /weinsheim-eifel/13729444-plus-warenhandelsges-mbh
  439. func replayFile() {
  440. var req data.Request
  441. var startTs time.Time
  442. var endTs time.Time
  443. rand.Seed(time.Now().UnixNano())
  444. for {
  445. fh, err := os.Open(config.RequestsFile)
  446. if err != nil {
  447. log.Fatalf("Failed to open request file '%s': %s", config.RequestsFile, err)
  448. }
  449. c := csv.NewReader(fh)
  450. c.Comma = ' '
  451. for {
  452. if config.SleepFor.Duration > time.Nanosecond {
  453. startTs = time.Now()
  454. }
  455. r, err := c.Read()
  456. if err == io.EOF {
  457. break
  458. }
  459. if err != nil {
  460. log.Println(err)
  461. continue
  462. }
  463. req.IpSrc = r[0]
  464. req.Source = r[0]
  465. req.Url = r[1]
  466. req.UserAgent = "Munch/1.0"
  467. req.Host = "demo.scraperwall.com"
  468. req.CreatedAt = time.Now().UnixNano()
  469. err = natsEC.Publish(config.NatsQueue, &req)
  470. if err != nil {
  471. log.Println(err)
  472. }
  473. if strings.Index(r[1], ".") < 0 {
  474. hash := sha1.New()
  475. io.WriteString(hash, r[0])
  476. fp := data.Fingerprint{
  477. ClientID: "scw",
  478. Fingerprint: fmt.Sprintf("%x", hash.Sum(nil)),
  479. Remote: r[0],
  480. Url: r[1],
  481. Source: r[0],
  482. CreatedAt: time.Now(),
  483. }
  484. if strings.HasPrefix(r[0], "50.31.") {
  485. fp.Fingerprint = "a1f2c2ee560ce6580d66d451a9c8dfbf"
  486. natsJsonEC.Publish("fingerprints_scw", fp)
  487. } else if rand.Intn(10) < 5 {
  488. natsJsonEC.Publish("fingerprints_scw", fp)
  489. }
  490. }
  491. count++
  492. if config.SleepFor.Duration >= time.Nanosecond {
  493. endTs = time.Now()
  494. if endTs.Before(startTs.Add(config.SleepFor.Duration)) {
  495. time.Sleep(config.SleepFor.Duration - endTs.Sub(startTs))
  496. }
  497. }
  498. }
  499. }
  500. }
  501. func loadConfig() {
  502. // initialize with values from the command line / environment
  503. config.Live = *doLiveCapture
  504. config.Interface = *iface
  505. config.SnapshotLen = *snapshotLen
  506. config.Filter = *filter
  507. config.Promiscuous = *promiscuous
  508. config.NatsURL = *natsURL
  509. config.NatsQueue = *natsQueue
  510. config.NatsUser = *natsUser
  511. config.NatsPassword = *natsPassword
  512. config.NatsCA = *natsCA
  513. config.SleepFor.Duration = *sleepFor
  514. config.RequestsFile = *requestsFile
  515. config.UseXForwardedAsSource = *useXForwardedAsSource
  516. config.Protocol = *protocol
  517. config.ApacheLog = *apacheLog
  518. config.Quiet = *beQuiet
  519. config.Trace = *trace
  520. if *configFile == "" {
  521. return
  522. }
  523. _, err := os.Stat(*configFile)
  524. if err != nil {
  525. log.Printf("%s: %s\n", *configFile, err)
  526. return
  527. }
  528. if _, err = toml.DecodeFile(*configFile, &config); err != nil {
  529. log.Printf("%s: %s\n", *configFile, err)
  530. }
  531. if !config.Quiet {
  532. config.print()
  533. }
  534. }
  535. // version outputs build information
  536. func version() {
  537. fmt.Printf("munchclient %s, built on %s\n", Version, BuildDate)
  538. }