main.go 26 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084
  1. package main
  2. import (
  3. "bufio"
  4. "bytes"
  5. "crypto/sha1"
  6. "encoding/csv"
  7. "encoding/json"
  8. "flag"
  9. "fmt"
  10. "io"
  11. "log"
  12. "math/rand"
  13. "net"
  14. "net/http"
  15. "os"
  16. "regexp"
  17. "strings"
  18. "time"
  19. "github.com/BurntSushi/toml"
  20. "github.com/Songmu/axslogparser"
  21. "github.com/google/gopacket"
  22. "github.com/google/gopacket/layers"
  23. "github.com/google/gopacket/pcap"
  24. "github.com/hpcloud/tail"
  25. "github.com/nats-io/nats"
  26. "github.com/nats-io/nats/encoders/protobuf"
  27. "github.com/satyrius/gonx"
  28. "git.scraperwall.com/scw/ajp13"
  29. "git.scraperwall.com/scw/data"
  30. "git.scraperwall.com/scw/ip"
  31. )
  32. var (
  33. doLiveCapture = flag.Bool("live", false, "Capture data in real time from a given interface")
  34. iface = flag.String("interface", "eth0", "Interface to get packets from")
  35. snapshotLen = flag.Int("snapshot-len", 8192, "Snapshot Length in Bytes")
  36. filter = flag.String("filter", "tcp", "PCAP filter expression")
  37. promiscuous = flag.Bool("promiscuous", false, "Switch interface into promiscuous mode?")
  38. natsURL = flag.String("nats-url", "nats://127.0.0.1:4222", "The URL of the NATS server")
  39. natsUser = flag.String("nats-user", "", "The user for NATS authentication")
  40. natsPassword = flag.String("nats-password", "", "The password for NATS authentication")
  41. natsQueue = flag.String("nats-queue", "requests", "The NATS queue name")
  42. natsCA = flag.String("nats-ca", "", "CA chain for NATS TLS")
  43. sleepFor = flag.Duration("sleep", 0, "Sleep this long between sending data (only when replaying a file)")
  44. requestsFile = flag.String("requests", "", "CSV file containing requests (IP and URL)")
  45. protocol = flag.String("protocol", "http", "which protocol to parse: http or ajp13")
  46. useXForwardedAsSource = flag.Bool("use-x-forwarded", false, "Use the IP address in X-Forwarded-For as source")
  47. trace = flag.Bool("trace", false, "Trace the packet capturing")
  48. apacheLog = flag.String("apache-log", "", "Parse an Apache Log file")
  49. apacheReplay = flag.String("apache-replay", "", "Apache log file to replay into the system")
  50. nginxLog = flag.String("nginx-log", "", "Nginx log file to tail")
  51. nginxFormat = flag.String("nginx-format", "", "The nginx log file format")
  52. hostName = flag.String("hostname", "", "Override the captured hostname with this one")
  53. accessWatchKey = flag.String("access-watch-key", "", "access.watch API key")
  54. configFile = flag.String("config", "", "The location of the TOML config file")
  55. beQuiet = flag.Bool("quiet", true, "Be quiet")
  56. doVersion = flag.Bool("version", false, "Show version information")
  57. natsEC *nats.EncodedConn
  58. natsJSONEC *nats.EncodedConn
  59. natsErrorChan chan error
  60. natsIsAvailable bool
  61. count uint64
  62. timeout = -1 * time.Second
  63. ipPriv *ip.IP
  64. config Config
  65. // Version contains the program Version, e.g. 1.0.1
  66. Version string
  67. // BuildDate contains the date and time at which the program was compiled
  68. BuildDate string
  69. )
  70. // Config contains the program configuration
  71. type Config struct {
  72. Live bool
  73. Interface string
  74. SnapshotLen int
  75. Filter string
  76. Promiscuous bool
  77. NatsURL string
  78. NatsQueue string
  79. NatsUser string
  80. NatsPassword string
  81. NatsCA string
  82. SleepFor duration
  83. RequestsFile string
  84. UseXForwardedAsSource bool
  85. Quiet bool
  86. Protocol string
  87. Trace bool
  88. ApacheLog string
  89. ApacheReplay string
  90. NginxLog string
  91. NginxLogFormat string
  92. HostName string
  93. AccessWatchKey string
  94. }
  95. type duration struct {
  96. time.Duration
  97. }
  98. func (d *duration) UnmarshalText(text []byte) error {
  99. var err error
  100. d.Duration, err = time.ParseDuration(string(text))
  101. return err
  102. }
  103. func (c Config) print() {
  104. fmt.Printf("Live: %t\n", c.Live)
  105. fmt.Printf("Interface: %s\n", c.Interface)
  106. fmt.Printf("SnapshotLen: %d\n", c.SnapshotLen)
  107. fmt.Printf("Filter: %s\n", c.Filter)
  108. fmt.Printf("Promiscuous: %t\n", c.Promiscuous)
  109. fmt.Printf("NatsURL: %s\n", c.NatsURL)
  110. fmt.Printf("NatsQueue: %s\n", c.NatsQueue)
  111. fmt.Printf("NatsUser: %s\n", c.NatsUser)
  112. fmt.Printf("NatsPassword: %s\n", c.NatsPassword)
  113. fmt.Printf("NatsCA: %s\n", c.NatsCA)
  114. fmt.Printf("SleepFor: %s\n", c.SleepFor.String())
  115. fmt.Printf("RequestsFile: %s\n", c.RequestsFile)
  116. fmt.Printf("Apache Log: %s\n", c.ApacheLog)
  117. fmt.Printf("Apache Replay: %s\n", c.ApacheReplay)
  118. fmt.Printf("Nginx Log: %s\n", c.NginxLog)
  119. fmt.Printf("Nginx Log Format: %s\n", c.NginxLogFormat)
  120. fmt.Printf("HostName: %s\n", c.HostName)
  121. fmt.Printf("AccessWatchKey: %s\n", c.AccessWatchKey)
  122. fmt.Printf("UseXForwardedAsSource: %t\n", c.UseXForwardedAsSource)
  123. fmt.Printf("Protocol: %s\n", c.Protocol)
  124. fmt.Printf("Quiet: %t\n", c.Quiet)
  125. fmt.Printf("Trace: %t\n", c.Trace)
  126. }
  127. func init() {
  128. flag.Parse()
  129. nats.RegisterEncoder(protobuf.PROTOBUF_ENCODER, &protobuf.ProtobufEncoder{})
  130. }
  131. func main() {
  132. if *doVersion {
  133. version()
  134. os.Exit(0)
  135. }
  136. loadConfig()
  137. // Output how many requests per second were sent
  138. if !config.Quiet {
  139. go func(c *uint64) {
  140. for {
  141. fmt.Printf("%d requests per second\n", *c)
  142. *c = 0
  143. time.Sleep(time.Second)
  144. }
  145. }(&count)
  146. }
  147. // NATS
  148. //
  149. if config.NatsURL == "" && config.AccessWatchKey == "" {
  150. log.Fatal("No NATS URL specified (-nats-url)!")
  151. }
  152. natsIsAvailable = false
  153. natsErrorChan = make(chan error, 1)
  154. err := connectToNATS()
  155. if err != nil && config.AccessWatchKey == "" {
  156. log.Fatal(err)
  157. }
  158. go natsWatchdog(natsErrorChan)
  159. // What should I do?
  160. if config.RequestsFile != "" {
  161. replayFile()
  162. } else if config.ApacheReplay != "" {
  163. apacheLogReplay(config.ApacheReplay)
  164. } else if config.ApacheLog != "" {
  165. apacheLogCapture(config.ApacheLog)
  166. } else if config.Live {
  167. fmt.Printf("live capture (%s, %s) to %s\n", config.Interface, config.Filter, config.NatsURL)
  168. liveCapture()
  169. } else if config.NginxLog != "" && config.NginxLogFormat != "" {
  170. nginxLogCapture(config.NginxLog, config.NginxLogFormat)
  171. }
  172. }
  173. func natsWatchdog(closedChan chan error) {
  174. var lastError error
  175. for err := range closedChan {
  176. if lastError != err {
  177. lastError = err
  178. log.Println(err)
  179. }
  180. if err != nats.ErrConnectionClosed {
  181. continue
  182. }
  183. RECONNECT:
  184. for {
  185. log.Printf("Reconnecting to NATS at %s\n", *natsURL)
  186. err := connectToNATS()
  187. if err == nil {
  188. break RECONNECT
  189. }
  190. time.Sleep(1 * time.Second)
  191. }
  192. }
  193. }
  194. func connectToNATS() error {
  195. var natsConn *nats.Conn
  196. var err error
  197. if config.NatsUser != "" && config.NatsPassword != "" && config.NatsCA != "" {
  198. natsConn, err = nats.Connect(config.NatsURL, nats.UserInfo(config.NatsUser, config.NatsPassword), nats.RootCAs(config.NatsCA))
  199. } else {
  200. if config.NatsPassword != "" && config.NatsUser != "" {
  201. natsConn, err = nats.Connect(config.NatsURL, nats.UserInfo(config.NatsUser, config.NatsPassword))
  202. } else {
  203. natsConn, err = nats.Connect(config.NatsURL)
  204. }
  205. }
  206. if err != nil {
  207. return err
  208. }
  209. natsEC, err = nats.NewEncodedConn(natsConn, protobuf.PROTOBUF_ENCODER)
  210. if err != nil {
  211. return fmt.Errorf("Encoded Connection: %v", err)
  212. }
  213. natsJSONEC, err = nats.NewEncodedConn(natsConn, nats.JSON_ENCODER)
  214. if err != nil {
  215. return fmt.Errorf("Encoded Connection: %v", err)
  216. }
  217. natsIsAvailable = true
  218. return nil
  219. }
  220. func nginxLogCapture(logfile, format string) {
  221. if _, err := os.Stat(logfile); err != nil {
  222. log.Fatalf("%s: %s", logfile, err)
  223. }
  224. t, err := tail.TailFile(logfile, tail.Config{
  225. Follow: true, // follow the file
  226. ReOpen: true, // reopen log file when it gets closed/rotated
  227. Logger: tail.DiscardingLogger, // don't log anything
  228. Location: &tail.SeekInfo{Offset: 0, Whence: 2}, // start at the end of the file
  229. })
  230. if err != nil {
  231. log.Fatalf("%s: %s", logfile, err)
  232. }
  233. // `$remote_addr - $remote_user [$time_local] "$request" $status $body_bytes_sent "$http_referer" "$http_user_agent"`
  234. p := gonx.NewParser(format)
  235. reqRegexp := regexp.MustCompile(`^([A-Z]+)\s+(.+?)\s+(HTTP/\d+\.\d+)$`)
  236. for line := range t.Lines {
  237. l := line.Text
  238. logEntry, err := p.ParseString(l)
  239. if err != nil {
  240. log.Println(err)
  241. continue
  242. }
  243. remote, err := logEntry.Field("remote_addr")
  244. if err != nil {
  245. log.Println(err)
  246. continue
  247. }
  248. // only use the first host in case there are multiple hosts in the log
  249. if cidx := strings.Index(remote, ","); cidx >= 0 {
  250. remote = remote[0:cidx]
  251. }
  252. timestampStr, err := logEntry.Field("time_local")
  253. if err != nil {
  254. log.Println(err)
  255. continue
  256. }
  257. timeStamp, err := time.Parse("02/Jan/2006:15:04:05 -0700", timestampStr)
  258. if err != nil {
  259. log.Println(err)
  260. continue
  261. }
  262. httpRequest, err := logEntry.Field("request")
  263. if err != nil {
  264. log.Println(err)
  265. continue
  266. }
  267. reqData := reqRegexp.FindStringSubmatch(httpRequest)
  268. if len(reqData) < 4 {
  269. log.Printf("reqData is too short: %d instead of 4\n", len(reqData))
  270. continue
  271. }
  272. host := config.HostName
  273. if host == "" {
  274. host = "[not available]"
  275. }
  276. request := data.Request{
  277. IpSrc: remote,
  278. Origin: remote,
  279. Source: remote,
  280. IpDst: "127.0.0.1",
  281. PortSrc: 0,
  282. PortDst: 0,
  283. TcpSeq: 0,
  284. CreatedAt: timeStamp.Unix(),
  285. Url: reqData[2],
  286. Method: reqData[1],
  287. Host: host,
  288. Protocol: reqData[3],
  289. }
  290. request.Referer, _ = logEntry.Field("http_referer")
  291. request.UserAgent, _ = logEntry.Field("http_user_agent")
  292. if config.Trace {
  293. log.Printf("[%s] %s\n", request.Source, request.Url)
  294. }
  295. count++
  296. publishRequest(config.NatsQueue, &request)
  297. }
  298. }
  299. func apacheLogReplay(logfile string) {
  300. file, err := os.Open(logfile)
  301. if err != nil {
  302. log.Fatalf("%s: %s", logfile, err)
  303. }
  304. defer file.Close()
  305. scanner := bufio.NewScanner(file)
  306. var p axslogparser.Parser
  307. parserSet := false
  308. var tOffset time.Duration
  309. for scanner.Scan() {
  310. l := scanner.Text()
  311. if err := scanner.Err(); err != nil {
  312. log.Fatal(err)
  313. }
  314. if !parserSet {
  315. p, _, err = axslogparser.GuessParser(l)
  316. if err != nil {
  317. log.Println(err)
  318. continue
  319. }
  320. parserSet = true
  321. }
  322. logEntry, err := p.Parse(l)
  323. if err != nil {
  324. log.Println(err)
  325. continue
  326. }
  327. if tOffset == 0 {
  328. tOffset = time.Now().Sub(logEntry.Time)
  329. }
  330. ts := logEntry.Time.Add(tOffset)
  331. if ts.After(time.Now()) {
  332. time.Sleep(ts.Sub(time.Now()))
  333. }
  334. // fmt.Println(l)
  335. remote := logEntry.Host
  336. if *useXForwardedAsSource && logEntry.ForwardedFor != "" {
  337. remote = logEntry.ForwardedFor
  338. }
  339. // only use the first host in case there are multiple hosts in the log
  340. if cidx := strings.Index(remote, ","); cidx >= 0 {
  341. remote = remote[0:cidx]
  342. }
  343. // extract the virtual host
  344. var virtualHost string
  345. vhost := logEntry.VirtualHost
  346. if vhost != "" {
  347. vhostAndPort := strings.Split(logEntry.VirtualHost, ":")
  348. virtualHost = vhostAndPort[0]
  349. } else {
  350. if config.HostName != "" {
  351. vhost = config.HostName
  352. } else {
  353. vhost = "[not available]"
  354. }
  355. }
  356. request := data.Request{
  357. IpSrc: remote,
  358. IpDst: "127.0.0.1",
  359. PortSrc: 0,
  360. PortDst: 0,
  361. TcpSeq: 0,
  362. CreatedAt: (logEntry.Time.Add(tOffset)).UnixNano(),
  363. Url: logEntry.RequestURI,
  364. Method: logEntry.Method,
  365. Host: virtualHost,
  366. Protocol: logEntry.Protocol,
  367. Origin: remote,
  368. Source: remote,
  369. Referer: logEntry.Referer,
  370. XForwardedFor: logEntry.ForwardedFor,
  371. UserAgent: logEntry.UserAgent,
  372. }
  373. if config.Trace {
  374. log.Printf("[%s] %s\n", request.Source, request.Url)
  375. }
  376. count++
  377. publishRequest(config.NatsQueue, &request)
  378. }
  379. }
  380. /*
  381. func apacheLogReplay(logfile string) {
  382. if _, err := os.Stat(logfile); err != nil {
  383. log.Fatalf("%s: %s", logfile, err)
  384. }
  385. file, err := os.Open(logfile)
  386. if err != nil {
  387. log.Fatalf("%s: %s", logfile, err)
  388. }
  389. defer file.Close()
  390. scanner := bufio.NewScanner(file)
  391. var p axslogparser.Parser
  392. parserSet := false
  393. var tOffset time.Duration
  394. for scanner.Scan() {
  395. l := scanner.Text()
  396. if err := scanner.Err(); err != nil {
  397. log.Fatal(err)
  398. }
  399. if !parserSet {
  400. p, _, err = axslogparser.GuessParser(l)
  401. if err != nil {
  402. log.Println(err)
  403. continue
  404. }
  405. parserSet = true
  406. }
  407. logEntry, err := p.Parse(l)
  408. if err != nil {
  409. log.Println(err)
  410. continue
  411. }
  412. remote := logEntry.Host
  413. if *useXForwardedAsSource && logEntry.ForwardedFor != "" {
  414. remote = logEntry.ForwardedFor
  415. }
  416. // only use the first host in case there are multiple hosts in the log
  417. if cidx := strings.Index(remote, ","); cidx >= 0 {
  418. remote = remote[0:cidx]
  419. }
  420. // extract the virtual host
  421. var virtualHost string
  422. vhost := logEntry.VirtualHost
  423. if vhost != "" {
  424. vhostAndPort := strings.Split(logEntry.VirtualHost, ":")
  425. virtualHost = vhostAndPort[0]
  426. } else {
  427. if config.HostName != "" {
  428. vhost = config.HostName
  429. } else {
  430. vhost = "[not available]"
  431. }
  432. }
  433. if tOffset == 0 {
  434. tOffset = time.Now().Sub(logEntry.Time)
  435. }
  436. request := data.Request{
  437. IpSrc: remote,
  438. IpDst: "127.0.0.1",
  439. PortSrc: 0,
  440. PortDst: 0,
  441. TcpSeq: 0,
  442. CreatedAt: (logEntry.Time.Add(tOffset)).UnixNano(),
  443. Url: logEntry.RequestURI,
  444. Method: logEntry.Method,
  445. Host: virtualHost,
  446. Protocol: logEntry.Protocol,
  447. Origin: remote,
  448. Source: remote,
  449. Referer: logEntry.Referer,
  450. XForwardedFor: logEntry.ForwardedFor,
  451. UserAgent: logEntry.UserAgent,
  452. }
  453. if config.Trace {
  454. log.Printf("[%s] %s\n", request.Source, request.Url)
  455. }
  456. count++
  457. publishRequest(config.NatsQueue, &request)
  458. }
  459. }
  460. */
  461. func apacheLogCapture(logfile string) {
  462. if _, err := os.Stat(logfile); err != nil {
  463. log.Fatalf("%s: %s", logfile, err)
  464. }
  465. t, err := tail.TailFile(logfile, tail.Config{
  466. Follow: true, // follow the file
  467. ReOpen: true, // reopen log file when it gets closed/rotated
  468. Logger: tail.DiscardingLogger, // don't log anything
  469. Location: &tail.SeekInfo{Offset: 0, Whence: 2}, // start at the end of the file
  470. })
  471. if err != nil {
  472. log.Fatalf("%s: %s", logfile, err)
  473. }
  474. var p axslogparser.Parser
  475. parserSet := false
  476. for line := range t.Lines {
  477. l := line.Text
  478. if !parserSet {
  479. p, _, err = axslogparser.GuessParser(l)
  480. if err != nil {
  481. log.Println(err)
  482. continue
  483. }
  484. parserSet = true
  485. }
  486. logEntry, err := p.Parse(l)
  487. if err != nil {
  488. log.Println(err)
  489. continue
  490. }
  491. remote := logEntry.Host
  492. if *useXForwardedAsSource && logEntry.ForwardedFor != "" {
  493. remote = logEntry.ForwardedFor
  494. }
  495. // only use the first host in case there are multiple hosts in the log
  496. if cidx := strings.Index(remote, ","); cidx >= 0 {
  497. remote = remote[0:cidx]
  498. }
  499. // extract the virtual host
  500. var virtualHost string
  501. vhost := logEntry.VirtualHost
  502. if vhost != "" {
  503. vhostAndPort := strings.Split(logEntry.VirtualHost, ":")
  504. virtualHost = vhostAndPort[0]
  505. } else {
  506. if config.HostName != "" {
  507. vhost = config.HostName
  508. } else {
  509. vhost = "[not available]"
  510. }
  511. }
  512. request := data.Request{
  513. IpSrc: remote,
  514. IpDst: "127.0.0.1",
  515. PortSrc: 0,
  516. PortDst: 0,
  517. TcpSeq: 0,
  518. CreatedAt: logEntry.Time.UnixNano(),
  519. Url: logEntry.RequestURI,
  520. Method: logEntry.Method,
  521. Host: virtualHost,
  522. Protocol: logEntry.Protocol,
  523. Origin: remote,
  524. Source: remote,
  525. Referer: logEntry.Referer,
  526. XForwardedFor: logEntry.ForwardedFor,
  527. UserAgent: logEntry.UserAgent,
  528. }
  529. if config.Trace {
  530. log.Printf("[%s] %s\n", request.Source, request.Url)
  531. }
  532. count++
  533. publishRequest(config.NatsQueue, &request)
  534. }
  535. }
  536. func liveCapture() {
  537. ipPriv = ip.NewIP()
  538. // PCAP setup
  539. //
  540. handle, err := pcap.OpenLive(config.Interface, int32(config.SnapshotLen), config.Promiscuous, timeout)
  541. if err != nil {
  542. log.Fatal(err)
  543. }
  544. defer handle.Close()
  545. err = handle.SetBPFFilter(config.Filter)
  546. if err != nil {
  547. log.Fatal(err)
  548. }
  549. packetSource := gopacket.NewPacketSource(handle, handle.LinkType())
  550. for packet := range packetSource.Packets() {
  551. go processPacket(packet)
  552. }
  553. }
  554. func writeLogToWatch(r *data.Request) {
  555. h := map[string]string{}
  556. if r.AcceptEncoding != "" {
  557. h["Accept-Encoding"] = r.AcceptEncoding
  558. }
  559. if r.Accept != "" {
  560. h["Accept"] = r.Accept
  561. }
  562. if r.AcceptLanguage != "" {
  563. h["Accept-Language"] = r.AcceptLanguage
  564. }
  565. if r.Cookie != "" {
  566. h["Cookie"] = r.Cookie
  567. }
  568. if r.Host != "" {
  569. h["Host"] = r.Host
  570. }
  571. if r.Referer != "" {
  572. h["Referer"] = r.Referer
  573. }
  574. if r.UserAgent != "" {
  575. h["User-Agent"] = r.UserAgent
  576. }
  577. if r.Via != "" {
  578. h["Via"] = r.Via
  579. }
  580. if r.XForwardedFor != "" {
  581. h["X-Forwarded-For"] = r.XForwardedFor
  582. }
  583. if r.XRequestedWith != "" {
  584. h["X-Requested-With"] = r.XRequestedWith
  585. }
  586. data := map[string]interface{}{
  587. "request": map[string]interface{}{
  588. "time": time.Unix(0, r.CreatedAt),
  589. "address": r.Source,
  590. "protocol": r.Protocol,
  591. "scheme": "https",
  592. "method": r.Method,
  593. "url": r.Url,
  594. "headers": h,
  595. },
  596. "response": map[string]interface{}{"status": "200"},
  597. }
  598. jdata, err := json.Marshal(data)
  599. client := &http.Client{}
  600. fmt.Println(string(jdata))
  601. buf := bytes.NewBuffer(jdata)
  602. req, err := http.NewRequest("POST", "https://log.access.watch/1.1/log", buf)
  603. req.Header.Add("Api-Key", config.AccessWatchKey)
  604. req.Header.Add("Accept", "application/json")
  605. req.Header.Add("Content-Type", "application/json")
  606. resp, err := client.Do(req)
  607. if err != nil {
  608. log.Println(err)
  609. }
  610. resp.Body.Close()
  611. }
  612. func publishRequest(queue string, request *data.Request) {
  613. if config.AccessWatchKey != "" {
  614. writeLogToWatch(request)
  615. return
  616. }
  617. if !natsIsAvailable {
  618. return
  619. }
  620. if err := natsEC.Publish(config.NatsQueue, request); err != nil {
  621. natsErrorChan <- err
  622. if err == nats.ErrConnectionClosed {
  623. natsIsAvailable = false
  624. }
  625. }
  626. }
  627. // processPacket receives a raw packet from pcap, builds a Request item from it and sends it to the queue
  628. func processPacket(packet gopacket.Packet) {
  629. hasIPv4 := false
  630. var ipSrc, ipDst string
  631. // IPv4
  632. if ipLayer := packet.Layer(layers.LayerTypeIPv4); ipLayer != nil {
  633. ip := ipLayer.(*layers.IPv4)
  634. ipSrc = ip.SrcIP.String()
  635. ipDst = ip.DstIP.String()
  636. hasIPv4 = true
  637. }
  638. // IPv6
  639. if !hasIPv4 {
  640. if ipLayer := packet.Layer(layers.LayerTypeIPv6); ipLayer != nil {
  641. ip := ipLayer.(*layers.IPv6)
  642. ipSrc = ip.SrcIP.String()
  643. ipDst = ip.DstIP.String()
  644. }
  645. }
  646. // TCP
  647. tcpLayer := packet.Layer(layers.LayerTypeTCP)
  648. if tcpLayer == nil {
  649. return
  650. }
  651. tcp, _ := tcpLayer.(*layers.TCP)
  652. portSrc := tcp.SrcPort
  653. portDst := tcp.DstPort
  654. sequence := tcp.Seq
  655. applicationLayer := packet.ApplicationLayer()
  656. if applicationLayer == nil {
  657. return
  658. }
  659. count++
  660. if len(applicationLayer.Payload()) < 50 {
  661. log.Println("application layer too small!")
  662. return
  663. }
  664. request := data.Request{
  665. IpSrc: ipSrc,
  666. IpDst: ipDst,
  667. PortSrc: uint32(portSrc),
  668. PortDst: uint32(portDst),
  669. TcpSeq: uint32(sequence),
  670. CreatedAt: packet.Metadata().CaptureInfo.Timestamp.UnixNano(),
  671. }
  672. switch config.Protocol {
  673. case "http":
  674. err := processHTTP(&request, applicationLayer.Payload())
  675. if err != nil {
  676. log.Println(err)
  677. return
  678. }
  679. case "ajp13":
  680. err := processAJP13(&request, applicationLayer.Payload())
  681. if err != nil {
  682. log.Println(err)
  683. return
  684. }
  685. }
  686. if config.UseXForwardedAsSource && request.XForwardedFor != "" {
  687. if strings.Contains(request.XForwardedFor, ",") {
  688. ips := strings.Split(request.XForwardedFor, ",")
  689. for i := len(ips) - 1; i >= 0; i-- {
  690. ipRaw := strings.TrimSpace(ips[i])
  691. ipAddr := net.ParseIP(ipRaw)
  692. if ipAddr != nil && !ipPriv.IsPrivate(ipAddr) {
  693. request.Source = ipRaw
  694. break
  695. }
  696. }
  697. } else {
  698. ipAddr := net.ParseIP(strings.TrimSpace(request.XForwardedFor))
  699. if !ipPriv.IsPrivate(ipAddr) {
  700. request.Source = request.XForwardedFor
  701. }
  702. }
  703. }
  704. if request.Source == request.IpSrc && request.XRealIP != "" {
  705. request.Source = request.XRealIP
  706. }
  707. if config.Trace {
  708. log.Printf("[%s] %s\n", request.Source, request.Url)
  709. }
  710. publishRequest(config.NatsQueue, &request)
  711. }
  712. func processAJP13(request *data.Request, appData []byte) error {
  713. a, err := ajp13.Parse(appData)
  714. if err != nil {
  715. return fmt.Errorf("Failed to parse AJP13 request: %s", err)
  716. }
  717. request.Url = a.URI
  718. request.Method = a.Method()
  719. request.Host = a.Server
  720. request.Protocol = a.Version
  721. request.Origin = a.RemoteAddr.String()
  722. request.Source = a.RemoteAddr.String()
  723. if v, ok := a.Header("Referer"); ok {
  724. request.Referer = v
  725. }
  726. if v, ok := a.Header("Connection"); ok {
  727. request.Connection = v
  728. }
  729. if v, ok := a.Header("X-Forwarded-For"); ok {
  730. request.XForwardedFor = v
  731. }
  732. if v, ok := a.Header("X-Real-IP"); ok {
  733. request.XRealIP = v
  734. }
  735. if v, ok := a.Header("X-Requested-With"); ok {
  736. request.XRequestedWith = v
  737. }
  738. if v, ok := a.Header("Accept-Encoding"); ok {
  739. request.AcceptEncoding = v
  740. }
  741. if v, ok := a.Header("Accept-Language"); ok {
  742. request.AcceptLanguage = v
  743. }
  744. if v, ok := a.Header("User-Agent"); ok {
  745. request.UserAgent = v
  746. }
  747. if v, ok := a.Header("Accept"); ok {
  748. request.Accept = v
  749. }
  750. if v, ok := a.Header("Cookie"); ok {
  751. request.Cookie = v
  752. }
  753. if v, ok := a.Header("X-Forwarded-Host"); ok {
  754. if v != request.Host {
  755. request.Host = v
  756. }
  757. }
  758. return nil
  759. }
  760. func processHTTP(request *data.Request, appData []byte) error {
  761. reader := bufio.NewReader(strings.NewReader(string(appData)))
  762. req, err := http.ReadRequest(reader)
  763. if err != nil {
  764. return fmt.Errorf("Failed to parse HTTP header: %s", err)
  765. }
  766. request.Url = req.URL.String()
  767. request.Method = req.Method
  768. request.Referer = req.Referer()
  769. request.Host = req.Host
  770. request.Protocol = req.Proto
  771. request.Origin = request.Host
  772. if _, ok := req.Header["Connection"]; ok {
  773. request.Connection = req.Header["Connection"][0]
  774. }
  775. if _, ok := req.Header["X-Forwarded-For"]; ok {
  776. request.XForwardedFor = req.Header["X-Forwarded-For"][0]
  777. }
  778. // CloudFlare: override X-Forwarded for since it is tainted by cloudflare
  779. if _, ok := req.Header["True-Client-Ip"]; ok {
  780. request.XForwardedFor = req.Header["True-Client-Ip"][0]
  781. }
  782. if _, ok := req.Header["X-Real-Ip"]; ok {
  783. request.XRealIP = req.Header["X-Real-Ip"][0]
  784. }
  785. if _, ok := req.Header["X-Requested-With"]; ok {
  786. request.XRequestedWith = req.Header["X-Requested-With"][0]
  787. }
  788. if _, ok := req.Header["Accept-Encoding"]; ok {
  789. request.AcceptEncoding = req.Header["Accept-Encoding"][0]
  790. }
  791. if _, ok := req.Header["Accept-Language"]; ok {
  792. request.AcceptLanguage = req.Header["Accept-Language"][0]
  793. }
  794. if _, ok := req.Header["User-Agent"]; ok {
  795. request.UserAgent = req.Header["User-Agent"][0]
  796. }
  797. if _, ok := req.Header["Accept"]; ok {
  798. request.Accept = req.Header["Accept"][0]
  799. }
  800. if _, ok := req.Header["Cookie"]; ok {
  801. request.Cookie = req.Header["Cookie"][0]
  802. }
  803. request.Source = request.IpSrc
  804. return nil
  805. }
  806. // replayFile takes a file containing a list of requests (SourceIP Url) and queues the requests
  807. // e.g.
  808. // 157.55.39.229 /gross-gerau/12012260-beate-anstatt
  809. // 103.232.100.98 /weinsheim-eifel/13729444-plus-warenhandelsges-mbh
  810. func replayFile() {
  811. var req data.Request
  812. var startTs time.Time
  813. var endTs time.Time
  814. rand.Seed(time.Now().UnixNano())
  815. for {
  816. fh, err := os.Open(config.RequestsFile)
  817. if err != nil {
  818. log.Fatalf("Failed to open request file '%s': %s", config.RequestsFile, err)
  819. }
  820. c := csv.NewReader(fh)
  821. c.Comma = ' '
  822. for {
  823. if config.SleepFor.Duration > time.Nanosecond {
  824. startTs = time.Now()
  825. }
  826. r, err := c.Read()
  827. if err == io.EOF {
  828. break
  829. }
  830. if err != nil {
  831. log.Println(err)
  832. continue
  833. }
  834. req.IpSrc = r[0]
  835. req.Source = r[0]
  836. req.Url = r[1]
  837. req.UserAgent = "Munch/1.0"
  838. req.Host = "demo.scraperwall.com"
  839. req.CreatedAt = time.Now().UnixNano()
  840. publishRequest(config.NatsQueue, &req)
  841. if strings.Index(r[1], ".") < 0 {
  842. hash := sha1.New()
  843. io.WriteString(hash, r[0])
  844. fp := data.Fingerprint{
  845. ClientID: "scw",
  846. Fingerprint: fmt.Sprintf("%x", hash.Sum(nil)),
  847. Remote: r[0],
  848. Url: r[1],
  849. Source: r[0],
  850. CreatedAt: time.Now(),
  851. }
  852. if strings.HasPrefix(r[0], "50.31.") {
  853. fp.Fingerprint = "a1f2c2ee560ce6580d66d451a9c8dfbf"
  854. natsJSONEC.Publish("fingerprints_scw", fp)
  855. } else if rand.Intn(10) < 5 {
  856. natsJSONEC.Publish("fingerprints_scw", fp)
  857. }
  858. }
  859. count++
  860. if config.SleepFor.Duration >= time.Nanosecond {
  861. endTs = time.Now()
  862. if endTs.Before(startTs.Add(config.SleepFor.Duration)) {
  863. time.Sleep(config.SleepFor.Duration - endTs.Sub(startTs))
  864. }
  865. }
  866. }
  867. }
  868. }
  869. func loadConfig() {
  870. // initialize with values from the command line / environment
  871. config.Live = *doLiveCapture
  872. config.Interface = *iface
  873. config.SnapshotLen = *snapshotLen
  874. config.Filter = *filter
  875. config.Promiscuous = *promiscuous
  876. config.NatsURL = *natsURL
  877. config.NatsQueue = *natsQueue
  878. config.NatsUser = *natsUser
  879. config.NatsPassword = *natsPassword
  880. config.NatsCA = *natsCA
  881. config.SleepFor.Duration = *sleepFor
  882. config.RequestsFile = *requestsFile
  883. config.UseXForwardedAsSource = *useXForwardedAsSource
  884. config.Protocol = *protocol
  885. config.ApacheLog = *apacheLog
  886. config.ApacheReplay = *apacheReplay
  887. config.NginxLog = *nginxLog
  888. config.NginxLogFormat = *nginxFormat
  889. config.HostName = *hostName
  890. config.Quiet = *beQuiet
  891. config.Trace = *trace
  892. config.AccessWatchKey = *accessWatchKey
  893. if *configFile == "" {
  894. return
  895. }
  896. _, err := os.Stat(*configFile)
  897. if err != nil {
  898. log.Printf("%s: %s\n", *configFile, err)
  899. return
  900. }
  901. if _, err = toml.DecodeFile(*configFile, &config); err != nil {
  902. log.Printf("%s: %s\n", *configFile, err)
  903. }
  904. if !config.Quiet {
  905. config.print()
  906. }
  907. }
  908. // version outputs build information
  909. func version() {
  910. fmt.Printf("munchclient %s, built on %s\n", Version, BuildDate)
  911. }