main.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486
  1. package main
  2. import (
  3. "bufio"
  4. "encoding/csv"
  5. "flag"
  6. "fmt"
  7. "io"
  8. "log"
  9. "net"
  10. "net/http"
  11. "os"
  12. "strings"
  13. "time"
  14. "github.com/BurntSushi/toml"
  15. "github.com/google/gopacket"
  16. "github.com/google/gopacket/layers"
  17. "github.com/google/gopacket/pcap"
  18. "github.com/kr/pretty"
  19. "github.com/nats-io/nats"
  20. "github.com/nats-io/nats/encoders/protobuf"
  21. "git.scraperwall.com/scw/ajp13"
  22. "git.scraperwall.com/scw/data"
  23. "git.scraperwall.com/scw/ip"
  24. )
  25. var (
  26. doLiveCapture = flag.Bool("live", false, "Capture data in real time from a given interface")
  27. iface = flag.String("interface", "eth0", "Interface to get packets from")
  28. snapshotLen = flag.Int("snapshot-len", 8192, "Snapshot Length in Bytes")
  29. filter = flag.String("filter", "tcp", "PCAP filter expression")
  30. promiscuous = flag.Bool("promiscuous", false, "Switch interface into promiscuous mode?")
  31. natsURL = flag.String("nats-url", "nats://127.0.0.1:4222", "The URL of the NATS server")
  32. natsQueue = flag.String("nats-queue", "requests", "The NATS queue name")
  33. sleepFor = flag.Duration("sleep", 0, "Sleep this long between sending data (only when replaying a file)")
  34. requestsFile = flag.String("requests", "", "CSV file containing requests (IP and URL)")
  35. protocol = flag.String("protocol", "http", "which protocol to parse: http or ajp13")
  36. useXForwardedAsSource = flag.Bool("use-x-forwarded", false, "Use the IP address in X-Forwarded-For as source")
  37. trace = flag.Bool("trace", false, "Trace the packet capturing")
  38. configFile = flag.String("config", "", "The location of the TOML config file")
  39. beQuiet = flag.Bool("quiet", true, "Be quiet")
  40. doVersion = flag.Bool("version", false, "Show version information")
  41. natsEC *nats.EncodedConn
  42. count uint64
  43. timeout = -1 * time.Second
  44. ipPriv *ip.IP
  45. config Config
  46. // Version contains the program Version, e.g. 1.0.1
  47. Version string
  48. // BuildDate contains the date and time at which the program was compiled
  49. BuildDate string
  50. )
  51. // Config contains the program configuration
  52. type Config struct {
  53. Live bool
  54. Interface string
  55. SnapshotLen int
  56. Filter string
  57. Promiscuous bool
  58. NatsURL string
  59. NatsQueue string
  60. SleepFor duration
  61. RequestsFile string
  62. UseXForwardedAsSource bool
  63. Quiet bool
  64. Protocol string
  65. Trace bool
  66. }
  67. type duration struct {
  68. time.Duration
  69. }
  70. func (d *duration) UnmarshalText(text []byte) error {
  71. var err error
  72. d.Duration, err = time.ParseDuration(string(text))
  73. return err
  74. }
  75. func (c Config) print() {
  76. fmt.Printf("Live: %t\n", c.Live)
  77. fmt.Printf("Interface: %s\n", c.Interface)
  78. fmt.Printf("SnapshotLen: %d\n", c.SnapshotLen)
  79. fmt.Printf("Filter: %s\n", c.Filter)
  80. fmt.Printf("Promiscuous: %t\n", c.Promiscuous)
  81. fmt.Printf("NatsURL: %s\n", c.NatsURL)
  82. fmt.Printf("NatsQueue: %s\n", c.NatsQueue)
  83. fmt.Printf("SleepFor: %s\n", c.SleepFor.String())
  84. fmt.Printf("RequestsFile: %s\n", c.RequestsFile)
  85. fmt.Printf("UseXForwardedAsSource: %t\n", c.UseXForwardedAsSource)
  86. fmt.Printf("Protocol: %s\n", c.Protocol)
  87. fmt.Printf("Quiet: %t\n", c.Quiet)
  88. fmt.Printf("Trace: %t\n", c.Trace)
  89. }
  90. func init() {
  91. flag.Parse()
  92. nats.RegisterEncoder(protobuf.PROTOBUF_ENCODER, &protobuf.ProtobufEncoder{})
  93. }
  94. func main() {
  95. if *doVersion {
  96. version()
  97. os.Exit(0)
  98. }
  99. loadConfig()
  100. // Output how many requests per second were sent
  101. if !config.Quiet {
  102. go func(c *uint64) {
  103. for {
  104. fmt.Printf("%d requests per second\n", *c)
  105. *c = 0
  106. time.Sleep(time.Second)
  107. }
  108. }(&count)
  109. }
  110. // NATS
  111. //
  112. if config.NatsURL == "" {
  113. log.Fatal("No NATS URL specified (-nats-url)!")
  114. }
  115. natsConn, err := nats.Connect(config.NatsURL)
  116. if err != nil {
  117. log.Fatal(err)
  118. }
  119. natsEC, err = nats.NewEncodedConn(natsConn, protobuf.PROTOBUF_ENCODER)
  120. if err != nil {
  121. log.Fatalf("Encoded Connection: %v!\n", err)
  122. }
  123. // What should I do?
  124. if config.RequestsFile != "" {
  125. replayFile()
  126. } else if config.Live {
  127. fmt.Printf("live capture (%s, %s) to %s\n", config.Interface, config.Filter, config.NatsURL)
  128. liveCapture()
  129. }
  130. }
  131. func liveCapture() {
  132. ipPriv = ip.NewIP()
  133. // PCAP setup
  134. //
  135. handle, err := pcap.OpenLive(config.Interface, int32(config.SnapshotLen), config.Promiscuous, timeout)
  136. if err != nil {
  137. log.Fatal(err)
  138. }
  139. defer handle.Close()
  140. err = handle.SetBPFFilter(config.Filter)
  141. if err != nil {
  142. log.Fatal(err)
  143. }
  144. packetSource := gopacket.NewPacketSource(handle, handle.LinkType())
  145. for packet := range packetSource.Packets() {
  146. go processPacket(packet)
  147. }
  148. }
  149. // processPacket receives a raw packet from pcap, builds a Request item from it and sends it to the queue
  150. func processPacket(packet gopacket.Packet) {
  151. hasIPv4 := false
  152. var ipSrc, ipDst string
  153. // IPv4
  154. if ipLayer := packet.Layer(layers.LayerTypeIPv4); ipLayer != nil {
  155. ip := ipLayer.(*layers.IPv4)
  156. ipSrc = ip.SrcIP.String()
  157. ipDst = ip.DstIP.String()
  158. hasIPv4 = true
  159. }
  160. // IPv6
  161. if !hasIPv4 {
  162. if ipLayer := packet.Layer(layers.LayerTypeIPv6); ipLayer != nil {
  163. ip := ipLayer.(*layers.IPv6)
  164. ipSrc = ip.SrcIP.String()
  165. ipDst = ip.DstIP.String()
  166. }
  167. }
  168. // TCP
  169. tcpLayer := packet.Layer(layers.LayerTypeTCP)
  170. if tcpLayer == nil {
  171. return
  172. }
  173. tcp, _ := tcpLayer.(*layers.TCP)
  174. portSrc := tcp.SrcPort
  175. portDst := tcp.DstPort
  176. sequence := tcp.Seq
  177. applicationLayer := packet.ApplicationLayer()
  178. if applicationLayer == nil {
  179. return
  180. }
  181. count++
  182. if len(applicationLayer.Payload()) < 50 {
  183. log.Println("application layer too small!")
  184. return
  185. }
  186. request := data.Request{
  187. IpSrc: ipSrc,
  188. IpDst: ipDst,
  189. PortSrc: uint32(portSrc),
  190. PortDst: uint32(portDst),
  191. TcpSeq: uint32(sequence),
  192. CreatedAt: packet.Metadata().CaptureInfo.Timestamp.UnixNano(),
  193. }
  194. switch config.Protocol {
  195. case "http":
  196. err := processHTTP(&request, applicationLayer.Payload())
  197. if err != nil {
  198. log.Println(err)
  199. return
  200. }
  201. case "ajp13":
  202. err := processAJP13(&request, applicationLayer.Payload())
  203. if err != nil {
  204. log.Println(err)
  205. return
  206. }
  207. }
  208. if config.UseXForwardedAsSource && request.XForwardedFor != "" {
  209. if strings.Contains(request.XForwardedFor, ",") {
  210. ips := strings.Split(request.XForwardedFor, ",")
  211. for i := len(ips) - 1; i >= 0; i-- {
  212. ipRaw := strings.TrimSpace(ips[i])
  213. ipAddr := net.ParseIP(ipRaw)
  214. if ipAddr != nil && !ipPriv.IsPrivate(ipAddr) {
  215. request.Source = ipRaw
  216. break
  217. }
  218. }
  219. } else {
  220. ipAddr := net.ParseIP(strings.TrimSpace(request.XForwardedFor))
  221. if !ipPriv.IsPrivate(ipAddr) {
  222. request.Source = request.XForwardedFor
  223. }
  224. }
  225. }
  226. if request.Source == request.IpSrc && request.XRealIP != "" {
  227. request.Source = request.XRealIP
  228. }
  229. if config.Trace {
  230. log.Printf("[%s] %s\n", request.Source, request.Url)
  231. }
  232. natsEC.Publish(config.NatsQueue, &request)
  233. }
  234. func processAJP13(request *data.Request, appData []byte) error {
  235. a, err := ajp13.Parse(appData)
  236. if err != nil {
  237. return fmt.Errorf("Failed to parse AJP13 request: %s", err)
  238. }
  239. request.Url = a.URI
  240. request.Method = a.Method()
  241. request.Host = a.Server
  242. request.Protocol = a.Version
  243. request.Origin = a.RemoteAddr.String()
  244. request.Source = a.RemoteAddr.String()
  245. if v, ok := a.Header("Referer"); ok {
  246. request.Referer = v
  247. }
  248. if v, ok := a.Header("Connection"); ok {
  249. request.Connection = v
  250. }
  251. if v, ok := a.Header("X-Forwarded-For"); ok {
  252. request.XForwardedFor = v
  253. }
  254. if v, ok := a.Header("X-Real-IP"); ok {
  255. request.XRealIP = v
  256. }
  257. if v, ok := a.Header("X-Requested-With"); ok {
  258. request.XRequestedWith = v
  259. }
  260. if v, ok := a.Header("Accept-Encoding"); ok {
  261. request.AcceptEncoding = v
  262. }
  263. if v, ok := a.Header("Accept-Language"); ok {
  264. request.AcceptLanguage = v
  265. }
  266. if v, ok := a.Header("User-Agent"); ok {
  267. request.UserAgent = v
  268. }
  269. if v, ok := a.Header("Accept"); ok {
  270. request.Accept = v
  271. }
  272. if v, ok := a.Header("Cookie"); ok {
  273. request.Cookie = v
  274. }
  275. if v, ok := a.Header("X-Forwarded-Host"); ok {
  276. if v != request.Host {
  277. request.Host = v
  278. }
  279. }
  280. if false && config.Trace {
  281. log.Println("Request")
  282. pretty.Println(request)
  283. }
  284. return nil
  285. }
  286. func processHTTP(request *data.Request, appData []byte) error {
  287. reader := bufio.NewReader(strings.NewReader(string(appData)))
  288. req, err := http.ReadRequest(reader)
  289. if err != nil {
  290. return fmt.Errorf("Failed to parse HTTP header: %s", err)
  291. }
  292. request.Url = req.URL.String()
  293. request.Method = req.Method
  294. request.Referer = req.Referer()
  295. request.Host = req.Host
  296. request.Protocol = req.Proto
  297. request.Origin = request.Host
  298. if _, ok := req.Header["Connection"]; ok {
  299. request.Connection = req.Header["Connection"][0]
  300. }
  301. if _, ok := req.Header["X-Forwarded-For"]; ok {
  302. request.XForwardedFor = req.Header["X-Forwarded-For"][0]
  303. }
  304. if _, ok := req.Header["X-Real-IP"]; ok {
  305. request.XRealIP = req.Header["X-Real-IP"][0]
  306. }
  307. if _, ok := req.Header["X-Requested-With"]; ok {
  308. request.XRequestedWith = req.Header["X-Requested-With"][0]
  309. }
  310. if _, ok := req.Header["Accept-Encoding"]; ok {
  311. request.AcceptEncoding = req.Header["Accept-Encoding"][0]
  312. }
  313. if _, ok := req.Header["Accept-Language"]; ok {
  314. request.AcceptLanguage = req.Header["Accept-Language"][0]
  315. }
  316. if _, ok := req.Header["User-Agent"]; ok {
  317. request.UserAgent = req.Header["User-Agent"][0]
  318. }
  319. if _, ok := req.Header["Accept"]; ok {
  320. request.Accept = req.Header["Accept"][0]
  321. }
  322. if _, ok := req.Header["Cookie"]; ok {
  323. request.Cookie = req.Header["Cookie"][0]
  324. }
  325. request.Source = request.IpSrc
  326. return nil
  327. }
  328. // replayFile takes a file containing a list of requests (SourceIP Url) and queues the requests
  329. // e.g.
  330. // 157.55.39.229 /gross-gerau/12012260-beate-anstatt
  331. // 103.232.100.98 /weinsheim-eifel/13729444-plus-warenhandelsges-mbh
  332. func replayFile() {
  333. var req data.Request
  334. var startTs time.Time
  335. var endTs time.Time
  336. for {
  337. fh, err := os.Open(config.RequestsFile)
  338. if err != nil {
  339. log.Fatalf("Failed to open request file '%s': %s", config.RequestsFile, err)
  340. }
  341. c := csv.NewReader(fh)
  342. c.Comma = ' '
  343. for {
  344. if config.SleepFor.Duration > time.Nanosecond {
  345. startTs = time.Now()
  346. }
  347. r, err := c.Read()
  348. if err == io.EOF {
  349. break
  350. }
  351. if err != nil {
  352. log.Println(err)
  353. continue
  354. }
  355. req.IpSrc = r[0]
  356. req.Source = r[0]
  357. req.Url = r[1]
  358. req.UserAgent = "Munch/1.0"
  359. req.Host = "demo.scraperwall.com"
  360. req.CreatedAt = time.Now().UnixNano()
  361. natsEC.Publish(config.NatsQueue, &req)
  362. count++
  363. if config.SleepFor.Duration >= time.Nanosecond {
  364. endTs = time.Now()
  365. if endTs.Before(startTs.Add(config.SleepFor.Duration)) {
  366. time.Sleep(config.SleepFor.Duration - endTs.Sub(startTs))
  367. }
  368. }
  369. }
  370. }
  371. }
  372. func loadConfig() {
  373. // initialize with values from the command line / environment
  374. config.Live = *doLiveCapture
  375. config.Interface = *iface
  376. config.SnapshotLen = *snapshotLen
  377. config.Filter = *filter
  378. config.Promiscuous = *promiscuous
  379. config.NatsURL = *natsURL
  380. config.NatsQueue = *natsQueue
  381. config.SleepFor.Duration = *sleepFor
  382. config.RequestsFile = *requestsFile
  383. config.UseXForwardedAsSource = *useXForwardedAsSource
  384. config.Protocol = *protocol
  385. config.Quiet = *beQuiet
  386. config.Trace = *trace
  387. if *configFile == "" {
  388. return
  389. }
  390. _, err := os.Stat(*configFile)
  391. if err != nil {
  392. log.Printf("%s: %s\n", *configFile, err)
  393. return
  394. }
  395. if _, err = toml.DecodeFile(*configFile, &config); err != nil {
  396. log.Printf("%s: %s\n", *configFile, err)
  397. }
  398. if !config.Quiet {
  399. config.print()
  400. }
  401. }
  402. // version outputs build information
  403. func version() {
  404. fmt.Printf("munchclient %s, built on %s\n", Version, BuildDate)
  405. }