main.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621
  1. package main
  2. import (
  3. "bufio"
  4. "encoding/csv"
  5. "flag"
  6. "fmt"
  7. "io"
  8. "log"
  9. "math/rand"
  10. "net"
  11. "net/http"
  12. "os"
  13. "strings"
  14. "time"
  15. "github.com/BurntSushi/toml"
  16. "github.com/Songmu/axslogparser"
  17. "github.com/google/gopacket"
  18. "github.com/google/gopacket/layers"
  19. "github.com/google/gopacket/pcap"
  20. "github.com/hpcloud/tail"
  21. "github.com/kr/pretty"
  22. "github.com/nats-io/nats"
  23. "github.com/nats-io/nats/encoders/protobuf"
  24. "git.scraperwall.com/scw/ajp13"
  25. "git.scraperwall.com/scw/data"
  26. "git.scraperwall.com/scw/ip"
  27. )
  28. var (
  29. doLiveCapture = flag.Bool("live", false, "Capture data in real time from a given interface")
  30. iface = flag.String("interface", "eth0", "Interface to get packets from")
  31. snapshotLen = flag.Int("snapshot-len", 8192, "Snapshot Length in Bytes")
  32. filter = flag.String("filter", "tcp", "PCAP filter expression")
  33. promiscuous = flag.Bool("promiscuous", false, "Switch interface into promiscuous mode?")
  34. natsURL = flag.String("nats-url", "nats://127.0.0.1:4222", "The URL of the NATS server")
  35. natsUser = flag.String("nats-user", "", "The user for NATS authentication")
  36. natsPassword = flag.String("nats-password", "", "The password for NATS authentication")
  37. natsQueue = flag.String("nats-queue", "requests", "The NATS queue name")
  38. natsCA = flag.String("nats-ca", "", "CA chain for NATS TLS")
  39. sleepFor = flag.Duration("sleep", 0, "Sleep this long between sending data (only when replaying a file)")
  40. requestsFile = flag.String("requests", "", "CSV file containing requests (IP and URL)")
  41. protocol = flag.String("protocol", "http", "which protocol to parse: http or ajp13")
  42. useXForwardedAsSource = flag.Bool("use-x-forwarded", false, "Use the IP address in X-Forwarded-For as source")
  43. trace = flag.Bool("trace", false, "Trace the packet capturing")
  44. apacheLog = flag.String("apache-log", "", "Parse an Apache Log file")
  45. configFile = flag.String("config", "", "The location of the TOML config file")
  46. beQuiet = flag.Bool("quiet", true, "Be quiet")
  47. doVersion = flag.Bool("version", false, "Show version information")
  48. natsEC *nats.EncodedConn
  49. natsJsonEC *nats.EncodedConn
  50. count uint64
  51. timeout = -1 * time.Second
  52. ipPriv *ip.IP
  53. config Config
  54. // Version contains the program Version, e.g. 1.0.1
  55. Version string
  56. // BuildDate contains the date and time at which the program was compiled
  57. BuildDate string
  58. )
  59. // Config contains the program configuration
  60. type Config struct {
  61. Live bool
  62. Interface string
  63. SnapshotLen int
  64. Filter string
  65. Promiscuous bool
  66. NatsURL string
  67. NatsQueue string
  68. NatsUser string
  69. NatsPassword string
  70. NatsCA string
  71. SleepFor duration
  72. RequestsFile string
  73. UseXForwardedAsSource bool
  74. Quiet bool
  75. Protocol string
  76. Trace bool
  77. ApacheLog string
  78. }
  79. type duration struct {
  80. time.Duration
  81. }
  82. func (d *duration) UnmarshalText(text []byte) error {
  83. var err error
  84. d.Duration, err = time.ParseDuration(string(text))
  85. return err
  86. }
  87. func (c Config) print() {
  88. fmt.Printf("Live: %t\n", c.Live)
  89. fmt.Printf("Interface: %s\n", c.Interface)
  90. fmt.Printf("SnapshotLen: %d\n", c.SnapshotLen)
  91. fmt.Printf("Filter: %s\n", c.Filter)
  92. fmt.Printf("Promiscuous: %t\n", c.Promiscuous)
  93. fmt.Printf("NatsURL: %s\n", c.NatsURL)
  94. fmt.Printf("NatsQueue: %s\n", c.NatsQueue)
  95. fmt.Printf("NatsUser: %s\n", c.NatsUser)
  96. fmt.Printf("NatsPassword: %s\n", c.NatsPassword)
  97. fmt.Printf("NatsCA: %s\n", c.NatsCA)
  98. fmt.Printf("SleepFor: %s\n", c.SleepFor.String())
  99. fmt.Printf("RequestsFile: %s\n", c.RequestsFile)
  100. fmt.Printf("Apache Log: %s\n", c.ApacheLog)
  101. fmt.Printf("UseXForwardedAsSource: %t\n", c.UseXForwardedAsSource)
  102. fmt.Printf("Protocol: %s\n", c.Protocol)
  103. fmt.Printf("Quiet: %t\n", c.Quiet)
  104. fmt.Printf("Trace: %t\n", c.Trace)
  105. }
  106. func init() {
  107. flag.Parse()
  108. nats.RegisterEncoder(protobuf.PROTOBUF_ENCODER, &protobuf.ProtobufEncoder{})
  109. }
  110. func main() {
  111. if *doVersion {
  112. version()
  113. os.Exit(0)
  114. }
  115. loadConfig()
  116. // Output how many requests per second were sent
  117. if !config.Quiet {
  118. go func(c *uint64) {
  119. for {
  120. fmt.Printf("%d requests per second\n", *c)
  121. *c = 0
  122. time.Sleep(time.Second)
  123. }
  124. }(&count)
  125. }
  126. // NATS
  127. //
  128. if config.NatsURL == "" {
  129. log.Fatal("No NATS URL specified (-nats-url)!")
  130. }
  131. var natsConn *nats.Conn
  132. var err error
  133. if config.NatsUser != "" && config.NatsPassword != "" && config.NatsCA != "" {
  134. natsConn, err = nats.Connect(config.NatsURL, nats.UserInfo(config.NatsUser, config.NatsPassword), nats.RootCAs(config.NatsCA))
  135. } else {
  136. natsConn, err = nats.Connect(config.NatsURL)
  137. }
  138. if err != nil {
  139. log.Fatal(err)
  140. }
  141. natsEC, err = nats.NewEncodedConn(natsConn, protobuf.PROTOBUF_ENCODER)
  142. if err != nil {
  143. log.Fatalf("Encoded Connection: %v!\n", err)
  144. }
  145. natsJsonEC, err = nats.NewEncodedConn(natsConn, nats.JSON_ENCODER)
  146. if err != nil {
  147. log.Fatalf("Encoded Connection: %v!\n", err)
  148. }
  149. // What should I do?
  150. if config.RequestsFile != "" {
  151. replayFile()
  152. } else if config.ApacheLog != "" {
  153. apacheLogCapture(config.ApacheLog)
  154. } else if config.Live {
  155. fmt.Printf("live capture (%s, %s) to %s\n", config.Interface, config.Filter, config.NatsURL)
  156. liveCapture()
  157. }
  158. }
  159. func apacheLogCapture(logfile string) {
  160. if _, err := os.Stat(logfile); err != nil {
  161. log.Fatalf("%s: %s", logfile, err)
  162. }
  163. t, err := tail.TailFile(logfile, tail.Config{
  164. Follow: true, // follow the file
  165. ReOpen: true, // reopen log file when it gets closed/rotated
  166. Logger: tail.DiscardingLogger, // don't log anything
  167. Location: &tail.SeekInfo{Offset: 0, Whence: 2}, // start at the end of the file
  168. })
  169. if err != nil {
  170. log.Fatalf("%s: %s", logfile, err)
  171. }
  172. var p axslogparser.Parser
  173. parserSet := false
  174. for line := range t.Lines {
  175. l := line.Text
  176. if !parserSet {
  177. p, _, err = axslogparser.GuessParser(l)
  178. if err != nil {
  179. log.Println(err)
  180. continue
  181. }
  182. parserSet = true
  183. }
  184. logEntry, err := p.Parse(l)
  185. if err != nil {
  186. log.Println(err)
  187. continue
  188. }
  189. remote := logEntry.Host
  190. if *useXForwardedAsSource && logEntry.ForwardedFor != "" {
  191. remote = logEntry.ForwardedFor
  192. }
  193. // only use the first host in case there are multiple hosts in the log
  194. if cidx := strings.Index(remote, ","); cidx >= 0 {
  195. remote = remote[0:cidx]
  196. }
  197. // extract the virtual host
  198. var virtualHost string
  199. vhost := logEntry.VirtualHost
  200. if vhost != "" {
  201. vhostAndPort := strings.Split(logEntry.VirtualHost, ":")
  202. virtualHost = vhostAndPort[0]
  203. }
  204. request := data.Request{
  205. IpSrc: remote,
  206. IpDst: "127.0.0.1",
  207. PortSrc: 0,
  208. PortDst: 0,
  209. TcpSeq: 0,
  210. CreatedAt: logEntry.Time.UnixNano(),
  211. Url: logEntry.RequestURI,
  212. Method: logEntry.Method,
  213. Host: virtualHost,
  214. Protocol: logEntry.Protocol,
  215. Origin: remote,
  216. Source: remote,
  217. Referer: logEntry.Referer,
  218. XForwardedFor: logEntry.ForwardedFor,
  219. UserAgent: logEntry.UserAgent,
  220. }
  221. if config.Trace {
  222. log.Printf("[%s] %s\n", request.Source, request.Url)
  223. }
  224. natsEC.Publish(config.NatsQueue, &request)
  225. }
  226. }
  227. func liveCapture() {
  228. ipPriv = ip.NewIP()
  229. // PCAP setup
  230. //
  231. handle, err := pcap.OpenLive(config.Interface, int32(config.SnapshotLen), config.Promiscuous, timeout)
  232. if err != nil {
  233. log.Fatal(err)
  234. }
  235. defer handle.Close()
  236. err = handle.SetBPFFilter(config.Filter)
  237. if err != nil {
  238. log.Fatal(err)
  239. }
  240. packetSource := gopacket.NewPacketSource(handle, handle.LinkType())
  241. for packet := range packetSource.Packets() {
  242. go processPacket(packet)
  243. }
  244. }
  245. // processPacket receives a raw packet from pcap, builds a Request item from it and sends it to the queue
  246. func processPacket(packet gopacket.Packet) {
  247. hasIPv4 := false
  248. var ipSrc, ipDst string
  249. // IPv4
  250. if ipLayer := packet.Layer(layers.LayerTypeIPv4); ipLayer != nil {
  251. ip := ipLayer.(*layers.IPv4)
  252. ipSrc = ip.SrcIP.String()
  253. ipDst = ip.DstIP.String()
  254. hasIPv4 = true
  255. }
  256. // IPv6
  257. if !hasIPv4 {
  258. if ipLayer := packet.Layer(layers.LayerTypeIPv6); ipLayer != nil {
  259. ip := ipLayer.(*layers.IPv6)
  260. ipSrc = ip.SrcIP.String()
  261. ipDst = ip.DstIP.String()
  262. }
  263. }
  264. // TCP
  265. tcpLayer := packet.Layer(layers.LayerTypeTCP)
  266. if tcpLayer == nil {
  267. return
  268. }
  269. tcp, _ := tcpLayer.(*layers.TCP)
  270. portSrc := tcp.SrcPort
  271. portDst := tcp.DstPort
  272. sequence := tcp.Seq
  273. applicationLayer := packet.ApplicationLayer()
  274. if applicationLayer == nil {
  275. return
  276. }
  277. count++
  278. if len(applicationLayer.Payload()) < 50 {
  279. log.Println("application layer too small!")
  280. return
  281. }
  282. request := data.Request{
  283. IpSrc: ipSrc,
  284. IpDst: ipDst,
  285. PortSrc: uint32(portSrc),
  286. PortDst: uint32(portDst),
  287. TcpSeq: uint32(sequence),
  288. CreatedAt: packet.Metadata().CaptureInfo.Timestamp.UnixNano(),
  289. }
  290. switch config.Protocol {
  291. case "http":
  292. err := processHTTP(&request, applicationLayer.Payload())
  293. if err != nil {
  294. log.Println(err)
  295. return
  296. }
  297. case "ajp13":
  298. err := processAJP13(&request, applicationLayer.Payload())
  299. if err != nil {
  300. log.Println(err)
  301. return
  302. }
  303. }
  304. if config.UseXForwardedAsSource && request.XForwardedFor != "" {
  305. if strings.Contains(request.XForwardedFor, ",") {
  306. ips := strings.Split(request.XForwardedFor, ",")
  307. for i := len(ips) - 1; i >= 0; i-- {
  308. ipRaw := strings.TrimSpace(ips[i])
  309. ipAddr := net.ParseIP(ipRaw)
  310. if ipAddr != nil && !ipPriv.IsPrivate(ipAddr) {
  311. request.Source = ipRaw
  312. break
  313. }
  314. }
  315. } else {
  316. ipAddr := net.ParseIP(strings.TrimSpace(request.XForwardedFor))
  317. if !ipPriv.IsPrivate(ipAddr) {
  318. request.Source = request.XForwardedFor
  319. }
  320. }
  321. }
  322. if request.Source == request.IpSrc && request.XRealIP != "" {
  323. request.Source = request.XRealIP
  324. }
  325. if config.Trace {
  326. log.Printf("[%s] %s\n", request.Source, request.Url)
  327. }
  328. natsEC.Publish(config.NatsQueue, &request)
  329. }
  330. func processAJP13(request *data.Request, appData []byte) error {
  331. a, err := ajp13.Parse(appData)
  332. if err != nil {
  333. return fmt.Errorf("Failed to parse AJP13 request: %s", err)
  334. }
  335. request.Url = a.URI
  336. request.Method = a.Method()
  337. request.Host = a.Server
  338. request.Protocol = a.Version
  339. request.Origin = a.RemoteAddr.String()
  340. request.Source = a.RemoteAddr.String()
  341. if v, ok := a.Header("Referer"); ok {
  342. request.Referer = v
  343. }
  344. if v, ok := a.Header("Connection"); ok {
  345. request.Connection = v
  346. }
  347. if v, ok := a.Header("X-Forwarded-For"); ok {
  348. request.XForwardedFor = v
  349. }
  350. if v, ok := a.Header("X-Real-IP"); ok {
  351. request.XRealIP = v
  352. }
  353. if v, ok := a.Header("X-Requested-With"); ok {
  354. request.XRequestedWith = v
  355. }
  356. if v, ok := a.Header("Accept-Encoding"); ok {
  357. request.AcceptEncoding = v
  358. }
  359. if v, ok := a.Header("Accept-Language"); ok {
  360. request.AcceptLanguage = v
  361. }
  362. if v, ok := a.Header("User-Agent"); ok {
  363. request.UserAgent = v
  364. }
  365. if v, ok := a.Header("Accept"); ok {
  366. request.Accept = v
  367. }
  368. if v, ok := a.Header("Cookie"); ok {
  369. request.Cookie = v
  370. }
  371. if v, ok := a.Header("X-Forwarded-Host"); ok {
  372. if v != request.Host {
  373. request.Host = v
  374. }
  375. }
  376. if false && config.Trace {
  377. log.Println("Request")
  378. pretty.Println(request)
  379. }
  380. return nil
  381. }
  382. func processHTTP(request *data.Request, appData []byte) error {
  383. reader := bufio.NewReader(strings.NewReader(string(appData)))
  384. req, err := http.ReadRequest(reader)
  385. if err != nil {
  386. return fmt.Errorf("Failed to parse HTTP header: %s", err)
  387. }
  388. request.Url = req.URL.String()
  389. request.Method = req.Method
  390. request.Referer = req.Referer()
  391. request.Host = req.Host
  392. request.Protocol = req.Proto
  393. request.Origin = request.Host
  394. if _, ok := req.Header["Connection"]; ok {
  395. request.Connection = req.Header["Connection"][0]
  396. }
  397. if _, ok := req.Header["X-Forwarded-For"]; ok {
  398. request.XForwardedFor = req.Header["X-Forwarded-For"][0]
  399. }
  400. if _, ok := req.Header["X-Real-IP"]; ok {
  401. request.XRealIP = req.Header["X-Real-IP"][0]
  402. }
  403. if _, ok := req.Header["X-Requested-With"]; ok {
  404. request.XRequestedWith = req.Header["X-Requested-With"][0]
  405. }
  406. if _, ok := req.Header["Accept-Encoding"]; ok {
  407. request.AcceptEncoding = req.Header["Accept-Encoding"][0]
  408. }
  409. if _, ok := req.Header["Accept-Language"]; ok {
  410. request.AcceptLanguage = req.Header["Accept-Language"][0]
  411. }
  412. if _, ok := req.Header["User-Agent"]; ok {
  413. request.UserAgent = req.Header["User-Agent"][0]
  414. }
  415. if _, ok := req.Header["Accept"]; ok {
  416. request.Accept = req.Header["Accept"][0]
  417. }
  418. if _, ok := req.Header["Cookie"]; ok {
  419. request.Cookie = req.Header["Cookie"][0]
  420. }
  421. request.Source = request.IpSrc
  422. return nil
  423. }
  424. // replayFile takes a file containing a list of requests (SourceIP Url) and queues the requests
  425. // e.g.
  426. // 157.55.39.229 /gross-gerau/12012260-beate-anstatt
  427. // 103.232.100.98 /weinsheim-eifel/13729444-plus-warenhandelsges-mbh
  428. func replayFile() {
  429. var req data.Request
  430. var startTs time.Time
  431. var endTs time.Time
  432. rand.Seed(time.Now().UnixNano())
  433. for {
  434. fh, err := os.Open(config.RequestsFile)
  435. if err != nil {
  436. log.Fatalf("Failed to open request file '%s': %s", config.RequestsFile, err)
  437. }
  438. c := csv.NewReader(fh)
  439. c.Comma = ' '
  440. for {
  441. if config.SleepFor.Duration > time.Nanosecond {
  442. startTs = time.Now()
  443. }
  444. r, err := c.Read()
  445. if err == io.EOF {
  446. break
  447. }
  448. if err != nil {
  449. log.Println(err)
  450. continue
  451. }
  452. req.IpSrc = r[0]
  453. req.Source = r[0]
  454. req.Url = r[1]
  455. req.UserAgent = "Munch/1.0"
  456. req.Host = "demo.scraperwall.com"
  457. req.CreatedAt = time.Now().UnixNano()
  458. natsEC.Publish(config.NatsQueue, &req)
  459. if strings.Index(r[1], ".") < 0 {
  460. fp := data.Fingerprint{
  461. ClientID: "scw",
  462. Fingerprint: fmt.Sprintf("%x%x%x%x\n", rand.Int(), rand.Int(), rand.Int(), rand.Int()),
  463. Remote: r[0],
  464. Url: r[1],
  465. Source: r[0],
  466. CreatedAt: time.Now(),
  467. }
  468. if strings.HasPrefix(r[0], "50.31.") {
  469. fp.Fingerprint = "a1f2c2ee560ce6580d66d451a9c8dfbf"
  470. natsJsonEC.Publish("fingerprints_scw", fp)
  471. } else if rand.Intn(10) < 8 {
  472. natsJsonEC.Publish("fingerprints_scw", fp)
  473. }
  474. }
  475. count++
  476. if config.SleepFor.Duration >= time.Nanosecond {
  477. endTs = time.Now()
  478. if endTs.Before(startTs.Add(config.SleepFor.Duration)) {
  479. time.Sleep(config.SleepFor.Duration - endTs.Sub(startTs))
  480. }
  481. }
  482. }
  483. }
  484. }
  485. func loadConfig() {
  486. // initialize with values from the command line / environment
  487. config.Live = *doLiveCapture
  488. config.Interface = *iface
  489. config.SnapshotLen = *snapshotLen
  490. config.Filter = *filter
  491. config.Promiscuous = *promiscuous
  492. config.NatsURL = *natsURL
  493. config.NatsQueue = *natsQueue
  494. config.NatsUser = *natsUser
  495. config.NatsPassword = *natsPassword
  496. config.NatsCA = *natsCA
  497. config.SleepFor.Duration = *sleepFor
  498. config.RequestsFile = *requestsFile
  499. config.UseXForwardedAsSource = *useXForwardedAsSource
  500. config.Protocol = *protocol
  501. config.ApacheLog = *apacheLog
  502. config.Quiet = *beQuiet
  503. config.Trace = *trace
  504. if *configFile == "" {
  505. return
  506. }
  507. _, err := os.Stat(*configFile)
  508. if err != nil {
  509. log.Printf("%s: %s\n", *configFile, err)
  510. return
  511. }
  512. if _, err = toml.DecodeFile(*configFile, &config); err != nil {
  513. log.Printf("%s: %s\n", *configFile, err)
  514. }
  515. if !config.Quiet {
  516. config.print()
  517. }
  518. }
  519. // version outputs build information
  520. func version() {
  521. fmt.Printf("munchclient %s, built on %s\n", Version, BuildDate)
  522. }