package gonx

// Reducer interface for Entries channel redure.
//
// Each Reduce method should accept input channel of Entries, do it's job and
// the result should be written to the output channel.
//
// It does not return values because usually it runs in a separate
// goroutine and it is handy to use channel for reduced data retrieval.
type Reducer interface {
	Reduce(input chan *Entry, output chan *Entry)
}

// ReadAll implements the Reducer interface for simple input entries redirected to
// the output channel.
type ReadAll struct {
}

// Reduce redirects input Entries channel directly to the output without any
// modifications. It is useful when you want jast to read file fast
// using asynchronous with mapper routines.
func (r *ReadAll) Reduce(input chan *Entry, output chan *Entry) {
	for entry := range input {
		output <- entry
	}
	close(output)
}

// Count implements the Reducer interface to count entries
type Count struct {
}

// Reduce simply counts entries and write a sum to the output channel
func (r *Count) Reduce(input chan *Entry, output chan *Entry) {
	var count uint64
	for {
		_, ok := <-input
		if !ok {
			break
		}
		count++
	}
	entry := NewEmptyEntry()
	entry.SetUintField("count", count)
	output <- entry
	close(output)
}

// Sum implements the Reducer interface for summarize Entry values for the given fields
type Sum struct {
	Fields []string
}

// Reduce summarizes given Entry fields and return a map with result for each field.
func (r *Sum) Reduce(input chan *Entry, output chan *Entry) {
	sum := make(map[string]float64)
	for entry := range input {
		for _, name := range r.Fields {
			val, err := entry.FloatField(name)
			if err == nil {
				sum[name] += val
			}
		}
	}
	entry := NewEmptyEntry()
	for name, val := range sum {
		entry.SetFloatField(name, val)
	}
	output <- entry
	close(output)
}

// Avg implements the Reducer interface for average entries values calculation
type Avg struct {
	Fields []string
}

// Reduce calculates the average value for input channel Entries, using configured Fields
// of the struct. Write result to the output channel as map[string]float64
func (r *Avg) Reduce(input chan *Entry, output chan *Entry) {
	avg := make(map[string]float64)
	count := 0.0
	for entry := range input {
		for _, name := range r.Fields {
			val, err := entry.FloatField(name)
			if err == nil {
				avg[name] = (avg[name]*count + val) / (count + 1)
			}
		}
		count++
	}
	entry := NewEmptyEntry()
	for name, val := range avg {
		entry.SetFloatField(name, val)
	}
	output <- entry
	close(output)
}

// Chain implements the Reducer interface for chaining other reducers
type Chain struct {
	filters  []Filter
	reducers []Reducer
}

// NewChain creates a new chain of Reducers
func NewChain(reducers ...Reducer) *Chain {
	chain := new(Chain)
	for _, r := range reducers {
		if f, ok := interface{}(r).(Filter); ok {
			chain.filters = append(chain.filters, f)
		} else {
			chain.reducers = append(chain.reducers, r)
		}
	}
	return chain
}

// Reduce applies a chain of reducers to the input channel of entries and merge results
func (r *Chain) Reduce(input chan *Entry, output chan *Entry) {
	// Make input and output channel for each reducer
	subInput := make([]chan *Entry, len(r.reducers))
	subOutput := make([]chan *Entry, len(r.reducers))
	for i, reducer := range r.reducers {
		subInput[i] = make(chan *Entry, cap(input))
		subOutput[i] = make(chan *Entry, cap(output))
		go reducer.Reduce(subInput[i], subOutput[i])
	}

	// Read reducer master input channel
	for entry := range input {
		for _, f := range r.filters {
			entry = f.Filter(entry)
			if entry == nil {
				break
			}
		}
		// Publish input entry for each sub-reducers to process
		if entry != nil {
			for _, sub := range subInput {
				sub <- entry
			}
		}
	}
	for _, ch := range subInput {
		close(ch)
	}

	// Merge all results
	entry := NewEmptyEntry()
	for _, result := range subOutput {
		entry.Merge(<-result)
	}

	output <- entry
	close(output)
}

// GroupBy implements the Reducer interface to apply other reducers and get data grouped by
// given fields.
type GroupBy struct {
	Fields   []string
	reducers []Reducer
}

// NewGroupBy creates a new GroupBy Reducer
func NewGroupBy(fields []string, reducers ...Reducer) *GroupBy {
	return &GroupBy{
		Fields:   fields,
		reducers: reducers,
	}
}

// Reduce applies related reducers and group data by Fields.
func (r *GroupBy) Reduce(input chan *Entry, output chan *Entry) {
	subInput := make(map[string]chan *Entry)
	subOutput := make(map[string]chan *Entry)

	// Read reducer master input channel and create discinct input chanel
	// for each entry key we group by
	for entry := range input {
		key := entry.FieldsHash(r.Fields)
		if _, ok := subInput[key]; !ok {
			subInput[key] = make(chan *Entry, cap(input))
			subOutput[key] = make(chan *Entry, cap(output)+1)
			subOutput[key] <- entry.Partial(r.Fields)
			go NewChain(r.reducers...).Reduce(subInput[key], subOutput[key])
		}
		subInput[key] <- entry
	}
	for _, ch := range subInput {
		close(ch)
	}
	for _, ch := range subOutput {
		entry := <-ch
		entry.Merge(<-ch)
		output <- entry
	}
	close(output)
}