main.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628
  1. package main
  2. import (
  3. "bufio"
  4. "crypto/sha1"
  5. "encoding/csv"
  6. "flag"
  7. "fmt"
  8. "io"
  9. "log"
  10. "math/rand"
  11. "net"
  12. "net/http"
  13. "os"
  14. "strings"
  15. "time"
  16. "github.com/BurntSushi/toml"
  17. "github.com/Songmu/axslogparser"
  18. "github.com/google/gopacket"
  19. "github.com/google/gopacket/layers"
  20. "github.com/google/gopacket/pcap"
  21. "github.com/hpcloud/tail"
  22. "github.com/kr/pretty"
  23. "github.com/nats-io/nats"
  24. "github.com/nats-io/nats/encoders/protobuf"
  25. "git.scraperwall.com/scw/ajp13"
  26. "git.scraperwall.com/scw/data"
  27. "git.scraperwall.com/scw/ip"
  28. )
  29. var (
  30. doLiveCapture = flag.Bool("live", false, "Capture data in real time from a given interface")
  31. iface = flag.String("interface", "eth0", "Interface to get packets from")
  32. snapshotLen = flag.Int("snapshot-len", 8192, "Snapshot Length in Bytes")
  33. filter = flag.String("filter", "tcp", "PCAP filter expression")
  34. promiscuous = flag.Bool("promiscuous", false, "Switch interface into promiscuous mode?")
  35. natsURL = flag.String("nats-url", "nats://127.0.0.1:4222", "The URL of the NATS server")
  36. natsUser = flag.String("nats-user", "", "The user for NATS authentication")
  37. natsPassword = flag.String("nats-password", "", "The password for NATS authentication")
  38. natsQueue = flag.String("nats-queue", "requests", "The NATS queue name")
  39. natsCA = flag.String("nats-ca", "", "CA chain for NATS TLS")
  40. sleepFor = flag.Duration("sleep", 0, "Sleep this long between sending data (only when replaying a file)")
  41. requestsFile = flag.String("requests", "", "CSV file containing requests (IP and URL)")
  42. protocol = flag.String("protocol", "http", "which protocol to parse: http or ajp13")
  43. useXForwardedAsSource = flag.Bool("use-x-forwarded", false, "Use the IP address in X-Forwarded-For as source")
  44. trace = flag.Bool("trace", false, "Trace the packet capturing")
  45. apacheLog = flag.String("apache-log", "", "Parse an Apache Log file")
  46. configFile = flag.String("config", "", "The location of the TOML config file")
  47. beQuiet = flag.Bool("quiet", true, "Be quiet")
  48. doVersion = flag.Bool("version", false, "Show version information")
  49. natsEC *nats.EncodedConn
  50. natsJsonEC *nats.EncodedConn
  51. count uint64
  52. timeout = -1 * time.Second
  53. ipPriv *ip.IP
  54. config Config
  55. // Version contains the program Version, e.g. 1.0.1
  56. Version string
  57. // BuildDate contains the date and time at which the program was compiled
  58. BuildDate string
  59. )
  60. // Config contains the program configuration
  61. type Config struct {
  62. Live bool
  63. Interface string
  64. SnapshotLen int
  65. Filter string
  66. Promiscuous bool
  67. NatsURL string
  68. NatsQueue string
  69. NatsUser string
  70. NatsPassword string
  71. NatsCA string
  72. SleepFor duration
  73. RequestsFile string
  74. UseXForwardedAsSource bool
  75. Quiet bool
  76. Protocol string
  77. Trace bool
  78. ApacheLog string
  79. }
  80. type duration struct {
  81. time.Duration
  82. }
  83. func (d *duration) UnmarshalText(text []byte) error {
  84. var err error
  85. d.Duration, err = time.ParseDuration(string(text))
  86. return err
  87. }
  88. func (c Config) print() {
  89. fmt.Printf("Live: %t\n", c.Live)
  90. fmt.Printf("Interface: %s\n", c.Interface)
  91. fmt.Printf("SnapshotLen: %d\n", c.SnapshotLen)
  92. fmt.Printf("Filter: %s\n", c.Filter)
  93. fmt.Printf("Promiscuous: %t\n", c.Promiscuous)
  94. fmt.Printf("NatsURL: %s\n", c.NatsURL)
  95. fmt.Printf("NatsQueue: %s\n", c.NatsQueue)
  96. fmt.Printf("NatsUser: %s\n", c.NatsUser)
  97. fmt.Printf("NatsPassword: %s\n", c.NatsPassword)
  98. fmt.Printf("NatsCA: %s\n", c.NatsCA)
  99. fmt.Printf("SleepFor: %s\n", c.SleepFor.String())
  100. fmt.Printf("RequestsFile: %s\n", c.RequestsFile)
  101. fmt.Printf("Apache Log: %s\n", c.ApacheLog)
  102. fmt.Printf("UseXForwardedAsSource: %t\n", c.UseXForwardedAsSource)
  103. fmt.Printf("Protocol: %s\n", c.Protocol)
  104. fmt.Printf("Quiet: %t\n", c.Quiet)
  105. fmt.Printf("Trace: %t\n", c.Trace)
  106. }
  107. func init() {
  108. flag.Parse()
  109. nats.RegisterEncoder(protobuf.PROTOBUF_ENCODER, &protobuf.ProtobufEncoder{})
  110. }
  111. func main() {
  112. if *doVersion {
  113. version()
  114. os.Exit(0)
  115. }
  116. loadConfig()
  117. // Output how many requests per second were sent
  118. if !config.Quiet {
  119. go func(c *uint64) {
  120. for {
  121. fmt.Printf("%d requests per second\n", *c)
  122. *c = 0
  123. time.Sleep(time.Second)
  124. }
  125. }(&count)
  126. }
  127. // NATS
  128. //
  129. if config.NatsURL == "" {
  130. log.Fatal("No NATS URL specified (-nats-url)!")
  131. }
  132. var natsConn *nats.Conn
  133. var err error
  134. if config.NatsUser != "" && config.NatsPassword != "" && config.NatsCA != "" {
  135. natsConn, err = nats.Connect(config.NatsURL, nats.UserInfo(config.NatsUser, config.NatsPassword), nats.RootCAs(config.NatsCA))
  136. } else {
  137. natsConn, err = nats.Connect(config.NatsURL)
  138. }
  139. if err != nil {
  140. log.Fatal(err)
  141. }
  142. natsEC, err = nats.NewEncodedConn(natsConn, protobuf.PROTOBUF_ENCODER)
  143. if err != nil {
  144. log.Fatalf("Encoded Connection: %v!\n", err)
  145. }
  146. natsJsonEC, err = nats.NewEncodedConn(natsConn, nats.JSON_ENCODER)
  147. if err != nil {
  148. log.Fatalf("Encoded Connection: %v!\n", err)
  149. }
  150. // What should I do?
  151. if config.RequestsFile != "" {
  152. replayFile()
  153. } else if config.ApacheLog != "" {
  154. apacheLogCapture(config.ApacheLog)
  155. } else if config.Live {
  156. fmt.Printf("live capture (%s, %s) to %s\n", config.Interface, config.Filter, config.NatsURL)
  157. liveCapture()
  158. }
  159. }
  160. func apacheLogCapture(logfile string) {
  161. if _, err := os.Stat(logfile); err != nil {
  162. log.Fatalf("%s: %s", logfile, err)
  163. }
  164. t, err := tail.TailFile(logfile, tail.Config{
  165. Follow: true, // follow the file
  166. ReOpen: true, // reopen log file when it gets closed/rotated
  167. Logger: tail.DiscardingLogger, // don't log anything
  168. Location: &tail.SeekInfo{Offset: 0, Whence: 2}, // start at the end of the file
  169. })
  170. if err != nil {
  171. log.Fatalf("%s: %s", logfile, err)
  172. }
  173. var p axslogparser.Parser
  174. parserSet := false
  175. for line := range t.Lines {
  176. l := line.Text
  177. if !parserSet {
  178. p, _, err = axslogparser.GuessParser(l)
  179. if err != nil {
  180. log.Println(err)
  181. continue
  182. }
  183. parserSet = true
  184. }
  185. logEntry, err := p.Parse(l)
  186. if err != nil {
  187. log.Println(err)
  188. continue
  189. }
  190. remote := logEntry.Host
  191. if *useXForwardedAsSource && logEntry.ForwardedFor != "" {
  192. remote = logEntry.ForwardedFor
  193. }
  194. // only use the first host in case there are multiple hosts in the log
  195. if cidx := strings.Index(remote, ","); cidx >= 0 {
  196. remote = remote[0:cidx]
  197. }
  198. // extract the virtual host
  199. var virtualHost string
  200. vhost := logEntry.VirtualHost
  201. if vhost != "" {
  202. vhostAndPort := strings.Split(logEntry.VirtualHost, ":")
  203. virtualHost = vhostAndPort[0]
  204. }
  205. request := data.Request{
  206. IpSrc: remote,
  207. IpDst: "127.0.0.1",
  208. PortSrc: 0,
  209. PortDst: 0,
  210. TcpSeq: 0,
  211. CreatedAt: logEntry.Time.UnixNano(),
  212. Url: logEntry.RequestURI,
  213. Method: logEntry.Method,
  214. Host: virtualHost,
  215. Protocol: logEntry.Protocol,
  216. Origin: remote,
  217. Source: remote,
  218. Referer: logEntry.Referer,
  219. XForwardedFor: logEntry.ForwardedFor,
  220. UserAgent: logEntry.UserAgent,
  221. }
  222. if config.Trace {
  223. log.Printf("[%s] %s\n", request.Source, request.Url)
  224. }
  225. natsEC.Publish(config.NatsQueue, &request)
  226. }
  227. }
  228. func liveCapture() {
  229. ipPriv = ip.NewIP()
  230. // PCAP setup
  231. //
  232. handle, err := pcap.OpenLive(config.Interface, int32(config.SnapshotLen), config.Promiscuous, timeout)
  233. if err != nil {
  234. log.Fatal(err)
  235. }
  236. defer handle.Close()
  237. err = handle.SetBPFFilter(config.Filter)
  238. if err != nil {
  239. log.Fatal(err)
  240. }
  241. packetSource := gopacket.NewPacketSource(handle, handle.LinkType())
  242. for packet := range packetSource.Packets() {
  243. go processPacket(packet)
  244. }
  245. }
  246. // processPacket receives a raw packet from pcap, builds a Request item from it and sends it to the queue
  247. func processPacket(packet gopacket.Packet) {
  248. hasIPv4 := false
  249. var ipSrc, ipDst string
  250. // IPv4
  251. if ipLayer := packet.Layer(layers.LayerTypeIPv4); ipLayer != nil {
  252. ip := ipLayer.(*layers.IPv4)
  253. ipSrc = ip.SrcIP.String()
  254. ipDst = ip.DstIP.String()
  255. hasIPv4 = true
  256. }
  257. // IPv6
  258. if !hasIPv4 {
  259. if ipLayer := packet.Layer(layers.LayerTypeIPv6); ipLayer != nil {
  260. ip := ipLayer.(*layers.IPv6)
  261. ipSrc = ip.SrcIP.String()
  262. ipDst = ip.DstIP.String()
  263. }
  264. }
  265. // TCP
  266. tcpLayer := packet.Layer(layers.LayerTypeTCP)
  267. if tcpLayer == nil {
  268. return
  269. }
  270. tcp, _ := tcpLayer.(*layers.TCP)
  271. portSrc := tcp.SrcPort
  272. portDst := tcp.DstPort
  273. sequence := tcp.Seq
  274. applicationLayer := packet.ApplicationLayer()
  275. if applicationLayer == nil {
  276. return
  277. }
  278. count++
  279. if len(applicationLayer.Payload()) < 50 {
  280. log.Println("application layer too small!")
  281. return
  282. }
  283. request := data.Request{
  284. IpSrc: ipSrc,
  285. IpDst: ipDst,
  286. PortSrc: uint32(portSrc),
  287. PortDst: uint32(portDst),
  288. TcpSeq: uint32(sequence),
  289. CreatedAt: packet.Metadata().CaptureInfo.Timestamp.UnixNano(),
  290. }
  291. switch config.Protocol {
  292. case "http":
  293. err := processHTTP(&request, applicationLayer.Payload())
  294. if err != nil {
  295. log.Println(err)
  296. return
  297. }
  298. case "ajp13":
  299. err := processAJP13(&request, applicationLayer.Payload())
  300. if err != nil {
  301. log.Println(err)
  302. return
  303. }
  304. }
  305. if config.UseXForwardedAsSource && request.XForwardedFor != "" {
  306. if strings.Contains(request.XForwardedFor, ",") {
  307. ips := strings.Split(request.XForwardedFor, ",")
  308. for i := len(ips) - 1; i >= 0; i-- {
  309. ipRaw := strings.TrimSpace(ips[i])
  310. ipAddr := net.ParseIP(ipRaw)
  311. if ipAddr != nil && !ipPriv.IsPrivate(ipAddr) {
  312. request.Source = ipRaw
  313. break
  314. }
  315. }
  316. } else {
  317. ipAddr := net.ParseIP(strings.TrimSpace(request.XForwardedFor))
  318. if !ipPriv.IsPrivate(ipAddr) {
  319. request.Source = request.XForwardedFor
  320. }
  321. }
  322. }
  323. if request.Source == request.IpSrc && request.XRealIP != "" {
  324. request.Source = request.XRealIP
  325. }
  326. if config.Trace {
  327. log.Printf("[%s] %s\n", request.Source, request.Url)
  328. }
  329. natsEC.Publish(config.NatsQueue, &request)
  330. }
  331. func processAJP13(request *data.Request, appData []byte) error {
  332. a, err := ajp13.Parse(appData)
  333. if err != nil {
  334. return fmt.Errorf("Failed to parse AJP13 request: %s", err)
  335. }
  336. request.Url = a.URI
  337. request.Method = a.Method()
  338. request.Host = a.Server
  339. request.Protocol = a.Version
  340. request.Origin = a.RemoteAddr.String()
  341. request.Source = a.RemoteAddr.String()
  342. if v, ok := a.Header("Referer"); ok {
  343. request.Referer = v
  344. }
  345. if v, ok := a.Header("Connection"); ok {
  346. request.Connection = v
  347. }
  348. if v, ok := a.Header("X-Forwarded-For"); ok {
  349. request.XForwardedFor = v
  350. }
  351. if v, ok := a.Header("X-Real-IP"); ok {
  352. request.XRealIP = v
  353. }
  354. if v, ok := a.Header("X-Requested-With"); ok {
  355. request.XRequestedWith = v
  356. }
  357. if v, ok := a.Header("Accept-Encoding"); ok {
  358. request.AcceptEncoding = v
  359. }
  360. if v, ok := a.Header("Accept-Language"); ok {
  361. request.AcceptLanguage = v
  362. }
  363. if v, ok := a.Header("User-Agent"); ok {
  364. request.UserAgent = v
  365. }
  366. if v, ok := a.Header("Accept"); ok {
  367. request.Accept = v
  368. }
  369. if v, ok := a.Header("Cookie"); ok {
  370. request.Cookie = v
  371. }
  372. if v, ok := a.Header("X-Forwarded-Host"); ok {
  373. if v != request.Host {
  374. request.Host = v
  375. }
  376. }
  377. if false && config.Trace {
  378. log.Println("Request")
  379. pretty.Println(request)
  380. }
  381. return nil
  382. }
  383. func processHTTP(request *data.Request, appData []byte) error {
  384. reader := bufio.NewReader(strings.NewReader(string(appData)))
  385. req, err := http.ReadRequest(reader)
  386. if err != nil {
  387. return fmt.Errorf("Failed to parse HTTP header: %s", err)
  388. }
  389. request.Url = req.URL.String()
  390. request.Method = req.Method
  391. request.Referer = req.Referer()
  392. request.Host = req.Host
  393. request.Protocol = req.Proto
  394. request.Origin = request.Host
  395. if _, ok := req.Header["Connection"]; ok {
  396. request.Connection = req.Header["Connection"][0]
  397. }
  398. if _, ok := req.Header["X-Forwarded-For"]; ok {
  399. request.XForwardedFor = req.Header["X-Forwarded-For"][0]
  400. }
  401. // CloudFlare: override X-Forwarded for since it is tainted by cloudflare
  402. if _, ok := req.Header["True-Client-Ip"]; ok {
  403. request.XForwardedFor = req.Header["True-Client-Ip"][0]
  404. }
  405. if _, ok := req.Header["X-Real-Ip"]; ok {
  406. request.XRealIP = req.Header["X-Real-Ip"][0]
  407. }
  408. if _, ok := req.Header["X-Requested-With"]; ok {
  409. request.XRequestedWith = req.Header["X-Requested-With"][0]
  410. }
  411. if _, ok := req.Header["Accept-Encoding"]; ok {
  412. request.AcceptEncoding = req.Header["Accept-Encoding"][0]
  413. }
  414. if _, ok := req.Header["Accept-Language"]; ok {
  415. request.AcceptLanguage = req.Header["Accept-Language"][0]
  416. }
  417. if _, ok := req.Header["User-Agent"]; ok {
  418. request.UserAgent = req.Header["User-Agent"][0]
  419. }
  420. if _, ok := req.Header["Accept"]; ok {
  421. request.Accept = req.Header["Accept"][0]
  422. }
  423. if _, ok := req.Header["Cookie"]; ok {
  424. request.Cookie = req.Header["Cookie"][0]
  425. }
  426. request.Source = request.IpSrc
  427. return nil
  428. }
  429. // replayFile takes a file containing a list of requests (SourceIP Url) and queues the requests
  430. // e.g.
  431. // 157.55.39.229 /gross-gerau/12012260-beate-anstatt
  432. // 103.232.100.98 /weinsheim-eifel/13729444-plus-warenhandelsges-mbh
  433. func replayFile() {
  434. var req data.Request
  435. var startTs time.Time
  436. var endTs time.Time
  437. rand.Seed(time.Now().UnixNano())
  438. for {
  439. fh, err := os.Open(config.RequestsFile)
  440. if err != nil {
  441. log.Fatalf("Failed to open request file '%s': %s", config.RequestsFile, err)
  442. }
  443. c := csv.NewReader(fh)
  444. c.Comma = ' '
  445. for {
  446. if config.SleepFor.Duration > time.Nanosecond {
  447. startTs = time.Now()
  448. }
  449. r, err := c.Read()
  450. if err == io.EOF {
  451. break
  452. }
  453. if err != nil {
  454. log.Println(err)
  455. continue
  456. }
  457. req.IpSrc = r[0]
  458. req.Source = r[0]
  459. req.Url = r[1]
  460. req.UserAgent = "Munch/1.0"
  461. req.Host = "demo.scraperwall.com"
  462. req.CreatedAt = time.Now().UnixNano()
  463. natsEC.Publish(config.NatsQueue, &req)
  464. if strings.Index(r[1], ".") < 0 {
  465. hash := sha1.New()
  466. io.WriteString(hash, r[0])
  467. fp := data.Fingerprint{
  468. ClientID: "scw",
  469. Fingerprint: fmt.Sprintf("%x", hash.Sum(nil)),
  470. Remote: r[0],
  471. Url: r[1],
  472. Source: r[0],
  473. CreatedAt: time.Now(),
  474. }
  475. if strings.HasPrefix(r[0], "50.31.") {
  476. fp.Fingerprint = "a1f2c2ee560ce6580d66d451a9c8dfbf"
  477. natsJsonEC.Publish("fingerprints_scw", fp)
  478. } else if rand.Intn(10) < 5 {
  479. natsJsonEC.Publish("fingerprints_scw", fp)
  480. }
  481. }
  482. count++
  483. if config.SleepFor.Duration >= time.Nanosecond {
  484. endTs = time.Now()
  485. if endTs.Before(startTs.Add(config.SleepFor.Duration)) {
  486. time.Sleep(config.SleepFor.Duration - endTs.Sub(startTs))
  487. }
  488. }
  489. }
  490. }
  491. }
  492. func loadConfig() {
  493. // initialize with values from the command line / environment
  494. config.Live = *doLiveCapture
  495. config.Interface = *iface
  496. config.SnapshotLen = *snapshotLen
  497. config.Filter = *filter
  498. config.Promiscuous = *promiscuous
  499. config.NatsURL = *natsURL
  500. config.NatsQueue = *natsQueue
  501. config.NatsUser = *natsUser
  502. config.NatsPassword = *natsPassword
  503. config.NatsCA = *natsCA
  504. config.SleepFor.Duration = *sleepFor
  505. config.RequestsFile = *requestsFile
  506. config.UseXForwardedAsSource = *useXForwardedAsSource
  507. config.Protocol = *protocol
  508. config.ApacheLog = *apacheLog
  509. config.Quiet = *beQuiet
  510. config.Trace = *trace
  511. if *configFile == "" {
  512. return
  513. }
  514. _, err := os.Stat(*configFile)
  515. if err != nil {
  516. log.Printf("%s: %s\n", *configFile, err)
  517. return
  518. }
  519. if _, err = toml.DecodeFile(*configFile, &config); err != nil {
  520. log.Printf("%s: %s\n", *configFile, err)
  521. }
  522. if !config.Quiet {
  523. config.print()
  524. }
  525. }
  526. // version outputs build information
  527. func version() {
  528. fmt.Printf("munchclient %s, built on %s\n", Version, BuildDate)
  529. }