mapreduce.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110
  1. package gonx
  2. import (
  3. "bufio"
  4. "bytes"
  5. "io"
  6. "sync"
  7. )
  8. func handleError(err error) {
  9. //fmt.Fprintln(os.Stderr, err)
  10. }
  11. // Iterate over given file and map each it's line into Entry record using
  12. // parser and apply reducer to the Entries channel. Execution terminates
  13. // when result will be readed from reducer's output channel, but the mapper
  14. // works and fills input Entries channel until all lines will be read from
  15. // the fiven file.
  16. func MapReduce(file io.Reader, parser StringParser, reducer Reducer) chan *Entry {
  17. // Input file lines. This channel is unbuffered to publish
  18. // next line to handle only when previous is taken by mapper.
  19. var lines = make(chan string)
  20. // Host thread to spawn new mappers
  21. var entries = make(chan *Entry, 10)
  22. go func(topLoad int) {
  23. // Create semafore channel with capacity equal to the output channel
  24. // capacity. Use it to control mapper goroutines spawn.
  25. var sem = make(chan bool, topLoad)
  26. for i := 0; i < topLoad; i++ {
  27. // Ready to go!
  28. sem <- true
  29. }
  30. var wg sync.WaitGroup
  31. for {
  32. // Wait until semaphore becomes available and run a mapper
  33. if !<-sem {
  34. // Stop the host loop if false received from semaphore
  35. break
  36. }
  37. wg.Add(1)
  38. go func() {
  39. defer wg.Done()
  40. // Take next file line to map. Check is channel closed.
  41. line, ok := <-lines
  42. // Return immediately if lines channel is closed
  43. if !ok {
  44. // Send false to semaphore channel to indicate that job's done
  45. sem <- false
  46. return
  47. }
  48. entry, err := parser.ParseString(line)
  49. if err == nil {
  50. // Write result Entry to the output channel. This will
  51. // block goroutine runtime until channel is free to
  52. // accept new item.
  53. entries <- entry
  54. } else {
  55. handleError(err)
  56. }
  57. // Increment semaphore to allow new mapper workers to spawn
  58. sem <- true
  59. }()
  60. }
  61. // Wait for all mappers to complete, then send a quit signal
  62. wg.Wait()
  63. close(entries)
  64. }(cap(entries))
  65. // Run reducer routine.
  66. var output = make(chan *Entry)
  67. go reducer.Reduce(entries, output)
  68. go func() {
  69. reader := bufio.NewReader(file)
  70. line, err := readLine(reader)
  71. for err == nil {
  72. // Read next line from the file and feed mapper routines.
  73. lines <- line
  74. line, err = readLine(reader)
  75. }
  76. close(lines)
  77. if err != nil && err != io.EOF {
  78. handleError(err)
  79. }
  80. }()
  81. return output
  82. }
  83. func readLine(reader *bufio.Reader) (string, error) {
  84. line, isPrefix, err := reader.ReadLine()
  85. if err != nil {
  86. return "", err
  87. }
  88. if !isPrefix {
  89. return string(line), nil
  90. }
  91. var buffer bytes.Buffer
  92. _, err = buffer.Write(line)
  93. for isPrefix && err == nil {
  94. line, isPrefix, err = reader.ReadLine()
  95. if err == nil {
  96. _, err = buffer.Write(line)
  97. }
  98. }
  99. return buffer.String(), err
  100. }