md5-server_amd64.go 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307
  1. //+build !noasm,!appengine,gc
  2. // Copyright (c) 2020 MinIO Inc. All rights reserved.
  3. // Use of this source code is governed by a license that can be
  4. // found in the LICENSE file.
  5. package md5simd
  6. import (
  7. "encoding/binary"
  8. "fmt"
  9. "runtime"
  10. "github.com/klauspost/cpuid"
  11. )
  12. // MD5 initialization constants
  13. const (
  14. // Lanes is the number of concurrently calculated hashes.
  15. Lanes = 16
  16. init0 = 0x67452301
  17. init1 = 0xefcdab89
  18. init2 = 0x98badcfe
  19. init3 = 0x10325476
  20. )
  21. // md5ServerUID - Does not start at 0 but next multiple of 16 so as to be able to
  22. // differentiate with default initialisation value of 0
  23. const md5ServerUID = Lanes
  24. const buffersPerLane = 3
  25. // Message to send across input channel
  26. type blockInput struct {
  27. uid uint64
  28. msg []byte
  29. sumCh chan sumResult
  30. reset bool
  31. }
  32. type sumResult struct {
  33. digest [Size]byte
  34. }
  35. type lanesInfo [Lanes]blockInput
  36. // md5Server - Type to implement parallel handling of MD5 invocations
  37. type md5Server struct {
  38. uidCounter uint64
  39. cycle chan uint64 // client with uid has update.
  40. newInput chan newClient // Add new client.
  41. digests map[uint64][Size]byte // Map of uids to (interim) digest results
  42. maskRounds16 [16]maskRounds // Pre-allocated static array for max 16 rounds
  43. maskRounds8a [8]maskRounds // Pre-allocated static array for max 8 rounds (1st AVX2 core)
  44. maskRounds8b [8]maskRounds // Pre-allocated static array for max 8 rounds (2nd AVX2 core)
  45. allBufs []byte // Preallocated buffer.
  46. buffers chan []byte // Preallocated buffers, sliced from allBufs.
  47. }
  48. // NewServer - Create new object for parallel processing handling
  49. func NewServer() Server {
  50. if !cpuid.CPU.AVX2() {
  51. return &fallbackServer{}
  52. }
  53. md5srv := &md5Server{}
  54. md5srv.digests = make(map[uint64][Size]byte)
  55. md5srv.newInput = make(chan newClient, Lanes)
  56. md5srv.cycle = make(chan uint64, Lanes*10)
  57. md5srv.uidCounter = md5ServerUID - 1
  58. md5srv.allBufs = make([]byte, 32+buffersPerLane*Lanes*internalBlockSize)
  59. md5srv.buffers = make(chan []byte, buffersPerLane*Lanes)
  60. // Fill buffers.
  61. for i := 0; i < buffersPerLane*Lanes; i++ {
  62. s := 32 + i*internalBlockSize
  63. md5srv.buffers <- md5srv.allBufs[s : s+internalBlockSize : s+internalBlockSize]
  64. }
  65. // Start a single thread for reading from the input channel
  66. go md5srv.process(md5srv.newInput)
  67. return md5srv
  68. }
  69. type newClient struct {
  70. uid uint64
  71. input chan blockInput
  72. }
  73. // process - Sole handler for reading from the input channel.
  74. func (s *md5Server) process(newClients chan newClient) {
  75. // To fill up as many lanes as possible:
  76. //
  77. // 1. Wait for a cycle id.
  78. // 2. If not already in a lane, add, otherwise leave on channel
  79. // 3. Start timer
  80. // 4. Check if lanes is full, if so, goto 10 (process).
  81. // 5. If timeout, goto 10.
  82. // 6. Wait for new id (goto 2) or timeout (goto 10).
  83. // 10. Process.
  84. // 11. Check all input if there is already input, if so add to lanes.
  85. // 12. Goto 1
  86. // lanes contains the lanes.
  87. var lanes lanesInfo
  88. // lanesFilled contains the number of filled lanes for current cycle.
  89. var lanesFilled int
  90. // clients contains active clients
  91. var clients = make(map[uint64]chan blockInput, Lanes)
  92. addToLane := func(uid uint64) {
  93. cl, ok := clients[uid]
  94. if !ok {
  95. // Unknown client. Maybe it was already removed.
  96. return
  97. }
  98. // Check if we already have it.
  99. for _, lane := range lanes[:lanesFilled] {
  100. if lane.uid == uid {
  101. return
  102. }
  103. }
  104. // Continue until we get a block or there is nothing on channel
  105. for {
  106. select {
  107. case block, ok := <-cl:
  108. if !ok {
  109. // Client disconnected
  110. delete(clients, block.uid)
  111. return
  112. }
  113. if block.uid != uid {
  114. panic(fmt.Errorf("uid mismatch, %d (block) != %d (client)", block.uid, uid))
  115. }
  116. // If reset message, reset and we're done
  117. if block.reset {
  118. delete(s.digests, uid)
  119. continue
  120. }
  121. // If requesting sum, we will need to maintain state.
  122. if block.sumCh != nil {
  123. var dig digest
  124. d, ok := s.digests[uid]
  125. if ok {
  126. dig.s[0] = binary.LittleEndian.Uint32(d[0:4])
  127. dig.s[1] = binary.LittleEndian.Uint32(d[4:8])
  128. dig.s[2] = binary.LittleEndian.Uint32(d[8:12])
  129. dig.s[3] = binary.LittleEndian.Uint32(d[12:16])
  130. } else {
  131. dig.s[0], dig.s[1], dig.s[2], dig.s[3] = init0, init1, init2, init3
  132. }
  133. sum := sumResult{}
  134. // Add end block to current digest.
  135. blockGeneric(&dig, block.msg)
  136. binary.LittleEndian.PutUint32(sum.digest[0:], dig.s[0])
  137. binary.LittleEndian.PutUint32(sum.digest[4:], dig.s[1])
  138. binary.LittleEndian.PutUint32(sum.digest[8:], dig.s[2])
  139. binary.LittleEndian.PutUint32(sum.digest[12:], dig.s[3])
  140. block.sumCh <- sum
  141. if block.msg != nil {
  142. s.buffers <- block.msg
  143. }
  144. continue
  145. }
  146. if len(block.msg) == 0 {
  147. continue
  148. }
  149. lanes[lanesFilled] = block
  150. lanesFilled++
  151. return
  152. default:
  153. return
  154. }
  155. }
  156. }
  157. addNewClient := func(cl newClient) {
  158. if _, ok := clients[cl.uid]; ok {
  159. panic("internal error: duplicate client registration")
  160. }
  161. clients[cl.uid] = cl.input
  162. }
  163. allLanesFilled := func() bool {
  164. return lanesFilled == Lanes || lanesFilled >= len(clients)
  165. }
  166. for {
  167. // Step 1.
  168. for lanesFilled == 0 {
  169. select {
  170. case cl, ok := <-newClients:
  171. if !ok {
  172. return
  173. }
  174. addNewClient(cl)
  175. // Check if it already sent a payload.
  176. addToLane(cl.uid)
  177. continue
  178. case uid := <-s.cycle:
  179. addToLane(uid)
  180. }
  181. }
  182. fillLanes:
  183. for !allLanesFilled() {
  184. select {
  185. case cl, ok := <-newClients:
  186. if !ok {
  187. return
  188. }
  189. addNewClient(cl)
  190. case uid := <-s.cycle:
  191. addToLane(uid)
  192. default:
  193. // Nothing more queued...
  194. break fillLanes
  195. }
  196. }
  197. // If we did not fill all lanes, check if there is more waiting
  198. if !allLanesFilled() {
  199. runtime.Gosched()
  200. for uid := range clients {
  201. addToLane(uid)
  202. if allLanesFilled() {
  203. break
  204. }
  205. }
  206. }
  207. if false {
  208. if !allLanesFilled() {
  209. fmt.Println("Not all lanes filled", lanesFilled, "of", len(clients))
  210. //pprof.Lookup("goroutine").WriteTo(os.Stdout, 1)
  211. } else if true {
  212. fmt.Println("all lanes filled")
  213. }
  214. }
  215. // Process the lanes we could collect
  216. s.blocks(lanes[:lanesFilled])
  217. // Clear lanes...
  218. lanesFilled = 0
  219. // Add all current queued
  220. for uid := range clients {
  221. addToLane(uid)
  222. if allLanesFilled() {
  223. break
  224. }
  225. }
  226. }
  227. }
  228. func (s *md5Server) Close() {
  229. if s.newInput != nil {
  230. close(s.newInput)
  231. s.newInput = nil
  232. }
  233. }
  234. // Invoke assembly and send results back
  235. func (s *md5Server) blocks(lanes []blockInput) {
  236. inputs := [16][]byte{}
  237. for i := range lanes {
  238. inputs[i] = lanes[i].msg
  239. }
  240. // Collect active digests...
  241. state := s.getDigests(lanes)
  242. // Process all lanes...
  243. s.blockMd5_x16(&state, inputs, len(lanes) <= 8)
  244. for i, lane := range lanes {
  245. uid := lane.uid
  246. dig := [Size]byte{}
  247. binary.LittleEndian.PutUint32(dig[0:], state.v0[i])
  248. binary.LittleEndian.PutUint32(dig[4:], state.v1[i])
  249. binary.LittleEndian.PutUint32(dig[8:], state.v2[i])
  250. binary.LittleEndian.PutUint32(dig[12:], state.v3[i])
  251. s.digests[uid] = dig
  252. if lane.msg != nil {
  253. s.buffers <- lane.msg
  254. }
  255. lanes[i] = blockInput{}
  256. }
  257. }
  258. func (s *md5Server) getDigests(lanes []blockInput) (d digest16) {
  259. for i, lane := range lanes {
  260. a, ok := s.digests[lane.uid]
  261. if ok {
  262. d.v0[i] = binary.LittleEndian.Uint32(a[0:4])
  263. d.v1[i] = binary.LittleEndian.Uint32(a[4:8])
  264. d.v2[i] = binary.LittleEndian.Uint32(a[8:12])
  265. d.v3[i] = binary.LittleEndian.Uint32(a[12:16])
  266. } else {
  267. d.v0[i] = init0
  268. d.v1[i] = init1
  269. d.v2[i] = init2
  270. d.v3[i] = init3
  271. }
  272. }
  273. return
  274. }