123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200 |
- package gonx
- // Reducer interface for Entries channel redure.
- //
- // Each Reduce method should accept input channel of Entries, do it's job and
- // the result should be written to the output channel.
- //
- // It does not return values because usually it runs in a separate
- // goroutine and it is handy to use channel for reduced data retrieval.
- type Reducer interface {
- Reduce(input chan *Entry, output chan *Entry)
- }
- // ReadAll implements the Reducer interface for simple input entries redirected to
- // the output channel.
- type ReadAll struct {
- }
- // Reduce redirects input Entries channel directly to the output without any
- // modifications. It is useful when you want jast to read file fast
- // using asynchronous with mapper routines.
- func (r *ReadAll) Reduce(input chan *Entry, output chan *Entry) {
- for entry := range input {
- output <- entry
- }
- close(output)
- }
- // Count implements the Reducer interface to count entries
- type Count struct {
- }
- // Reduce simply counts entries and write a sum to the output channel
- func (r *Count) Reduce(input chan *Entry, output chan *Entry) {
- var count uint64
- for {
- _, ok := <-input
- if !ok {
- break
- }
- count++
- }
- entry := NewEmptyEntry()
- entry.SetUintField("count", count)
- output <- entry
- close(output)
- }
- // Sum implements the Reducer interface for summarize Entry values for the given fields
- type Sum struct {
- Fields []string
- }
- // Reduce summarizes given Entry fields and return a map with result for each field.
- func (r *Sum) Reduce(input chan *Entry, output chan *Entry) {
- sum := make(map[string]float64)
- for entry := range input {
- for _, name := range r.Fields {
- val, err := entry.FloatField(name)
- if err == nil {
- sum[name] += val
- }
- }
- }
- entry := NewEmptyEntry()
- for name, val := range sum {
- entry.SetFloatField(name, val)
- }
- output <- entry
- close(output)
- }
- // Avg implements the Reducer interface for average entries values calculation
- type Avg struct {
- Fields []string
- }
- // Reduce calculates the average value for input channel Entries, using configured Fields
- // of the struct. Write result to the output channel as map[string]float64
- func (r *Avg) Reduce(input chan *Entry, output chan *Entry) {
- avg := make(map[string]float64)
- count := 0.0
- for entry := range input {
- for _, name := range r.Fields {
- val, err := entry.FloatField(name)
- if err == nil {
- avg[name] = (avg[name]*count + val) / (count + 1)
- }
- }
- count++
- }
- entry := NewEmptyEntry()
- for name, val := range avg {
- entry.SetFloatField(name, val)
- }
- output <- entry
- close(output)
- }
- // Chain implements the Reducer interface for chaining other reducers
- type Chain struct {
- filters []Filter
- reducers []Reducer
- }
- // NewChain creates a new chain of Reducers
- func NewChain(reducers ...Reducer) *Chain {
- chain := new(Chain)
- for _, r := range reducers {
- if f, ok := interface{}(r).(Filter); ok {
- chain.filters = append(chain.filters, f)
- } else {
- chain.reducers = append(chain.reducers, r)
- }
- }
- return chain
- }
- // Reduce applies a chain of reducers to the input channel of entries and merge results
- func (r *Chain) Reduce(input chan *Entry, output chan *Entry) {
- // Make input and output channel for each reducer
- subInput := make([]chan *Entry, len(r.reducers))
- subOutput := make([]chan *Entry, len(r.reducers))
- for i, reducer := range r.reducers {
- subInput[i] = make(chan *Entry, cap(input))
- subOutput[i] = make(chan *Entry, cap(output))
- go reducer.Reduce(subInput[i], subOutput[i])
- }
- // Read reducer master input channel
- for entry := range input {
- for _, f := range r.filters {
- entry = f.Filter(entry)
- if entry == nil {
- break
- }
- }
- // Publish input entry for each sub-reducers to process
- if entry != nil {
- for _, sub := range subInput {
- sub <- entry
- }
- }
- }
- for _, ch := range subInput {
- close(ch)
- }
- // Merge all results
- entry := NewEmptyEntry()
- for _, result := range subOutput {
- entry.Merge(<-result)
- }
- output <- entry
- close(output)
- }
- // GroupBy implements the Reducer interface to apply other reducers and get data grouped by
- // given fields.
- type GroupBy struct {
- Fields []string
- reducers []Reducer
- }
- // NewGroupBy creates a new GroupBy Reducer
- func NewGroupBy(fields []string, reducers ...Reducer) *GroupBy {
- return &GroupBy{
- Fields: fields,
- reducers: reducers,
- }
- }
- // Reduce applies related reducers and group data by Fields.
- func (r *GroupBy) Reduce(input chan *Entry, output chan *Entry) {
- subInput := make(map[string]chan *Entry)
- subOutput := make(map[string]chan *Entry)
- // Read reducer master input channel and create discinct input chanel
- // for each entry key we group by
- for entry := range input {
- key := entry.FieldsHash(r.Fields)
- if _, ok := subInput[key]; !ok {
- subInput[key] = make(chan *Entry, cap(input))
- subOutput[key] = make(chan *Entry, cap(output)+1)
- subOutput[key] <- entry.Partial(r.Fields)
- go NewChain(r.reducers...).Reduce(subInput[key], subOutput[key])
- }
- subInput[key] <- entry
- }
- for _, ch := range subInput {
- close(ch)
- }
- for _, ch := range subOutput {
- entry := <-ch
- entry.Merge(<-ch)
- output <- entry
- }
- close(output)
- }
|