main.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479
  1. package main
  2. import (
  3. "bufio"
  4. "encoding/csv"
  5. "flag"
  6. "fmt"
  7. "io"
  8. "log"
  9. "net"
  10. "net/http"
  11. "os"
  12. "strings"
  13. "time"
  14. "github.com/BurntSushi/toml"
  15. "github.com/google/gopacket"
  16. "github.com/google/gopacket/layers"
  17. "github.com/google/gopacket/pcap"
  18. "github.com/kr/pretty"
  19. "github.com/nats-io/nats"
  20. "github.com/nats-io/nats/encoders/protobuf"
  21. "git.scraperwall.com/scw/ajp13"
  22. "git.scraperwall.com/scw/data"
  23. "git.scraperwall.com/scw/ip"
  24. )
  25. var (
  26. doLiveCapture = flag.Bool("live", false, "Capture data in real time from a given interface")
  27. iface = flag.String("interface", "eth0", "Interface to get packets from")
  28. snapshotLen = flag.Int("snapshot-len", 8192, "Snapshot Length in Bytes")
  29. filter = flag.String("filter", "tcp", "PCAP filter expression")
  30. promiscuous = flag.Bool("promiscuous", false, "Switch interface into promiscuous mode?")
  31. natsURL = flag.String("nats-url", "nats://127.0.0.1:4222", "The URL of the NATS server")
  32. natsQueue = flag.String("nats-queue", "requests", "The NATS queue name")
  33. sleepFor = flag.Duration("sleep", 0, "Sleep this long between sending data (only when replaying a file)")
  34. requestsFile = flag.String("requests", "", "CSV file containing requests (IP and URL)")
  35. protocol = flag.String("protocol", "http", "which protocol to parse: http or ajp13")
  36. useXForwardedAsSource = flag.Bool("use-x-forwarded", false, "Use the IP address in X-Forwarded-For as source")
  37. trace = flag.Bool("trace", false, "Trace the packet capturing")
  38. configFile = flag.String("config", "", "The location of the TOML config file")
  39. beQuiet = flag.Bool("quiet", true, "Be quiet")
  40. doVersion = flag.Bool("version", false, "Show version information")
  41. natsEC *nats.EncodedConn
  42. count uint64
  43. timeout = -1 * time.Second
  44. ipPriv *ip.IP
  45. config Config
  46. // Version contains the program Version, e.g. 1.0.1
  47. Version string
  48. // BuildDate contains the date and time at which the program was compiled
  49. BuildDate string
  50. )
  51. // Config contains the program configuration
  52. type Config struct {
  53. Live bool
  54. Interface string
  55. SnapshotLen int
  56. Filter string
  57. Promiscuous bool
  58. NatsURL string
  59. NatsQueue string
  60. SleepFor duration
  61. RequestsFile string
  62. UseXForwardedAsSource bool
  63. Quiet bool
  64. Protocol string
  65. Trace bool
  66. }
  67. type duration struct {
  68. time.Duration
  69. }
  70. func (d *duration) UnmarshalText(text []byte) error {
  71. var err error
  72. d.Duration, err = time.ParseDuration(string(text))
  73. return err
  74. }
  75. func (c Config) print() {
  76. fmt.Printf("Live: %t\n", c.Live)
  77. fmt.Printf("Interface: %s\n", c.Interface)
  78. fmt.Printf("SnapshotLen: %d\n", c.SnapshotLen)
  79. fmt.Printf("Filter: %s\n", c.Filter)
  80. fmt.Printf("Promiscuous: %t\n", c.Promiscuous)
  81. fmt.Printf("NatsURL: %s\n", c.NatsURL)
  82. fmt.Printf("NatsQueue: %s\n", c.NatsQueue)
  83. fmt.Printf("SleepFor: %s\n", c.SleepFor.String())
  84. fmt.Printf("RequestsFile: %s\n", c.RequestsFile)
  85. fmt.Printf("UseXForwardedAsSource: %t\n", c.UseXForwardedAsSource)
  86. fmt.Printf("Protocol: %s\n", c.Protocol)
  87. fmt.Printf("Quiet: %t\n", c.Quiet)
  88. fmt.Printf("Trace: %t\n", c.Trace)
  89. }
  90. func init() {
  91. flag.Parse()
  92. nats.RegisterEncoder(protobuf.PROTOBUF_ENCODER, &protobuf.ProtobufEncoder{})
  93. }
  94. func main() {
  95. if *doVersion {
  96. version()
  97. os.Exit(0)
  98. }
  99. loadConfig()
  100. // Output how many requests per second were sent
  101. if !config.Quiet {
  102. go func(c *uint64) {
  103. for {
  104. fmt.Printf("%d requests per second\n", *c)
  105. *c = 0
  106. time.Sleep(time.Second)
  107. }
  108. }(&count)
  109. }
  110. // NATS
  111. //
  112. if config.NatsURL == "" {
  113. log.Fatal("No NATS URL specified (-nats-url)!")
  114. }
  115. natsConn, err := nats.Connect(config.NatsURL)
  116. if err != nil {
  117. log.Fatal(err)
  118. }
  119. natsEC, err = nats.NewEncodedConn(natsConn, protobuf.PROTOBUF_ENCODER)
  120. if err != nil {
  121. log.Fatalf("Encoded Connection: %v!\n", err)
  122. }
  123. // What should I do?
  124. if config.RequestsFile != "" {
  125. replayFile()
  126. } else if config.Live {
  127. fmt.Printf("live capture (%s, %s) to %s\n", config.Interface, config.Filter, config.NatsURL)
  128. liveCapture()
  129. }
  130. }
  131. func liveCapture() {
  132. ipPriv = ip.NewIP()
  133. // PCAP setup
  134. //
  135. handle, err := pcap.OpenLive(config.Interface, int32(config.SnapshotLen), config.Promiscuous, timeout)
  136. if err != nil {
  137. log.Fatal(err)
  138. }
  139. defer handle.Close()
  140. err = handle.SetBPFFilter(config.Filter)
  141. if err != nil {
  142. log.Fatal(err)
  143. }
  144. packetSource := gopacket.NewPacketSource(handle, handle.LinkType())
  145. for packet := range packetSource.Packets() {
  146. go processPacket(packet)
  147. }
  148. }
  149. // processPacket receives a raw packet from pcap, builds a Request item from it and sends it to the queue
  150. func processPacket(packet gopacket.Packet) {
  151. count++
  152. hasIPv4 := false
  153. var ipSrc, ipDst string
  154. // IPv4
  155. if ipLayer := packet.Layer(layers.LayerTypeIPv4); ipLayer != nil {
  156. ip := ipLayer.(*layers.IPv4)
  157. ipSrc = ip.SrcIP.String()
  158. ipDst = ip.DstIP.String()
  159. hasIPv4 = true
  160. }
  161. // IPv6
  162. if !hasIPv4 {
  163. if ipLayer := packet.Layer(layers.LayerTypeIPv6); ipLayer != nil {
  164. ip := ipLayer.(*layers.IPv6)
  165. ipSrc = ip.SrcIP.String()
  166. ipDst = ip.DstIP.String()
  167. }
  168. }
  169. // TCP
  170. tcpLayer := packet.Layer(layers.LayerTypeTCP)
  171. if tcpLayer == nil {
  172. return
  173. }
  174. tcp, _ := tcpLayer.(*layers.TCP)
  175. portSrc := tcp.SrcPort
  176. portDst := tcp.DstPort
  177. sequence := tcp.Seq
  178. applicationLayer := packet.ApplicationLayer()
  179. if applicationLayer == nil {
  180. return
  181. }
  182. request := data.Request{
  183. IpSrc: ipSrc,
  184. IpDst: ipDst,
  185. PortSrc: uint32(portSrc),
  186. PortDst: uint32(portDst),
  187. TcpSeq: uint32(sequence),
  188. CreatedAt: packet.Metadata().CaptureInfo.Timestamp.UnixNano(),
  189. }
  190. switch *protocol {
  191. case "http":
  192. processHTTP(&request, applicationLayer.Payload())
  193. case "ajp13":
  194. processAJP13(&request, applicationLayer.Payload())
  195. }
  196. if config.UseXForwardedAsSource && request.XForwardedFor != "" {
  197. if strings.Contains(request.XForwardedFor, ",") {
  198. ips := strings.Split(request.XForwardedFor, ",")
  199. for i := len(ips) - 1; i >= 0; i-- {
  200. ipRaw := strings.TrimSpace(ips[i])
  201. ipAddr := net.ParseIP(ipRaw)
  202. if ipAddr != nil && !ipPriv.IsPrivate(ipAddr) {
  203. request.Source = ipRaw
  204. break
  205. }
  206. }
  207. } else {
  208. ipAddr := net.ParseIP(strings.TrimSpace(request.XForwardedFor))
  209. if !ipPriv.IsPrivate(ipAddr) {
  210. request.Source = request.XForwardedFor
  211. }
  212. }
  213. }
  214. if request.Source == request.IpSrc && request.XRealIP != "" {
  215. request.Source = request.XRealIP
  216. }
  217. if *trace {
  218. fmt.Println("Request for NATS")
  219. pretty.Println(request)
  220. }
  221. natsEC.Publish(config.NatsQueue, &request)
  222. }
  223. func processAJP13(request *data.Request, appData []byte) error {
  224. if *trace {
  225. fmt.Printf("packet: %v\n", appData)
  226. }
  227. a, err := ajp13.Parse(appData)
  228. if err != nil {
  229. return fmt.Errorf("Failed to parse AJP13 request: %s", err)
  230. }
  231. if *trace {
  232. fmt.Println("AJP13")
  233. pretty.Println(a)
  234. }
  235. request.Url = a.URI
  236. request.Method = a.Method()
  237. request.Host = a.Server
  238. request.Protocol = a.Version
  239. request.Origin = a.RemoteAddr.String()
  240. if v, ok := a.Header("Referer"); ok {
  241. request.Referer = v
  242. }
  243. if v, ok := a.Header("Connection"); ok {
  244. request.Connection = v
  245. }
  246. if v, ok := a.Header("X-Forwarded-For"); ok {
  247. request.XForwardedFor = v
  248. }
  249. if v, ok := a.Header("X-Real-IP"); ok {
  250. request.XRealIP = v
  251. }
  252. if v, ok := a.Header("X-Requested-With"); ok {
  253. request.XRequestedWith = v
  254. }
  255. if v, ok := a.Header("Accept-Encoding"); ok {
  256. request.AcceptEncoding = v
  257. }
  258. if v, ok := a.Header("Accept-Language"); ok {
  259. request.AcceptLanguage = v
  260. }
  261. if v, ok := a.Header("User-Agent"); ok {
  262. request.UserAgent = v
  263. }
  264. if v, ok := a.Header("Accept"); ok {
  265. request.Accept = v
  266. }
  267. if v, ok := a.Header("Cookie"); ok {
  268. request.Cookie = v
  269. }
  270. if v, ok := a.Header("X-Forwarded-Host"); ok {
  271. if v != request.Host {
  272. request.Host = v
  273. }
  274. }
  275. if *trace {
  276. fmt.Println("Request")
  277. pretty.Println(request)
  278. }
  279. return nil
  280. }
  281. func processHTTP(request *data.Request, appData []byte) error {
  282. reader := bufio.NewReader(strings.NewReader(string(appData)))
  283. req, err := http.ReadRequest(reader)
  284. if err != nil {
  285. return fmt.Errorf("Failed to parse HTTP header: %s", err)
  286. }
  287. request.Url = req.URL.String()
  288. request.Method = req.Method
  289. request.Referer = req.Referer()
  290. request.Host = req.Host
  291. request.Protocol = req.Proto
  292. request.Origin = request.Host
  293. if _, ok := req.Header["Connection"]; ok {
  294. request.Connection = req.Header["Connection"][0]
  295. }
  296. if _, ok := req.Header["X-Forwarded-For"]; ok {
  297. request.XForwardedFor = req.Header["X-Forwarded-For"][0]
  298. }
  299. if _, ok := req.Header["X-Real-IP"]; ok {
  300. request.XRealIP = req.Header["X-Real-IP"][0]
  301. }
  302. if _, ok := req.Header["X-Requested-With"]; ok {
  303. request.XRequestedWith = req.Header["X-Requested-With"][0]
  304. }
  305. if _, ok := req.Header["Accept-Encoding"]; ok {
  306. request.AcceptEncoding = req.Header["Accept-Encoding"][0]
  307. }
  308. if _, ok := req.Header["Accept-Language"]; ok {
  309. request.AcceptLanguage = req.Header["Accept-Language"][0]
  310. }
  311. if _, ok := req.Header["User-Agent"]; ok {
  312. request.UserAgent = req.Header["User-Agent"][0]
  313. }
  314. if _, ok := req.Header["Accept"]; ok {
  315. request.Accept = req.Header["Accept"][0]
  316. }
  317. if _, ok := req.Header["Cookie"]; ok {
  318. request.Cookie = req.Header["Cookie"][0]
  319. }
  320. request.Source = request.IpSrc
  321. return nil
  322. }
  323. // replayFile takes a file containing a list of requests (SourceIP Url) and queues the requests
  324. // e.g.
  325. // 157.55.39.229 /gross-gerau/12012260-beate-anstatt
  326. // 103.232.100.98 /weinsheim-eifel/13729444-plus-warenhandelsges-mbh
  327. func replayFile() {
  328. var req data.Request
  329. var startTs time.Time
  330. var endTs time.Time
  331. for {
  332. fh, err := os.Open(config.RequestsFile)
  333. if err != nil {
  334. log.Fatalf("Failed to open request file '%s': %s", config.RequestsFile, err)
  335. }
  336. c := csv.NewReader(fh)
  337. c.Comma = ' '
  338. for {
  339. if config.SleepFor.Duration > time.Nanosecond {
  340. startTs = time.Now()
  341. }
  342. r, err := c.Read()
  343. if err == io.EOF {
  344. break
  345. }
  346. if err != nil {
  347. log.Println(err)
  348. continue
  349. }
  350. req.IpSrc = r[0]
  351. req.Source = r[0]
  352. req.Url = r[1]
  353. req.UserAgent = "Munch/1.0"
  354. req.Host = "demo.scraperwall.com"
  355. req.CreatedAt = time.Now().UnixNano()
  356. natsEC.Publish(config.NatsQueue, &req)
  357. count++
  358. if config.SleepFor.Duration >= time.Nanosecond {
  359. endTs = time.Now()
  360. if endTs.Before(startTs.Add(config.SleepFor.Duration)) {
  361. time.Sleep(config.SleepFor.Duration - endTs.Sub(startTs))
  362. }
  363. }
  364. }
  365. }
  366. }
  367. func loadConfig() {
  368. // initialize with values from the command line / environment
  369. config.Live = *doLiveCapture
  370. config.Interface = *iface
  371. config.SnapshotLen = *snapshotLen
  372. config.Filter = *filter
  373. config.Promiscuous = *promiscuous
  374. config.NatsURL = *natsURL
  375. config.NatsQueue = *natsQueue
  376. config.SleepFor.Duration = *sleepFor
  377. config.RequestsFile = *requestsFile
  378. config.UseXForwardedAsSource = *useXForwardedAsSource
  379. config.Protocol = *protocol
  380. config.Quiet = *beQuiet
  381. if *configFile == "" {
  382. return
  383. }
  384. _, err := os.Stat(*configFile)
  385. if err != nil {
  386. log.Printf("%s: %s\n", *configFile, err)
  387. return
  388. }
  389. if _, err = toml.DecodeFile(*configFile, &config); err != nil {
  390. log.Printf("%s: %s\n", *configFile, err)
  391. }
  392. if !config.Quiet {
  393. config.print()
  394. }
  395. }
  396. // version outputs build information
  397. func version() {
  398. fmt.Printf("munchclient %s, built on %s\n", Version, BuildDate)
  399. }