123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248 |
- // Copyright (c) 2015 HPE Software Inc. All rights reserved.
- // Copyright (c) 2013 ActiveState Software Inc. All rights reserved.
- package watch
- import (
- "log"
- "os"
- "path/filepath"
- "sync"
- "syscall"
- "github.com/hpcloud/tail/util"
- "gopkg.in/fsnotify.v1"
- )
- type InotifyTracker struct {
- mux sync.Mutex
- watcher *fsnotify.Watcher
- chans map[string]chan fsnotify.Event
- done map[string]chan bool
- watchNums map[string]int
- watch chan *watchInfo
- remove chan *watchInfo
- error chan error
- }
- type watchInfo struct {
- op fsnotify.Op
- fname string
- }
- func (this *watchInfo) isCreate() bool {
- return this.op == fsnotify.Create
- }
- var (
- // globally shared InotifyTracker; ensures only one fsnotify.Watcher is used
- shared *InotifyTracker
- // these are used to ensure the shared InotifyTracker is run exactly once
- once = sync.Once{}
- goRun = func() {
- shared = &InotifyTracker{
- mux: sync.Mutex{},
- chans: make(map[string]chan fsnotify.Event),
- done: make(map[string]chan bool),
- watchNums: make(map[string]int),
- watch: make(chan *watchInfo),
- remove: make(chan *watchInfo),
- error: make(chan error),
- }
- go shared.run()
- }
- logger = log.New(os.Stderr, "", log.LstdFlags)
- )
- // Watch signals the run goroutine to begin watching the input filename
- func Watch(fname string) error {
- return watch(&watchInfo{
- fname: fname,
- })
- }
- // Watch create signals the run goroutine to begin watching the input filename
- // if call the WatchCreate function, don't call the Cleanup, call the RemoveWatchCreate
- func WatchCreate(fname string) error {
- return watch(&watchInfo{
- op: fsnotify.Create,
- fname: fname,
- })
- }
- func watch(winfo *watchInfo) error {
- // start running the shared InotifyTracker if not already running
- once.Do(goRun)
- winfo.fname = filepath.Clean(winfo.fname)
- shared.watch <- winfo
- return <-shared.error
- }
- // RemoveWatch signals the run goroutine to remove the watch for the input filename
- func RemoveWatch(fname string) error {
- return remove(&watchInfo{
- fname: fname,
- })
- }
- // RemoveWatch create signals the run goroutine to remove the watch for the input filename
- func RemoveWatchCreate(fname string) error {
- return remove(&watchInfo{
- op: fsnotify.Create,
- fname: fname,
- })
- }
- func remove(winfo *watchInfo) error {
- // start running the shared InotifyTracker if not already running
- once.Do(goRun)
- winfo.fname = filepath.Clean(winfo.fname)
- shared.mux.Lock()
- done := shared.done[winfo.fname]
- if done != nil {
- delete(shared.done, winfo.fname)
- close(done)
- }
- shared.mux.Unlock()
- shared.remove <- winfo
- return <-shared.error
- }
- // Events returns a channel to which FileEvents corresponding to the input filename
- // will be sent. This channel will be closed when removeWatch is called on this
- // filename.
- func Events(fname string) <-chan fsnotify.Event {
- shared.mux.Lock()
- defer shared.mux.Unlock()
- return shared.chans[fname]
- }
- // Cleanup removes the watch for the input filename if necessary.
- func Cleanup(fname string) error {
- return RemoveWatch(fname)
- }
- // watchFlags calls fsnotify.WatchFlags for the input filename and flags, creating
- // a new Watcher if the previous Watcher was closed.
- func (shared *InotifyTracker) addWatch(winfo *watchInfo) error {
- shared.mux.Lock()
- defer shared.mux.Unlock()
- if shared.chans[winfo.fname] == nil {
- shared.chans[winfo.fname] = make(chan fsnotify.Event)
- }
- if shared.done[winfo.fname] == nil {
- shared.done[winfo.fname] = make(chan bool)
- }
- fname := winfo.fname
- if winfo.isCreate() {
- // Watch for new files to be created in the parent directory.
- fname = filepath.Dir(fname)
- }
- var err error
- // already in inotify watch
- if shared.watchNums[fname] == 0 {
- err = shared.watcher.Add(fname)
- }
- if err == nil {
- shared.watchNums[fname]++
- }
- return err
- }
- // removeWatch calls fsnotify.RemoveWatch for the input filename and closes the
- // corresponding events channel.
- func (shared *InotifyTracker) removeWatch(winfo *watchInfo) error {
- shared.mux.Lock()
- ch := shared.chans[winfo.fname]
- if ch != nil {
- delete(shared.chans, winfo.fname)
- close(ch)
- }
- fname := winfo.fname
- if winfo.isCreate() {
- // Watch for new files to be created in the parent directory.
- fname = filepath.Dir(fname)
- }
- shared.watchNums[fname]--
- watchNum := shared.watchNums[fname]
- if watchNum == 0 {
- delete(shared.watchNums, fname)
- }
- shared.mux.Unlock()
- var err error
- // If we were the last ones to watch this file, unsubscribe from inotify.
- // This needs to happen after releasing the lock because fsnotify waits
- // synchronously for the kernel to acknowledge the removal of the watch
- // for this file, which causes us to deadlock if we still held the lock.
- if watchNum == 0 {
- err = shared.watcher.Remove(fname)
- }
- return err
- }
- // sendEvent sends the input event to the appropriate Tail.
- func (shared *InotifyTracker) sendEvent(event fsnotify.Event) {
- name := filepath.Clean(event.Name)
- shared.mux.Lock()
- ch := shared.chans[name]
- done := shared.done[name]
- shared.mux.Unlock()
- if ch != nil && done != nil {
- select {
- case ch <- event:
- case <-done:
- }
- }
- }
- // run starts the goroutine in which the shared struct reads events from its
- // Watcher's Event channel and sends the events to the appropriate Tail.
- func (shared *InotifyTracker) run() {
- watcher, err := fsnotify.NewWatcher()
- if err != nil {
- util.Fatal("failed to create Watcher")
- }
- shared.watcher = watcher
- for {
- select {
- case winfo := <-shared.watch:
- shared.error <- shared.addWatch(winfo)
- case winfo := <-shared.remove:
- shared.error <- shared.removeWatch(winfo)
- case event, open := <-shared.watcher.Events:
- if !open {
- return
- }
- shared.sendEvent(event)
- case err, open := <-shared.watcher.Errors:
- if !open {
- return
- } else if err != nil {
- sysErr, ok := err.(*os.SyscallError)
- if !ok || sysErr.Err != syscall.EINTR {
- logger.Printf("Error in Watcher Error channel: %s", err)
- }
- }
- }
- }
- }
|