main.go 26 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106
  1. package main
  2. import (
  3. "bufio"
  4. "bytes"
  5. "crypto/sha1"
  6. "encoding/csv"
  7. "encoding/json"
  8. "flag"
  9. "fmt"
  10. "io"
  11. "log"
  12. "math/rand"
  13. "net"
  14. "net/http"
  15. "os"
  16. "regexp"
  17. "strings"
  18. "time"
  19. "github.com/BurntSushi/toml"
  20. "github.com/Songmu/axslogparser"
  21. "github.com/google/gopacket"
  22. "github.com/google/gopacket/layers"
  23. "github.com/google/gopacket/pcap"
  24. "github.com/hpcloud/tail"
  25. "github.com/kr/pretty"
  26. "github.com/nats-io/nats"
  27. "github.com/nats-io/nats/encoders/protobuf"
  28. "github.com/satyrius/gonx"
  29. "git.scraperwall.com/scw/ajp13"
  30. "git.scraperwall.com/scw/data"
  31. "git.scraperwall.com/scw/ip"
  32. )
  33. var (
  34. doLiveCapture = flag.Bool("live", false, "Capture data in real time from a given interface")
  35. iface = flag.String("interface", "eth0", "Interface to get packets from")
  36. snapshotLen = flag.Int("snapshot-len", 8192, "Snapshot Length in Bytes")
  37. filter = flag.String("filter", "tcp", "PCAP filter expression")
  38. promiscuous = flag.Bool("promiscuous", false, "Switch interface into promiscuous mode?")
  39. natsURL = flag.String("nats-url", "nats://127.0.0.1:4222", "The URL of the NATS server")
  40. natsUser = flag.String("nats-user", "", "The user for NATS authentication")
  41. natsPassword = flag.String("nats-password", "", "The password for NATS authentication")
  42. natsQueue = flag.String("nats-queue", "requests", "The NATS queue name")
  43. natsCA = flag.String("nats-ca", "", "CA chain for NATS TLS")
  44. sleepFor = flag.Duration("sleep", 0, "Sleep this long between sending data (only when replaying a file)")
  45. requestsFile = flag.String("requests", "", "CSV file containing requests (IP and URL)")
  46. protocol = flag.String("protocol", "http", "which protocol to parse: http or ajp13")
  47. useXForwardedAsSource = flag.Bool("use-x-forwarded", false, "Use the IP address in X-Forwarded-For as source")
  48. trace = flag.Bool("trace", false, "Trace the packet capturing")
  49. apacheLog = flag.String("apache-log", "", "Parse an Apache Log file")
  50. apacheReplay = flag.String("apache-replay", "", "Apache log file to replay into the system")
  51. nginxLog = flag.String("nginx-log", "", "Nginx log file to tail")
  52. nginxFormat = flag.String("nginx-format", "", "The nginx log file format")
  53. hostName = flag.String("hostname", "", "Override the captured hostname with this one")
  54. accessWatchKey = flag.String("access-watch-key", "", "access.watch API key")
  55. configFile = flag.String("config", "", "The location of the TOML config file")
  56. beQuiet = flag.Bool("quiet", true, "Be quiet")
  57. doVersion = flag.Bool("version", false, "Show version information")
  58. natsEC *nats.EncodedConn
  59. natsJSONEC *nats.EncodedConn
  60. natsErrorChan chan error
  61. natsIsAvailable bool
  62. count uint64
  63. timeout = -1 * time.Second
  64. ipPriv *ip.IP
  65. config Config
  66. // Version contains the program Version, e.g. 1.0.1
  67. Version string
  68. // BuildDate contains the date and time at which the program was compiled
  69. BuildDate string
  70. )
  71. // Config contains the program configuration
  72. type Config struct {
  73. Live bool
  74. Interface string
  75. SnapshotLen int
  76. Filter string
  77. Promiscuous bool
  78. NatsURL string
  79. NatsQueue string
  80. NatsUser string
  81. NatsPassword string
  82. NatsCA string
  83. SleepFor duration
  84. RequestsFile string
  85. UseXForwardedAsSource bool
  86. Quiet bool
  87. Protocol string
  88. Trace bool
  89. ApacheLog string
  90. ApacheReplay string
  91. NginxLog string
  92. NginxLogFormat string
  93. HostName string
  94. AccessWatchKey string
  95. }
  96. type duration struct {
  97. time.Duration
  98. }
  99. func (d *duration) UnmarshalText(text []byte) error {
  100. var err error
  101. d.Duration, err = time.ParseDuration(string(text))
  102. return err
  103. }
  104. func (c Config) print() {
  105. fmt.Printf("Live: %t\n", c.Live)
  106. fmt.Printf("Interface: %s\n", c.Interface)
  107. fmt.Printf("SnapshotLen: %d\n", c.SnapshotLen)
  108. fmt.Printf("Filter: %s\n", c.Filter)
  109. fmt.Printf("Promiscuous: %t\n", c.Promiscuous)
  110. fmt.Printf("NatsURL: %s\n", c.NatsURL)
  111. fmt.Printf("NatsQueue: %s\n", c.NatsQueue)
  112. fmt.Printf("NatsUser: %s\n", c.NatsUser)
  113. fmt.Printf("NatsPassword: %s\n", c.NatsPassword)
  114. fmt.Printf("NatsCA: %s\n", c.NatsCA)
  115. fmt.Printf("SleepFor: %s\n", c.SleepFor.String())
  116. fmt.Printf("RequestsFile: %s\n", c.RequestsFile)
  117. fmt.Printf("Apache Log: %s\n", c.ApacheLog)
  118. fmt.Printf("Apache Replay: %s\n", c.ApacheReplay)
  119. fmt.Printf("Nginx Log: %s\n", c.NginxLog)
  120. fmt.Printf("Nginx Log Format: %s\n", c.NginxLogFormat)
  121. fmt.Printf("HostName: %s\n", c.HostName)
  122. fmt.Printf("AccessWatchKey: %s\n", c.AccessWatchKey)
  123. fmt.Printf("UseXForwardedAsSource: %t\n", c.UseXForwardedAsSource)
  124. fmt.Printf("Protocol: %s\n", c.Protocol)
  125. fmt.Printf("Quiet: %t\n", c.Quiet)
  126. fmt.Printf("Trace: %t\n", c.Trace)
  127. }
  128. func init() {
  129. flag.Parse()
  130. nats.RegisterEncoder(protobuf.PROTOBUF_ENCODER, &protobuf.ProtobufEncoder{})
  131. }
  132. func main() {
  133. if *doVersion {
  134. version()
  135. os.Exit(0)
  136. }
  137. loadConfig()
  138. // Output how many requests per second were sent
  139. if !config.Quiet {
  140. go func(c *uint64) {
  141. for {
  142. fmt.Printf("%d requests per second\n", *c)
  143. *c = 0
  144. time.Sleep(time.Second)
  145. }
  146. }(&count)
  147. }
  148. // NATS
  149. //
  150. if config.NatsURL == "" && config.AccessWatchKey == "" {
  151. log.Fatal("No NATS URL specified (-nats-url)!")
  152. }
  153. natsIsAvailable = false
  154. natsErrorChan = make(chan error, 1)
  155. err := connectToNATS()
  156. if err != nil && config.AccessWatchKey == "" {
  157. log.Fatal(err)
  158. }
  159. go natsWatchdog(natsErrorChan)
  160. // What should I do?
  161. if config.RequestsFile != "" {
  162. replayFile()
  163. } else if config.ApacheReplay != "" {
  164. apacheLogReplay(config.ApacheReplay)
  165. } else if config.ApacheLog != "" {
  166. apacheLogCapture(config.ApacheLog)
  167. } else if config.Live {
  168. fmt.Printf("live capture (%s, %s) to %s\n", config.Interface, config.Filter, config.NatsURL)
  169. liveCapture()
  170. } else if config.NginxLog != "" && config.NginxLogFormat != "" {
  171. nginxLogCapture(config.NginxLog, config.NginxLogFormat)
  172. }
  173. }
  174. func natsWatchdog(closedChan chan error) {
  175. var lastError error
  176. for err := range closedChan {
  177. if lastError != err {
  178. lastError = err
  179. log.Println(err)
  180. }
  181. if err != nats.ErrConnectionClosed {
  182. continue
  183. }
  184. RECONNECT:
  185. for {
  186. log.Printf("Reconnecting to NATS at %s\n", *natsURL)
  187. err := connectToNATS()
  188. if err == nil {
  189. break RECONNECT
  190. }
  191. time.Sleep(1 * time.Second)
  192. }
  193. }
  194. }
  195. func connectToNATS() error {
  196. var natsConn *nats.Conn
  197. var err error
  198. if config.NatsUser != "" && config.NatsPassword != "" && config.NatsCA != "" {
  199. natsConn, err = nats.Connect(config.NatsURL, nats.UserInfo(config.NatsUser, config.NatsPassword), nats.RootCAs(config.NatsCA))
  200. } else {
  201. if config.NatsPassword != "" && config.NatsUser != "" {
  202. natsConn, err = nats.Connect(config.NatsURL, nats.UserInfo(config.NatsUser, config.NatsPassword))
  203. } else {
  204. natsConn, err = nats.Connect(config.NatsURL)
  205. }
  206. }
  207. if err != nil {
  208. return err
  209. }
  210. natsEC, err = nats.NewEncodedConn(natsConn, protobuf.PROTOBUF_ENCODER)
  211. if err != nil {
  212. return fmt.Errorf("Encoded Connection: %v", err)
  213. }
  214. natsJSONEC, err = nats.NewEncodedConn(natsConn, nats.JSON_ENCODER)
  215. if err != nil {
  216. return fmt.Errorf("Encoded Connection: %v", err)
  217. }
  218. natsIsAvailable = true
  219. return nil
  220. }
  221. func nginxLogCapture(logfile, format string) {
  222. if _, err := os.Stat(logfile); err != nil {
  223. log.Fatalf("%s: %s", logfile, err)
  224. }
  225. t, err := tail.TailFile(logfile, tail.Config{
  226. Follow: true, // follow the file
  227. ReOpen: true, // reopen log file when it gets closed/rotated
  228. Logger: tail.DiscardingLogger, // don't log anything
  229. Location: &tail.SeekInfo{Offset: 0, Whence: 2}, // start at the end of the file
  230. })
  231. if err != nil {
  232. log.Fatalf("%s: %s", logfile, err)
  233. }
  234. // `$remote_addr - $remote_user [$time_local] "$request" $status $body_bytes_sent "$http_referer" "$http_user_agent"`
  235. p := gonx.NewParser(format)
  236. reqRegexp := regexp.MustCompile(`^([A-Z]+)\s+(.+?)\s+(HTTP/\d+\.\d+)$`)
  237. for line := range t.Lines {
  238. var remote string
  239. var err error
  240. l := line.Text
  241. logEntry, err := p.ParseString(l)
  242. if err != nil {
  243. log.Println(err)
  244. continue
  245. }
  246. if config.Trace {
  247. pretty.Println(logEntry)
  248. }
  249. remote, err = logEntry.Field("remote_addr")
  250. if err != nil {
  251. log.Println(err)
  252. continue
  253. }
  254. xff, err := logEntry.Field("http_x_forwarded_for")
  255. if err != nil && xff != "" {
  256. if config.Trace {
  257. log.Printf("Using XFF: %s\n", xff)
  258. }
  259. remote = xff
  260. }
  261. if remote == "" {
  262. log.Println("remote is empty: ignoring request.")
  263. continue
  264. }
  265. // only use the first host in case there are multiple hosts in the log
  266. if cidx := strings.Index(remote, ","); cidx >= 0 {
  267. remote = remote[0:cidx]
  268. }
  269. timestampStr, err := logEntry.Field("time_local")
  270. if err != nil {
  271. log.Println(err)
  272. continue
  273. }
  274. timeStamp, err := time.Parse("02/Jan/2006:15:04:05 -0700", timestampStr)
  275. if err != nil {
  276. log.Println(err)
  277. continue
  278. }
  279. httpRequest, err := logEntry.Field("request")
  280. if err != nil {
  281. log.Println(err)
  282. continue
  283. }
  284. reqData := reqRegexp.FindStringSubmatch(httpRequest)
  285. if len(reqData) < 4 {
  286. log.Printf("reqData is too short: %d instead of 4\n", len(reqData))
  287. continue
  288. }
  289. host := config.HostName
  290. if host == "" {
  291. host = "[not available]"
  292. }
  293. request := data.Request{
  294. IpSrc: remote,
  295. Origin: remote,
  296. Source: remote,
  297. IpDst: "127.0.0.1",
  298. PortSrc: 0,
  299. PortDst: 0,
  300. TcpSeq: 0,
  301. CreatedAt: timeStamp.Unix(),
  302. Url: reqData[2],
  303. Method: reqData[1],
  304. Host: host,
  305. Protocol: reqData[3],
  306. }
  307. request.Referer, _ = logEntry.Field("http_referer")
  308. request.UserAgent, _ = logEntry.Field("http_user_agent")
  309. if config.Trace {
  310. log.Printf("[%s] %s\n", request.Source, request.Url)
  311. }
  312. count++
  313. publishRequest(config.NatsQueue, &request)
  314. }
  315. }
  316. func apacheLogReplay(logfile string) {
  317. file, err := os.Open(logfile)
  318. if err != nil {
  319. log.Fatalf("%s: %s", logfile, err)
  320. }
  321. defer file.Close()
  322. scanner := bufio.NewScanner(file)
  323. var p axslogparser.Parser
  324. parserSet := false
  325. var tOffset time.Duration
  326. for scanner.Scan() {
  327. l := scanner.Text()
  328. if err := scanner.Err(); err != nil {
  329. log.Fatal(err)
  330. }
  331. if !parserSet {
  332. p, _, err = axslogparser.GuessParser(l)
  333. if err != nil {
  334. log.Println(err)
  335. continue
  336. }
  337. parserSet = true
  338. }
  339. logEntry, err := p.Parse(l)
  340. if err != nil {
  341. log.Println(err)
  342. continue
  343. }
  344. if tOffset == 0 {
  345. tOffset = time.Now().Sub(logEntry.Time)
  346. }
  347. ts := logEntry.Time.Add(tOffset)
  348. if ts.After(time.Now()) {
  349. time.Sleep(ts.Sub(time.Now()))
  350. }
  351. // fmt.Println(l)
  352. remote := logEntry.Host
  353. if *useXForwardedAsSource && logEntry.ForwardedFor != "" {
  354. remote = logEntry.ForwardedFor
  355. }
  356. // only use the first host in case there are multiple hosts in the log
  357. if cidx := strings.Index(remote, ","); cidx >= 0 {
  358. remote = remote[0:cidx]
  359. }
  360. // extract the virtual host
  361. var virtualHost string
  362. vhost := logEntry.VirtualHost
  363. if vhost != "" {
  364. vhostAndPort := strings.Split(logEntry.VirtualHost, ":")
  365. virtualHost = vhostAndPort[0]
  366. } else {
  367. if config.HostName != "" {
  368. vhost = config.HostName
  369. } else {
  370. vhost = "[not available]"
  371. }
  372. }
  373. request := data.Request{
  374. IpSrc: remote,
  375. IpDst: "127.0.0.1",
  376. PortSrc: 0,
  377. PortDst: 0,
  378. TcpSeq: 0,
  379. CreatedAt: (logEntry.Time.Add(tOffset)).UnixNano(),
  380. Url: logEntry.RequestURI,
  381. Method: logEntry.Method,
  382. Host: virtualHost,
  383. Protocol: logEntry.Protocol,
  384. Origin: remote,
  385. Source: remote,
  386. Referer: logEntry.Referer,
  387. XForwardedFor: logEntry.ForwardedFor,
  388. UserAgent: logEntry.UserAgent,
  389. }
  390. if config.Trace {
  391. log.Printf("[%s] %s\n", request.Source, request.Url)
  392. }
  393. count++
  394. publishRequest(config.NatsQueue, &request)
  395. }
  396. }
  397. /*
  398. func apacheLogReplay(logfile string) {
  399. if _, err := os.Stat(logfile); err != nil {
  400. log.Fatalf("%s: %s", logfile, err)
  401. }
  402. file, err := os.Open(logfile)
  403. if err != nil {
  404. log.Fatalf("%s: %s", logfile, err)
  405. }
  406. defer file.Close()
  407. scanner := bufio.NewScanner(file)
  408. var p axslogparser.Parser
  409. parserSet := false
  410. var tOffset time.Duration
  411. for scanner.Scan() {
  412. l := scanner.Text()
  413. if err := scanner.Err(); err != nil {
  414. log.Fatal(err)
  415. }
  416. if !parserSet {
  417. p, _, err = axslogparser.GuessParser(l)
  418. if err != nil {
  419. log.Println(err)
  420. continue
  421. }
  422. parserSet = true
  423. }
  424. logEntry, err := p.Parse(l)
  425. if err != nil {
  426. log.Println(err)
  427. continue
  428. }
  429. remote := logEntry.Host
  430. if *useXForwardedAsSource && logEntry.ForwardedFor != "" {
  431. remote = logEntry.ForwardedFor
  432. }
  433. // only use the first host in case there are multiple hosts in the log
  434. if cidx := strings.Index(remote, ","); cidx >= 0 {
  435. remote = remote[0:cidx]
  436. }
  437. // extract the virtual host
  438. var virtualHost string
  439. vhost := logEntry.VirtualHost
  440. if vhost != "" {
  441. vhostAndPort := strings.Split(logEntry.VirtualHost, ":")
  442. virtualHost = vhostAndPort[0]
  443. } else {
  444. if config.HostName != "" {
  445. vhost = config.HostName
  446. } else {
  447. vhost = "[not available]"
  448. }
  449. }
  450. if tOffset == 0 {
  451. tOffset = time.Now().Sub(logEntry.Time)
  452. }
  453. request := data.Request{
  454. IpSrc: remote,
  455. IpDst: "127.0.0.1",
  456. PortSrc: 0,
  457. PortDst: 0,
  458. TcpSeq: 0,
  459. CreatedAt: (logEntry.Time.Add(tOffset)).UnixNano(),
  460. Url: logEntry.RequestURI,
  461. Method: logEntry.Method,
  462. Host: virtualHost,
  463. Protocol: logEntry.Protocol,
  464. Origin: remote,
  465. Source: remote,
  466. Referer: logEntry.Referer,
  467. XForwardedFor: logEntry.ForwardedFor,
  468. UserAgent: logEntry.UserAgent,
  469. }
  470. if config.Trace {
  471. log.Printf("[%s] %s\n", request.Source, request.Url)
  472. }
  473. count++
  474. publishRequest(config.NatsQueue, &request)
  475. }
  476. }
  477. */
  478. func apacheLogCapture(logfile string) {
  479. if _, err := os.Stat(logfile); err != nil {
  480. log.Fatalf("%s: %s", logfile, err)
  481. }
  482. t, err := tail.TailFile(logfile, tail.Config{
  483. Follow: true, // follow the file
  484. ReOpen: true, // reopen log file when it gets closed/rotated
  485. Logger: tail.DiscardingLogger, // don't log anything
  486. Location: &tail.SeekInfo{Offset: 0, Whence: 2}, // start at the end of the file
  487. })
  488. if err != nil {
  489. log.Fatalf("%s: %s", logfile, err)
  490. }
  491. var p axslogparser.Parser
  492. parserSet := false
  493. for line := range t.Lines {
  494. l := line.Text
  495. if !parserSet {
  496. p, _, err = axslogparser.GuessParser(l)
  497. if err != nil {
  498. log.Println(err)
  499. continue
  500. }
  501. parserSet = true
  502. }
  503. logEntry, err := p.Parse(l)
  504. if err != nil {
  505. log.Println(err)
  506. continue
  507. }
  508. remote := logEntry.Host
  509. if config.UseXForwardedAsSource && logEntry.ForwardedFor != "" {
  510. remote = logEntry.ForwardedFor
  511. }
  512. // only use the first host in case there are multiple hosts in the log
  513. if cidx := strings.Index(remote, ","); cidx >= 0 {
  514. remote = remote[0:cidx]
  515. }
  516. // extract the virtual host
  517. var virtualHost string
  518. vhost := logEntry.VirtualHost
  519. if vhost != "" {
  520. vhostAndPort := strings.Split(logEntry.VirtualHost, ":")
  521. virtualHost = vhostAndPort[0]
  522. } else {
  523. if config.HostName != "" {
  524. vhost = config.HostName
  525. } else {
  526. vhost = "[not available]"
  527. }
  528. }
  529. request := data.Request{
  530. IpSrc: remote,
  531. IpDst: "127.0.0.1",
  532. PortSrc: 0,
  533. PortDst: 0,
  534. TcpSeq: 0,
  535. CreatedAt: logEntry.Time.UnixNano(),
  536. Url: logEntry.RequestURI,
  537. Method: logEntry.Method,
  538. Host: virtualHost,
  539. Protocol: logEntry.Protocol,
  540. Origin: remote,
  541. Source: remote,
  542. Referer: logEntry.Referer,
  543. XForwardedFor: logEntry.ForwardedFor,
  544. UserAgent: logEntry.UserAgent,
  545. }
  546. if config.Trace {
  547. log.Printf("[%s] %s\n", request.Source, request.Url)
  548. }
  549. count++
  550. publishRequest(config.NatsQueue, &request)
  551. }
  552. }
  553. func liveCapture() {
  554. ipPriv = ip.NewIP()
  555. // PCAP setup
  556. //
  557. handle, err := pcap.OpenLive(config.Interface, int32(config.SnapshotLen), config.Promiscuous, timeout)
  558. if err != nil {
  559. log.Fatal(err)
  560. }
  561. defer handle.Close()
  562. err = handle.SetBPFFilter(config.Filter)
  563. if err != nil {
  564. log.Fatal(err)
  565. }
  566. packetSource := gopacket.NewPacketSource(handle, handle.LinkType())
  567. for packet := range packetSource.Packets() {
  568. go processPacket(packet)
  569. }
  570. }
  571. func writeLogToWatch(r *data.Request) {
  572. h := map[string]string{}
  573. if r.AcceptEncoding != "" {
  574. h["Accept-Encoding"] = r.AcceptEncoding
  575. }
  576. if r.Accept != "" {
  577. h["Accept"] = r.Accept
  578. }
  579. if r.AcceptLanguage != "" {
  580. h["Accept-Language"] = r.AcceptLanguage
  581. }
  582. if r.Cookie != "" {
  583. h["Cookie"] = r.Cookie
  584. }
  585. if r.Host != "" {
  586. h["Host"] = r.Host
  587. }
  588. if r.Referer != "" {
  589. h["Referer"] = r.Referer
  590. }
  591. if r.UserAgent != "" {
  592. h["User-Agent"] = r.UserAgent
  593. }
  594. if r.Via != "" {
  595. h["Via"] = r.Via
  596. }
  597. if r.XForwardedFor != "" {
  598. h["X-Forwarded-For"] = r.XForwardedFor
  599. }
  600. if r.XRequestedWith != "" {
  601. h["X-Requested-With"] = r.XRequestedWith
  602. }
  603. data := map[string]interface{}{
  604. "request": map[string]interface{}{
  605. "time": time.Unix(0, r.CreatedAt),
  606. "address": r.Source,
  607. "protocol": r.Protocol,
  608. "scheme": "https",
  609. "method": r.Method,
  610. "url": r.Url,
  611. "headers": h,
  612. },
  613. "response": map[string]interface{}{"status": "200"},
  614. }
  615. jdata, err := json.Marshal(data)
  616. client := &http.Client{}
  617. fmt.Println(string(jdata))
  618. buf := bytes.NewBuffer(jdata)
  619. req, err := http.NewRequest("POST", "https://log.access.watch/1.1/log", buf)
  620. req.Header.Add("Api-Key", config.AccessWatchKey)
  621. req.Header.Add("Accept", "application/json")
  622. req.Header.Add("Content-Type", "application/json")
  623. resp, err := client.Do(req)
  624. if err != nil {
  625. log.Println(err)
  626. }
  627. resp.Body.Close()
  628. }
  629. func publishRequest(queue string, request *data.Request) {
  630. if config.AccessWatchKey != "" {
  631. writeLogToWatch(request)
  632. return
  633. }
  634. if !natsIsAvailable {
  635. return
  636. }
  637. if err := natsEC.Publish(config.NatsQueue, request); err != nil {
  638. natsErrorChan <- err
  639. if err == nats.ErrConnectionClosed {
  640. natsIsAvailable = false
  641. }
  642. }
  643. }
  644. // processPacket receives a raw packet from pcap, builds a Request item from it and sends it to the queue
  645. func processPacket(packet gopacket.Packet) {
  646. hasIPv4 := false
  647. var ipSrc, ipDst string
  648. // IPv4
  649. if ipLayer := packet.Layer(layers.LayerTypeIPv4); ipLayer != nil {
  650. ip := ipLayer.(*layers.IPv4)
  651. ipSrc = ip.SrcIP.String()
  652. ipDst = ip.DstIP.String()
  653. hasIPv4 = true
  654. }
  655. // IPv6
  656. if !hasIPv4 {
  657. if ipLayer := packet.Layer(layers.LayerTypeIPv6); ipLayer != nil {
  658. ip := ipLayer.(*layers.IPv6)
  659. ipSrc = ip.SrcIP.String()
  660. ipDst = ip.DstIP.String()
  661. }
  662. }
  663. // TCP
  664. tcpLayer := packet.Layer(layers.LayerTypeTCP)
  665. if tcpLayer == nil {
  666. return
  667. }
  668. tcp, _ := tcpLayer.(*layers.TCP)
  669. portSrc := tcp.SrcPort
  670. portDst := tcp.DstPort
  671. sequence := tcp.Seq
  672. applicationLayer := packet.ApplicationLayer()
  673. if applicationLayer == nil {
  674. return
  675. }
  676. count++
  677. if len(applicationLayer.Payload()) < 50 {
  678. log.Println("application layer too small!")
  679. return
  680. }
  681. request := data.Request{
  682. IpSrc: ipSrc,
  683. IpDst: ipDst,
  684. PortSrc: uint32(portSrc),
  685. PortDst: uint32(portDst),
  686. TcpSeq: uint32(sequence),
  687. CreatedAt: packet.Metadata().CaptureInfo.Timestamp.UnixNano(),
  688. }
  689. switch config.Protocol {
  690. case "http":
  691. err := processHTTP(&request, applicationLayer.Payload())
  692. if err != nil {
  693. log.Println(err)
  694. return
  695. }
  696. case "ajp13":
  697. err := processAJP13(&request, applicationLayer.Payload())
  698. if err != nil {
  699. log.Println(err)
  700. return
  701. }
  702. }
  703. if config.UseXForwardedAsSource && request.XForwardedFor != "" {
  704. if strings.Contains(request.XForwardedFor, ",") {
  705. ips := strings.Split(request.XForwardedFor, ",")
  706. for i := len(ips) - 1; i >= 0; i-- {
  707. ipRaw := strings.TrimSpace(ips[i])
  708. ipAddr := net.ParseIP(ipRaw)
  709. if ipAddr != nil && !ipPriv.IsPrivate(ipAddr) {
  710. request.Source = ipRaw
  711. break
  712. }
  713. }
  714. } else {
  715. ipAddr := net.ParseIP(strings.TrimSpace(request.XForwardedFor))
  716. if !ipPriv.IsPrivate(ipAddr) {
  717. request.Source = request.XForwardedFor
  718. }
  719. }
  720. }
  721. if request.Source == request.IpSrc && request.XRealIP != "" {
  722. request.Source = request.XRealIP
  723. }
  724. if config.Trace {
  725. log.Printf("[%s] %s\n", request.Source, request.Url)
  726. }
  727. publishRequest(config.NatsQueue, &request)
  728. }
  729. func processAJP13(request *data.Request, appData []byte) error {
  730. a, err := ajp13.Parse(appData)
  731. if err != nil {
  732. return fmt.Errorf("Failed to parse AJP13 request: %s", err)
  733. }
  734. request.Url = a.URI
  735. request.Method = a.Method()
  736. request.Host = a.Server
  737. request.Protocol = a.Version
  738. request.Origin = a.RemoteAddr.String()
  739. request.Source = a.RemoteAddr.String()
  740. if v, ok := a.Header("Referer"); ok {
  741. request.Referer = v
  742. }
  743. if v, ok := a.Header("Connection"); ok {
  744. request.Connection = v
  745. }
  746. if v, ok := a.Header("X-Forwarded-For"); ok {
  747. request.XForwardedFor = v
  748. }
  749. if v, ok := a.Header("X-Real-IP"); ok {
  750. request.XRealIP = v
  751. }
  752. if v, ok := a.Header("X-Requested-With"); ok {
  753. request.XRequestedWith = v
  754. }
  755. if v, ok := a.Header("Accept-Encoding"); ok {
  756. request.AcceptEncoding = v
  757. }
  758. if v, ok := a.Header("Accept-Language"); ok {
  759. request.AcceptLanguage = v
  760. }
  761. if v, ok := a.Header("User-Agent"); ok {
  762. request.UserAgent = v
  763. }
  764. if v, ok := a.Header("Accept"); ok {
  765. request.Accept = v
  766. }
  767. if v, ok := a.Header("Cookie"); ok {
  768. request.Cookie = v
  769. }
  770. if v, ok := a.Header("X-Forwarded-Host"); ok {
  771. if v != request.Host {
  772. request.Host = v
  773. }
  774. }
  775. return nil
  776. }
  777. func processHTTP(request *data.Request, appData []byte) error {
  778. reader := bufio.NewReader(strings.NewReader(string(appData)))
  779. req, err := http.ReadRequest(reader)
  780. if err != nil {
  781. return fmt.Errorf("Failed to parse HTTP header: %s", err)
  782. }
  783. request.Url = req.URL.String()
  784. request.Method = req.Method
  785. request.Referer = req.Referer()
  786. request.Host = req.Host
  787. request.Protocol = req.Proto
  788. request.Origin = request.Host
  789. if _, ok := req.Header["Connection"]; ok {
  790. request.Connection = req.Header["Connection"][0]
  791. }
  792. if _, ok := req.Header["X-Forwarded-For"]; ok {
  793. request.XForwardedFor = req.Header["X-Forwarded-For"][0]
  794. }
  795. // CloudFlare: override X-Forwarded for since it is tainted by cloudflare
  796. if _, ok := req.Header["True-Client-Ip"]; ok {
  797. request.XForwardedFor = req.Header["True-Client-Ip"][0]
  798. }
  799. if _, ok := req.Header["X-Real-Ip"]; ok {
  800. request.XRealIP = req.Header["X-Real-Ip"][0]
  801. }
  802. if _, ok := req.Header["X-Requested-With"]; ok {
  803. request.XRequestedWith = req.Header["X-Requested-With"][0]
  804. }
  805. if _, ok := req.Header["Accept-Encoding"]; ok {
  806. request.AcceptEncoding = req.Header["Accept-Encoding"][0]
  807. }
  808. if _, ok := req.Header["Accept-Language"]; ok {
  809. request.AcceptLanguage = req.Header["Accept-Language"][0]
  810. }
  811. if _, ok := req.Header["User-Agent"]; ok {
  812. request.UserAgent = req.Header["User-Agent"][0]
  813. }
  814. if _, ok := req.Header["Accept"]; ok {
  815. request.Accept = req.Header["Accept"][0]
  816. }
  817. if _, ok := req.Header["Cookie"]; ok {
  818. request.Cookie = req.Header["Cookie"][0]
  819. }
  820. request.Source = request.IpSrc
  821. return nil
  822. }
  823. // replayFile takes a file containing a list of requests (SourceIP Url) and queues the requests
  824. // e.g.
  825. // 157.55.39.229 /gross-gerau/12012260-beate-anstatt
  826. // 103.232.100.98 /weinsheim-eifel/13729444-plus-warenhandelsges-mbh
  827. func replayFile() {
  828. var req data.Request
  829. var startTs time.Time
  830. var endTs time.Time
  831. rand.Seed(time.Now().UnixNano())
  832. for {
  833. fh, err := os.Open(config.RequestsFile)
  834. if err != nil {
  835. log.Fatalf("Failed to open request file '%s': %s", config.RequestsFile, err)
  836. }
  837. c := csv.NewReader(fh)
  838. c.Comma = ' '
  839. for {
  840. if config.SleepFor.Duration > time.Nanosecond {
  841. startTs = time.Now()
  842. }
  843. r, err := c.Read()
  844. if err == io.EOF {
  845. break
  846. }
  847. if err != nil {
  848. log.Println(err)
  849. continue
  850. }
  851. req.IpSrc = r[0]
  852. req.Source = r[0]
  853. req.Url = r[1]
  854. req.UserAgent = "Munch/1.0"
  855. req.Host = "demo.scraperwall.com"
  856. req.CreatedAt = time.Now().UnixNano()
  857. publishRequest(config.NatsQueue, &req)
  858. if strings.Index(r[1], ".") < 0 {
  859. hash := sha1.New()
  860. io.WriteString(hash, r[0])
  861. fp := data.Fingerprint{
  862. ClientID: "scw",
  863. Fingerprint: fmt.Sprintf("%x", hash.Sum(nil)),
  864. Remote: r[0],
  865. Url: r[1],
  866. Source: r[0],
  867. CreatedAt: time.Now(),
  868. }
  869. if strings.HasPrefix(r[0], "50.31.") {
  870. fp.Fingerprint = "a1f2c2ee560ce6580d66d451a9c8dfbf"
  871. natsJSONEC.Publish("fingerprints_scw", fp)
  872. } else if rand.Intn(10) < 5 {
  873. natsJSONEC.Publish("fingerprints_scw", fp)
  874. }
  875. }
  876. count++
  877. if config.SleepFor.Duration >= time.Nanosecond {
  878. endTs = time.Now()
  879. if endTs.Before(startTs.Add(config.SleepFor.Duration)) {
  880. time.Sleep(config.SleepFor.Duration - endTs.Sub(startTs))
  881. }
  882. }
  883. }
  884. }
  885. }
  886. func loadConfig() {
  887. // initialize with values from the command line / environment
  888. config.Live = *doLiveCapture
  889. config.Interface = *iface
  890. config.SnapshotLen = *snapshotLen
  891. config.Filter = *filter
  892. config.Promiscuous = *promiscuous
  893. config.NatsURL = *natsURL
  894. config.NatsQueue = *natsQueue
  895. config.NatsUser = *natsUser
  896. config.NatsPassword = *natsPassword
  897. config.NatsCA = *natsCA
  898. config.SleepFor.Duration = *sleepFor
  899. config.RequestsFile = *requestsFile
  900. config.UseXForwardedAsSource = *useXForwardedAsSource
  901. config.Protocol = *protocol
  902. config.ApacheLog = *apacheLog
  903. config.ApacheReplay = *apacheReplay
  904. config.NginxLog = *nginxLog
  905. config.NginxLogFormat = *nginxFormat
  906. config.HostName = *hostName
  907. config.Quiet = *beQuiet
  908. config.Trace = *trace
  909. config.AccessWatchKey = *accessWatchKey
  910. if *configFile == "" {
  911. return
  912. }
  913. _, err := os.Stat(*configFile)
  914. if err != nil {
  915. log.Printf("%s: %s\n", *configFile, err)
  916. return
  917. }
  918. if _, err = toml.DecodeFile(*configFile, &config); err != nil {
  919. log.Printf("%s: %s\n", *configFile, err)
  920. }
  921. if !config.Quiet {
  922. config.print()
  923. }
  924. }
  925. // version outputs build information...
  926. func version() {
  927. fmt.Printf("munchclient %s, built on %s\n", Version, BuildDate)
  928. }