main.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635
  1. package main
  2. import (
  3. "bufio"
  4. "crypto/sha1"
  5. "encoding/csv"
  6. "flag"
  7. "fmt"
  8. "io"
  9. "log"
  10. "math/rand"
  11. "net"
  12. "net/http"
  13. "os"
  14. "strings"
  15. "time"
  16. "github.com/BurntSushi/toml"
  17. "github.com/Songmu/axslogparser"
  18. "github.com/google/gopacket"
  19. "github.com/google/gopacket/layers"
  20. "github.com/google/gopacket/pcap"
  21. "github.com/hpcloud/tail"
  22. "github.com/kr/pretty"
  23. "github.com/nats-io/nats"
  24. "github.com/nats-io/nats/encoders/protobuf"
  25. "git.scraperwall.com/scw/ajp13"
  26. "git.scraperwall.com/scw/data"
  27. "git.scraperwall.com/scw/ip"
  28. )
  29. var (
  30. doLiveCapture = flag.Bool("live", false, "Capture data in real time from a given interface")
  31. iface = flag.String("interface", "eth0", "Interface to get packets from")
  32. snapshotLen = flag.Int("snapshot-len", 8192, "Snapshot Length in Bytes")
  33. filter = flag.String("filter", "tcp", "PCAP filter expression")
  34. promiscuous = flag.Bool("promiscuous", false, "Switch interface into promiscuous mode?")
  35. natsURL = flag.String("nats-url", "nats://127.0.0.1:4222", "The URL of the NATS server")
  36. natsUser = flag.String("nats-user", "", "The user for NATS authentication")
  37. natsPassword = flag.String("nats-password", "", "The password for NATS authentication")
  38. natsQueue = flag.String("nats-queue", "requests", "The NATS queue name")
  39. natsCA = flag.String("nats-ca", "", "CA chain for NATS TLS")
  40. sleepFor = flag.Duration("sleep", 0, "Sleep this long between sending data (only when replaying a file)")
  41. requestsFile = flag.String("requests", "", "CSV file containing requests (IP and URL)")
  42. protocol = flag.String("protocol", "http", "which protocol to parse: http or ajp13")
  43. useXForwardedAsSource = flag.Bool("use-x-forwarded", false, "Use the IP address in X-Forwarded-For as source")
  44. trace = flag.Bool("trace", false, "Trace the packet capturing")
  45. apacheLog = flag.String("apache-log", "", "Parse an Apache Log file")
  46. configFile = flag.String("config", "", "The location of the TOML config file")
  47. beQuiet = flag.Bool("quiet", true, "Be quiet")
  48. doVersion = flag.Bool("version", false, "Show version information")
  49. natsEC *nats.EncodedConn
  50. natsJsonEC *nats.EncodedConn
  51. count uint64
  52. timeout = -1 * time.Second
  53. ipPriv *ip.IP
  54. config Config
  55. // Version contains the program Version, e.g. 1.0.1
  56. Version string
  57. // BuildDate contains the date and time at which the program was compiled
  58. BuildDate string
  59. )
  60. // Config contains the program configuration
  61. type Config struct {
  62. Live bool
  63. Interface string
  64. SnapshotLen int
  65. Filter string
  66. Promiscuous bool
  67. NatsURL string
  68. NatsQueue string
  69. NatsUser string
  70. NatsPassword string
  71. NatsCA string
  72. SleepFor duration
  73. RequestsFile string
  74. UseXForwardedAsSource bool
  75. Quiet bool
  76. Protocol string
  77. Trace bool
  78. ApacheLog string
  79. }
  80. type duration struct {
  81. time.Duration
  82. }
  83. func (d *duration) UnmarshalText(text []byte) error {
  84. var err error
  85. d.Duration, err = time.ParseDuration(string(text))
  86. return err
  87. }
  88. func (c Config) print() {
  89. fmt.Printf("Live: %t\n", c.Live)
  90. fmt.Printf("Interface: %s\n", c.Interface)
  91. fmt.Printf("SnapshotLen: %d\n", c.SnapshotLen)
  92. fmt.Printf("Filter: %s\n", c.Filter)
  93. fmt.Printf("Promiscuous: %t\n", c.Promiscuous)
  94. fmt.Printf("NatsURL: %s\n", c.NatsURL)
  95. fmt.Printf("NatsQueue: %s\n", c.NatsQueue)
  96. fmt.Printf("NatsUser: %s\n", c.NatsUser)
  97. fmt.Printf("NatsPassword: %s\n", c.NatsPassword)
  98. fmt.Printf("NatsCA: %s\n", c.NatsCA)
  99. fmt.Printf("SleepFor: %s\n", c.SleepFor.String())
  100. fmt.Printf("RequestsFile: %s\n", c.RequestsFile)
  101. fmt.Printf("Apache Log: %s\n", c.ApacheLog)
  102. fmt.Printf("UseXForwardedAsSource: %t\n", c.UseXForwardedAsSource)
  103. fmt.Printf("Protocol: %s\n", c.Protocol)
  104. fmt.Printf("Quiet: %t\n", c.Quiet)
  105. fmt.Printf("Trace: %t\n", c.Trace)
  106. }
  107. func init() {
  108. flag.Parse()
  109. nats.RegisterEncoder(protobuf.PROTOBUF_ENCODER, &protobuf.ProtobufEncoder{})
  110. }
  111. func main() {
  112. if *doVersion {
  113. version()
  114. os.Exit(0)
  115. }
  116. loadConfig()
  117. // Output how many requests per second were sent
  118. if !config.Quiet {
  119. go func(c *uint64) {
  120. for {
  121. fmt.Printf("%d requests per second\n", *c)
  122. *c = 0
  123. time.Sleep(time.Second)
  124. }
  125. }(&count)
  126. }
  127. // NATS
  128. //
  129. if config.NatsURL == "" {
  130. log.Fatal("No NATS URL specified (-nats-url)!")
  131. }
  132. var natsConn *nats.Conn
  133. var err error
  134. if config.NatsUser != "" && config.NatsPassword != "" && config.NatsCA != "" {
  135. natsConn, err = nats.Connect(config.NatsURL, nats.UserInfo(config.NatsUser, config.NatsPassword), nats.RootCAs(config.NatsCA))
  136. } else {
  137. if config.NatsPassword != "" && config.NatsCA != "" {
  138. natsConn, err = nats.Connect(config.NatsURL, nats.UserInfo(config.NatsUser, config.NatsPassword))
  139. } else {
  140. natsConn, err = nats.Connect(config.NatsURL)
  141. }
  142. }
  143. if err != nil {
  144. log.Fatal(err)
  145. }
  146. natsEC, err = nats.NewEncodedConn(natsConn, protobuf.PROTOBUF_ENCODER)
  147. if err != nil {
  148. log.Fatalf("Encoded Connection: %v!\n", err)
  149. }
  150. natsJsonEC, err = nats.NewEncodedConn(natsConn, nats.JSON_ENCODER)
  151. if err != nil {
  152. log.Fatalf("Encoded Connection: %v!\n", err)
  153. }
  154. // What should I do?
  155. if config.RequestsFile != "" {
  156. replayFile()
  157. } else if config.ApacheLog != "" {
  158. apacheLogCapture(config.ApacheLog)
  159. } else if config.Live {
  160. fmt.Printf("live capture (%s, %s) to %s\n", config.Interface, config.Filter, config.NatsURL)
  161. liveCapture()
  162. }
  163. }
  164. func apacheLogCapture(logfile string) {
  165. if _, err := os.Stat(logfile); err != nil {
  166. log.Fatalf("%s: %s", logfile, err)
  167. }
  168. t, err := tail.TailFile(logfile, tail.Config{
  169. Follow: true, // follow the file
  170. ReOpen: true, // reopen log file when it gets closed/rotated
  171. Logger: tail.DiscardingLogger, // don't log anything
  172. Location: &tail.SeekInfo{Offset: 0, Whence: 2}, // start at the end of the file
  173. })
  174. if err != nil {
  175. log.Fatalf("%s: %s", logfile, err)
  176. }
  177. var p axslogparser.Parser
  178. parserSet := false
  179. for line := range t.Lines {
  180. l := line.Text
  181. if !parserSet {
  182. p, _, err = axslogparser.GuessParser(l)
  183. if err != nil {
  184. log.Println(err)
  185. continue
  186. }
  187. parserSet = true
  188. }
  189. logEntry, err := p.Parse(l)
  190. if err != nil {
  191. log.Println(err)
  192. continue
  193. }
  194. remote := logEntry.Host
  195. if *useXForwardedAsSource && logEntry.ForwardedFor != "" {
  196. remote = logEntry.ForwardedFor
  197. }
  198. // only use the first host in case there are multiple hosts in the log
  199. if cidx := strings.Index(remote, ","); cidx >= 0 {
  200. remote = remote[0:cidx]
  201. }
  202. // extract the virtual host
  203. var virtualHost string
  204. vhost := logEntry.VirtualHost
  205. if vhost != "" {
  206. vhostAndPort := strings.Split(logEntry.VirtualHost, ":")
  207. virtualHost = vhostAndPort[0]
  208. }
  209. request := data.Request{
  210. IpSrc: remote,
  211. IpDst: "127.0.0.1",
  212. PortSrc: 0,
  213. PortDst: 0,
  214. TcpSeq: 0,
  215. CreatedAt: logEntry.Time.UnixNano(),
  216. Url: logEntry.RequestURI,
  217. Method: logEntry.Method,
  218. Host: virtualHost,
  219. Protocol: logEntry.Protocol,
  220. Origin: remote,
  221. Source: remote,
  222. Referer: logEntry.Referer,
  223. XForwardedFor: logEntry.ForwardedFor,
  224. UserAgent: logEntry.UserAgent,
  225. }
  226. if config.Trace {
  227. log.Printf("[%s] %s\n", request.Source, request.Url)
  228. }
  229. natsEC.Publish(config.NatsQueue, &request)
  230. }
  231. }
  232. func liveCapture() {
  233. ipPriv = ip.NewIP()
  234. // PCAP setup
  235. //
  236. handle, err := pcap.OpenLive(config.Interface, int32(config.SnapshotLen), config.Promiscuous, timeout)
  237. if err != nil {
  238. log.Fatal(err)
  239. }
  240. defer handle.Close()
  241. err = handle.SetBPFFilter(config.Filter)
  242. if err != nil {
  243. log.Fatal(err)
  244. }
  245. packetSource := gopacket.NewPacketSource(handle, handle.LinkType())
  246. for packet := range packetSource.Packets() {
  247. go processPacket(packet)
  248. }
  249. }
  250. // processPacket receives a raw packet from pcap, builds a Request item from it and sends it to the queue
  251. func processPacket(packet gopacket.Packet) {
  252. hasIPv4 := false
  253. var ipSrc, ipDst string
  254. // IPv4
  255. if ipLayer := packet.Layer(layers.LayerTypeIPv4); ipLayer != nil {
  256. ip := ipLayer.(*layers.IPv4)
  257. ipSrc = ip.SrcIP.String()
  258. ipDst = ip.DstIP.String()
  259. hasIPv4 = true
  260. }
  261. // IPv6
  262. if !hasIPv4 {
  263. if ipLayer := packet.Layer(layers.LayerTypeIPv6); ipLayer != nil {
  264. ip := ipLayer.(*layers.IPv6)
  265. ipSrc = ip.SrcIP.String()
  266. ipDst = ip.DstIP.String()
  267. }
  268. }
  269. // TCP
  270. tcpLayer := packet.Layer(layers.LayerTypeTCP)
  271. if tcpLayer == nil {
  272. return
  273. }
  274. tcp, _ := tcpLayer.(*layers.TCP)
  275. portSrc := tcp.SrcPort
  276. portDst := tcp.DstPort
  277. sequence := tcp.Seq
  278. applicationLayer := packet.ApplicationLayer()
  279. if applicationLayer == nil {
  280. return
  281. }
  282. count++
  283. if len(applicationLayer.Payload()) < 50 {
  284. log.Println("application layer too small!")
  285. return
  286. }
  287. request := data.Request{
  288. IpSrc: ipSrc,
  289. IpDst: ipDst,
  290. PortSrc: uint32(portSrc),
  291. PortDst: uint32(portDst),
  292. TcpSeq: uint32(sequence),
  293. CreatedAt: packet.Metadata().CaptureInfo.Timestamp.UnixNano(),
  294. }
  295. switch config.Protocol {
  296. case "http":
  297. err := processHTTP(&request, applicationLayer.Payload())
  298. if err != nil {
  299. log.Println(err)
  300. return
  301. }
  302. case "ajp13":
  303. err := processAJP13(&request, applicationLayer.Payload())
  304. if err != nil {
  305. log.Println(err)
  306. return
  307. }
  308. }
  309. if config.UseXForwardedAsSource && request.XForwardedFor != "" {
  310. if strings.Contains(request.XForwardedFor, ",") {
  311. ips := strings.Split(request.XForwardedFor, ",")
  312. for i := len(ips) - 1; i >= 0; i-- {
  313. ipRaw := strings.TrimSpace(ips[i])
  314. ipAddr := net.ParseIP(ipRaw)
  315. if ipAddr != nil && !ipPriv.IsPrivate(ipAddr) {
  316. request.Source = ipRaw
  317. break
  318. }
  319. }
  320. } else {
  321. ipAddr := net.ParseIP(strings.TrimSpace(request.XForwardedFor))
  322. if !ipPriv.IsPrivate(ipAddr) {
  323. request.Source = request.XForwardedFor
  324. }
  325. }
  326. }
  327. if request.Source == request.IpSrc && request.XRealIP != "" {
  328. request.Source = request.XRealIP
  329. }
  330. if config.Trace {
  331. log.Printf("[%s] %s\n", request.Source, request.Url)
  332. }
  333. natsEC.Publish(config.NatsQueue, &request)
  334. }
  335. func processAJP13(request *data.Request, appData []byte) error {
  336. a, err := ajp13.Parse(appData)
  337. if err != nil {
  338. return fmt.Errorf("Failed to parse AJP13 request: %s", err)
  339. }
  340. request.Url = a.URI
  341. request.Method = a.Method()
  342. request.Host = a.Server
  343. request.Protocol = a.Version
  344. request.Origin = a.RemoteAddr.String()
  345. request.Source = a.RemoteAddr.String()
  346. if v, ok := a.Header("Referer"); ok {
  347. request.Referer = v
  348. }
  349. if v, ok := a.Header("Connection"); ok {
  350. request.Connection = v
  351. }
  352. if v, ok := a.Header("X-Forwarded-For"); ok {
  353. request.XForwardedFor = v
  354. }
  355. if v, ok := a.Header("X-Real-IP"); ok {
  356. request.XRealIP = v
  357. }
  358. if v, ok := a.Header("X-Requested-With"); ok {
  359. request.XRequestedWith = v
  360. }
  361. if v, ok := a.Header("Accept-Encoding"); ok {
  362. request.AcceptEncoding = v
  363. }
  364. if v, ok := a.Header("Accept-Language"); ok {
  365. request.AcceptLanguage = v
  366. }
  367. if v, ok := a.Header("User-Agent"); ok {
  368. request.UserAgent = v
  369. }
  370. if v, ok := a.Header("Accept"); ok {
  371. request.Accept = v
  372. }
  373. if v, ok := a.Header("Cookie"); ok {
  374. request.Cookie = v
  375. }
  376. if v, ok := a.Header("X-Forwarded-Host"); ok {
  377. if v != request.Host {
  378. request.Host = v
  379. }
  380. }
  381. if false && config.Trace {
  382. log.Println("Request")
  383. pretty.Println(request)
  384. }
  385. return nil
  386. }
  387. func processHTTP(request *data.Request, appData []byte) error {
  388. reader := bufio.NewReader(strings.NewReader(string(appData)))
  389. req, err := http.ReadRequest(reader)
  390. if err != nil {
  391. return fmt.Errorf("Failed to parse HTTP header: %s", err)
  392. }
  393. request.Url = req.URL.String()
  394. request.Method = req.Method
  395. request.Referer = req.Referer()
  396. request.Host = req.Host
  397. request.Protocol = req.Proto
  398. request.Origin = request.Host
  399. if _, ok := req.Header["Connection"]; ok {
  400. request.Connection = req.Header["Connection"][0]
  401. }
  402. if _, ok := req.Header["X-Forwarded-For"]; ok {
  403. request.XForwardedFor = req.Header["X-Forwarded-For"][0]
  404. }
  405. // CloudFlare: override X-Forwarded for since it is tainted by cloudflare
  406. if _, ok := req.Header["True-Client-Ip"]; ok {
  407. request.XForwardedFor = req.Header["True-Client-Ip"][0]
  408. }
  409. if _, ok := req.Header["X-Real-Ip"]; ok {
  410. request.XRealIP = req.Header["X-Real-Ip"][0]
  411. }
  412. if _, ok := req.Header["X-Requested-With"]; ok {
  413. request.XRequestedWith = req.Header["X-Requested-With"][0]
  414. }
  415. if _, ok := req.Header["Accept-Encoding"]; ok {
  416. request.AcceptEncoding = req.Header["Accept-Encoding"][0]
  417. }
  418. if _, ok := req.Header["Accept-Language"]; ok {
  419. request.AcceptLanguage = req.Header["Accept-Language"][0]
  420. }
  421. if _, ok := req.Header["User-Agent"]; ok {
  422. request.UserAgent = req.Header["User-Agent"][0]
  423. }
  424. if _, ok := req.Header["Accept"]; ok {
  425. request.Accept = req.Header["Accept"][0]
  426. }
  427. if _, ok := req.Header["Cookie"]; ok {
  428. request.Cookie = req.Header["Cookie"][0]
  429. }
  430. request.Source = request.IpSrc
  431. return nil
  432. }
  433. // replayFile takes a file containing a list of requests (SourceIP Url) and queues the requests
  434. // e.g.
  435. // 157.55.39.229 /gross-gerau/12012260-beate-anstatt
  436. // 103.232.100.98 /weinsheim-eifel/13729444-plus-warenhandelsges-mbh
  437. func replayFile() {
  438. var req data.Request
  439. var startTs time.Time
  440. var endTs time.Time
  441. rand.Seed(time.Now().UnixNano())
  442. for {
  443. fh, err := os.Open(config.RequestsFile)
  444. if err != nil {
  445. log.Fatalf("Failed to open request file '%s': %s", config.RequestsFile, err)
  446. }
  447. c := csv.NewReader(fh)
  448. c.Comma = ' '
  449. for {
  450. if config.SleepFor.Duration > time.Nanosecond {
  451. startTs = time.Now()
  452. }
  453. r, err := c.Read()
  454. if err == io.EOF {
  455. break
  456. }
  457. if err != nil {
  458. log.Println(err)
  459. continue
  460. }
  461. req.IpSrc = r[0]
  462. req.Source = r[0]
  463. req.Url = r[1]
  464. req.UserAgent = "Munch/1.0"
  465. req.Host = "demo.scraperwall.com"
  466. req.CreatedAt = time.Now().UnixNano()
  467. err = natsEC.Publish(config.NatsQueue, &req)
  468. if err != nil {
  469. log.Println(err)
  470. }
  471. if strings.Index(r[1], ".") < 0 {
  472. hash := sha1.New()
  473. io.WriteString(hash, r[0])
  474. fp := data.Fingerprint{
  475. ClientID: "scw",
  476. Fingerprint: fmt.Sprintf("%x", hash.Sum(nil)),
  477. Remote: r[0],
  478. Url: r[1],
  479. Source: r[0],
  480. CreatedAt: time.Now(),
  481. }
  482. if strings.HasPrefix(r[0], "50.31.") {
  483. fp.Fingerprint = "a1f2c2ee560ce6580d66d451a9c8dfbf"
  484. natsJsonEC.Publish("fingerprints_scw", fp)
  485. } else if rand.Intn(10) < 5 {
  486. natsJsonEC.Publish("fingerprints_scw", fp)
  487. }
  488. }
  489. count++
  490. if config.SleepFor.Duration >= time.Nanosecond {
  491. endTs = time.Now()
  492. if endTs.Before(startTs.Add(config.SleepFor.Duration)) {
  493. time.Sleep(config.SleepFor.Duration - endTs.Sub(startTs))
  494. }
  495. }
  496. }
  497. }
  498. }
  499. func loadConfig() {
  500. // initialize with values from the command line / environment
  501. config.Live = *doLiveCapture
  502. config.Interface = *iface
  503. config.SnapshotLen = *snapshotLen
  504. config.Filter = *filter
  505. config.Promiscuous = *promiscuous
  506. config.NatsURL = *natsURL
  507. config.NatsQueue = *natsQueue
  508. config.NatsUser = *natsUser
  509. config.NatsPassword = *natsPassword
  510. config.NatsCA = *natsCA
  511. config.SleepFor.Duration = *sleepFor
  512. config.RequestsFile = *requestsFile
  513. config.UseXForwardedAsSource = *useXForwardedAsSource
  514. config.Protocol = *protocol
  515. config.ApacheLog = *apacheLog
  516. config.Quiet = *beQuiet
  517. config.Trace = *trace
  518. if *configFile == "" {
  519. return
  520. }
  521. _, err := os.Stat(*configFile)
  522. if err != nil {
  523. log.Printf("%s: %s\n", *configFile, err)
  524. return
  525. }
  526. if _, err = toml.DecodeFile(*configFile, &config); err != nil {
  527. log.Printf("%s: %s\n", *configFile, err)
  528. }
  529. if !config.Quiet {
  530. config.print()
  531. }
  532. }
  533. // version outputs build information
  534. func version() {
  535. fmt.Printf("munchclient %s, built on %s\n", Version, BuildDate)
  536. }