main.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487
  1. package main
  2. import (
  3. "bufio"
  4. "encoding/csv"
  5. "flag"
  6. "fmt"
  7. "io"
  8. "log"
  9. "net"
  10. "net/http"
  11. "os"
  12. "strings"
  13. "time"
  14. "github.com/BurntSushi/toml"
  15. "github.com/google/gopacket"
  16. "github.com/google/gopacket/layers"
  17. "github.com/google/gopacket/pcap"
  18. "github.com/kr/pretty"
  19. "github.com/nats-io/nats"
  20. "github.com/nats-io/nats/encoders/protobuf"
  21. "git.scraperwall.com/scw/ajp13"
  22. "git.scraperwall.com/scw/data"
  23. "git.scraperwall.com/scw/ip"
  24. )
  25. var (
  26. doLiveCapture = flag.Bool("live", false, "Capture data in real time from a given interface")
  27. iface = flag.String("interface", "eth0", "Interface to get packets from")
  28. snapshotLen = flag.Int("snapshot-len", 8192, "Snapshot Length in Bytes")
  29. filter = flag.String("filter", "tcp", "PCAP filter expression")
  30. promiscuous = flag.Bool("promiscuous", false, "Switch interface into promiscuous mode?")
  31. natsURL = flag.String("nats-url", "nats://127.0.0.1:4222", "The URL of the NATS server")
  32. natsQueue = flag.String("nats-queue", "requests", "The NATS queue name")
  33. sleepFor = flag.Duration("sleep", 0, "Sleep this long between sending data (only when replaying a file)")
  34. requestsFile = flag.String("requests", "", "CSV file containing requests (IP and URL)")
  35. protocol = flag.String("protocol", "http", "which protocol to parse: http or ajp13")
  36. useXForwardedAsSource = flag.Bool("use-x-forwarded", false, "Use the IP address in X-Forwarded-For as source")
  37. trace = flag.Bool("trace", false, "Trace the packet capturing")
  38. configFile = flag.String("config", "", "The location of the TOML config file")
  39. beQuiet = flag.Bool("quiet", true, "Be quiet")
  40. doVersion = flag.Bool("version", false, "Show version information")
  41. natsEC *nats.EncodedConn
  42. count uint64
  43. timeout = -1 * time.Second
  44. ipPriv *ip.IP
  45. config Config
  46. // Version contains the program Version, e.g. 1.0.1
  47. Version string
  48. // BuildDate contains the date and time at which the program was compiled
  49. BuildDate string
  50. )
  51. // Config contains the program configuration
  52. type Config struct {
  53. Live bool
  54. Interface string
  55. SnapshotLen int
  56. Filter string
  57. Promiscuous bool
  58. NatsURL string
  59. NatsQueue string
  60. SleepFor duration
  61. RequestsFile string
  62. UseXForwardedAsSource bool
  63. Quiet bool
  64. Protocol string
  65. Trace bool
  66. }
  67. type duration struct {
  68. time.Duration
  69. }
  70. func (d *duration) UnmarshalText(text []byte) error {
  71. var err error
  72. d.Duration, err = time.ParseDuration(string(text))
  73. return err
  74. }
  75. func (c Config) print() {
  76. fmt.Printf("Live: %t\n", c.Live)
  77. fmt.Printf("Interface: %s\n", c.Interface)
  78. fmt.Printf("SnapshotLen: %d\n", c.SnapshotLen)
  79. fmt.Printf("Filter: %s\n", c.Filter)
  80. fmt.Printf("Promiscuous: %t\n", c.Promiscuous)
  81. fmt.Printf("NatsURL: %s\n", c.NatsURL)
  82. fmt.Printf("NatsQueue: %s\n", c.NatsQueue)
  83. fmt.Printf("SleepFor: %s\n", c.SleepFor.String())
  84. fmt.Printf("RequestsFile: %s\n", c.RequestsFile)
  85. fmt.Printf("UseXForwardedAsSource: %t\n", c.UseXForwardedAsSource)
  86. fmt.Printf("Protocol: %s\n", c.Protocol)
  87. fmt.Printf("Quiet: %t\n", c.Quiet)
  88. fmt.Printf("Trace: %t\n", c.Trace)
  89. }
  90. func init() {
  91. flag.Parse()
  92. nats.RegisterEncoder(protobuf.PROTOBUF_ENCODER, &protobuf.ProtobufEncoder{})
  93. }
  94. func main() {
  95. if *doVersion {
  96. version()
  97. os.Exit(0)
  98. }
  99. loadConfig()
  100. // Output how many requests per second were sent
  101. if !config.Quiet {
  102. go func(c *uint64) {
  103. for {
  104. fmt.Printf("%d requests per second\n", *c)
  105. *c = 0
  106. time.Sleep(time.Second)
  107. }
  108. }(&count)
  109. }
  110. // NATS
  111. //
  112. if config.NatsURL == "" {
  113. log.Fatal("No NATS URL specified (-nats-url)!")
  114. }
  115. natsConn, err := nats.Connect(config.NatsURL)
  116. if err != nil {
  117. log.Fatal(err)
  118. }
  119. natsEC, err = nats.NewEncodedConn(natsConn, protobuf.PROTOBUF_ENCODER)
  120. if err != nil {
  121. log.Fatalf("Encoded Connection: %v!\n", err)
  122. }
  123. // What should I do?
  124. if config.RequestsFile != "" {
  125. replayFile()
  126. } else if config.Live {
  127. fmt.Printf("live capture (%s, %s) to %s\n", config.Interface, config.Filter, config.NatsURL)
  128. liveCapture()
  129. }
  130. }
  131. func liveCapture() {
  132. ipPriv = ip.NewIP()
  133. // PCAP setup
  134. //
  135. handle, err := pcap.OpenLive(config.Interface, int32(config.SnapshotLen), config.Promiscuous, timeout)
  136. if err != nil {
  137. log.Fatal(err)
  138. }
  139. defer handle.Close()
  140. err = handle.SetBPFFilter(config.Filter)
  141. if err != nil {
  142. log.Fatal(err)
  143. }
  144. packetSource := gopacket.NewPacketSource(handle, handle.LinkType())
  145. for packet := range packetSource.Packets() {
  146. go processPacket(packet)
  147. }
  148. }
  149. // processPacket receives a raw packet from pcap, builds a Request item from it and sends it to the queue
  150. func processPacket(packet gopacket.Packet) {
  151. count++
  152. hasIPv4 := false
  153. var ipSrc, ipDst string
  154. // IPv4
  155. if ipLayer := packet.Layer(layers.LayerTypeIPv4); ipLayer != nil {
  156. ip := ipLayer.(*layers.IPv4)
  157. ipSrc = ip.SrcIP.String()
  158. ipDst = ip.DstIP.String()
  159. hasIPv4 = true
  160. if config.Trace {
  161. log.Printf("IPv4 packet %s -> %s\n", ipSrc, ipDst)
  162. }
  163. }
  164. // IPv6
  165. if !hasIPv4 {
  166. if ipLayer := packet.Layer(layers.LayerTypeIPv6); ipLayer != nil {
  167. ip := ipLayer.(*layers.IPv6)
  168. ipSrc = ip.SrcIP.String()
  169. ipDst = ip.DstIP.String()
  170. if config.Trace {
  171. log.Printf("IPv6 packet %s -> %s\n", ipSrc, ipDst)
  172. }
  173. }
  174. }
  175. // TCP
  176. tcpLayer := packet.Layer(layers.LayerTypeTCP)
  177. if tcpLayer == nil {
  178. return
  179. }
  180. tcp, _ := tcpLayer.(*layers.TCP)
  181. portSrc := tcp.SrcPort
  182. portDst := tcp.DstPort
  183. sequence := tcp.Seq
  184. applicationLayer := packet.ApplicationLayer()
  185. if applicationLayer == nil {
  186. return
  187. }
  188. request := data.Request{
  189. IpSrc: ipSrc,
  190. IpDst: ipDst,
  191. PortSrc: uint32(portSrc),
  192. PortDst: uint32(portDst),
  193. TcpSeq: uint32(sequence),
  194. CreatedAt: packet.Metadata().CaptureInfo.Timestamp.UnixNano(),
  195. }
  196. switch config.Protocol {
  197. case "http":
  198. processHTTP(&request, applicationLayer.Payload())
  199. case "ajp13":
  200. processAJP13(&request, applicationLayer.Payload())
  201. }
  202. if config.UseXForwardedAsSource && request.XForwardedFor != "" {
  203. if strings.Contains(request.XForwardedFor, ",") {
  204. ips := strings.Split(request.XForwardedFor, ",")
  205. for i := len(ips) - 1; i >= 0; i-- {
  206. ipRaw := strings.TrimSpace(ips[i])
  207. ipAddr := net.ParseIP(ipRaw)
  208. if ipAddr != nil && !ipPriv.IsPrivate(ipAddr) {
  209. request.Source = ipRaw
  210. break
  211. }
  212. }
  213. } else {
  214. ipAddr := net.ParseIP(strings.TrimSpace(request.XForwardedFor))
  215. if !ipPriv.IsPrivate(ipAddr) {
  216. request.Source = request.XForwardedFor
  217. }
  218. }
  219. }
  220. if request.Source == request.IpSrc && request.XRealIP != "" {
  221. request.Source = request.XRealIP
  222. }
  223. if config.Trace {
  224. log.Println("Request for NATS")
  225. pretty.Println(request)
  226. }
  227. natsEC.Publish(config.NatsQueue, &request)
  228. }
  229. func processAJP13(request *data.Request, appData []byte) error {
  230. if config.Trace {
  231. log.Printf("packet: %v\n", appData)
  232. }
  233. a, err := ajp13.Parse(appData)
  234. if err != nil {
  235. return fmt.Errorf("Failed to parse AJP13 request: %s", err)
  236. }
  237. if config.Trace {
  238. log.Println("AJP13")
  239. pretty.Println(a)
  240. }
  241. request.Url = a.URI
  242. request.Method = a.Method()
  243. request.Host = a.Server
  244. request.Protocol = a.Version
  245. request.Origin = a.RemoteAddr.String()
  246. if v, ok := a.Header("Referer"); ok {
  247. request.Referer = v
  248. }
  249. if v, ok := a.Header("Connection"); ok {
  250. request.Connection = v
  251. }
  252. if v, ok := a.Header("X-Forwarded-For"); ok {
  253. request.XForwardedFor = v
  254. }
  255. if v, ok := a.Header("X-Real-IP"); ok {
  256. request.XRealIP = v
  257. }
  258. if v, ok := a.Header("X-Requested-With"); ok {
  259. request.XRequestedWith = v
  260. }
  261. if v, ok := a.Header("Accept-Encoding"); ok {
  262. request.AcceptEncoding = v
  263. }
  264. if v, ok := a.Header("Accept-Language"); ok {
  265. request.AcceptLanguage = v
  266. }
  267. if v, ok := a.Header("User-Agent"); ok {
  268. request.UserAgent = v
  269. }
  270. if v, ok := a.Header("Accept"); ok {
  271. request.Accept = v
  272. }
  273. if v, ok := a.Header("Cookie"); ok {
  274. request.Cookie = v
  275. }
  276. if v, ok := a.Header("X-Forwarded-Host"); ok {
  277. if v != request.Host {
  278. request.Host = v
  279. }
  280. }
  281. if config.Trace {
  282. log.Println("Request")
  283. pretty.Println(request)
  284. }
  285. return nil
  286. }
  287. func processHTTP(request *data.Request, appData []byte) error {
  288. reader := bufio.NewReader(strings.NewReader(string(appData)))
  289. req, err := http.ReadRequest(reader)
  290. if err != nil {
  291. return fmt.Errorf("Failed to parse HTTP header: %s", err)
  292. }
  293. request.Url = req.URL.String()
  294. request.Method = req.Method
  295. request.Referer = req.Referer()
  296. request.Host = req.Host
  297. request.Protocol = req.Proto
  298. request.Origin = request.Host
  299. if _, ok := req.Header["Connection"]; ok {
  300. request.Connection = req.Header["Connection"][0]
  301. }
  302. if _, ok := req.Header["X-Forwarded-For"]; ok {
  303. request.XForwardedFor = req.Header["X-Forwarded-For"][0]
  304. }
  305. if _, ok := req.Header["X-Real-IP"]; ok {
  306. request.XRealIP = req.Header["X-Real-IP"][0]
  307. }
  308. if _, ok := req.Header["X-Requested-With"]; ok {
  309. request.XRequestedWith = req.Header["X-Requested-With"][0]
  310. }
  311. if _, ok := req.Header["Accept-Encoding"]; ok {
  312. request.AcceptEncoding = req.Header["Accept-Encoding"][0]
  313. }
  314. if _, ok := req.Header["Accept-Language"]; ok {
  315. request.AcceptLanguage = req.Header["Accept-Language"][0]
  316. }
  317. if _, ok := req.Header["User-Agent"]; ok {
  318. request.UserAgent = req.Header["User-Agent"][0]
  319. }
  320. if _, ok := req.Header["Accept"]; ok {
  321. request.Accept = req.Header["Accept"][0]
  322. }
  323. if _, ok := req.Header["Cookie"]; ok {
  324. request.Cookie = req.Header["Cookie"][0]
  325. }
  326. request.Source = request.IpSrc
  327. return nil
  328. }
  329. // replayFile takes a file containing a list of requests (SourceIP Url) and queues the requests
  330. // e.g.
  331. // 157.55.39.229 /gross-gerau/12012260-beate-anstatt
  332. // 103.232.100.98 /weinsheim-eifel/13729444-plus-warenhandelsges-mbh
  333. func replayFile() {
  334. var req data.Request
  335. var startTs time.Time
  336. var endTs time.Time
  337. for {
  338. fh, err := os.Open(config.RequestsFile)
  339. if err != nil {
  340. log.Fatalf("Failed to open request file '%s': %s", config.RequestsFile, err)
  341. }
  342. c := csv.NewReader(fh)
  343. c.Comma = ' '
  344. for {
  345. if config.SleepFor.Duration > time.Nanosecond {
  346. startTs = time.Now()
  347. }
  348. r, err := c.Read()
  349. if err == io.EOF {
  350. break
  351. }
  352. if err != nil {
  353. log.Println(err)
  354. continue
  355. }
  356. req.IpSrc = r[0]
  357. req.Source = r[0]
  358. req.Url = r[1]
  359. req.UserAgent = "Munch/1.0"
  360. req.Host = "demo.scraperwall.com"
  361. req.CreatedAt = time.Now().UnixNano()
  362. natsEC.Publish(config.NatsQueue, &req)
  363. count++
  364. if config.SleepFor.Duration >= time.Nanosecond {
  365. endTs = time.Now()
  366. if endTs.Before(startTs.Add(config.SleepFor.Duration)) {
  367. time.Sleep(config.SleepFor.Duration - endTs.Sub(startTs))
  368. }
  369. }
  370. }
  371. }
  372. }
  373. func loadConfig() {
  374. // initialize with values from the command line / environment
  375. config.Live = *doLiveCapture
  376. config.Interface = *iface
  377. config.SnapshotLen = *snapshotLen
  378. config.Filter = *filter
  379. config.Promiscuous = *promiscuous
  380. config.NatsURL = *natsURL
  381. config.NatsQueue = *natsQueue
  382. config.SleepFor.Duration = *sleepFor
  383. config.RequestsFile = *requestsFile
  384. config.UseXForwardedAsSource = *useXForwardedAsSource
  385. config.Protocol = *protocol
  386. config.Quiet = *beQuiet
  387. config.Trace = *trace
  388. if *configFile == "" {
  389. return
  390. }
  391. _, err := os.Stat(*configFile)
  392. if err != nil {
  393. log.Printf("%s: %s\n", *configFile, err)
  394. return
  395. }
  396. if _, err = toml.DecodeFile(*configFile, &config); err != nil {
  397. log.Printf("%s: %s\n", *configFile, err)
  398. }
  399. if !config.Quiet {
  400. config.print()
  401. }
  402. }
  403. // version outputs build information
  404. func version() {
  405. fmt.Printf("munchclient %s, built on %s\n", Version, BuildDate)
  406. }