nginx.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259
  1. package main
  2. import (
  3. "bufio"
  4. "log"
  5. "os"
  6. "regexp"
  7. "strings"
  8. "time"
  9. "git.scraperwall.com/scw/data/v2"
  10. "github.com/hpcloud/tail"
  11. "github.com/kr/pretty"
  12. "github.com/satyrius/gonx"
  13. )
  14. func nginxLogCapture(logfile, format, serverHost string) {
  15. if _, err := os.Stat(logfile); err != nil {
  16. log.Fatalf("%s: %s", logfile, err)
  17. }
  18. t, err := tail.TailFile(logfile, tail.Config{
  19. Follow: true, // follow the file
  20. ReOpen: true, // reopen log file when it gets closed/rotated
  21. Poll: config.TailPoll, // use file polling to detect a file rollover
  22. Logger: tail.DiscardingLogger, // don't log anything
  23. Location: &tail.SeekInfo{Offset: 0, Whence: 2}, // start at the end of the file
  24. })
  25. if err != nil {
  26. log.Fatalf("%s: %s", logfile, err)
  27. }
  28. // `$remote_addr - $remote_user [$time_local] "$request" $status $body_bytes_sent "$http_referer" "$http_user_agent"`
  29. p := gonx.NewParser(format)
  30. reqRegexp := regexp.MustCompile(`^([A-Z]+)\s+(.+?)\s+(HTTP/\d+\.\d+)$`)
  31. for line := range t.Lines {
  32. var remote string
  33. var err error
  34. l := line.Text
  35. logEntry, err := p.ParseString(l)
  36. if err != nil {
  37. log.Println(err)
  38. continue
  39. }
  40. remote, err = logEntry.Field("remote_addr")
  41. if err != nil {
  42. log.Println(err)
  43. continue
  44. }
  45. xff, err := logEntry.Field("http_x_forwarded_for")
  46. if err != nil && xff != "" {
  47. if config.Trace {
  48. log.Printf("Using XFF: %s\n", xff)
  49. }
  50. remote = xff
  51. }
  52. if remote == "" {
  53. log.Println("remote is empty: ignoring request.")
  54. continue
  55. }
  56. // only use the first host in case there are multiple hosts in the log
  57. if cidx := strings.Index(remote, ","); cidx >= 0 {
  58. remote = remote[0:cidx]
  59. }
  60. timestampStr, err := logEntry.Field("time_local")
  61. if err != nil {
  62. log.Println(err)
  63. continue
  64. }
  65. timeStamp, err := time.Parse("02/Jan/2006:15:04:05 -0700", timestampStr)
  66. if err != nil {
  67. log.Println(err)
  68. continue
  69. }
  70. httpRequest, err := logEntry.Field("request")
  71. if err != nil {
  72. log.Println(err)
  73. continue
  74. }
  75. reqData := reqRegexp.FindStringSubmatch(httpRequest)
  76. if len(reqData) < 4 {
  77. log.Printf("reqData is too short: %d instead of 4\n", len(reqData))
  78. continue
  79. }
  80. host, err := logEntry.Field("host")
  81. if err != nil {
  82. host = serverHost
  83. if host == "" {
  84. host = "[not available]"
  85. }
  86. }
  87. customer, err := logEntry.Field("sc_customer")
  88. if err != nil {
  89. customer = "[not set]"
  90. }
  91. request := data.Request{
  92. IpSrc: remote,
  93. Origin: remote,
  94. Source: remote,
  95. IpDst: "127.0.0.1",
  96. PortSrc: 0,
  97. PortDst: 0,
  98. TcpSeq: 0,
  99. CreatedAt: timeStamp.Unix(),
  100. Url: reqData[2],
  101. Method: reqData[1],
  102. Host: host,
  103. Protocol: reqData[3],
  104. Customer: customer,
  105. }
  106. request.Referer, _ = logEntry.Field("http_referer")
  107. request.UserAgent, _ = logEntry.Field("http_user_agent")
  108. if config.Trace {
  109. log.Printf("[%s] %s\n", request.Source, request.Url)
  110. }
  111. count++
  112. publishRequest(config.NatsQueue, &request)
  113. }
  114. }
  115. func nginxLogReplay(logfile, format string) {
  116. file, err := os.Open(logfile)
  117. if err != nil {
  118. log.Fatalf("%s: %s", logfile, err)
  119. }
  120. defer file.Close()
  121. scanner := bufio.NewScanner(file)
  122. var tOffset time.Duration
  123. // `$remote_addr - $remote_user [$time_local] "$request" $status $body_bytes_sent "$http_referer" "$http_user_agent"`
  124. p := gonx.NewParser(format)
  125. reqRegexp := regexp.MustCompile(`^([A-Z]+)\s+(.+?)\s+(HTTP/\d+\.\d+)$`)
  126. for scanner.Scan() {
  127. l := scanner.Text()
  128. if err := scanner.Err(); err != nil {
  129. log.Fatal(err)
  130. }
  131. var remote string
  132. var err error
  133. logEntry, err := p.ParseString(l)
  134. if err != nil {
  135. log.Println(err)
  136. continue
  137. }
  138. if config.Trace {
  139. pretty.Println(logEntry)
  140. }
  141. remote, err = logEntry.Field("remote_addr")
  142. if err != nil {
  143. log.Println(err)
  144. continue
  145. }
  146. xff, err := logEntry.Field("http_x_forwarded_for")
  147. if err != nil && xff != "" {
  148. if config.Trace {
  149. log.Printf("Using XFF: %s\n", xff)
  150. }
  151. remote = xff
  152. }
  153. if remote == "" {
  154. log.Println("remote is empty: ignoring request.")
  155. continue
  156. }
  157. // only use the first host in case there are multiple hosts in the log
  158. if cidx := strings.Index(remote, ","); cidx >= 0 {
  159. remote = remote[0:cidx]
  160. }
  161. timestampStr, err := logEntry.Field("time_local")
  162. if err != nil {
  163. log.Println(err)
  164. continue
  165. }
  166. timeStamp, err := time.Parse("02/Jan/2006:15:04:05 -0700", timestampStr)
  167. if err != nil {
  168. log.Println(err)
  169. continue
  170. }
  171. if tOffset == 0 {
  172. tOffset = time.Now().Sub(timeStamp)
  173. }
  174. if timeStamp.Add(tOffset).After(time.Now()) {
  175. time.Sleep(timeStamp.Add(tOffset).Sub(time.Now()))
  176. }
  177. httpRequest, err := logEntry.Field("request")
  178. if err != nil {
  179. log.Println(err)
  180. continue
  181. }
  182. reqData := reqRegexp.FindStringSubmatch(httpRequest)
  183. if len(reqData) < 4 {
  184. log.Printf("reqData is too short: %d instead of 4\n", len(reqData))
  185. continue
  186. }
  187. host := config.HostName
  188. if host == "" {
  189. host = "[not available]"
  190. }
  191. request := data.Request{
  192. IpSrc: remote,
  193. Origin: remote,
  194. Source: remote,
  195. IpDst: "127.0.0.1",
  196. PortSrc: 0,
  197. PortDst: 0,
  198. TcpSeq: 0,
  199. CreatedAt: timeStamp.Add(tOffset).UnixNano(),
  200. Url: reqData[2],
  201. Method: reqData[1],
  202. Host: host,
  203. Protocol: reqData[3],
  204. }
  205. request.Referer, _ = logEntry.Field("http_referer")
  206. request.UserAgent, _ = logEntry.Field("http_user_agent")
  207. if config.Trace {
  208. log.Printf("[%s] %s\n", request.Source, request.Url)
  209. }
  210. count++
  211. publishRequest(config.NatsQueue, &request)
  212. }
  213. }