main.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624
  1. package main
  2. import (
  3. "bufio"
  4. "crypto/sha1"
  5. "encoding/csv"
  6. "flag"
  7. "fmt"
  8. "io"
  9. "log"
  10. "math/rand"
  11. "net"
  12. "net/http"
  13. "os"
  14. "strings"
  15. "time"
  16. "github.com/BurntSushi/toml"
  17. "github.com/Songmu/axslogparser"
  18. "github.com/google/gopacket"
  19. "github.com/google/gopacket/layers"
  20. "github.com/google/gopacket/pcap"
  21. "github.com/hpcloud/tail"
  22. "github.com/kr/pretty"
  23. "github.com/nats-io/nats"
  24. "github.com/nats-io/nats/encoders/protobuf"
  25. "git.scraperwall.com/scw/ajp13"
  26. "git.scraperwall.com/scw/data"
  27. "git.scraperwall.com/scw/ip"
  28. )
  29. var (
  30. doLiveCapture = flag.Bool("live", false, "Capture data in real time from a given interface")
  31. iface = flag.String("interface", "eth0", "Interface to get packets from")
  32. snapshotLen = flag.Int("snapshot-len", 8192, "Snapshot Length in Bytes")
  33. filter = flag.String("filter", "tcp", "PCAP filter expression")
  34. promiscuous = flag.Bool("promiscuous", false, "Switch interface into promiscuous mode?")
  35. natsURL = flag.String("nats-url", "nats://127.0.0.1:4222", "The URL of the NATS server")
  36. natsUser = flag.String("nats-user", "", "The user for NATS authentication")
  37. natsPassword = flag.String("nats-password", "", "The password for NATS authentication")
  38. natsQueue = flag.String("nats-queue", "requests", "The NATS queue name")
  39. natsCA = flag.String("nats-ca", "", "CA chain for NATS TLS")
  40. sleepFor = flag.Duration("sleep", 0, "Sleep this long between sending data (only when replaying a file)")
  41. requestsFile = flag.String("requests", "", "CSV file containing requests (IP and URL)")
  42. protocol = flag.String("protocol", "http", "which protocol to parse: http or ajp13")
  43. useXForwardedAsSource = flag.Bool("use-x-forwarded", false, "Use the IP address in X-Forwarded-For as source")
  44. trace = flag.Bool("trace", false, "Trace the packet capturing")
  45. apacheLog = flag.String("apache-log", "", "Parse an Apache Log file")
  46. configFile = flag.String("config", "", "The location of the TOML config file")
  47. beQuiet = flag.Bool("quiet", true, "Be quiet")
  48. doVersion = flag.Bool("version", false, "Show version information")
  49. natsEC *nats.EncodedConn
  50. natsJsonEC *nats.EncodedConn
  51. count uint64
  52. timeout = -1 * time.Second
  53. ipPriv *ip.IP
  54. config Config
  55. // Version contains the program Version, e.g. 1.0.1
  56. Version string
  57. // BuildDate contains the date and time at which the program was compiled
  58. BuildDate string
  59. )
  60. // Config contains the program configuration
  61. type Config struct {
  62. Live bool
  63. Interface string
  64. SnapshotLen int
  65. Filter string
  66. Promiscuous bool
  67. NatsURL string
  68. NatsQueue string
  69. NatsUser string
  70. NatsPassword string
  71. NatsCA string
  72. SleepFor duration
  73. RequestsFile string
  74. UseXForwardedAsSource bool
  75. Quiet bool
  76. Protocol string
  77. Trace bool
  78. ApacheLog string
  79. }
  80. type duration struct {
  81. time.Duration
  82. }
  83. func (d *duration) UnmarshalText(text []byte) error {
  84. var err error
  85. d.Duration, err = time.ParseDuration(string(text))
  86. return err
  87. }
  88. func (c Config) print() {
  89. fmt.Printf("Live: %t\n", c.Live)
  90. fmt.Printf("Interface: %s\n", c.Interface)
  91. fmt.Printf("SnapshotLen: %d\n", c.SnapshotLen)
  92. fmt.Printf("Filter: %s\n", c.Filter)
  93. fmt.Printf("Promiscuous: %t\n", c.Promiscuous)
  94. fmt.Printf("NatsURL: %s\n", c.NatsURL)
  95. fmt.Printf("NatsQueue: %s\n", c.NatsQueue)
  96. fmt.Printf("NatsUser: %s\n", c.NatsUser)
  97. fmt.Printf("NatsPassword: %s\n", c.NatsPassword)
  98. fmt.Printf("NatsCA: %s\n", c.NatsCA)
  99. fmt.Printf("SleepFor: %s\n", c.SleepFor.String())
  100. fmt.Printf("RequestsFile: %s\n", c.RequestsFile)
  101. fmt.Printf("Apache Log: %s\n", c.ApacheLog)
  102. fmt.Printf("UseXForwardedAsSource: %t\n", c.UseXForwardedAsSource)
  103. fmt.Printf("Protocol: %s\n", c.Protocol)
  104. fmt.Printf("Quiet: %t\n", c.Quiet)
  105. fmt.Printf("Trace: %t\n", c.Trace)
  106. }
  107. func init() {
  108. flag.Parse()
  109. nats.RegisterEncoder(protobuf.PROTOBUF_ENCODER, &protobuf.ProtobufEncoder{})
  110. }
  111. func main() {
  112. if *doVersion {
  113. version()
  114. os.Exit(0)
  115. }
  116. loadConfig()
  117. // Output how many requests per second were sent
  118. if !config.Quiet {
  119. go func(c *uint64) {
  120. for {
  121. fmt.Printf("%d requests per second\n", *c)
  122. *c = 0
  123. time.Sleep(time.Second)
  124. }
  125. }(&count)
  126. }
  127. // NATS
  128. //
  129. if config.NatsURL == "" {
  130. log.Fatal("No NATS URL specified (-nats-url)!")
  131. }
  132. var natsConn *nats.Conn
  133. var err error
  134. if config.NatsUser != "" && config.NatsPassword != "" && config.NatsCA != "" {
  135. natsConn, err = nats.Connect(config.NatsURL, nats.UserInfo(config.NatsUser, config.NatsPassword), nats.RootCAs(config.NatsCA))
  136. } else {
  137. natsConn, err = nats.Connect(config.NatsURL)
  138. }
  139. if err != nil {
  140. log.Fatal(err)
  141. }
  142. natsEC, err = nats.NewEncodedConn(natsConn, protobuf.PROTOBUF_ENCODER)
  143. if err != nil {
  144. log.Fatalf("Encoded Connection: %v!\n", err)
  145. }
  146. natsJsonEC, err = nats.NewEncodedConn(natsConn, nats.JSON_ENCODER)
  147. if err != nil {
  148. log.Fatalf("Encoded Connection: %v!\n", err)
  149. }
  150. // What should I do?
  151. if config.RequestsFile != "" {
  152. replayFile()
  153. } else if config.ApacheLog != "" {
  154. apacheLogCapture(config.ApacheLog)
  155. } else if config.Live {
  156. fmt.Printf("live capture (%s, %s) to %s\n", config.Interface, config.Filter, config.NatsURL)
  157. liveCapture()
  158. }
  159. }
  160. func apacheLogCapture(logfile string) {
  161. if _, err := os.Stat(logfile); err != nil {
  162. log.Fatalf("%s: %s", logfile, err)
  163. }
  164. t, err := tail.TailFile(logfile, tail.Config{
  165. Follow: true, // follow the file
  166. ReOpen: true, // reopen log file when it gets closed/rotated
  167. Logger: tail.DiscardingLogger, // don't log anything
  168. Location: &tail.SeekInfo{Offset: 0, Whence: 2}, // start at the end of the file
  169. })
  170. if err != nil {
  171. log.Fatalf("%s: %s", logfile, err)
  172. }
  173. var p axslogparser.Parser
  174. parserSet := false
  175. for line := range t.Lines {
  176. l := line.Text
  177. if !parserSet {
  178. p, _, err = axslogparser.GuessParser(l)
  179. if err != nil {
  180. log.Println(err)
  181. continue
  182. }
  183. parserSet = true
  184. }
  185. logEntry, err := p.Parse(l)
  186. if err != nil {
  187. log.Println(err)
  188. continue
  189. }
  190. remote := logEntry.Host
  191. if *useXForwardedAsSource && logEntry.ForwardedFor != "" {
  192. remote = logEntry.ForwardedFor
  193. }
  194. // only use the first host in case there are multiple hosts in the log
  195. if cidx := strings.Index(remote, ","); cidx >= 0 {
  196. remote = remote[0:cidx]
  197. }
  198. // extract the virtual host
  199. var virtualHost string
  200. vhost := logEntry.VirtualHost
  201. if vhost != "" {
  202. vhostAndPort := strings.Split(logEntry.VirtualHost, ":")
  203. virtualHost = vhostAndPort[0]
  204. }
  205. request := data.Request{
  206. IpSrc: remote,
  207. IpDst: "127.0.0.1",
  208. PortSrc: 0,
  209. PortDst: 0,
  210. TcpSeq: 0,
  211. CreatedAt: logEntry.Time.UnixNano(),
  212. Url: logEntry.RequestURI,
  213. Method: logEntry.Method,
  214. Host: virtualHost,
  215. Protocol: logEntry.Protocol,
  216. Origin: remote,
  217. Source: remote,
  218. Referer: logEntry.Referer,
  219. XForwardedFor: logEntry.ForwardedFor,
  220. UserAgent: logEntry.UserAgent,
  221. }
  222. if config.Trace {
  223. log.Printf("[%s] %s\n", request.Source, request.Url)
  224. }
  225. natsEC.Publish(config.NatsQueue, &request)
  226. }
  227. }
  228. func liveCapture() {
  229. ipPriv = ip.NewIP()
  230. // PCAP setup
  231. //
  232. handle, err := pcap.OpenLive(config.Interface, int32(config.SnapshotLen), config.Promiscuous, timeout)
  233. if err != nil {
  234. log.Fatal(err)
  235. }
  236. defer handle.Close()
  237. err = handle.SetBPFFilter(config.Filter)
  238. if err != nil {
  239. log.Fatal(err)
  240. }
  241. packetSource := gopacket.NewPacketSource(handle, handle.LinkType())
  242. for packet := range packetSource.Packets() {
  243. go processPacket(packet)
  244. }
  245. }
  246. // processPacket receives a raw packet from pcap, builds a Request item from it and sends it to the queue
  247. func processPacket(packet gopacket.Packet) {
  248. hasIPv4 := false
  249. var ipSrc, ipDst string
  250. // IPv4
  251. if ipLayer := packet.Layer(layers.LayerTypeIPv4); ipLayer != nil {
  252. ip := ipLayer.(*layers.IPv4)
  253. ipSrc = ip.SrcIP.String()
  254. ipDst = ip.DstIP.String()
  255. hasIPv4 = true
  256. }
  257. // IPv6
  258. if !hasIPv4 {
  259. if ipLayer := packet.Layer(layers.LayerTypeIPv6); ipLayer != nil {
  260. ip := ipLayer.(*layers.IPv6)
  261. ipSrc = ip.SrcIP.String()
  262. ipDst = ip.DstIP.String()
  263. }
  264. }
  265. // TCP
  266. tcpLayer := packet.Layer(layers.LayerTypeTCP)
  267. if tcpLayer == nil {
  268. return
  269. }
  270. tcp, _ := tcpLayer.(*layers.TCP)
  271. portSrc := tcp.SrcPort
  272. portDst := tcp.DstPort
  273. sequence := tcp.Seq
  274. applicationLayer := packet.ApplicationLayer()
  275. if applicationLayer == nil {
  276. return
  277. }
  278. count++
  279. if len(applicationLayer.Payload()) < 50 {
  280. log.Println("application layer too small!")
  281. return
  282. }
  283. request := data.Request{
  284. IpSrc: ipSrc,
  285. IpDst: ipDst,
  286. PortSrc: uint32(portSrc),
  287. PortDst: uint32(portDst),
  288. TcpSeq: uint32(sequence),
  289. CreatedAt: packet.Metadata().CaptureInfo.Timestamp.UnixNano(),
  290. }
  291. switch config.Protocol {
  292. case "http":
  293. err := processHTTP(&request, applicationLayer.Payload())
  294. if err != nil {
  295. log.Println(err)
  296. return
  297. }
  298. case "ajp13":
  299. err := processAJP13(&request, applicationLayer.Payload())
  300. if err != nil {
  301. log.Println(err)
  302. return
  303. }
  304. }
  305. if config.UseXForwardedAsSource && request.XForwardedFor != "" {
  306. if strings.Contains(request.XForwardedFor, ",") {
  307. ips := strings.Split(request.XForwardedFor, ",")
  308. for i := len(ips) - 1; i >= 0; i-- {
  309. ipRaw := strings.TrimSpace(ips[i])
  310. ipAddr := net.ParseIP(ipRaw)
  311. if ipAddr != nil && !ipPriv.IsPrivate(ipAddr) {
  312. request.Source = ipRaw
  313. break
  314. }
  315. }
  316. } else {
  317. ipAddr := net.ParseIP(strings.TrimSpace(request.XForwardedFor))
  318. if !ipPriv.IsPrivate(ipAddr) {
  319. request.Source = request.XForwardedFor
  320. }
  321. }
  322. }
  323. if request.Source == request.IpSrc && request.XRealIP != "" {
  324. request.Source = request.XRealIP
  325. }
  326. if config.Trace {
  327. log.Printf("[%s] %s\n", request.Source, request.Url)
  328. }
  329. natsEC.Publish(config.NatsQueue, &request)
  330. }
  331. func processAJP13(request *data.Request, appData []byte) error {
  332. a, err := ajp13.Parse(appData)
  333. if err != nil {
  334. return fmt.Errorf("Failed to parse AJP13 request: %s", err)
  335. }
  336. request.Url = a.URI
  337. request.Method = a.Method()
  338. request.Host = a.Server
  339. request.Protocol = a.Version
  340. request.Origin = a.RemoteAddr.String()
  341. request.Source = a.RemoteAddr.String()
  342. if v, ok := a.Header("Referer"); ok {
  343. request.Referer = v
  344. }
  345. if v, ok := a.Header("Connection"); ok {
  346. request.Connection = v
  347. }
  348. if v, ok := a.Header("X-Forwarded-For"); ok {
  349. request.XForwardedFor = v
  350. }
  351. if v, ok := a.Header("X-Real-IP"); ok {
  352. request.XRealIP = v
  353. }
  354. if v, ok := a.Header("X-Requested-With"); ok {
  355. request.XRequestedWith = v
  356. }
  357. if v, ok := a.Header("Accept-Encoding"); ok {
  358. request.AcceptEncoding = v
  359. }
  360. if v, ok := a.Header("Accept-Language"); ok {
  361. request.AcceptLanguage = v
  362. }
  363. if v, ok := a.Header("User-Agent"); ok {
  364. request.UserAgent = v
  365. }
  366. if v, ok := a.Header("Accept"); ok {
  367. request.Accept = v
  368. }
  369. if v, ok := a.Header("Cookie"); ok {
  370. request.Cookie = v
  371. }
  372. if v, ok := a.Header("X-Forwarded-Host"); ok {
  373. if v != request.Host {
  374. request.Host = v
  375. }
  376. }
  377. if false && config.Trace {
  378. log.Println("Request")
  379. pretty.Println(request)
  380. }
  381. return nil
  382. }
  383. func processHTTP(request *data.Request, appData []byte) error {
  384. reader := bufio.NewReader(strings.NewReader(string(appData)))
  385. req, err := http.ReadRequest(reader)
  386. if err != nil {
  387. return fmt.Errorf("Failed to parse HTTP header: %s", err)
  388. }
  389. request.Url = req.URL.String()
  390. request.Method = req.Method
  391. request.Referer = req.Referer()
  392. request.Host = req.Host
  393. request.Protocol = req.Proto
  394. request.Origin = request.Host
  395. if _, ok := req.Header["Connection"]; ok {
  396. request.Connection = req.Header["Connection"][0]
  397. }
  398. if _, ok := req.Header["X-Forwarded-For"]; ok {
  399. request.XForwardedFor = req.Header["X-Forwarded-For"][0]
  400. }
  401. if _, ok := req.Header["X-Real-IP"]; ok {
  402. request.XRealIP = req.Header["X-Real-IP"][0]
  403. }
  404. if _, ok := req.Header["X-Requested-With"]; ok {
  405. request.XRequestedWith = req.Header["X-Requested-With"][0]
  406. }
  407. if _, ok := req.Header["Accept-Encoding"]; ok {
  408. request.AcceptEncoding = req.Header["Accept-Encoding"][0]
  409. }
  410. if _, ok := req.Header["Accept-Language"]; ok {
  411. request.AcceptLanguage = req.Header["Accept-Language"][0]
  412. }
  413. if _, ok := req.Header["User-Agent"]; ok {
  414. request.UserAgent = req.Header["User-Agent"][0]
  415. }
  416. if _, ok := req.Header["Accept"]; ok {
  417. request.Accept = req.Header["Accept"][0]
  418. }
  419. if _, ok := req.Header["Cookie"]; ok {
  420. request.Cookie = req.Header["Cookie"][0]
  421. }
  422. request.Source = request.IpSrc
  423. return nil
  424. }
  425. // replayFile takes a file containing a list of requests (SourceIP Url) and queues the requests
  426. // e.g.
  427. // 157.55.39.229 /gross-gerau/12012260-beate-anstatt
  428. // 103.232.100.98 /weinsheim-eifel/13729444-plus-warenhandelsges-mbh
  429. func replayFile() {
  430. var req data.Request
  431. var startTs time.Time
  432. var endTs time.Time
  433. rand.Seed(time.Now().UnixNano())
  434. for {
  435. fh, err := os.Open(config.RequestsFile)
  436. if err != nil {
  437. log.Fatalf("Failed to open request file '%s': %s", config.RequestsFile, err)
  438. }
  439. c := csv.NewReader(fh)
  440. c.Comma = ' '
  441. for {
  442. if config.SleepFor.Duration > time.Nanosecond {
  443. startTs = time.Now()
  444. }
  445. r, err := c.Read()
  446. if err == io.EOF {
  447. break
  448. }
  449. if err != nil {
  450. log.Println(err)
  451. continue
  452. }
  453. req.IpSrc = r[0]
  454. req.Source = r[0]
  455. req.Url = r[1]
  456. req.UserAgent = "Munch/1.0"
  457. req.Host = "demo.scraperwall.com"
  458. req.CreatedAt = time.Now().UnixNano()
  459. natsEC.Publish(config.NatsQueue, &req)
  460. if strings.Index(r[1], ".") < 0 {
  461. hash := sha1.New()
  462. io.WriteString(hash, r[0])
  463. fp := data.Fingerprint{
  464. ClientID: "scw",
  465. Fingerprint: fmt.Sprintf("%x", hash.Sum(nil)),
  466. Remote: r[0],
  467. Url: r[1],
  468. Source: r[0],
  469. CreatedAt: time.Now(),
  470. }
  471. if strings.HasPrefix(r[0], "50.31.") {
  472. fp.Fingerprint = "a1f2c2ee560ce6580d66d451a9c8dfbf"
  473. natsJsonEC.Publish("fingerprints_scw", fp)
  474. } else if rand.Intn(10) < 5 {
  475. natsJsonEC.Publish("fingerprints_scw", fp)
  476. }
  477. }
  478. count++
  479. if config.SleepFor.Duration >= time.Nanosecond {
  480. endTs = time.Now()
  481. if endTs.Before(startTs.Add(config.SleepFor.Duration)) {
  482. time.Sleep(config.SleepFor.Duration - endTs.Sub(startTs))
  483. }
  484. }
  485. }
  486. }
  487. }
  488. func loadConfig() {
  489. // initialize with values from the command line / environment
  490. config.Live = *doLiveCapture
  491. config.Interface = *iface
  492. config.SnapshotLen = *snapshotLen
  493. config.Filter = *filter
  494. config.Promiscuous = *promiscuous
  495. config.NatsURL = *natsURL
  496. config.NatsQueue = *natsQueue
  497. config.NatsUser = *natsUser
  498. config.NatsPassword = *natsPassword
  499. config.NatsCA = *natsCA
  500. config.SleepFor.Duration = *sleepFor
  501. config.RequestsFile = *requestsFile
  502. config.UseXForwardedAsSource = *useXForwardedAsSource
  503. config.Protocol = *protocol
  504. config.ApacheLog = *apacheLog
  505. config.Quiet = *beQuiet
  506. config.Trace = *trace
  507. if *configFile == "" {
  508. return
  509. }
  510. _, err := os.Stat(*configFile)
  511. if err != nil {
  512. log.Printf("%s: %s\n", *configFile, err)
  513. return
  514. }
  515. if _, err = toml.DecodeFile(*configFile, &config); err != nil {
  516. log.Printf("%s: %s\n", *configFile, err)
  517. }
  518. if !config.Quiet {
  519. config.print()
  520. }
  521. }
  522. // version outputs build information
  523. func version() {
  524. fmt.Printf("munchclient %s, built on %s\n", Version, BuildDate)
  525. }