main.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451
  1. package main
  2. import (
  3. "bufio"
  4. "encoding/csv"
  5. "flag"
  6. "fmt"
  7. "io"
  8. "log"
  9. "net"
  10. "net/http"
  11. "os"
  12. "strings"
  13. "time"
  14. "github.com/BurntSushi/toml"
  15. "github.com/google/gopacket"
  16. "github.com/google/gopacket/layers"
  17. "github.com/google/gopacket/pcap"
  18. "github.com/nats-io/nats"
  19. "github.com/nats-io/nats/encoders/protobuf"
  20. "git.scraperwall.com/scw/ajp13"
  21. "git.scraperwall.com/scw/data"
  22. "git.scraperwall.com/scw/ip"
  23. )
  24. var (
  25. doLiveCapture = flag.Bool("live", false, "Capture data in real time from a given interface")
  26. iface = flag.String("interface", "eth0", "Interface to get packets from")
  27. snapshotLen = flag.Int("snapshot-len", 8192, "Snapshot Length in Bytes")
  28. filter = flag.String("filter", "tcp", "PCAP filter expression")
  29. promiscuous = flag.Bool("promiscuous", false, "Switch interface into promiscuous mode?")
  30. natsURL = flag.String("nats-url", "nats://127.0.0.1:4222", "The URL of the NATS server")
  31. natsQueue = flag.String("nats-queue", "requests", "The NATS queue name")
  32. sleepFor = flag.Duration("sleep", 0, "Sleep this long between sending data (only when replaying a file)")
  33. requestsFile = flag.String("requests", "", "CSV file containing requests (IP and URL)")
  34. protocol = flag.String("protocol", "http", "which protocol to parse: http or ajp13")
  35. useXForwardedAsSource = flag.Bool("use-x-forwarded", false, "Use the IP address in X-Forwarded-For as source")
  36. configFile = flag.String("config", "", "The location of the TOML config file")
  37. beQuiet = flag.Bool("quiet", true, "Be quiet")
  38. doVersion = flag.Bool("version", false, "Show version information")
  39. natsEC *nats.EncodedConn
  40. count uint64
  41. timeout = -1 * time.Second
  42. ipPriv *ip.IP
  43. config Config
  44. // Version contains the program Version, e.g. 1.0.1
  45. Version string
  46. // BuildDate contains the date and time at which the program was compiled
  47. BuildDate string
  48. )
  49. // Config contains the program configuration
  50. type Config struct {
  51. Live bool
  52. Interface string
  53. SnapshotLen int
  54. Filter string
  55. Promiscuous bool
  56. NatsURL string
  57. NatsQueue string
  58. SleepFor duration
  59. RequestsFile string
  60. UseXForwardedAsSource bool
  61. Quiet bool
  62. Protocol string
  63. }
  64. type duration struct {
  65. time.Duration
  66. }
  67. func (d *duration) UnmarshalText(text []byte) error {
  68. var err error
  69. d.Duration, err = time.ParseDuration(string(text))
  70. return err
  71. }
  72. func (c Config) print() {
  73. fmt.Printf("Live: %t\n", c.Live)
  74. fmt.Printf("Interface: %s\n", c.Interface)
  75. fmt.Printf("SnapshotLen: %d\n", c.SnapshotLen)
  76. fmt.Printf("Filter: %s\n", c.Filter)
  77. fmt.Printf("Promiscuous: %t\n", c.Promiscuous)
  78. fmt.Printf("NatsURL: %s\n", c.NatsURL)
  79. fmt.Printf("NatsQueue: %s\n", c.NatsQueue)
  80. fmt.Printf("SleepFor: %s\n", c.SleepFor.String())
  81. fmt.Printf("RequestsFile: %s\n", c.RequestsFile)
  82. fmt.Printf("UseXForwardedAsSource: %t\n", c.UseXForwardedAsSource)
  83. fmt.Printf("Protocol: %s\n", c.Protocol)
  84. fmt.Printf("Quiet: %t\n", c.Quiet)
  85. }
  86. func init() {
  87. flag.Parse()
  88. nats.RegisterEncoder(protobuf.PROTOBUF_ENCODER, &protobuf.ProtobufEncoder{})
  89. }
  90. func main() {
  91. if *doVersion {
  92. version()
  93. os.Exit(0)
  94. }
  95. loadConfig()
  96. // Output how many requests per second were sent
  97. if !config.Quiet {
  98. go func(c *uint64) {
  99. for {
  100. fmt.Printf("%d requests per second\n", *c)
  101. *c = 0
  102. time.Sleep(time.Second)
  103. }
  104. }(&count)
  105. }
  106. // NATS
  107. //
  108. if config.NatsURL == "" {
  109. log.Fatal("No NATS URL specified (-nats-url)!")
  110. }
  111. natsConn, err := nats.Connect(config.NatsURL)
  112. if err != nil {
  113. log.Fatal(err)
  114. }
  115. natsEC, err = nats.NewEncodedConn(natsConn, protobuf.PROTOBUF_ENCODER)
  116. if err != nil {
  117. log.Fatalf("Encoded Connection: %v!\n", err)
  118. }
  119. // What should I do?
  120. if config.RequestsFile != "" {
  121. replayFile()
  122. } else if config.Live {
  123. fmt.Printf("live capture (%s, %s) to %s\n", config.Interface, config.Filter, config.NatsURL)
  124. liveCapture()
  125. }
  126. }
  127. func liveCapture() {
  128. ipPriv = ip.NewIP()
  129. // PCAP setup
  130. //
  131. handle, err := pcap.OpenLive(config.Interface, int32(config.SnapshotLen), config.Promiscuous, timeout)
  132. if err != nil {
  133. log.Fatal(err)
  134. }
  135. defer handle.Close()
  136. err = handle.SetBPFFilter(config.Filter)
  137. if err != nil {
  138. log.Fatal(err)
  139. }
  140. packetSource := gopacket.NewPacketSource(handle, handle.LinkType())
  141. for packet := range packetSource.Packets() {
  142. go processPacket(packet)
  143. }
  144. }
  145. // processPacket receives a raw packet from pcap, builds a Request item from it and sends it to the queue
  146. func processPacket(packet gopacket.Packet) {
  147. count++
  148. ipLayer := packet.Layer(layers.LayerTypeIPv4)
  149. if ipLayer == nil {
  150. log.Println("No IPv4 Layer!")
  151. return
  152. }
  153. ip, _ := ipLayer.(*layers.IPv4)
  154. if ip.Protocol != layers.IPProtocolTCP {
  155. log.Println("No TCP Protocol!")
  156. return
  157. }
  158. ipSrc := ip.SrcIP.String()
  159. ipDst := ip.DstIP.String()
  160. tcpLayer := packet.Layer(layers.LayerTypeTCP)
  161. if tcpLayer == nil {
  162. return
  163. }
  164. tcp, _ := tcpLayer.(*layers.TCP)
  165. portSrc := tcp.SrcPort
  166. portDst := tcp.DstPort
  167. sequence := tcp.Seq
  168. applicationLayer := packet.ApplicationLayer()
  169. if applicationLayer == nil {
  170. return
  171. }
  172. request := data.Request{
  173. IpSrc: ipSrc,
  174. IpDst: ipDst,
  175. PortSrc: uint32(portSrc),
  176. PortDst: uint32(portDst),
  177. TcpSeq: uint32(sequence),
  178. CreatedAt: packet.Metadata().CaptureInfo.Timestamp.UnixNano(),
  179. }
  180. switch *protocol {
  181. case "http":
  182. processHTTP(&request, applicationLayer.Payload())
  183. case "ajp13":
  184. processAJP13(&request, applicationLayer.Payload())
  185. }
  186. if config.UseXForwardedAsSource && request.XForwardedFor != "" {
  187. if strings.Contains(request.XForwardedFor, ",") {
  188. ips := strings.Split(request.XForwardedFor, ",")
  189. for i := len(ips) - 1; i >= 0; i-- {
  190. ipRaw := strings.TrimSpace(ips[i])
  191. ipAddr := net.ParseIP(ipRaw)
  192. if ipAddr != nil && !ipPriv.IsPrivate(ipAddr) {
  193. request.Source = ipRaw
  194. break
  195. }
  196. }
  197. } else {
  198. ipAddr := net.ParseIP(strings.TrimSpace(request.XForwardedFor))
  199. if !ipPriv.IsPrivate(ipAddr) {
  200. request.Source = request.XForwardedFor
  201. }
  202. }
  203. }
  204. if request.Source == request.IpSrc && request.XRealIP != "" {
  205. request.Source = request.XRealIP
  206. }
  207. natsEC.Publish(config.NatsQueue, &request)
  208. }
  209. func processAJP13(request *data.Request, appData []byte) error {
  210. a, err := ajp13.Parse(appData)
  211. if err != nil {
  212. return fmt.Errorf("Failed to parse AJP13 request: %s", err)
  213. }
  214. request.Url = a.URI
  215. request.Method = a.Method()
  216. request.Host = a.Server
  217. request.Protocol = a.Version
  218. request.Origin = a.RemoteAddr.String()
  219. if v, ok := a.Header("Referer"); ok {
  220. request.Referer = v
  221. }
  222. if v, ok := a.Header("Connection"); ok {
  223. request.Connection = v
  224. }
  225. if v, ok := a.Header("X-Forwarded-For"); ok {
  226. request.XForwardedFor = v
  227. }
  228. if v, ok := a.Header("X-Real-IP"); ok {
  229. request.XRealIP = v
  230. }
  231. if v, ok := a.Header("X-Requested-With"); ok {
  232. request.XRequestedWith = v
  233. }
  234. if v, ok := a.Header("Accept-Encoding"); ok {
  235. request.AcceptEncoding = v
  236. }
  237. if v, ok := a.Header("Accept-Language"); ok {
  238. request.AcceptLanguage = v
  239. }
  240. if v, ok := a.Header("User-Agent"); ok {
  241. request.UserAgent = v
  242. }
  243. if v, ok := a.Header("Accept"); ok {
  244. request.Accept = v
  245. }
  246. if v, ok := a.Header("Cookie"); ok {
  247. request.Cookie = v
  248. }
  249. if v, ok := a.Header("X-Forwarded-Host"); ok {
  250. if v != request.Host {
  251. request.Host = v
  252. }
  253. }
  254. return nil
  255. }
  256. func processHTTP(request *data.Request, appData []byte) error {
  257. reader := bufio.NewReader(strings.NewReader(string(appData)))
  258. req, err := http.ReadRequest(reader)
  259. if err != nil {
  260. return fmt.Errorf("Failed to parse HTTP header: %s", err)
  261. }
  262. request.Url = req.URL.String()
  263. request.Method = req.Method
  264. request.Referer = req.Referer()
  265. request.Host = req.Host
  266. request.Protocol = req.Proto
  267. request.Origin = request.Host
  268. if _, ok := req.Header["Connection"]; ok {
  269. request.Connection = req.Header["Connection"][0]
  270. }
  271. if _, ok := req.Header["X-Forwarded-For"]; ok {
  272. request.XForwardedFor = req.Header["X-Forwarded-For"][0]
  273. }
  274. if _, ok := req.Header["X-Real-IP"]; ok {
  275. request.XRealIP = req.Header["X-Real-IP"][0]
  276. }
  277. if _, ok := req.Header["X-Requested-With"]; ok {
  278. request.XRequestedWith = req.Header["X-Requested-With"][0]
  279. }
  280. if _, ok := req.Header["Accept-Encoding"]; ok {
  281. request.AcceptEncoding = req.Header["Accept-Encoding"][0]
  282. }
  283. if _, ok := req.Header["Accept-Language"]; ok {
  284. request.AcceptLanguage = req.Header["Accept-Language"][0]
  285. }
  286. if _, ok := req.Header["User-Agent"]; ok {
  287. request.UserAgent = req.Header["User-Agent"][0]
  288. }
  289. if _, ok := req.Header["Accept"]; ok {
  290. request.Accept = req.Header["Accept"][0]
  291. }
  292. if _, ok := req.Header["Cookie"]; ok {
  293. request.Cookie = req.Header["Cookie"][0]
  294. }
  295. request.Source = request.IpSrc
  296. return nil
  297. }
  298. // replayFile takes a file containing a list of requests (SourceIP Url) and queues the requests
  299. // e.g.
  300. // 157.55.39.229 /gross-gerau/12012260-beate-anstatt
  301. // 103.232.100.98 /weinsheim-eifel/13729444-plus-warenhandelsges-mbh
  302. func replayFile() {
  303. var req data.Request
  304. var startTs time.Time
  305. var endTs time.Time
  306. for {
  307. fh, err := os.Open(config.RequestsFile)
  308. if err != nil {
  309. log.Fatalf("Failed to open request file '%s': %s", config.RequestsFile, err)
  310. }
  311. c := csv.NewReader(fh)
  312. c.Comma = ' '
  313. for {
  314. if config.SleepFor.Duration > time.Nanosecond {
  315. startTs = time.Now()
  316. }
  317. r, err := c.Read()
  318. if err == io.EOF {
  319. break
  320. }
  321. if err != nil {
  322. log.Println(err)
  323. continue
  324. }
  325. req.IpSrc = r[0]
  326. req.Source = r[0]
  327. req.Url = r[1]
  328. req.UserAgent = "Munch/1.0"
  329. req.Host = "demo.scraperwall.com"
  330. req.CreatedAt = time.Now().UnixNano()
  331. natsEC.Publish(config.NatsQueue, &req)
  332. count++
  333. if config.SleepFor.Duration >= time.Nanosecond {
  334. endTs = time.Now()
  335. if endTs.Before(startTs.Add(config.SleepFor.Duration)) {
  336. time.Sleep(config.SleepFor.Duration - endTs.Sub(startTs))
  337. }
  338. }
  339. }
  340. }
  341. }
  342. func loadConfig() {
  343. // initialize with values from the command line / environment
  344. config.Live = *doLiveCapture
  345. config.Interface = *iface
  346. config.SnapshotLen = *snapshotLen
  347. config.Filter = *filter
  348. config.Promiscuous = *promiscuous
  349. config.NatsURL = *natsURL
  350. config.NatsQueue = *natsQueue
  351. config.SleepFor.Duration = *sleepFor
  352. config.RequestsFile = *requestsFile
  353. config.UseXForwardedAsSource = *useXForwardedAsSource
  354. config.Protocol = *protocol
  355. config.Quiet = *beQuiet
  356. if *configFile == "" {
  357. return
  358. }
  359. _, err := os.Stat(*configFile)
  360. if err != nil {
  361. log.Printf("%s: %s\n", *configFile, err)
  362. return
  363. }
  364. if _, err = toml.DecodeFile(*configFile, &config); err != nil {
  365. log.Printf("%s: %s\n", *configFile, err)
  366. }
  367. if !config.Quiet {
  368. config.print()
  369. }
  370. }
  371. // version outputs build information
  372. func version() {
  373. fmt.Printf("munchclient %s, built on %s\n", Version, BuildDate)
  374. }