nginx.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263
  1. package main
  2. import (
  3. "bufio"
  4. "log"
  5. "os"
  6. "regexp"
  7. "strings"
  8. "time"
  9. "git.scraperwall.com/scw/data"
  10. "github.com/hpcloud/tail"
  11. "github.com/kr/pretty"
  12. "github.com/satyrius/gonx"
  13. )
  14. func nginxLogCapture(logfile, format string) {
  15. if _, err := os.Stat(logfile); err != nil {
  16. log.Fatalf("%s: %s", logfile, err)
  17. }
  18. t, err := tail.TailFile(logfile, tail.Config{
  19. Follow: true, // follow the file
  20. ReOpen: true, // reopen log file when it gets closed/rotated
  21. Poll: config.TailPoll, // use file polling to detect a file rollover
  22. Logger: tail.DiscardingLogger, // don't log anything
  23. Location: &tail.SeekInfo{Offset: 0, Whence: 2}, // start at the end of the file
  24. })
  25. if err != nil {
  26. log.Fatalf("%s: %s", logfile, err)
  27. }
  28. // `$remote_addr - $remote_user [$time_local] "$request" $status $body_bytes_sent "$http_referer" "$http_user_agent"`
  29. p := gonx.NewParser(format)
  30. reqRegexp := regexp.MustCompile(`^([A-Z]+)\s+(.+?)\s+(HTTP/\d+\.\d+)$`)
  31. for line := range t.Lines {
  32. var remote string
  33. var err error
  34. l := line.Text
  35. logEntry, err := p.ParseString(l)
  36. if err != nil {
  37. log.Println(err)
  38. continue
  39. }
  40. if config.Trace {
  41. pretty.Println(logEntry)
  42. }
  43. remote, err = logEntry.Field("remote_addr")
  44. if err != nil {
  45. log.Println(err)
  46. continue
  47. }
  48. xff, err := logEntry.Field("http_x_forwarded_for")
  49. if err != nil && xff != "" {
  50. if config.Trace {
  51. log.Printf("Using XFF: %s\n", xff)
  52. }
  53. remote = xff
  54. }
  55. if remote == "" {
  56. log.Println("remote is empty: ignoring request.")
  57. continue
  58. }
  59. // only use the first host in case there are multiple hosts in the log
  60. if cidx := strings.Index(remote, ","); cidx >= 0 {
  61. remote = remote[0:cidx]
  62. }
  63. timestampStr, err := logEntry.Field("time_local")
  64. if err != nil {
  65. log.Println(err)
  66. continue
  67. }
  68. timeStamp, err := time.Parse("02/Jan/2006:15:04:05 -0700", timestampStr)
  69. if err != nil {
  70. log.Println(err)
  71. continue
  72. }
  73. httpRequest, err := logEntry.Field("request")
  74. if err != nil {
  75. log.Println(err)
  76. continue
  77. }
  78. reqData := reqRegexp.FindStringSubmatch(httpRequest)
  79. if len(reqData) < 4 {
  80. log.Printf("reqData is too short: %d instead of 4\n", len(reqData))
  81. continue
  82. }
  83. host, err := logEntry.Field("host")
  84. if err != nil {
  85. host = config.HostName
  86. if host == "" {
  87. host = "[not available]"
  88. }
  89. }
  90. customer, err := logEntry.Field("sc_customer")
  91. if err != nil {
  92. customer = "[not set]"
  93. }
  94. request := data.Request{
  95. IpSrc: remote,
  96. Origin: remote,
  97. Source: remote,
  98. IpDst: "127.0.0.1",
  99. PortSrc: 0,
  100. PortDst: 0,
  101. TcpSeq: 0,
  102. CreatedAt: timeStamp.Unix(),
  103. Url: reqData[2],
  104. Method: reqData[1],
  105. Host: host,
  106. Protocol: reqData[3],
  107. Customer: customer,
  108. }
  109. request.Referer, _ = logEntry.Field("http_referer")
  110. request.UserAgent, _ = logEntry.Field("http_user_agent")
  111. if config.Trace {
  112. log.Printf("[%s] %s\n", request.Source, request.Url)
  113. }
  114. count++
  115. publishRequest(config.NatsQueue, &request)
  116. }
  117. }
  118. func nginxLogReplay(logfile, format string) {
  119. file, err := os.Open(logfile)
  120. if err != nil {
  121. log.Fatalf("%s: %s", logfile, err)
  122. }
  123. defer file.Close()
  124. scanner := bufio.NewScanner(file)
  125. var tOffset time.Duration
  126. // `$remote_addr - $remote_user [$time_local] "$request" $status $body_bytes_sent "$http_referer" "$http_user_agent"`
  127. p := gonx.NewParser(format)
  128. reqRegexp := regexp.MustCompile(`^([A-Z]+)\s+(.+?)\s+(HTTP/\d+\.\d+)$`)
  129. for scanner.Scan() {
  130. l := scanner.Text()
  131. if err := scanner.Err(); err != nil {
  132. log.Fatal(err)
  133. }
  134. var remote string
  135. var err error
  136. logEntry, err := p.ParseString(l)
  137. if err != nil {
  138. log.Println(err)
  139. continue
  140. }
  141. if config.Trace {
  142. pretty.Println(logEntry)
  143. }
  144. remote, err = logEntry.Field("remote_addr")
  145. if err != nil {
  146. log.Println(err)
  147. continue
  148. }
  149. xff, err := logEntry.Field("http_x_forwarded_for")
  150. if err != nil && xff != "" {
  151. if config.Trace {
  152. log.Printf("Using XFF: %s\n", xff)
  153. }
  154. remote = xff
  155. }
  156. if remote == "" {
  157. log.Println("remote is empty: ignoring request.")
  158. continue
  159. }
  160. // only use the first host in case there are multiple hosts in the log
  161. if cidx := strings.Index(remote, ","); cidx >= 0 {
  162. remote = remote[0:cidx]
  163. }
  164. timestampStr, err := logEntry.Field("time_local")
  165. if err != nil {
  166. log.Println(err)
  167. continue
  168. }
  169. timeStamp, err := time.Parse("02/Jan/2006:15:04:05 -0700", timestampStr)
  170. if err != nil {
  171. log.Println(err)
  172. continue
  173. }
  174. if tOffset == 0 {
  175. tOffset = time.Now().Sub(timeStamp)
  176. }
  177. if timeStamp.Add(tOffset).After(time.Now()) {
  178. time.Sleep(timeStamp.Add(tOffset).Sub(time.Now()))
  179. }
  180. httpRequest, err := logEntry.Field("request")
  181. if err != nil {
  182. log.Println(err)
  183. continue
  184. }
  185. reqData := reqRegexp.FindStringSubmatch(httpRequest)
  186. if len(reqData) < 4 {
  187. log.Printf("reqData is too short: %d instead of 4\n", len(reqData))
  188. continue
  189. }
  190. host := config.HostName
  191. if host == "" {
  192. host = "[not available]"
  193. }
  194. request := data.Request{
  195. IpSrc: remote,
  196. Origin: remote,
  197. Source: remote,
  198. IpDst: "127.0.0.1",
  199. PortSrc: 0,
  200. PortDst: 0,
  201. TcpSeq: 0,
  202. CreatedAt: timeStamp.Add(tOffset).UnixNano(),
  203. Url: reqData[2],
  204. Method: reqData[1],
  205. Host: host,
  206. Protocol: reqData[3],
  207. }
  208. request.Referer, _ = logEntry.Field("http_referer")
  209. request.UserAgent, _ = logEntry.Field("http_user_agent")
  210. if config.Trace {
  211. log.Printf("[%s] %s\n", request.Source, request.Url)
  212. }
  213. count++
  214. publishRequest(config.NatsQueue, &request)
  215. }
  216. }