123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110 |
- package gonx
- import (
- "bufio"
- "bytes"
- "io"
- "sync"
- )
- func handleError(err error) {
-
- }
- func MapReduce(file io.Reader, parser StringParser, reducer Reducer) chan *Entry {
-
-
- var lines = make(chan string)
-
- var entries = make(chan *Entry, 10)
- go func(topLoad int) {
-
-
- var sem = make(chan bool, topLoad)
- for i := 0; i < topLoad; i++ {
-
- sem <- true
- }
- var wg sync.WaitGroup
- for {
-
- if !<-sem {
-
- break
- }
- wg.Add(1)
- go func() {
- defer wg.Done()
-
- line, ok := <-lines
-
- if !ok {
-
- sem <- false
- return
- }
- entry, err := parser.ParseString(line)
- if err == nil {
-
-
-
- entries <- entry
- } else {
- handleError(err)
- }
-
- sem <- true
- }()
- }
-
- wg.Wait()
- close(entries)
- }(cap(entries))
-
- var output = make(chan *Entry)
- go reducer.Reduce(entries, output)
- go func() {
- reader := bufio.NewReader(file)
- line, err := readLine(reader)
- for err == nil {
-
- lines <- line
- line, err = readLine(reader)
- }
- close(lines)
- if err != nil && err != io.EOF {
- handleError(err)
- }
- }()
- return output
- }
- func readLine(reader *bufio.Reader) (string, error) {
- line, isPrefix, err := reader.ReadLine()
- if err != nil {
- return "", err
- }
- if !isPrefix {
- return string(line), nil
- }
- var buffer bytes.Buffer
- _, err = buffer.Write(line)
- for isPrefix && err == nil {
- line, isPrefix, err = reader.ReadLine()
- if err == nil {
- _, err = buffer.Write(line)
- }
- }
- return buffer.String(), err
- }
|