reducer.go 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200
  1. package gonx
  2. // Reducer interface for Entries channel redure.
  3. //
  4. // Each Reduce method should accept input channel of Entries, do it's job and
  5. // the result should be written to the output channel.
  6. //
  7. // It does not return values because usually it runs in a separate
  8. // goroutine and it is handy to use channel for reduced data retrieval.
  9. type Reducer interface {
  10. Reduce(input chan *Entry, output chan *Entry)
  11. }
  12. // ReadAll implements the Reducer interface for simple input entries redirected to
  13. // the output channel.
  14. type ReadAll struct {
  15. }
  16. // Reduce redirects input Entries channel directly to the output without any
  17. // modifications. It is useful when you want jast to read file fast
  18. // using asynchronous with mapper routines.
  19. func (r *ReadAll) Reduce(input chan *Entry, output chan *Entry) {
  20. for entry := range input {
  21. output <- entry
  22. }
  23. close(output)
  24. }
  25. // Count implements the Reducer interface to count entries
  26. type Count struct {
  27. }
  28. // Reduce simply counts entries and write a sum to the output channel
  29. func (r *Count) Reduce(input chan *Entry, output chan *Entry) {
  30. var count uint64
  31. for {
  32. _, ok := <-input
  33. if !ok {
  34. break
  35. }
  36. count++
  37. }
  38. entry := NewEmptyEntry()
  39. entry.SetUintField("count", count)
  40. output <- entry
  41. close(output)
  42. }
  43. // Sum implements the Reducer interface for summarize Entry values for the given fields
  44. type Sum struct {
  45. Fields []string
  46. }
  47. // Reduce summarizes given Entry fields and return a map with result for each field.
  48. func (r *Sum) Reduce(input chan *Entry, output chan *Entry) {
  49. sum := make(map[string]float64)
  50. for entry := range input {
  51. for _, name := range r.Fields {
  52. val, err := entry.FloatField(name)
  53. if err == nil {
  54. sum[name] += val
  55. }
  56. }
  57. }
  58. entry := NewEmptyEntry()
  59. for name, val := range sum {
  60. entry.SetFloatField(name, val)
  61. }
  62. output <- entry
  63. close(output)
  64. }
  65. // Avg implements the Reducer interface for average entries values calculation
  66. type Avg struct {
  67. Fields []string
  68. }
  69. // Reduce calculates the average value for input channel Entries, using configured Fields
  70. // of the struct. Write result to the output channel as map[string]float64
  71. func (r *Avg) Reduce(input chan *Entry, output chan *Entry) {
  72. avg := make(map[string]float64)
  73. count := 0.0
  74. for entry := range input {
  75. for _, name := range r.Fields {
  76. val, err := entry.FloatField(name)
  77. if err == nil {
  78. avg[name] = (avg[name]*count + val) / (count + 1)
  79. }
  80. }
  81. count++
  82. }
  83. entry := NewEmptyEntry()
  84. for name, val := range avg {
  85. entry.SetFloatField(name, val)
  86. }
  87. output <- entry
  88. close(output)
  89. }
  90. // Chain implements the Reducer interface for chaining other reducers
  91. type Chain struct {
  92. filters []Filter
  93. reducers []Reducer
  94. }
  95. // NewChain creates a new chain of Reducers
  96. func NewChain(reducers ...Reducer) *Chain {
  97. chain := new(Chain)
  98. for _, r := range reducers {
  99. if f, ok := interface{}(r).(Filter); ok {
  100. chain.filters = append(chain.filters, f)
  101. } else {
  102. chain.reducers = append(chain.reducers, r)
  103. }
  104. }
  105. return chain
  106. }
  107. // Reduce applies a chain of reducers to the input channel of entries and merge results
  108. func (r *Chain) Reduce(input chan *Entry, output chan *Entry) {
  109. // Make input and output channel for each reducer
  110. subInput := make([]chan *Entry, len(r.reducers))
  111. subOutput := make([]chan *Entry, len(r.reducers))
  112. for i, reducer := range r.reducers {
  113. subInput[i] = make(chan *Entry, cap(input))
  114. subOutput[i] = make(chan *Entry, cap(output))
  115. go reducer.Reduce(subInput[i], subOutput[i])
  116. }
  117. // Read reducer master input channel
  118. for entry := range input {
  119. for _, f := range r.filters {
  120. entry = f.Filter(entry)
  121. if entry == nil {
  122. break
  123. }
  124. }
  125. // Publish input entry for each sub-reducers to process
  126. if entry != nil {
  127. for _, sub := range subInput {
  128. sub <- entry
  129. }
  130. }
  131. }
  132. for _, ch := range subInput {
  133. close(ch)
  134. }
  135. // Merge all results
  136. entry := NewEmptyEntry()
  137. for _, result := range subOutput {
  138. entry.Merge(<-result)
  139. }
  140. output <- entry
  141. close(output)
  142. }
  143. // GroupBy implements the Reducer interface to apply other reducers and get data grouped by
  144. // given fields.
  145. type GroupBy struct {
  146. Fields []string
  147. reducers []Reducer
  148. }
  149. // NewGroupBy creates a new GroupBy Reducer
  150. func NewGroupBy(fields []string, reducers ...Reducer) *GroupBy {
  151. return &GroupBy{
  152. Fields: fields,
  153. reducers: reducers,
  154. }
  155. }
  156. // Reduce applies related reducers and group data by Fields.
  157. func (r *GroupBy) Reduce(input chan *Entry, output chan *Entry) {
  158. subInput := make(map[string]chan *Entry)
  159. subOutput := make(map[string]chan *Entry)
  160. // Read reducer master input channel and create discinct input chanel
  161. // for each entry key we group by
  162. for entry := range input {
  163. key := entry.FieldsHash(r.Fields)
  164. if _, ok := subInput[key]; !ok {
  165. subInput[key] = make(chan *Entry, cap(input))
  166. subOutput[key] = make(chan *Entry, cap(output)+1)
  167. subOutput[key] <- entry.Partial(r.Fields)
  168. go NewChain(r.reducers...).Reduce(subInput[key], subOutput[key])
  169. }
  170. subInput[key] <- entry
  171. }
  172. for _, ch := range subInput {
  173. close(ch)
  174. }
  175. for _, ch := range subOutput {
  176. entry := <-ch
  177. entry.Merge(<-ch)
  178. output <- entry
  179. }
  180. close(output)
  181. }