tail.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438
  1. // Copyright (c) 2015 HPE Software Inc. All rights reserved.
  2. // Copyright (c) 2013 ActiveState Software Inc. All rights reserved.
  3. package tail
  4. import (
  5. "bufio"
  6. "errors"
  7. "fmt"
  8. "io"
  9. "io/ioutil"
  10. "log"
  11. "os"
  12. "strings"
  13. "sync"
  14. "time"
  15. "github.com/hpcloud/tail/ratelimiter"
  16. "github.com/hpcloud/tail/util"
  17. "github.com/hpcloud/tail/watch"
  18. "gopkg.in/tomb.v1"
  19. )
  20. var (
  21. ErrStop = fmt.Errorf("tail should now stop")
  22. )
  23. type Line struct {
  24. Text string
  25. Time time.Time
  26. Err error // Error from tail
  27. }
  28. // NewLine returns a Line with present time.
  29. func NewLine(text string) *Line {
  30. return &Line{text, time.Now(), nil}
  31. }
  32. // SeekInfo represents arguments to `os.Seek`
  33. type SeekInfo struct {
  34. Offset int64
  35. Whence int // os.SEEK_*
  36. }
  37. type logger interface {
  38. Fatal(v ...interface{})
  39. Fatalf(format string, v ...interface{})
  40. Fatalln(v ...interface{})
  41. Panic(v ...interface{})
  42. Panicf(format string, v ...interface{})
  43. Panicln(v ...interface{})
  44. Print(v ...interface{})
  45. Printf(format string, v ...interface{})
  46. Println(v ...interface{})
  47. }
  48. // Config is used to specify how a file must be tailed.
  49. type Config struct {
  50. // File-specifc
  51. Location *SeekInfo // Seek to this location before tailing
  52. ReOpen bool // Reopen recreated files (tail -F)
  53. MustExist bool // Fail early if the file does not exist
  54. Poll bool // Poll for file changes instead of using inotify
  55. Pipe bool // Is a named pipe (mkfifo)
  56. RateLimiter *ratelimiter.LeakyBucket
  57. // Generic IO
  58. Follow bool // Continue looking for new lines (tail -f)
  59. MaxLineSize int // If non-zero, split longer lines into multiple lines
  60. // Logger, when nil, is set to tail.DefaultLogger
  61. // To disable logging: set field to tail.DiscardingLogger
  62. Logger logger
  63. }
  64. type Tail struct {
  65. Filename string
  66. Lines chan *Line
  67. Config
  68. file *os.File
  69. reader *bufio.Reader
  70. watcher watch.FileWatcher
  71. changes *watch.FileChanges
  72. tomb.Tomb // provides: Done, Kill, Dying
  73. lk sync.Mutex
  74. }
  75. var (
  76. // DefaultLogger is used when Config.Logger == nil
  77. DefaultLogger = log.New(os.Stderr, "", log.LstdFlags)
  78. // DiscardingLogger can be used to disable logging output
  79. DiscardingLogger = log.New(ioutil.Discard, "", 0)
  80. )
  81. // TailFile begins tailing the file. Output stream is made available
  82. // via the `Tail.Lines` channel. To handle errors during tailing,
  83. // invoke the `Wait` or `Err` method after finishing reading from the
  84. // `Lines` channel.
  85. func TailFile(filename string, config Config) (*Tail, error) {
  86. if config.ReOpen && !config.Follow {
  87. util.Fatal("cannot set ReOpen without Follow.")
  88. }
  89. t := &Tail{
  90. Filename: filename,
  91. Lines: make(chan *Line),
  92. Config: config,
  93. }
  94. // when Logger was not specified in config, use default logger
  95. if t.Logger == nil {
  96. t.Logger = log.New(os.Stderr, "", log.LstdFlags)
  97. }
  98. if t.Poll {
  99. t.watcher = watch.NewPollingFileWatcher(filename)
  100. } else {
  101. t.watcher = watch.NewInotifyFileWatcher(filename)
  102. }
  103. if t.MustExist {
  104. var err error
  105. t.file, err = OpenFile(t.Filename)
  106. if err != nil {
  107. return nil, err
  108. }
  109. }
  110. go t.tailFileSync()
  111. return t, nil
  112. }
  113. // Return the file's current position, like stdio's ftell().
  114. // But this value is not very accurate.
  115. // it may readed one line in the chan(tail.Lines),
  116. // so it may lost one line.
  117. func (tail *Tail) Tell() (offset int64, err error) {
  118. if tail.file == nil {
  119. return
  120. }
  121. offset, err = tail.file.Seek(0, os.SEEK_CUR)
  122. if err != nil {
  123. return
  124. }
  125. tail.lk.Lock()
  126. defer tail.lk.Unlock()
  127. if tail.reader == nil {
  128. return
  129. }
  130. offset -= int64(tail.reader.Buffered())
  131. return
  132. }
  133. // Stop stops the tailing activity.
  134. func (tail *Tail) Stop() error {
  135. tail.Kill(nil)
  136. return tail.Wait()
  137. }
  138. // StopAtEOF stops tailing as soon as the end of the file is reached.
  139. func (tail *Tail) StopAtEOF() error {
  140. tail.Kill(errStopAtEOF)
  141. return tail.Wait()
  142. }
  143. var errStopAtEOF = errors.New("tail: stop at eof")
  144. func (tail *Tail) close() {
  145. close(tail.Lines)
  146. tail.closeFile()
  147. }
  148. func (tail *Tail) closeFile() {
  149. if tail.file != nil {
  150. tail.file.Close()
  151. tail.file = nil
  152. }
  153. }
  154. func (tail *Tail) reopen() error {
  155. tail.closeFile()
  156. for {
  157. var err error
  158. tail.file, err = OpenFile(tail.Filename)
  159. if err != nil {
  160. if os.IsNotExist(err) {
  161. tail.Logger.Printf("Waiting for %s to appear...", tail.Filename)
  162. if err := tail.watcher.BlockUntilExists(&tail.Tomb); err != nil {
  163. if err == tomb.ErrDying {
  164. return err
  165. }
  166. return fmt.Errorf("Failed to detect creation of %s: %s", tail.Filename, err)
  167. }
  168. continue
  169. }
  170. return fmt.Errorf("Unable to open file %s: %s", tail.Filename, err)
  171. }
  172. break
  173. }
  174. return nil
  175. }
  176. func (tail *Tail) readLine() (string, error) {
  177. tail.lk.Lock()
  178. line, err := tail.reader.ReadString('\n')
  179. tail.lk.Unlock()
  180. if err != nil {
  181. // Note ReadString "returns the data read before the error" in
  182. // case of an error, including EOF, so we return it as is. The
  183. // caller is expected to process it if err is EOF.
  184. return line, err
  185. }
  186. line = strings.TrimRight(line, "\n")
  187. return line, err
  188. }
  189. func (tail *Tail) tailFileSync() {
  190. defer tail.Done()
  191. defer tail.close()
  192. if !tail.MustExist {
  193. // deferred first open.
  194. err := tail.reopen()
  195. if err != nil {
  196. if err != tomb.ErrDying {
  197. tail.Kill(err)
  198. }
  199. return
  200. }
  201. }
  202. // Seek to requested location on first open of the file.
  203. if tail.Location != nil {
  204. _, err := tail.file.Seek(tail.Location.Offset, tail.Location.Whence)
  205. tail.Logger.Printf("Seeked %s - %+v\n", tail.Filename, tail.Location)
  206. if err != nil {
  207. tail.Killf("Seek error on %s: %s", tail.Filename, err)
  208. return
  209. }
  210. }
  211. tail.openReader()
  212. var offset int64 = 0
  213. var err error
  214. // Read line by line.
  215. for {
  216. // do not seek in named pipes
  217. if !tail.Pipe {
  218. // grab the position in case we need to back up in the event of a half-line
  219. offset, err = tail.Tell()
  220. if err != nil {
  221. tail.Kill(err)
  222. return
  223. }
  224. }
  225. line, err := tail.readLine()
  226. // Process `line` even if err is EOF.
  227. if err == nil {
  228. cooloff := !tail.sendLine(line)
  229. if cooloff {
  230. // Wait a second before seeking till the end of
  231. // file when rate limit is reached.
  232. msg := fmt.Sprintf(
  233. "Too much log activity; waiting a second " +
  234. "before resuming tailing")
  235. tail.Lines <- &Line{msg, time.Now(), fmt.Errorf(msg)}
  236. select {
  237. case <-time.After(time.Second):
  238. case <-tail.Dying():
  239. return
  240. }
  241. if err := tail.seekEnd(); err != nil {
  242. tail.Kill(err)
  243. return
  244. }
  245. }
  246. } else if err == io.EOF {
  247. if !tail.Follow {
  248. if line != "" {
  249. tail.sendLine(line)
  250. }
  251. return
  252. }
  253. if tail.Follow && line != "" {
  254. // this has the potential to never return the last line if
  255. // it's not followed by a newline; seems a fair trade here
  256. err := tail.seekTo(SeekInfo{Offset: offset, Whence: 0})
  257. if err != nil {
  258. tail.Kill(err)
  259. return
  260. }
  261. }
  262. // When EOF is reached, wait for more data to become
  263. // available. Wait strategy is based on the `tail.watcher`
  264. // implementation (inotify or polling).
  265. err := tail.waitForChanges()
  266. if err != nil {
  267. if err != ErrStop {
  268. tail.Kill(err)
  269. }
  270. return
  271. }
  272. } else {
  273. // non-EOF error
  274. tail.Killf("Error reading %s: %s", tail.Filename, err)
  275. return
  276. }
  277. select {
  278. case <-tail.Dying():
  279. if tail.Err() == errStopAtEOF {
  280. continue
  281. }
  282. return
  283. default:
  284. }
  285. }
  286. }
  287. // waitForChanges waits until the file has been appended, deleted,
  288. // moved or truncated. When moved or deleted - the file will be
  289. // reopened if ReOpen is true. Truncated files are always reopened.
  290. func (tail *Tail) waitForChanges() error {
  291. if tail.changes == nil {
  292. pos, err := tail.file.Seek(0, os.SEEK_CUR)
  293. if err != nil {
  294. return err
  295. }
  296. tail.changes, err = tail.watcher.ChangeEvents(&tail.Tomb, pos)
  297. if err != nil {
  298. return err
  299. }
  300. }
  301. select {
  302. case <-tail.changes.Modified:
  303. return nil
  304. case <-tail.changes.Deleted:
  305. tail.changes = nil
  306. if tail.ReOpen {
  307. // XXX: we must not log from a library.
  308. tail.Logger.Printf("Re-opening moved/deleted file %s ...", tail.Filename)
  309. if err := tail.reopen(); err != nil {
  310. return err
  311. }
  312. tail.Logger.Printf("Successfully reopened %s", tail.Filename)
  313. tail.openReader()
  314. return nil
  315. } else {
  316. tail.Logger.Printf("Stopping tail as file no longer exists: %s", tail.Filename)
  317. return ErrStop
  318. }
  319. case <-tail.changes.Truncated:
  320. // Always reopen truncated files (Follow is true)
  321. tail.Logger.Printf("Re-opening truncated file %s ...", tail.Filename)
  322. if err := tail.reopen(); err != nil {
  323. return err
  324. }
  325. tail.Logger.Printf("Successfully reopened truncated %s", tail.Filename)
  326. tail.openReader()
  327. return nil
  328. case <-tail.Dying():
  329. return ErrStop
  330. }
  331. panic("unreachable")
  332. }
  333. func (tail *Tail) openReader() {
  334. if tail.MaxLineSize > 0 {
  335. // add 2 to account for newline characters
  336. tail.reader = bufio.NewReaderSize(tail.file, tail.MaxLineSize+2)
  337. } else {
  338. tail.reader = bufio.NewReader(tail.file)
  339. }
  340. }
  341. func (tail *Tail) seekEnd() error {
  342. return tail.seekTo(SeekInfo{Offset: 0, Whence: os.SEEK_END})
  343. }
  344. func (tail *Tail) seekTo(pos SeekInfo) error {
  345. _, err := tail.file.Seek(pos.Offset, pos.Whence)
  346. if err != nil {
  347. return fmt.Errorf("Seek error on %s: %s", tail.Filename, err)
  348. }
  349. // Reset the read buffer whenever the file is re-seek'ed
  350. tail.reader.Reset(tail.file)
  351. return nil
  352. }
  353. // sendLine sends the line(s) to Lines channel, splitting longer lines
  354. // if necessary. Return false if rate limit is reached.
  355. func (tail *Tail) sendLine(line string) bool {
  356. now := time.Now()
  357. lines := []string{line}
  358. // Split longer lines
  359. if tail.MaxLineSize > 0 && len(line) > tail.MaxLineSize {
  360. lines = util.PartitionString(line, tail.MaxLineSize)
  361. }
  362. for _, line := range lines {
  363. tail.Lines <- &Line{line, now, nil}
  364. }
  365. if tail.Config.RateLimiter != nil {
  366. ok := tail.Config.RateLimiter.Pour(uint16(len(lines)))
  367. if !ok {
  368. tail.Logger.Printf("Leaky bucket full (%v); entering 1s cooloff period.\n",
  369. tail.Filename)
  370. return false
  371. }
  372. }
  373. return true
  374. }
  375. // Cleanup removes inotify watches added by the tail package. This function is
  376. // meant to be invoked from a process's exit handler. Linux kernel may not
  377. // automatically remove inotify watches after the process exits.
  378. func (tail *Tail) Cleanup() {
  379. watch.Cleanup(tail.Filename)
  380. }