123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438 |
- package tail
- import (
- "bufio"
- "errors"
- "fmt"
- "io"
- "io/ioutil"
- "log"
- "os"
- "strings"
- "sync"
- "time"
- "github.com/hpcloud/tail/ratelimiter"
- "github.com/hpcloud/tail/util"
- "github.com/hpcloud/tail/watch"
- "gopkg.in/tomb.v1"
- )
- var (
- ErrStop = fmt.Errorf("tail should now stop")
- )
- type Line struct {
- Text string
- Time time.Time
- Err error
- }
- func NewLine(text string) *Line {
- return &Line{text, time.Now(), nil}
- }
- type SeekInfo struct {
- Offset int64
- Whence int
- }
- type logger interface {
- Fatal(v ...interface{})
- Fatalf(format string, v ...interface{})
- Fatalln(v ...interface{})
- Panic(v ...interface{})
- Panicf(format string, v ...interface{})
- Panicln(v ...interface{})
- Print(v ...interface{})
- Printf(format string, v ...interface{})
- Println(v ...interface{})
- }
- type Config struct {
-
- Location *SeekInfo
- ReOpen bool
- MustExist bool
- Poll bool
- Pipe bool
- RateLimiter *ratelimiter.LeakyBucket
-
- Follow bool
- MaxLineSize int
-
-
- Logger logger
- }
- type Tail struct {
- Filename string
- Lines chan *Line
- Config
- file *os.File
- reader *bufio.Reader
- watcher watch.FileWatcher
- changes *watch.FileChanges
- tomb.Tomb
- lk sync.Mutex
- }
- var (
-
- DefaultLogger = log.New(os.Stderr, "", log.LstdFlags)
-
- DiscardingLogger = log.New(ioutil.Discard, "", 0)
- )
- func TailFile(filename string, config Config) (*Tail, error) {
- if config.ReOpen && !config.Follow {
- util.Fatal("cannot set ReOpen without Follow.")
- }
- t := &Tail{
- Filename: filename,
- Lines: make(chan *Line),
- Config: config,
- }
-
- if t.Logger == nil {
- t.Logger = log.New(os.Stderr, "", log.LstdFlags)
- }
- if t.Poll {
- t.watcher = watch.NewPollingFileWatcher(filename)
- } else {
- t.watcher = watch.NewInotifyFileWatcher(filename)
- }
- if t.MustExist {
- var err error
- t.file, err = OpenFile(t.Filename)
- if err != nil {
- return nil, err
- }
- }
- go t.tailFileSync()
- return t, nil
- }
- func (tail *Tail) Tell() (offset int64, err error) {
- if tail.file == nil {
- return
- }
- offset, err = tail.file.Seek(0, os.SEEK_CUR)
- if err != nil {
- return
- }
- tail.lk.Lock()
- defer tail.lk.Unlock()
- if tail.reader == nil {
- return
- }
- offset -= int64(tail.reader.Buffered())
- return
- }
- func (tail *Tail) Stop() error {
- tail.Kill(nil)
- return tail.Wait()
- }
- func (tail *Tail) StopAtEOF() error {
- tail.Kill(errStopAtEOF)
- return tail.Wait()
- }
- var errStopAtEOF = errors.New("tail: stop at eof")
- func (tail *Tail) close() {
- close(tail.Lines)
- tail.closeFile()
- }
- func (tail *Tail) closeFile() {
- if tail.file != nil {
- tail.file.Close()
- tail.file = nil
- }
- }
- func (tail *Tail) reopen() error {
- tail.closeFile()
- for {
- var err error
- tail.file, err = OpenFile(tail.Filename)
- if err != nil {
- if os.IsNotExist(err) {
- tail.Logger.Printf("Waiting for %s to appear...", tail.Filename)
- if err := tail.watcher.BlockUntilExists(&tail.Tomb); err != nil {
- if err == tomb.ErrDying {
- return err
- }
- return fmt.Errorf("Failed to detect creation of %s: %s", tail.Filename, err)
- }
- continue
- }
- return fmt.Errorf("Unable to open file %s: %s", tail.Filename, err)
- }
- break
- }
- return nil
- }
- func (tail *Tail) readLine() (string, error) {
- tail.lk.Lock()
- line, err := tail.reader.ReadString('\n')
- tail.lk.Unlock()
- if err != nil {
-
-
-
- return line, err
- }
- line = strings.TrimRight(line, "\n")
- return line, err
- }
- func (tail *Tail) tailFileSync() {
- defer tail.Done()
- defer tail.close()
- if !tail.MustExist {
-
- err := tail.reopen()
- if err != nil {
- if err != tomb.ErrDying {
- tail.Kill(err)
- }
- return
- }
- }
-
- if tail.Location != nil {
- _, err := tail.file.Seek(tail.Location.Offset, tail.Location.Whence)
- tail.Logger.Printf("Seeked %s - %+v\n", tail.Filename, tail.Location)
- if err != nil {
- tail.Killf("Seek error on %s: %s", tail.Filename, err)
- return
- }
- }
- tail.openReader()
- var offset int64 = 0
- var err error
-
- for {
-
- if !tail.Pipe {
-
- offset, err = tail.Tell()
- if err != nil {
- tail.Kill(err)
- return
- }
- }
- line, err := tail.readLine()
-
- if err == nil {
- cooloff := !tail.sendLine(line)
- if cooloff {
-
-
- msg := fmt.Sprintf(
- "Too much log activity; waiting a second " +
- "before resuming tailing")
- tail.Lines <- &Line{msg, time.Now(), fmt.Errorf(msg)}
- select {
- case <-time.After(time.Second):
- case <-tail.Dying():
- return
- }
- if err := tail.seekEnd(); err != nil {
- tail.Kill(err)
- return
- }
- }
- } else if err == io.EOF {
- if !tail.Follow {
- if line != "" {
- tail.sendLine(line)
- }
- return
- }
- if tail.Follow && line != "" {
-
-
- err := tail.seekTo(SeekInfo{Offset: offset, Whence: 0})
- if err != nil {
- tail.Kill(err)
- return
- }
- }
-
-
-
- err := tail.waitForChanges()
- if err != nil {
- if err != ErrStop {
- tail.Kill(err)
- }
- return
- }
- } else {
-
- tail.Killf("Error reading %s: %s", tail.Filename, err)
- return
- }
- select {
- case <-tail.Dying():
- if tail.Err() == errStopAtEOF {
- continue
- }
- return
- default:
- }
- }
- }
- func (tail *Tail) waitForChanges() error {
- if tail.changes == nil {
- pos, err := tail.file.Seek(0, os.SEEK_CUR)
- if err != nil {
- return err
- }
- tail.changes, err = tail.watcher.ChangeEvents(&tail.Tomb, pos)
- if err != nil {
- return err
- }
- }
- select {
- case <-tail.changes.Modified:
- return nil
- case <-tail.changes.Deleted:
- tail.changes = nil
- if tail.ReOpen {
-
- tail.Logger.Printf("Re-opening moved/deleted file %s ...", tail.Filename)
- if err := tail.reopen(); err != nil {
- return err
- }
- tail.Logger.Printf("Successfully reopened %s", tail.Filename)
- tail.openReader()
- return nil
- } else {
- tail.Logger.Printf("Stopping tail as file no longer exists: %s", tail.Filename)
- return ErrStop
- }
- case <-tail.changes.Truncated:
-
- tail.Logger.Printf("Re-opening truncated file %s ...", tail.Filename)
- if err := tail.reopen(); err != nil {
- return err
- }
- tail.Logger.Printf("Successfully reopened truncated %s", tail.Filename)
- tail.openReader()
- return nil
- case <-tail.Dying():
- return ErrStop
- }
- panic("unreachable")
- }
- func (tail *Tail) openReader() {
- if tail.MaxLineSize > 0 {
-
- tail.reader = bufio.NewReaderSize(tail.file, tail.MaxLineSize+2)
- } else {
- tail.reader = bufio.NewReader(tail.file)
- }
- }
- func (tail *Tail) seekEnd() error {
- return tail.seekTo(SeekInfo{Offset: 0, Whence: os.SEEK_END})
- }
- func (tail *Tail) seekTo(pos SeekInfo) error {
- _, err := tail.file.Seek(pos.Offset, pos.Whence)
- if err != nil {
- return fmt.Errorf("Seek error on %s: %s", tail.Filename, err)
- }
-
- tail.reader.Reset(tail.file)
- return nil
- }
- func (tail *Tail) sendLine(line string) bool {
- now := time.Now()
- lines := []string{line}
-
- if tail.MaxLineSize > 0 && len(line) > tail.MaxLineSize {
- lines = util.PartitionString(line, tail.MaxLineSize)
- }
- for _, line := range lines {
- tail.Lines <- &Line{line, now, nil}
- }
- if tail.Config.RateLimiter != nil {
- ok := tail.Config.RateLimiter.Pour(uint16(len(lines)))
- if !ok {
- tail.Logger.Printf("Leaky bucket full (%v); entering 1s cooloff period.\n",
- tail.Filename)
- return false
- }
- }
- return true
- }
- func (tail *Tail) Cleanup() {
- watch.Cleanup(tail.Filename)
- }
|