123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198 |
- package gonx
- type Reducer interface {
- Reduce(input chan *Entry, output chan *Entry)
- }
- type ReadAll struct {
- }
- func (r *ReadAll) Reduce(input chan *Entry, output chan *Entry) {
- for entry := range input {
- output <- entry
- }
- close(output)
- }
- type Count struct {
- }
- func (r *Count) Reduce(input chan *Entry, output chan *Entry) {
- var count uint64
- for {
- _, ok := <-input
- if !ok {
- break
- }
- count++
- }
- entry := NewEmptyEntry()
- entry.SetUintField("count", count)
- output <- entry
- close(output)
- }
- type Sum struct {
- Fields []string
- }
- func (r *Sum) Reduce(input chan *Entry, output chan *Entry) {
- sum := make(map[string]float64)
- for entry := range input {
- for _, name := range r.Fields {
- val, err := entry.FloatField(name)
- if err == nil {
- sum[name] += val
- }
- }
- }
- entry := NewEmptyEntry()
- for name, val := range sum {
- entry.SetFloatField(name, val)
- }
- output <- entry
- close(output)
- }
- type Avg struct {
- Fields []string
- }
- func (r *Avg) Reduce(input chan *Entry, output chan *Entry) {
- avg := make(map[string]float64)
- count := 0.0
- for entry := range input {
- for _, name := range r.Fields {
- val, err := entry.FloatField(name)
- if err == nil {
- avg[name] = (avg[name]*count + val) / (count + 1)
- }
- }
- count++
- }
- entry := NewEmptyEntry()
- for name, val := range avg {
- entry.SetFloatField(name, val)
- }
- output <- entry
- close(output)
- }
- type Chain struct {
- filters []Filter
- reducers []Reducer
- }
- func NewChain(reducers ...Reducer) *Chain {
- chain := new(Chain)
- for _, r := range reducers {
- if f, ok := interface{}(r).(Filter); ok {
- chain.filters = append(chain.filters, f)
- } else {
- chain.reducers = append(chain.reducers, r)
- }
- }
- return chain
- }
- func (r *Chain) Reduce(input chan *Entry, output chan *Entry) {
-
- subInput := make([]chan *Entry, len(r.reducers))
- subOutput := make([]chan *Entry, len(r.reducers))
- for i, reducer := range r.reducers {
- subInput[i] = make(chan *Entry, cap(input))
- subOutput[i] = make(chan *Entry, cap(output))
- go reducer.Reduce(subInput[i], subOutput[i])
- }
-
- for entry := range input {
- for _, f := range r.filters {
- entry = f.Filter(entry)
- if entry == nil {
- break
- }
- }
-
- if entry != nil {
- for _, sub := range subInput {
- sub <- entry
- }
- }
- }
- for _, ch := range subInput {
- close(ch)
- }
-
- entry := NewEmptyEntry()
- for _, result := range subOutput {
- entry.Merge(<-result)
- }
- output <- entry
- close(output)
- }
- type GroupBy struct {
- Fields []string
- reducers []Reducer
- }
- func NewGroupBy(fields []string, reducers ...Reducer) *GroupBy {
- return &GroupBy{
- Fields: fields,
- reducers: reducers,
- }
- }
- func (r *GroupBy) Reduce(input chan *Entry, output chan *Entry) {
- subInput := make(map[string]chan *Entry)
- subOutput := make(map[string]chan *Entry)
-
-
- for entry := range input {
- key := entry.FieldsHash(r.Fields)
- if _, ok := subInput[key]; !ok {
- subInput[key] = make(chan *Entry, cap(input))
- subOutput[key] = make(chan *Entry, cap(output)+1)
- subOutput[key] <- entry.Partial(r.Fields)
- go NewChain(r.reducers...).Reduce(subInput[key], subOutput[key])
- }
- subInput[key] <- entry
- }
- for _, ch := range subInput {
- close(ch)
- }
- for _, ch := range subOutput {
- entry := <-ch
- entry.Merge(<-ch)
- output <- entry
- }
- close(output)
- }
|