apache.go 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219
  1. package main
  2. import (
  3. "bufio"
  4. "fmt"
  5. "log"
  6. "net"
  7. "os"
  8. "strings"
  9. "time"
  10. "git.scraperwall.com/scw/data"
  11. "github.com/Songmu/axslogparser"
  12. "github.com/hpcloud/tail"
  13. )
  14. func apacheLogReplay(logfile string) {
  15. file, err := os.Open(logfile)
  16. if err != nil {
  17. log.Fatalf("%s: %s", logfile, err)
  18. }
  19. defer file.Close()
  20. scanner := bufio.NewScanner(file)
  21. var p axslogparser.Parser
  22. parserSet := false
  23. var tOffset time.Duration
  24. for scanner.Scan() {
  25. l := scanner.Text()
  26. if err := scanner.Err(); err != nil {
  27. log.Fatal(err)
  28. }
  29. if !parserSet {
  30. p, _, err = axslogparser.GuessParser(l)
  31. if err != nil {
  32. log.Println(err)
  33. continue
  34. }
  35. parserSet = true
  36. }
  37. logEntry, err := p.Parse(l)
  38. if err != nil {
  39. log.Println(err)
  40. continue
  41. }
  42. if tOffset == 0 {
  43. tOffset = time.Now().Sub(logEntry.Time)
  44. }
  45. ts := logEntry.Time.Add(tOffset)
  46. if ts.After(time.Now()) {
  47. time.Sleep(ts.Sub(time.Now()))
  48. }
  49. remote := logEntry.Host
  50. if logEntry.VirtualHost != "" && config.UseVhostAsSource {
  51. commaIdx := strings.Index(logEntry.VirtualHost, ",")
  52. if commaIdx > 0 {
  53. logEntry.VirtualHost = logEntry.VirtualHost[0:commaIdx]
  54. }
  55. vhIP := net.ParseIP(logEntry.VirtualHost)
  56. if vhIP != nil {
  57. remote = logEntry.VirtualHost
  58. }
  59. }
  60. if *useXForwardedAsSource && logEntry.ForwardedFor != "" {
  61. remote = logEntry.ForwardedFor
  62. }
  63. // only use the first host in case there are multiple hosts in the log
  64. if cidx := strings.Index(remote, ","); cidx >= 0 {
  65. remote = remote[0:cidx]
  66. }
  67. // extract the virtual host
  68. var virtualHost string
  69. vhost := logEntry.VirtualHost
  70. if vhost != "" {
  71. vhostAndPort := strings.Split(logEntry.VirtualHost, ":")
  72. virtualHost = vhostAndPort[0]
  73. } else {
  74. if config.HostName != "" {
  75. vhost = config.HostName
  76. } else {
  77. vhost = "[not available]"
  78. }
  79. }
  80. request := data.Request{
  81. IpSrc: remote,
  82. IpDst: "127.0.0.1",
  83. PortSrc: 0,
  84. PortDst: 0,
  85. TcpSeq: 0,
  86. CreatedAt: (logEntry.Time.Add(tOffset)).UnixNano(),
  87. Url: logEntry.RequestURI,
  88. Method: logEntry.Method,
  89. Host: virtualHost,
  90. Protocol: logEntry.Protocol,
  91. Origin: remote,
  92. Source: remote,
  93. Referer: logEntry.Referer,
  94. XForwardedFor: logEntry.ForwardedFor,
  95. UserAgent: logEntry.UserAgent,
  96. }
  97. if config.Trace {
  98. log.Printf("[%s] %s\n", request.Source, request.Url)
  99. }
  100. count++
  101. publishRequest(config.NatsQueue, &request)
  102. }
  103. }
  104. func apacheLogCapture(logfile string) {
  105. if _, err := os.Stat(logfile); err != nil {
  106. log.Fatalf("%s: %s", logfile, err)
  107. }
  108. t, err := tail.TailFile(logfile, tail.Config{
  109. Follow: true, // follow the file
  110. ReOpen: true, // reopen log file when it gets closed/rotated
  111. Logger: tail.DiscardingLogger, // don't log anything
  112. Location: &tail.SeekInfo{Offset: 0, Whence: 2}, // start at the end of the file
  113. })
  114. if err != nil {
  115. log.Fatalf("%s: %s", logfile, err)
  116. }
  117. var p axslogparser.Parser
  118. parserSet := false
  119. for line := range t.Lines {
  120. l := line.Text
  121. if !parserSet {
  122. p, _, err = axslogparser.GuessParser(l)
  123. if err != nil {
  124. log.Println(err)
  125. continue
  126. }
  127. parserSet = true
  128. }
  129. logEntry, err := p.Parse(l)
  130. if err != nil {
  131. log.Println(err)
  132. continue
  133. }
  134. remote := logEntry.Host
  135. if config.UseXForwardedAsSource && logEntry.ForwardedFor != "" {
  136. remote = logEntry.ForwardedFor
  137. }
  138. // only use the first host in case there are multiple hosts in the log
  139. if cidx := strings.Index(remote, ","); cidx >= 0 {
  140. remote = remote[0:cidx]
  141. }
  142. // extract the virtual host
  143. var virtualHost string
  144. vhost := logEntry.VirtualHost
  145. if vhost != "" {
  146. vhostAndPort := strings.Split(logEntry.VirtualHost, ":")
  147. virtualHost = vhostAndPort[0]
  148. if config.UseVhostAsSource {
  149. virtualHost = virtualHost[0:strings.Index(virtualHost, ",")]
  150. vhIP := net.ParseIP(virtualHost)
  151. if vhIP != nil {
  152. remote = virtualHost
  153. }
  154. }
  155. } else {
  156. if config.HostName != "" {
  157. vhost = config.HostName
  158. } else {
  159. vhost = "[not available]"
  160. }
  161. }
  162. request := data.Request{
  163. IpSrc: remote,
  164. IpDst: "127.0.0.1",
  165. PortSrc: 0,
  166. PortDst: 0,
  167. TcpSeq: 0,
  168. CreatedAt: logEntry.Time.UnixNano(),
  169. Url: logEntry.RequestURI,
  170. Method: logEntry.Method,
  171. Host: virtualHost,
  172. Protocol: logEntry.Protocol,
  173. Origin: remote,
  174. Source: remote,
  175. Referer: logEntry.Referer,
  176. XForwardedFor: logEntry.ForwardedFor,
  177. UserAgent: logEntry.UserAgent,
  178. }
  179. if config.Trace {
  180. log.Printf("[%s] %s\n", request.Source, request.Url)
  181. }
  182. count++
  183. publishRequest(config.NatsQueue, &request)
  184. }
  185. }