123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248 |
- package watch
- import (
- "log"
- "os"
- "path/filepath"
- "sync"
- "syscall"
- "github.com/hpcloud/tail/util"
- "gopkg.in/fsnotify.v1"
- )
- type InotifyTracker struct {
- mux sync.Mutex
- watcher *fsnotify.Watcher
- chans map[string]chan fsnotify.Event
- done map[string]chan bool
- watchNums map[string]int
- watch chan *watchInfo
- remove chan *watchInfo
- error chan error
- }
- type watchInfo struct {
- op fsnotify.Op
- fname string
- }
- func (this *watchInfo) isCreate() bool {
- return this.op == fsnotify.Create
- }
- var (
-
- shared *InotifyTracker
-
- once = sync.Once{}
- goRun = func() {
- shared = &InotifyTracker{
- mux: sync.Mutex{},
- chans: make(map[string]chan fsnotify.Event),
- done: make(map[string]chan bool),
- watchNums: make(map[string]int),
- watch: make(chan *watchInfo),
- remove: make(chan *watchInfo),
- error: make(chan error),
- }
- go shared.run()
- }
- logger = log.New(os.Stderr, "", log.LstdFlags)
- )
- func Watch(fname string) error {
- return watch(&watchInfo{
- fname: fname,
- })
- }
- func WatchCreate(fname string) error {
- return watch(&watchInfo{
- op: fsnotify.Create,
- fname: fname,
- })
- }
- func watch(winfo *watchInfo) error {
-
- once.Do(goRun)
- winfo.fname = filepath.Clean(winfo.fname)
- shared.watch <- winfo
- return <-shared.error
- }
- func RemoveWatch(fname string) error {
- return remove(&watchInfo{
- fname: fname,
- })
- }
- func RemoveWatchCreate(fname string) error {
- return remove(&watchInfo{
- op: fsnotify.Create,
- fname: fname,
- })
- }
- func remove(winfo *watchInfo) error {
-
- once.Do(goRun)
- winfo.fname = filepath.Clean(winfo.fname)
- shared.mux.Lock()
- done := shared.done[winfo.fname]
- if done != nil {
- delete(shared.done, winfo.fname)
- close(done)
- }
- shared.mux.Unlock()
- shared.remove <- winfo
- return <-shared.error
- }
- func Events(fname string) <-chan fsnotify.Event {
- shared.mux.Lock()
- defer shared.mux.Unlock()
- return shared.chans[fname]
- }
- func Cleanup(fname string) error {
- return RemoveWatch(fname)
- }
- func (shared *InotifyTracker) addWatch(winfo *watchInfo) error {
- shared.mux.Lock()
- defer shared.mux.Unlock()
- if shared.chans[winfo.fname] == nil {
- shared.chans[winfo.fname] = make(chan fsnotify.Event)
- }
- if shared.done[winfo.fname] == nil {
- shared.done[winfo.fname] = make(chan bool)
- }
- fname := winfo.fname
- if winfo.isCreate() {
-
- fname = filepath.Dir(fname)
- }
- var err error
-
- if shared.watchNums[fname] == 0 {
- err = shared.watcher.Add(fname)
- }
- if err == nil {
- shared.watchNums[fname]++
- }
- return err
- }
- func (shared *InotifyTracker) removeWatch(winfo *watchInfo) error {
- shared.mux.Lock()
- ch := shared.chans[winfo.fname]
- if ch != nil {
- delete(shared.chans, winfo.fname)
- close(ch)
- }
- fname := winfo.fname
- if winfo.isCreate() {
-
- fname = filepath.Dir(fname)
- }
- shared.watchNums[fname]--
- watchNum := shared.watchNums[fname]
- if watchNum == 0 {
- delete(shared.watchNums, fname)
- }
- shared.mux.Unlock()
- var err error
-
-
-
-
- if watchNum == 0 {
- err = shared.watcher.Remove(fname)
- }
- return err
- }
- func (shared *InotifyTracker) sendEvent(event fsnotify.Event) {
- name := filepath.Clean(event.Name)
- shared.mux.Lock()
- ch := shared.chans[name]
- done := shared.done[name]
- shared.mux.Unlock()
- if ch != nil && done != nil {
- select {
- case ch <- event:
- case <-done:
- }
- }
- }
- func (shared *InotifyTracker) run() {
- watcher, err := fsnotify.NewWatcher()
- if err != nil {
- util.Fatal("failed to create Watcher")
- }
- shared.watcher = watcher
- for {
- select {
- case winfo := <-shared.watch:
- shared.error <- shared.addWatch(winfo)
- case winfo := <-shared.remove:
- shared.error <- shared.removeWatch(winfo)
- case event, open := <-shared.watcher.Events:
- if !open {
- return
- }
- shared.sendEvent(event)
- case err, open := <-shared.watcher.Errors:
- if !open {
- return
- } else if err != nil {
- sysErr, ok := err.(*os.SyscallError)
- if !ok || sysErr.Err != syscall.EINTR {
- logger.Printf("Error in Watcher Error channel: %s", err)
- }
- }
- }
- }
- }
|