main.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478
  1. package main
  2. import (
  3. "bufio"
  4. "encoding/csv"
  5. "flag"
  6. "fmt"
  7. "io"
  8. "log"
  9. "net"
  10. "net/http"
  11. "os"
  12. "strings"
  13. "time"
  14. "github.com/BurntSushi/toml"
  15. "github.com/google/gopacket"
  16. "github.com/google/gopacket/layers"
  17. "github.com/google/gopacket/pcap"
  18. "github.com/kr/pretty"
  19. "github.com/nats-io/nats"
  20. "github.com/nats-io/nats/encoders/protobuf"
  21. "git.scraperwall.com/scw/ajp13"
  22. "git.scraperwall.com/scw/data"
  23. "git.scraperwall.com/scw/ip"
  24. )
  25. var (
  26. doLiveCapture = flag.Bool("live", false, "Capture data in real time from a given interface")
  27. iface = flag.String("interface", "eth0", "Interface to get packets from")
  28. snapshotLen = flag.Int("snapshot-len", 8192, "Snapshot Length in Bytes")
  29. filter = flag.String("filter", "tcp", "PCAP filter expression")
  30. promiscuous = flag.Bool("promiscuous", false, "Switch interface into promiscuous mode?")
  31. natsURL = flag.String("nats-url", "nats://127.0.0.1:4222", "The URL of the NATS server")
  32. natsQueue = flag.String("nats-queue", "requests", "The NATS queue name")
  33. sleepFor = flag.Duration("sleep", 0, "Sleep this long between sending data (only when replaying a file)")
  34. requestsFile = flag.String("requests", "", "CSV file containing requests (IP and URL)")
  35. protocol = flag.String("protocol", "http", "which protocol to parse: http or ajp13")
  36. useXForwardedAsSource = flag.Bool("use-x-forwarded", false, "Use the IP address in X-Forwarded-For as source")
  37. trace = flag.Bool("trace", false, "Trace the packet capturing")
  38. configFile = flag.String("config", "", "The location of the TOML config file")
  39. beQuiet = flag.Bool("quiet", true, "Be quiet")
  40. doVersion = flag.Bool("version", false, "Show version information")
  41. natsEC *nats.EncodedConn
  42. count uint64
  43. timeout = -1 * time.Second
  44. ipPriv *ip.IP
  45. config Config
  46. // Version contains the program Version, e.g. 1.0.1
  47. Version string
  48. // BuildDate contains the date and time at which the program was compiled
  49. BuildDate string
  50. )
  51. // Config contains the program configuration
  52. type Config struct {
  53. Live bool
  54. Interface string
  55. SnapshotLen int
  56. Filter string
  57. Promiscuous bool
  58. NatsURL string
  59. NatsQueue string
  60. SleepFor duration
  61. RequestsFile string
  62. UseXForwardedAsSource bool
  63. Quiet bool
  64. Protocol string
  65. Trace bool
  66. }
  67. type duration struct {
  68. time.Duration
  69. }
  70. func (d *duration) UnmarshalText(text []byte) error {
  71. var err error
  72. d.Duration, err = time.ParseDuration(string(text))
  73. return err
  74. }
  75. func (c Config) print() {
  76. fmt.Printf("Live: %t\n", c.Live)
  77. fmt.Printf("Interface: %s\n", c.Interface)
  78. fmt.Printf("SnapshotLen: %d\n", c.SnapshotLen)
  79. fmt.Printf("Filter: %s\n", c.Filter)
  80. fmt.Printf("Promiscuous: %t\n", c.Promiscuous)
  81. fmt.Printf("NatsURL: %s\n", c.NatsURL)
  82. fmt.Printf("NatsQueue: %s\n", c.NatsQueue)
  83. fmt.Printf("SleepFor: %s\n", c.SleepFor.String())
  84. fmt.Printf("RequestsFile: %s\n", c.RequestsFile)
  85. fmt.Printf("UseXForwardedAsSource: %t\n", c.UseXForwardedAsSource)
  86. fmt.Printf("Protocol: %s\n", c.Protocol)
  87. fmt.Printf("Quiet: %t\n", c.Quiet)
  88. fmt.Printf("Trace: %t\n", c.Trace)
  89. }
  90. func init() {
  91. flag.Parse()
  92. nats.RegisterEncoder(protobuf.PROTOBUF_ENCODER, &protobuf.ProtobufEncoder{})
  93. }
  94. func main() {
  95. if *doVersion {
  96. version()
  97. os.Exit(0)
  98. }
  99. loadConfig()
  100. // Output how many requests per second were sent
  101. if !config.Quiet {
  102. go func(c *uint64) {
  103. for {
  104. fmt.Printf("%d requests per second\n", *c)
  105. *c = 0
  106. time.Sleep(time.Second)
  107. }
  108. }(&count)
  109. }
  110. // NATS
  111. //
  112. if config.NatsURL == "" {
  113. log.Fatal("No NATS URL specified (-nats-url)!")
  114. }
  115. natsConn, err := nats.Connect(config.NatsURL)
  116. if err != nil {
  117. log.Fatal(err)
  118. }
  119. natsEC, err = nats.NewEncodedConn(natsConn, protobuf.PROTOBUF_ENCODER)
  120. if err != nil {
  121. log.Fatalf("Encoded Connection: %v!\n", err)
  122. }
  123. // What should I do?
  124. if config.RequestsFile != "" {
  125. replayFile()
  126. } else if config.Live {
  127. fmt.Printf("live capture (%s, %s) to %s\n", config.Interface, config.Filter, config.NatsURL)
  128. liveCapture()
  129. }
  130. }
  131. func liveCapture() {
  132. ipPriv = ip.NewIP()
  133. // PCAP setup
  134. //
  135. handle, err := pcap.OpenLive(config.Interface, int32(config.SnapshotLen), config.Promiscuous, timeout)
  136. if err != nil {
  137. log.Fatal(err)
  138. }
  139. defer handle.Close()
  140. err = handle.SetBPFFilter(config.Filter)
  141. if err != nil {
  142. log.Fatal(err)
  143. }
  144. packetSource := gopacket.NewPacketSource(handle, handle.LinkType())
  145. for packet := range packetSource.Packets() {
  146. go processPacket(packet)
  147. }
  148. }
  149. // processPacket receives a raw packet from pcap, builds a Request item from it and sends it to the queue
  150. func processPacket(packet gopacket.Packet) {
  151. hasIPv4 := false
  152. var ipSrc, ipDst string
  153. // IPv4
  154. if ipLayer := packet.Layer(layers.LayerTypeIPv4); ipLayer != nil {
  155. ip := ipLayer.(*layers.IPv4)
  156. ipSrc = ip.SrcIP.String()
  157. ipDst = ip.DstIP.String()
  158. hasIPv4 = true
  159. }
  160. // IPv6
  161. if !hasIPv4 {
  162. if ipLayer := packet.Layer(layers.LayerTypeIPv6); ipLayer != nil {
  163. ip := ipLayer.(*layers.IPv6)
  164. ipSrc = ip.SrcIP.String()
  165. ipDst = ip.DstIP.String()
  166. }
  167. }
  168. // TCP
  169. tcpLayer := packet.Layer(layers.LayerTypeTCP)
  170. if tcpLayer == nil {
  171. return
  172. }
  173. tcp, _ := tcpLayer.(*layers.TCP)
  174. portSrc := tcp.SrcPort
  175. portDst := tcp.DstPort
  176. sequence := tcp.Seq
  177. applicationLayer := packet.ApplicationLayer()
  178. if applicationLayer == nil {
  179. return
  180. }
  181. count++
  182. if len(applicationLayer.Payload()) < 50 {
  183. log.Println("application layer too small!")
  184. return
  185. }
  186. request := data.Request{
  187. IpSrc: ipSrc,
  188. IpDst: ipDst,
  189. PortSrc: uint32(portSrc),
  190. PortDst: uint32(portDst),
  191. TcpSeq: uint32(sequence),
  192. CreatedAt: packet.Metadata().CaptureInfo.Timestamp.UnixNano(),
  193. }
  194. switch config.Protocol {
  195. case "http":
  196. processHTTP(&request, applicationLayer.Payload())
  197. case "ajp13":
  198. processAJP13(&request, applicationLayer.Payload())
  199. }
  200. if config.UseXForwardedAsSource && request.XForwardedFor != "" {
  201. if strings.Contains(request.XForwardedFor, ",") {
  202. ips := strings.Split(request.XForwardedFor, ",")
  203. for i := len(ips) - 1; i >= 0; i-- {
  204. ipRaw := strings.TrimSpace(ips[i])
  205. ipAddr := net.ParseIP(ipRaw)
  206. if ipAddr != nil && !ipPriv.IsPrivate(ipAddr) {
  207. request.Source = ipRaw
  208. break
  209. }
  210. }
  211. } else {
  212. ipAddr := net.ParseIP(strings.TrimSpace(request.XForwardedFor))
  213. if !ipPriv.IsPrivate(ipAddr) {
  214. request.Source = request.XForwardedFor
  215. }
  216. }
  217. }
  218. if request.Source == request.IpSrc && request.XRealIP != "" {
  219. request.Source = request.XRealIP
  220. }
  221. if config.Trace {
  222. log.Printf("[%s] %s\n", request.Source, request.Url)
  223. }
  224. natsEC.Publish(config.NatsQueue, &request)
  225. }
  226. func processAJP13(request *data.Request, appData []byte) error {
  227. a, err := ajp13.Parse(appData)
  228. if err != nil {
  229. return fmt.Errorf("Failed to parse AJP13 request: %s", err)
  230. }
  231. request.Url = a.URI
  232. request.Method = a.Method()
  233. request.Host = a.Server
  234. request.Protocol = a.Version
  235. request.Origin = a.RemoteAddr.String()
  236. request.Source = a.RemoteAddr.String()
  237. if v, ok := a.Header("Referer"); ok {
  238. request.Referer = v
  239. }
  240. if v, ok := a.Header("Connection"); ok {
  241. request.Connection = v
  242. }
  243. if v, ok := a.Header("X-Forwarded-For"); ok {
  244. request.XForwardedFor = v
  245. }
  246. if v, ok := a.Header("X-Real-IP"); ok {
  247. request.XRealIP = v
  248. }
  249. if v, ok := a.Header("X-Requested-With"); ok {
  250. request.XRequestedWith = v
  251. }
  252. if v, ok := a.Header("Accept-Encoding"); ok {
  253. request.AcceptEncoding = v
  254. }
  255. if v, ok := a.Header("Accept-Language"); ok {
  256. request.AcceptLanguage = v
  257. }
  258. if v, ok := a.Header("User-Agent"); ok {
  259. request.UserAgent = v
  260. }
  261. if v, ok := a.Header("Accept"); ok {
  262. request.Accept = v
  263. }
  264. if v, ok := a.Header("Cookie"); ok {
  265. request.Cookie = v
  266. }
  267. if v, ok := a.Header("X-Forwarded-Host"); ok {
  268. if v != request.Host {
  269. request.Host = v
  270. }
  271. }
  272. if false && config.Trace {
  273. log.Println("Request")
  274. pretty.Println(request)
  275. }
  276. return nil
  277. }
  278. func processHTTP(request *data.Request, appData []byte) error {
  279. reader := bufio.NewReader(strings.NewReader(string(appData)))
  280. req, err := http.ReadRequest(reader)
  281. if err != nil {
  282. return fmt.Errorf("Failed to parse HTTP header: %s", err)
  283. }
  284. request.Url = req.URL.String()
  285. request.Method = req.Method
  286. request.Referer = req.Referer()
  287. request.Host = req.Host
  288. request.Protocol = req.Proto
  289. request.Origin = request.Host
  290. if _, ok := req.Header["Connection"]; ok {
  291. request.Connection = req.Header["Connection"][0]
  292. }
  293. if _, ok := req.Header["X-Forwarded-For"]; ok {
  294. request.XForwardedFor = req.Header["X-Forwarded-For"][0]
  295. }
  296. if _, ok := req.Header["X-Real-IP"]; ok {
  297. request.XRealIP = req.Header["X-Real-IP"][0]
  298. }
  299. if _, ok := req.Header["X-Requested-With"]; ok {
  300. request.XRequestedWith = req.Header["X-Requested-With"][0]
  301. }
  302. if _, ok := req.Header["Accept-Encoding"]; ok {
  303. request.AcceptEncoding = req.Header["Accept-Encoding"][0]
  304. }
  305. if _, ok := req.Header["Accept-Language"]; ok {
  306. request.AcceptLanguage = req.Header["Accept-Language"][0]
  307. }
  308. if _, ok := req.Header["User-Agent"]; ok {
  309. request.UserAgent = req.Header["User-Agent"][0]
  310. }
  311. if _, ok := req.Header["Accept"]; ok {
  312. request.Accept = req.Header["Accept"][0]
  313. }
  314. if _, ok := req.Header["Cookie"]; ok {
  315. request.Cookie = req.Header["Cookie"][0]
  316. }
  317. request.Source = request.IpSrc
  318. return nil
  319. }
  320. // replayFile takes a file containing a list of requests (SourceIP Url) and queues the requests
  321. // e.g.
  322. // 157.55.39.229 /gross-gerau/12012260-beate-anstatt
  323. // 103.232.100.98 /weinsheim-eifel/13729444-plus-warenhandelsges-mbh
  324. func replayFile() {
  325. var req data.Request
  326. var startTs time.Time
  327. var endTs time.Time
  328. for {
  329. fh, err := os.Open(config.RequestsFile)
  330. if err != nil {
  331. log.Fatalf("Failed to open request file '%s': %s", config.RequestsFile, err)
  332. }
  333. c := csv.NewReader(fh)
  334. c.Comma = ' '
  335. for {
  336. if config.SleepFor.Duration > time.Nanosecond {
  337. startTs = time.Now()
  338. }
  339. r, err := c.Read()
  340. if err == io.EOF {
  341. break
  342. }
  343. if err != nil {
  344. log.Println(err)
  345. continue
  346. }
  347. req.IpSrc = r[0]
  348. req.Source = r[0]
  349. req.Url = r[1]
  350. req.UserAgent = "Munch/1.0"
  351. req.Host = "demo.scraperwall.com"
  352. req.CreatedAt = time.Now().UnixNano()
  353. natsEC.Publish(config.NatsQueue, &req)
  354. count++
  355. if config.SleepFor.Duration >= time.Nanosecond {
  356. endTs = time.Now()
  357. if endTs.Before(startTs.Add(config.SleepFor.Duration)) {
  358. time.Sleep(config.SleepFor.Duration - endTs.Sub(startTs))
  359. }
  360. }
  361. }
  362. }
  363. }
  364. func loadConfig() {
  365. // initialize with values from the command line / environment
  366. config.Live = *doLiveCapture
  367. config.Interface = *iface
  368. config.SnapshotLen = *snapshotLen
  369. config.Filter = *filter
  370. config.Promiscuous = *promiscuous
  371. config.NatsURL = *natsURL
  372. config.NatsQueue = *natsQueue
  373. config.SleepFor.Duration = *sleepFor
  374. config.RequestsFile = *requestsFile
  375. config.UseXForwardedAsSource = *useXForwardedAsSource
  376. config.Protocol = *protocol
  377. config.Quiet = *beQuiet
  378. config.Trace = *trace
  379. if *configFile == "" {
  380. return
  381. }
  382. _, err := os.Stat(*configFile)
  383. if err != nil {
  384. log.Printf("%s: %s\n", *configFile, err)
  385. return
  386. }
  387. if _, err = toml.DecodeFile(*configFile, &config); err != nil {
  388. log.Printf("%s: %s\n", *configFile, err)
  389. }
  390. if !config.Quiet {
  391. config.print()
  392. }
  393. }
  394. // version outputs build information
  395. func version() {
  396. fmt.Printf("munchclient %s, built on %s\n", Version, BuildDate)
  397. }