inotify_tracker.go 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260
  1. // Copyright (c) 2015 HPE Software Inc. All rights reserved.
  2. // Copyright (c) 2013 ActiveState Software Inc. All rights reserved.
  3. package watch
  4. import (
  5. "log"
  6. "os"
  7. "path/filepath"
  8. "sync"
  9. "syscall"
  10. "github.com/hpcloud/tail/util"
  11. "gopkg.in/fsnotify.v1"
  12. )
  13. type InotifyTracker struct {
  14. mux sync.Mutex
  15. watcher *fsnotify.Watcher
  16. chans map[string]chan fsnotify.Event
  17. done map[string]chan bool
  18. watchNums map[string]int
  19. watch chan *watchInfo
  20. remove chan *watchInfo
  21. error chan error
  22. }
  23. type watchInfo struct {
  24. op fsnotify.Op
  25. fname string
  26. }
  27. func (this *watchInfo) isCreate() bool {
  28. return this.op == fsnotify.Create
  29. }
  30. var (
  31. // globally shared InotifyTracker; ensures only one fsnotify.Watcher is used
  32. shared *InotifyTracker
  33. // these are used to ensure the shared InotifyTracker is run exactly once
  34. once = sync.Once{}
  35. goRun = func() {
  36. shared = &InotifyTracker{
  37. mux: sync.Mutex{},
  38. chans: make(map[string]chan fsnotify.Event),
  39. done: make(map[string]chan bool),
  40. watchNums: make(map[string]int),
  41. watch: make(chan *watchInfo),
  42. remove: make(chan *watchInfo),
  43. error: make(chan error),
  44. }
  45. go shared.run()
  46. }
  47. logger = log.New(os.Stderr, "", log.LstdFlags)
  48. )
  49. // Watch signals the run goroutine to begin watching the input filename
  50. func Watch(fname string) error {
  51. return watch(&watchInfo{
  52. fname: fname,
  53. })
  54. }
  55. // Watch create signals the run goroutine to begin watching the input filename
  56. // if call the WatchCreate function, don't call the Cleanup, call the RemoveWatchCreate
  57. func WatchCreate(fname string) error {
  58. return watch(&watchInfo{
  59. op: fsnotify.Create,
  60. fname: fname,
  61. })
  62. }
  63. func watch(winfo *watchInfo) error {
  64. // start running the shared InotifyTracker if not already running
  65. once.Do(goRun)
  66. winfo.fname = filepath.Clean(winfo.fname)
  67. shared.watch <- winfo
  68. return <-shared.error
  69. }
  70. // RemoveWatch signals the run goroutine to remove the watch for the input filename
  71. func RemoveWatch(fname string) {
  72. remove(&watchInfo{
  73. fname: fname,
  74. })
  75. }
  76. // RemoveWatch create signals the run goroutine to remove the watch for the input filename
  77. func RemoveWatchCreate(fname string) {
  78. remove(&watchInfo{
  79. op: fsnotify.Create,
  80. fname: fname,
  81. })
  82. }
  83. func remove(winfo *watchInfo) {
  84. // start running the shared InotifyTracker if not already running
  85. once.Do(goRun)
  86. winfo.fname = filepath.Clean(winfo.fname)
  87. shared.mux.Lock()
  88. done := shared.done[winfo.fname]
  89. if done != nil {
  90. delete(shared.done, winfo.fname)
  91. close(done)
  92. }
  93. fname := winfo.fname
  94. if winfo.isCreate() {
  95. // Watch for new files to be created in the parent directory.
  96. fname = filepath.Dir(fname)
  97. }
  98. shared.watchNums[fname]--
  99. watchNum := shared.watchNums[fname]
  100. if watchNum == 0 {
  101. delete(shared.watchNums, fname)
  102. }
  103. shared.mux.Unlock()
  104. // If we were the last ones to watch this file, unsubscribe from inotify.
  105. // This needs to happen after releasing the lock because fsnotify waits
  106. // synchronously for the kernel to acknowledge the removal of the watch
  107. // for this file, which causes us to deadlock if we still held the lock.
  108. if watchNum == 0 {
  109. shared.watcher.Remove(fname)
  110. }
  111. shared.remove <- winfo
  112. }
  113. // Events returns a channel to which FileEvents corresponding to the input filename
  114. // will be sent. This channel will be closed when removeWatch is called on this
  115. // filename.
  116. func Events(fname string) <-chan fsnotify.Event {
  117. shared.mux.Lock()
  118. defer shared.mux.Unlock()
  119. return shared.chans[fname]
  120. }
  121. // Cleanup removes the watch for the input filename if necessary.
  122. func Cleanup(fname string) {
  123. RemoveWatch(fname)
  124. }
  125. // watchFlags calls fsnotify.WatchFlags for the input filename and flags, creating
  126. // a new Watcher if the previous Watcher was closed.
  127. func (shared *InotifyTracker) addWatch(winfo *watchInfo) error {
  128. shared.mux.Lock()
  129. defer shared.mux.Unlock()
  130. if shared.chans[winfo.fname] == nil {
  131. shared.chans[winfo.fname] = make(chan fsnotify.Event)
  132. shared.done[winfo.fname] = make(chan bool)
  133. }
  134. fname := winfo.fname
  135. if winfo.isCreate() {
  136. // Watch for new files to be created in the parent directory.
  137. fname = filepath.Dir(fname)
  138. }
  139. // already in inotify watch
  140. if shared.watchNums[fname] > 0 {
  141. shared.watchNums[fname]++
  142. if winfo.isCreate() {
  143. shared.watchNums[winfo.fname]++
  144. }
  145. return nil
  146. }
  147. err := shared.watcher.Add(fname)
  148. if err == nil {
  149. shared.watchNums[fname]++
  150. if winfo.isCreate() {
  151. shared.watchNums[winfo.fname]++
  152. }
  153. }
  154. return err
  155. }
  156. // removeWatch calls fsnotify.RemoveWatch for the input filename and closes the
  157. // corresponding events channel.
  158. func (shared *InotifyTracker) removeWatch(winfo *watchInfo) {
  159. shared.mux.Lock()
  160. defer shared.mux.Unlock()
  161. ch := shared.chans[winfo.fname]
  162. if ch == nil {
  163. return
  164. }
  165. delete(shared.chans, winfo.fname)
  166. close(ch)
  167. if !winfo.isCreate() {
  168. return
  169. }
  170. shared.watchNums[winfo.fname]--
  171. if shared.watchNums[winfo.fname] == 0 {
  172. delete(shared.watchNums, winfo.fname)
  173. }
  174. }
  175. // sendEvent sends the input event to the appropriate Tail.
  176. func (shared *InotifyTracker) sendEvent(event fsnotify.Event) {
  177. name := filepath.Clean(event.Name)
  178. shared.mux.Lock()
  179. ch := shared.chans[name]
  180. done := shared.done[name]
  181. shared.mux.Unlock()
  182. if ch != nil && done != nil {
  183. select {
  184. case ch <- event:
  185. case <-done:
  186. }
  187. }
  188. }
  189. // run starts the goroutine in which the shared struct reads events from its
  190. // Watcher's Event channel and sends the events to the appropriate Tail.
  191. func (shared *InotifyTracker) run() {
  192. watcher, err := fsnotify.NewWatcher()
  193. if err != nil {
  194. util.Fatal("failed to create Watcher")
  195. }
  196. shared.watcher = watcher
  197. for {
  198. select {
  199. case winfo := <-shared.watch:
  200. shared.error <- shared.addWatch(winfo)
  201. case winfo := <-shared.remove:
  202. shared.removeWatch(winfo)
  203. case event, open := <-shared.watcher.Events:
  204. if !open {
  205. return
  206. }
  207. shared.sendEvent(event)
  208. case err, open := <-shared.watcher.Errors:
  209. if !open {
  210. return
  211. } else if err != nil {
  212. sysErr, ok := err.(*os.SyscallError)
  213. if !ok || sysErr.Err != syscall.EINTR {
  214. logger.Printf("Error in Watcher Error channel: %s", err)
  215. }
  216. }
  217. }
  218. }
  219. }