funcs.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573
  1. /*
  2. *
  3. * Copyright 2018 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. // Package channelz defines APIs for enabling channelz service, entry
  19. // registration/deletion, and accessing channelz data. It also defines channelz
  20. // metric struct formats.
  21. //
  22. // All APIs in this package are experimental.
  23. package channelz
  24. import (
  25. "sort"
  26. "sync"
  27. "sync/atomic"
  28. "google.golang.org/grpc/grpclog"
  29. )
  30. var (
  31. db dbWrapper
  32. idGen idGenerator
  33. // EntryPerPage defines the number of channelz entries to be shown on a web page.
  34. EntryPerPage = 50
  35. curState int32
  36. )
  37. // TurnOn turns on channelz data collection.
  38. func TurnOn() {
  39. if !IsOn() {
  40. NewChannelzStorage()
  41. atomic.StoreInt32(&curState, 1)
  42. }
  43. }
  44. // IsOn returns whether channelz data collection is on.
  45. func IsOn() bool {
  46. return atomic.CompareAndSwapInt32(&curState, 1, 1)
  47. }
  48. // dbWarpper wraps around a reference to internal channelz data storage, and
  49. // provide synchronized functionality to set and get the reference.
  50. type dbWrapper struct {
  51. mu sync.RWMutex
  52. DB *channelMap
  53. }
  54. func (d *dbWrapper) set(db *channelMap) {
  55. d.mu.Lock()
  56. d.DB = db
  57. d.mu.Unlock()
  58. }
  59. func (d *dbWrapper) get() *channelMap {
  60. d.mu.RLock()
  61. defer d.mu.RUnlock()
  62. return d.DB
  63. }
  64. // NewChannelzStorage initializes channelz data storage and id generator.
  65. //
  66. // Note: This function is exported for testing purpose only. User should not call
  67. // it in most cases.
  68. func NewChannelzStorage() {
  69. db.set(&channelMap{
  70. topLevelChannels: make(map[int64]struct{}),
  71. channels: make(map[int64]*channel),
  72. listenSockets: make(map[int64]*listenSocket),
  73. normalSockets: make(map[int64]*normalSocket),
  74. servers: make(map[int64]*server),
  75. subChannels: make(map[int64]*subChannel),
  76. })
  77. idGen.reset()
  78. }
  79. // GetTopChannels returns a slice of top channel's ChannelMetric, along with a
  80. // boolean indicating whether there's more top channels to be queried for.
  81. //
  82. // The arg id specifies that only top channel with id at or above it will be included
  83. // in the result. The returned slice is up to a length of EntryPerPage, and is
  84. // sorted in ascending id order.
  85. func GetTopChannels(id int64) ([]*ChannelMetric, bool) {
  86. return db.get().GetTopChannels(id)
  87. }
  88. // GetServers returns a slice of server's ServerMetric, along with a
  89. // boolean indicating whether there's more servers to be queried for.
  90. //
  91. // The arg id specifies that only server with id at or above it will be included
  92. // in the result. The returned slice is up to a length of EntryPerPage, and is
  93. // sorted in ascending id order.
  94. func GetServers(id int64) ([]*ServerMetric, bool) {
  95. return db.get().GetServers(id)
  96. }
  97. // GetServerSockets returns a slice of server's (identified by id) normal socket's
  98. // SocketMetric, along with a boolean indicating whether there's more sockets to
  99. // be queried for.
  100. //
  101. // The arg startID specifies that only sockets with id at or above it will be
  102. // included in the result. The returned slice is up to a length of EntryPerPage,
  103. // and is sorted in ascending id order.
  104. func GetServerSockets(id int64, startID int64) ([]*SocketMetric, bool) {
  105. return db.get().GetServerSockets(id, startID)
  106. }
  107. // GetChannel returns the ChannelMetric for the channel (identified by id).
  108. func GetChannel(id int64) *ChannelMetric {
  109. return db.get().GetChannel(id)
  110. }
  111. // GetSubChannel returns the SubChannelMetric for the subchannel (identified by id).
  112. func GetSubChannel(id int64) *SubChannelMetric {
  113. return db.get().GetSubChannel(id)
  114. }
  115. // GetSocket returns the SocketInternalMetric for the socket (identified by id).
  116. func GetSocket(id int64) *SocketMetric {
  117. return db.get().GetSocket(id)
  118. }
  119. // RegisterChannel registers the given channel c in channelz database with ref
  120. // as its reference name, and add it to the child list of its parent (identified
  121. // by pid). pid = 0 means no parent. It returns the unique channelz tracking id
  122. // assigned to this channel.
  123. func RegisterChannel(c Channel, pid int64, ref string) int64 {
  124. id := idGen.genID()
  125. cn := &channel{
  126. refName: ref,
  127. c: c,
  128. subChans: make(map[int64]string),
  129. nestedChans: make(map[int64]string),
  130. id: id,
  131. pid: pid,
  132. }
  133. if pid == 0 {
  134. db.get().addChannel(id, cn, true, pid, ref)
  135. } else {
  136. db.get().addChannel(id, cn, false, pid, ref)
  137. }
  138. return id
  139. }
  140. // RegisterSubChannel registers the given channel c in channelz database with ref
  141. // as its reference name, and add it to the child list of its parent (identified
  142. // by pid). It returns the unique channelz tracking id assigned to this subchannel.
  143. func RegisterSubChannel(c Channel, pid int64, ref string) int64 {
  144. if pid == 0 {
  145. grpclog.Error("a SubChannel's parent id cannot be 0")
  146. return 0
  147. }
  148. id := idGen.genID()
  149. sc := &subChannel{
  150. refName: ref,
  151. c: c,
  152. sockets: make(map[int64]string),
  153. id: id,
  154. pid: pid,
  155. }
  156. db.get().addSubChannel(id, sc, pid, ref)
  157. return id
  158. }
  159. // RegisterServer registers the given server s in channelz database. It returns
  160. // the unique channelz tracking id assigned to this server.
  161. func RegisterServer(s Server, ref string) int64 {
  162. id := idGen.genID()
  163. svr := &server{
  164. refName: ref,
  165. s: s,
  166. sockets: make(map[int64]string),
  167. listenSockets: make(map[int64]string),
  168. id: id,
  169. }
  170. db.get().addServer(id, svr)
  171. return id
  172. }
  173. // RegisterListenSocket registers the given listen socket s in channelz database
  174. // with ref as its reference name, and add it to the child list of its parent
  175. // (identified by pid). It returns the unique channelz tracking id assigned to
  176. // this listen socket.
  177. func RegisterListenSocket(s Socket, pid int64, ref string) int64 {
  178. if pid == 0 {
  179. grpclog.Error("a ListenSocket's parent id cannot be 0")
  180. return 0
  181. }
  182. id := idGen.genID()
  183. ls := &listenSocket{refName: ref, s: s, id: id, pid: pid}
  184. db.get().addListenSocket(id, ls, pid, ref)
  185. return id
  186. }
  187. // RegisterNormalSocket registers the given normal socket s in channelz database
  188. // with ref as its reference name, and add it to the child list of its parent
  189. // (identified by pid). It returns the unique channelz tracking id assigned to
  190. // this normal socket.
  191. func RegisterNormalSocket(s Socket, pid int64, ref string) int64 {
  192. if pid == 0 {
  193. grpclog.Error("a NormalSocket's parent id cannot be 0")
  194. return 0
  195. }
  196. id := idGen.genID()
  197. ns := &normalSocket{refName: ref, s: s, id: id, pid: pid}
  198. db.get().addNormalSocket(id, ns, pid, ref)
  199. return id
  200. }
  201. // RemoveEntry removes an entry with unique channelz trakcing id to be id from
  202. // channelz database.
  203. func RemoveEntry(id int64) {
  204. db.get().removeEntry(id)
  205. }
  206. // channelMap is the storage data structure for channelz.
  207. // Methods of channelMap can be divided in two two categories with respect to locking.
  208. // 1. Methods acquire the global lock.
  209. // 2. Methods that can only be called when global lock is held.
  210. // A second type of method need always to be called inside a first type of method.
  211. type channelMap struct {
  212. mu sync.RWMutex
  213. topLevelChannels map[int64]struct{}
  214. servers map[int64]*server
  215. channels map[int64]*channel
  216. subChannels map[int64]*subChannel
  217. listenSockets map[int64]*listenSocket
  218. normalSockets map[int64]*normalSocket
  219. }
  220. func (c *channelMap) addServer(id int64, s *server) {
  221. c.mu.Lock()
  222. s.cm = c
  223. c.servers[id] = s
  224. c.mu.Unlock()
  225. }
  226. func (c *channelMap) addChannel(id int64, cn *channel, isTopChannel bool, pid int64, ref string) {
  227. c.mu.Lock()
  228. cn.cm = c
  229. c.channels[id] = cn
  230. if isTopChannel {
  231. c.topLevelChannels[id] = struct{}{}
  232. } else {
  233. c.findEntry(pid).addChild(id, cn)
  234. }
  235. c.mu.Unlock()
  236. }
  237. func (c *channelMap) addSubChannel(id int64, sc *subChannel, pid int64, ref string) {
  238. c.mu.Lock()
  239. sc.cm = c
  240. c.subChannels[id] = sc
  241. c.findEntry(pid).addChild(id, sc)
  242. c.mu.Unlock()
  243. }
  244. func (c *channelMap) addListenSocket(id int64, ls *listenSocket, pid int64, ref string) {
  245. c.mu.Lock()
  246. ls.cm = c
  247. c.listenSockets[id] = ls
  248. c.findEntry(pid).addChild(id, ls)
  249. c.mu.Unlock()
  250. }
  251. func (c *channelMap) addNormalSocket(id int64, ns *normalSocket, pid int64, ref string) {
  252. c.mu.Lock()
  253. ns.cm = c
  254. c.normalSockets[id] = ns
  255. c.findEntry(pid).addChild(id, ns)
  256. c.mu.Unlock()
  257. }
  258. // removeEntry triggers the removal of an entry, which may not indeed delete the
  259. // entry, if it has to wait on the deletion of its children, or may lead to a chain
  260. // of entry deletion. For example, deleting the last socket of a gracefully shutting
  261. // down server will lead to the server being also deleted.
  262. func (c *channelMap) removeEntry(id int64) {
  263. c.mu.Lock()
  264. c.findEntry(id).triggerDelete()
  265. c.mu.Unlock()
  266. }
  267. // c.mu must be held by the caller.
  268. func (c *channelMap) findEntry(id int64) entry {
  269. var v entry
  270. var ok bool
  271. if v, ok = c.channels[id]; ok {
  272. return v
  273. }
  274. if v, ok = c.subChannels[id]; ok {
  275. return v
  276. }
  277. if v, ok = c.servers[id]; ok {
  278. return v
  279. }
  280. if v, ok = c.listenSockets[id]; ok {
  281. return v
  282. }
  283. if v, ok = c.normalSockets[id]; ok {
  284. return v
  285. }
  286. return &dummyEntry{idNotFound: id}
  287. }
  288. // c.mu must be held by the caller
  289. // deleteEntry simply deletes an entry from the channelMap. Before calling this
  290. // method, caller must check this entry is ready to be deleted, i.e removeEntry()
  291. // has been called on it, and no children still exist.
  292. // Conditionals are ordered by the expected frequency of deletion of each entity
  293. // type, in order to optimize performance.
  294. func (c *channelMap) deleteEntry(id int64) {
  295. var ok bool
  296. if _, ok = c.normalSockets[id]; ok {
  297. delete(c.normalSockets, id)
  298. return
  299. }
  300. if _, ok = c.subChannels[id]; ok {
  301. delete(c.subChannels, id)
  302. return
  303. }
  304. if _, ok = c.channels[id]; ok {
  305. delete(c.channels, id)
  306. delete(c.topLevelChannels, id)
  307. return
  308. }
  309. if _, ok = c.listenSockets[id]; ok {
  310. delete(c.listenSockets, id)
  311. return
  312. }
  313. if _, ok = c.servers[id]; ok {
  314. delete(c.servers, id)
  315. return
  316. }
  317. }
  318. type int64Slice []int64
  319. func (s int64Slice) Len() int { return len(s) }
  320. func (s int64Slice) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
  321. func (s int64Slice) Less(i, j int) bool { return s[i] < s[j] }
  322. func copyMap(m map[int64]string) map[int64]string {
  323. n := make(map[int64]string)
  324. for k, v := range m {
  325. n[k] = v
  326. }
  327. return n
  328. }
  329. func min(a, b int) int {
  330. if a < b {
  331. return a
  332. }
  333. return b
  334. }
  335. func (c *channelMap) GetTopChannels(id int64) ([]*ChannelMetric, bool) {
  336. c.mu.RLock()
  337. l := len(c.topLevelChannels)
  338. ids := make([]int64, 0, l)
  339. cns := make([]*channel, 0, min(l, EntryPerPage))
  340. for k := range c.topLevelChannels {
  341. ids = append(ids, k)
  342. }
  343. sort.Sort(int64Slice(ids))
  344. idx := sort.Search(len(ids), func(i int) bool { return ids[i] >= id })
  345. count := 0
  346. var end bool
  347. var t []*ChannelMetric
  348. for i, v := range ids[idx:] {
  349. if count == EntryPerPage {
  350. break
  351. }
  352. if cn, ok := c.channels[v]; ok {
  353. cns = append(cns, cn)
  354. t = append(t, &ChannelMetric{
  355. NestedChans: copyMap(cn.nestedChans),
  356. SubChans: copyMap(cn.subChans),
  357. })
  358. count++
  359. }
  360. if i == len(ids[idx:])-1 {
  361. end = true
  362. break
  363. }
  364. }
  365. c.mu.RUnlock()
  366. if count == 0 {
  367. end = true
  368. }
  369. for i, cn := range cns {
  370. t[i].ChannelData = cn.c.ChannelzMetric()
  371. t[i].ID = cn.id
  372. t[i].RefName = cn.refName
  373. }
  374. return t, end
  375. }
  376. func (c *channelMap) GetServers(id int64) ([]*ServerMetric, bool) {
  377. c.mu.RLock()
  378. l := len(c.servers)
  379. ids := make([]int64, 0, l)
  380. ss := make([]*server, 0, min(l, EntryPerPage))
  381. for k := range c.servers {
  382. ids = append(ids, k)
  383. }
  384. sort.Sort(int64Slice(ids))
  385. idx := sort.Search(len(ids), func(i int) bool { return ids[i] >= id })
  386. count := 0
  387. var end bool
  388. var s []*ServerMetric
  389. for i, v := range ids[idx:] {
  390. if count == EntryPerPage {
  391. break
  392. }
  393. if svr, ok := c.servers[v]; ok {
  394. ss = append(ss, svr)
  395. s = append(s, &ServerMetric{
  396. ListenSockets: copyMap(svr.listenSockets),
  397. })
  398. count++
  399. }
  400. if i == len(ids[idx:])-1 {
  401. end = true
  402. break
  403. }
  404. }
  405. c.mu.RUnlock()
  406. if count == 0 {
  407. end = true
  408. }
  409. for i, svr := range ss {
  410. s[i].ServerData = svr.s.ChannelzMetric()
  411. s[i].ID = svr.id
  412. s[i].RefName = svr.refName
  413. }
  414. return s, end
  415. }
  416. func (c *channelMap) GetServerSockets(id int64, startID int64) ([]*SocketMetric, bool) {
  417. var svr *server
  418. var ok bool
  419. c.mu.RLock()
  420. if svr, ok = c.servers[id]; !ok {
  421. // server with id doesn't exist.
  422. c.mu.RUnlock()
  423. return nil, true
  424. }
  425. svrskts := svr.sockets
  426. l := len(svrskts)
  427. ids := make([]int64, 0, l)
  428. sks := make([]*normalSocket, 0, min(l, EntryPerPage))
  429. for k := range svrskts {
  430. ids = append(ids, k)
  431. }
  432. sort.Sort((int64Slice(ids)))
  433. idx := sort.Search(len(ids), func(i int) bool { return ids[i] >= id })
  434. count := 0
  435. var end bool
  436. for i, v := range ids[idx:] {
  437. if count == EntryPerPage {
  438. break
  439. }
  440. if ns, ok := c.normalSockets[v]; ok {
  441. sks = append(sks, ns)
  442. count++
  443. }
  444. if i == len(ids[idx:])-1 {
  445. end = true
  446. break
  447. }
  448. }
  449. c.mu.RUnlock()
  450. if count == 0 {
  451. end = true
  452. }
  453. var s []*SocketMetric
  454. for _, ns := range sks {
  455. sm := &SocketMetric{}
  456. sm.SocketData = ns.s.ChannelzMetric()
  457. sm.ID = ns.id
  458. sm.RefName = ns.refName
  459. s = append(s, sm)
  460. }
  461. return s, end
  462. }
  463. func (c *channelMap) GetChannel(id int64) *ChannelMetric {
  464. cm := &ChannelMetric{}
  465. var cn *channel
  466. var ok bool
  467. c.mu.RLock()
  468. if cn, ok = c.channels[id]; !ok {
  469. // channel with id doesn't exist.
  470. c.mu.RUnlock()
  471. return nil
  472. }
  473. cm.NestedChans = copyMap(cn.nestedChans)
  474. cm.SubChans = copyMap(cn.subChans)
  475. c.mu.RUnlock()
  476. cm.ChannelData = cn.c.ChannelzMetric()
  477. cm.ID = cn.id
  478. cm.RefName = cn.refName
  479. return cm
  480. }
  481. func (c *channelMap) GetSubChannel(id int64) *SubChannelMetric {
  482. cm := &SubChannelMetric{}
  483. var sc *subChannel
  484. var ok bool
  485. c.mu.RLock()
  486. if sc, ok = c.subChannels[id]; !ok {
  487. // subchannel with id doesn't exist.
  488. c.mu.RUnlock()
  489. return nil
  490. }
  491. cm.Sockets = copyMap(sc.sockets)
  492. c.mu.RUnlock()
  493. cm.ChannelData = sc.c.ChannelzMetric()
  494. cm.ID = sc.id
  495. cm.RefName = sc.refName
  496. return cm
  497. }
  498. func (c *channelMap) GetSocket(id int64) *SocketMetric {
  499. sm := &SocketMetric{}
  500. c.mu.RLock()
  501. if ls, ok := c.listenSockets[id]; ok {
  502. c.mu.RUnlock()
  503. sm.SocketData = ls.s.ChannelzMetric()
  504. sm.ID = ls.id
  505. sm.RefName = ls.refName
  506. return sm
  507. }
  508. if ns, ok := c.normalSockets[id]; ok {
  509. c.mu.RUnlock()
  510. sm.SocketData = ns.s.ChannelzMetric()
  511. sm.ID = ns.id
  512. sm.RefName = ns.refName
  513. return sm
  514. }
  515. c.mu.RUnlock()
  516. return nil
  517. }
  518. type idGenerator struct {
  519. id int64
  520. }
  521. func (i *idGenerator) reset() {
  522. atomic.StoreInt64(&i.id, 0)
  523. }
  524. func (i *idGenerator) genID() int64 {
  525. return atomic.AddInt64(&i.id, 1)
  526. }