inotify_tracker.go 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248
  1. // Copyright (c) 2015 HPE Software Inc. All rights reserved.
  2. // Copyright (c) 2013 ActiveState Software Inc. All rights reserved.
  3. package watch
  4. import (
  5. "log"
  6. "os"
  7. "path/filepath"
  8. "sync"
  9. "syscall"
  10. "github.com/hpcloud/tail/util"
  11. "gopkg.in/fsnotify.v1"
  12. )
  13. type InotifyTracker struct {
  14. mux sync.Mutex
  15. watcher *fsnotify.Watcher
  16. chans map[string]chan fsnotify.Event
  17. done map[string]chan bool
  18. watchNums map[string]int
  19. watch chan *watchInfo
  20. remove chan *watchInfo
  21. error chan error
  22. }
  23. type watchInfo struct {
  24. op fsnotify.Op
  25. fname string
  26. }
  27. func (this *watchInfo) isCreate() bool {
  28. return this.op == fsnotify.Create
  29. }
  30. var (
  31. // globally shared InotifyTracker; ensures only one fsnotify.Watcher is used
  32. shared *InotifyTracker
  33. // these are used to ensure the shared InotifyTracker is run exactly once
  34. once = sync.Once{}
  35. goRun = func() {
  36. shared = &InotifyTracker{
  37. mux: sync.Mutex{},
  38. chans: make(map[string]chan fsnotify.Event),
  39. done: make(map[string]chan bool),
  40. watchNums: make(map[string]int),
  41. watch: make(chan *watchInfo),
  42. remove: make(chan *watchInfo),
  43. error: make(chan error),
  44. }
  45. go shared.run()
  46. }
  47. logger = log.New(os.Stderr, "", log.LstdFlags)
  48. )
  49. // Watch signals the run goroutine to begin watching the input filename
  50. func Watch(fname string) error {
  51. return watch(&watchInfo{
  52. fname: fname,
  53. })
  54. }
  55. // Watch create signals the run goroutine to begin watching the input filename
  56. // if call the WatchCreate function, don't call the Cleanup, call the RemoveWatchCreate
  57. func WatchCreate(fname string) error {
  58. return watch(&watchInfo{
  59. op: fsnotify.Create,
  60. fname: fname,
  61. })
  62. }
  63. func watch(winfo *watchInfo) error {
  64. // start running the shared InotifyTracker if not already running
  65. once.Do(goRun)
  66. winfo.fname = filepath.Clean(winfo.fname)
  67. shared.watch <- winfo
  68. return <-shared.error
  69. }
  70. // RemoveWatch signals the run goroutine to remove the watch for the input filename
  71. func RemoveWatch(fname string) error {
  72. return remove(&watchInfo{
  73. fname: fname,
  74. })
  75. }
  76. // RemoveWatch create signals the run goroutine to remove the watch for the input filename
  77. func RemoveWatchCreate(fname string) error {
  78. return remove(&watchInfo{
  79. op: fsnotify.Create,
  80. fname: fname,
  81. })
  82. }
  83. func remove(winfo *watchInfo) error {
  84. // start running the shared InotifyTracker if not already running
  85. once.Do(goRun)
  86. winfo.fname = filepath.Clean(winfo.fname)
  87. shared.mux.Lock()
  88. done := shared.done[winfo.fname]
  89. if done != nil {
  90. delete(shared.done, winfo.fname)
  91. close(done)
  92. }
  93. shared.mux.Unlock()
  94. shared.remove <- winfo
  95. return <-shared.error
  96. }
  97. // Events returns a channel to which FileEvents corresponding to the input filename
  98. // will be sent. This channel will be closed when removeWatch is called on this
  99. // filename.
  100. func Events(fname string) <-chan fsnotify.Event {
  101. shared.mux.Lock()
  102. defer shared.mux.Unlock()
  103. return shared.chans[fname]
  104. }
  105. // Cleanup removes the watch for the input filename if necessary.
  106. func Cleanup(fname string) error {
  107. return RemoveWatch(fname)
  108. }
  109. // watchFlags calls fsnotify.WatchFlags for the input filename and flags, creating
  110. // a new Watcher if the previous Watcher was closed.
  111. func (shared *InotifyTracker) addWatch(winfo *watchInfo) error {
  112. shared.mux.Lock()
  113. defer shared.mux.Unlock()
  114. if shared.chans[winfo.fname] == nil {
  115. shared.chans[winfo.fname] = make(chan fsnotify.Event)
  116. }
  117. if shared.done[winfo.fname] == nil {
  118. shared.done[winfo.fname] = make(chan bool)
  119. }
  120. fname := winfo.fname
  121. if winfo.isCreate() {
  122. // Watch for new files to be created in the parent directory.
  123. fname = filepath.Dir(fname)
  124. }
  125. var err error
  126. // already in inotify watch
  127. if shared.watchNums[fname] == 0 {
  128. err = shared.watcher.Add(fname)
  129. }
  130. if err == nil {
  131. shared.watchNums[fname]++
  132. }
  133. return err
  134. }
  135. // removeWatch calls fsnotify.RemoveWatch for the input filename and closes the
  136. // corresponding events channel.
  137. func (shared *InotifyTracker) removeWatch(winfo *watchInfo) error {
  138. shared.mux.Lock()
  139. ch := shared.chans[winfo.fname]
  140. if ch != nil {
  141. delete(shared.chans, winfo.fname)
  142. close(ch)
  143. }
  144. fname := winfo.fname
  145. if winfo.isCreate() {
  146. // Watch for new files to be created in the parent directory.
  147. fname = filepath.Dir(fname)
  148. }
  149. shared.watchNums[fname]--
  150. watchNum := shared.watchNums[fname]
  151. if watchNum == 0 {
  152. delete(shared.watchNums, fname)
  153. }
  154. shared.mux.Unlock()
  155. var err error
  156. // If we were the last ones to watch this file, unsubscribe from inotify.
  157. // This needs to happen after releasing the lock because fsnotify waits
  158. // synchronously for the kernel to acknowledge the removal of the watch
  159. // for this file, which causes us to deadlock if we still held the lock.
  160. if watchNum == 0 {
  161. err = shared.watcher.Remove(fname)
  162. }
  163. return err
  164. }
  165. // sendEvent sends the input event to the appropriate Tail.
  166. func (shared *InotifyTracker) sendEvent(event fsnotify.Event) {
  167. name := filepath.Clean(event.Name)
  168. shared.mux.Lock()
  169. ch := shared.chans[name]
  170. done := shared.done[name]
  171. shared.mux.Unlock()
  172. if ch != nil && done != nil {
  173. select {
  174. case ch <- event:
  175. case <-done:
  176. }
  177. }
  178. }
  179. // run starts the goroutine in which the shared struct reads events from its
  180. // Watcher's Event channel and sends the events to the appropriate Tail.
  181. func (shared *InotifyTracker) run() {
  182. watcher, err := fsnotify.NewWatcher()
  183. if err != nil {
  184. util.Fatal("failed to create Watcher")
  185. }
  186. shared.watcher = watcher
  187. for {
  188. select {
  189. case winfo := <-shared.watch:
  190. shared.error <- shared.addWatch(winfo)
  191. case winfo := <-shared.remove:
  192. shared.error <- shared.removeWatch(winfo)
  193. case event, open := <-shared.watcher.Events:
  194. if !open {
  195. return
  196. }
  197. shared.sendEvent(event)
  198. case err, open := <-shared.watcher.Errors:
  199. if !open {
  200. return
  201. } else if err != nil {
  202. sysErr, ok := err.(*os.SyscallError)
  203. if !ok || sysErr.Err != syscall.EINTR {
  204. logger.Printf("Error in Watcher Error channel: %s", err)
  205. }
  206. }
  207. }
  208. }
  209. }