parser.go 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481
  1. // Copyright 2012-2018 The NATS Authors
  2. // Licensed under the Apache License, Version 2.0 (the "License");
  3. // you may not use this file except in compliance with the License.
  4. // You may obtain a copy of the License at
  5. //
  6. // http://www.apache.org/licenses/LICENSE-2.0
  7. //
  8. // Unless required by applicable law or agreed to in writing, software
  9. // distributed under the License is distributed on an "AS IS" BASIS,
  10. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  11. // See the License for the specific language governing permissions and
  12. // limitations under the License.
  13. package nats
  14. import (
  15. "fmt"
  16. )
  17. type msgArg struct {
  18. subject []byte
  19. reply []byte
  20. sid int64
  21. size int
  22. }
  23. const MAX_CONTROL_LINE_SIZE = 1024
  24. type parseState struct {
  25. state int
  26. as int
  27. drop int
  28. ma msgArg
  29. argBuf []byte
  30. msgBuf []byte
  31. scratch [MAX_CONTROL_LINE_SIZE]byte
  32. }
  33. const (
  34. OP_START = iota
  35. OP_PLUS
  36. OP_PLUS_O
  37. OP_PLUS_OK
  38. OP_MINUS
  39. OP_MINUS_E
  40. OP_MINUS_ER
  41. OP_MINUS_ERR
  42. OP_MINUS_ERR_SPC
  43. MINUS_ERR_ARG
  44. OP_M
  45. OP_MS
  46. OP_MSG
  47. OP_MSG_SPC
  48. MSG_ARG
  49. MSG_PAYLOAD
  50. MSG_END
  51. OP_P
  52. OP_PI
  53. OP_PIN
  54. OP_PING
  55. OP_PO
  56. OP_PON
  57. OP_PONG
  58. OP_I
  59. OP_IN
  60. OP_INF
  61. OP_INFO
  62. OP_INFO_SPC
  63. INFO_ARG
  64. )
  65. // parse is the fast protocol parser engine.
  66. func (nc *Conn) parse(buf []byte) error {
  67. var i int
  68. var b byte
  69. // Move to loop instead of range syntax to allow jumping of i
  70. for i = 0; i < len(buf); i++ {
  71. b = buf[i]
  72. switch nc.ps.state {
  73. case OP_START:
  74. switch b {
  75. case 'M', 'm':
  76. nc.ps.state = OP_M
  77. case 'P', 'p':
  78. nc.ps.state = OP_P
  79. case '+':
  80. nc.ps.state = OP_PLUS
  81. case '-':
  82. nc.ps.state = OP_MINUS
  83. case 'I', 'i':
  84. nc.ps.state = OP_I
  85. default:
  86. goto parseErr
  87. }
  88. case OP_M:
  89. switch b {
  90. case 'S', 's':
  91. nc.ps.state = OP_MS
  92. default:
  93. goto parseErr
  94. }
  95. case OP_MS:
  96. switch b {
  97. case 'G', 'g':
  98. nc.ps.state = OP_MSG
  99. default:
  100. goto parseErr
  101. }
  102. case OP_MSG:
  103. switch b {
  104. case ' ', '\t':
  105. nc.ps.state = OP_MSG_SPC
  106. default:
  107. goto parseErr
  108. }
  109. case OP_MSG_SPC:
  110. switch b {
  111. case ' ', '\t':
  112. continue
  113. default:
  114. nc.ps.state = MSG_ARG
  115. nc.ps.as = i
  116. }
  117. case MSG_ARG:
  118. switch b {
  119. case '\r':
  120. nc.ps.drop = 1
  121. case '\n':
  122. var arg []byte
  123. if nc.ps.argBuf != nil {
  124. arg = nc.ps.argBuf
  125. } else {
  126. arg = buf[nc.ps.as : i-nc.ps.drop]
  127. }
  128. if err := nc.processMsgArgs(arg); err != nil {
  129. return err
  130. }
  131. nc.ps.drop, nc.ps.as, nc.ps.state = 0, i+1, MSG_PAYLOAD
  132. // jump ahead with the index. If this overruns
  133. // what is left we fall out and process split
  134. // buffer.
  135. i = nc.ps.as + nc.ps.ma.size - 1
  136. default:
  137. if nc.ps.argBuf != nil {
  138. nc.ps.argBuf = append(nc.ps.argBuf, b)
  139. }
  140. }
  141. case MSG_PAYLOAD:
  142. if nc.ps.msgBuf != nil {
  143. if len(nc.ps.msgBuf) >= nc.ps.ma.size {
  144. nc.processMsg(nc.ps.msgBuf)
  145. nc.ps.argBuf, nc.ps.msgBuf, nc.ps.state = nil, nil, MSG_END
  146. } else {
  147. // copy as much as we can to the buffer and skip ahead.
  148. toCopy := nc.ps.ma.size - len(nc.ps.msgBuf)
  149. avail := len(buf) - i
  150. if avail < toCopy {
  151. toCopy = avail
  152. }
  153. if toCopy > 0 {
  154. start := len(nc.ps.msgBuf)
  155. // This is needed for copy to work.
  156. nc.ps.msgBuf = nc.ps.msgBuf[:start+toCopy]
  157. copy(nc.ps.msgBuf[start:], buf[i:i+toCopy])
  158. // Update our index
  159. i = (i + toCopy) - 1
  160. } else {
  161. nc.ps.msgBuf = append(nc.ps.msgBuf, b)
  162. }
  163. }
  164. } else if i-nc.ps.as >= nc.ps.ma.size {
  165. nc.processMsg(buf[nc.ps.as:i])
  166. nc.ps.argBuf, nc.ps.msgBuf, nc.ps.state = nil, nil, MSG_END
  167. }
  168. case MSG_END:
  169. switch b {
  170. case '\n':
  171. nc.ps.drop, nc.ps.as, nc.ps.state = 0, i+1, OP_START
  172. default:
  173. continue
  174. }
  175. case OP_PLUS:
  176. switch b {
  177. case 'O', 'o':
  178. nc.ps.state = OP_PLUS_O
  179. default:
  180. goto parseErr
  181. }
  182. case OP_PLUS_O:
  183. switch b {
  184. case 'K', 'k':
  185. nc.ps.state = OP_PLUS_OK
  186. default:
  187. goto parseErr
  188. }
  189. case OP_PLUS_OK:
  190. switch b {
  191. case '\n':
  192. nc.processOK()
  193. nc.ps.drop, nc.ps.state = 0, OP_START
  194. }
  195. case OP_MINUS:
  196. switch b {
  197. case 'E', 'e':
  198. nc.ps.state = OP_MINUS_E
  199. default:
  200. goto parseErr
  201. }
  202. case OP_MINUS_E:
  203. switch b {
  204. case 'R', 'r':
  205. nc.ps.state = OP_MINUS_ER
  206. default:
  207. goto parseErr
  208. }
  209. case OP_MINUS_ER:
  210. switch b {
  211. case 'R', 'r':
  212. nc.ps.state = OP_MINUS_ERR
  213. default:
  214. goto parseErr
  215. }
  216. case OP_MINUS_ERR:
  217. switch b {
  218. case ' ', '\t':
  219. nc.ps.state = OP_MINUS_ERR_SPC
  220. default:
  221. goto parseErr
  222. }
  223. case OP_MINUS_ERR_SPC:
  224. switch b {
  225. case ' ', '\t':
  226. continue
  227. default:
  228. nc.ps.state = MINUS_ERR_ARG
  229. nc.ps.as = i
  230. }
  231. case MINUS_ERR_ARG:
  232. switch b {
  233. case '\r':
  234. nc.ps.drop = 1
  235. case '\n':
  236. var arg []byte
  237. if nc.ps.argBuf != nil {
  238. arg = nc.ps.argBuf
  239. nc.ps.argBuf = nil
  240. } else {
  241. arg = buf[nc.ps.as : i-nc.ps.drop]
  242. }
  243. nc.processErr(string(arg))
  244. nc.ps.drop, nc.ps.as, nc.ps.state = 0, i+1, OP_START
  245. default:
  246. if nc.ps.argBuf != nil {
  247. nc.ps.argBuf = append(nc.ps.argBuf, b)
  248. }
  249. }
  250. case OP_P:
  251. switch b {
  252. case 'I', 'i':
  253. nc.ps.state = OP_PI
  254. case 'O', 'o':
  255. nc.ps.state = OP_PO
  256. default:
  257. goto parseErr
  258. }
  259. case OP_PO:
  260. switch b {
  261. case 'N', 'n':
  262. nc.ps.state = OP_PON
  263. default:
  264. goto parseErr
  265. }
  266. case OP_PON:
  267. switch b {
  268. case 'G', 'g':
  269. nc.ps.state = OP_PONG
  270. default:
  271. goto parseErr
  272. }
  273. case OP_PONG:
  274. switch b {
  275. case '\n':
  276. nc.processPong()
  277. nc.ps.drop, nc.ps.state = 0, OP_START
  278. }
  279. case OP_PI:
  280. switch b {
  281. case 'N', 'n':
  282. nc.ps.state = OP_PIN
  283. default:
  284. goto parseErr
  285. }
  286. case OP_PIN:
  287. switch b {
  288. case 'G', 'g':
  289. nc.ps.state = OP_PING
  290. default:
  291. goto parseErr
  292. }
  293. case OP_PING:
  294. switch b {
  295. case '\n':
  296. nc.processPing()
  297. nc.ps.drop, nc.ps.state = 0, OP_START
  298. }
  299. case OP_I:
  300. switch b {
  301. case 'N', 'n':
  302. nc.ps.state = OP_IN
  303. default:
  304. goto parseErr
  305. }
  306. case OP_IN:
  307. switch b {
  308. case 'F', 'f':
  309. nc.ps.state = OP_INF
  310. default:
  311. goto parseErr
  312. }
  313. case OP_INF:
  314. switch b {
  315. case 'O', 'o':
  316. nc.ps.state = OP_INFO
  317. default:
  318. goto parseErr
  319. }
  320. case OP_INFO:
  321. switch b {
  322. case ' ', '\t':
  323. nc.ps.state = OP_INFO_SPC
  324. default:
  325. goto parseErr
  326. }
  327. case OP_INFO_SPC:
  328. switch b {
  329. case ' ', '\t':
  330. continue
  331. default:
  332. nc.ps.state = INFO_ARG
  333. nc.ps.as = i
  334. }
  335. case INFO_ARG:
  336. switch b {
  337. case '\r':
  338. nc.ps.drop = 1
  339. case '\n':
  340. var arg []byte
  341. if nc.ps.argBuf != nil {
  342. arg = nc.ps.argBuf
  343. nc.ps.argBuf = nil
  344. } else {
  345. arg = buf[nc.ps.as : i-nc.ps.drop]
  346. }
  347. nc.processAsyncInfo(arg)
  348. nc.ps.drop, nc.ps.as, nc.ps.state = 0, i+1, OP_START
  349. default:
  350. if nc.ps.argBuf != nil {
  351. nc.ps.argBuf = append(nc.ps.argBuf, b)
  352. }
  353. }
  354. default:
  355. goto parseErr
  356. }
  357. }
  358. // Check for split buffer scenarios
  359. if (nc.ps.state == MSG_ARG || nc.ps.state == MINUS_ERR_ARG || nc.ps.state == INFO_ARG) && nc.ps.argBuf == nil {
  360. nc.ps.argBuf = nc.ps.scratch[:0]
  361. nc.ps.argBuf = append(nc.ps.argBuf, buf[nc.ps.as:i-nc.ps.drop]...)
  362. // FIXME, check max len
  363. }
  364. // Check for split msg
  365. if nc.ps.state == MSG_PAYLOAD && nc.ps.msgBuf == nil {
  366. // We need to clone the msgArg if it is still referencing the
  367. // read buffer and we are not able to process the msg.
  368. if nc.ps.argBuf == nil {
  369. nc.cloneMsgArg()
  370. }
  371. // If we will overflow the scratch buffer, just create a
  372. // new buffer to hold the split message.
  373. if nc.ps.ma.size > cap(nc.ps.scratch)-len(nc.ps.argBuf) {
  374. lrem := len(buf[nc.ps.as:])
  375. nc.ps.msgBuf = make([]byte, lrem, nc.ps.ma.size)
  376. copy(nc.ps.msgBuf, buf[nc.ps.as:])
  377. } else {
  378. nc.ps.msgBuf = nc.ps.scratch[len(nc.ps.argBuf):len(nc.ps.argBuf)]
  379. nc.ps.msgBuf = append(nc.ps.msgBuf, (buf[nc.ps.as:])...)
  380. }
  381. }
  382. return nil
  383. parseErr:
  384. return fmt.Errorf("nats: Parse Error [%d]: '%s'", nc.ps.state, buf[i:])
  385. }
  386. // cloneMsgArg is used when the split buffer scenario has the pubArg in the existing read buffer, but
  387. // we need to hold onto it into the next read.
  388. func (nc *Conn) cloneMsgArg() {
  389. nc.ps.argBuf = nc.ps.scratch[:0]
  390. nc.ps.argBuf = append(nc.ps.argBuf, nc.ps.ma.subject...)
  391. nc.ps.argBuf = append(nc.ps.argBuf, nc.ps.ma.reply...)
  392. nc.ps.ma.subject = nc.ps.argBuf[:len(nc.ps.ma.subject)]
  393. if nc.ps.ma.reply != nil {
  394. nc.ps.ma.reply = nc.ps.argBuf[len(nc.ps.ma.subject):]
  395. }
  396. }
  397. const argsLenMax = 4
  398. func (nc *Conn) processMsgArgs(arg []byte) error {
  399. // Unroll splitArgs to avoid runtime/heap issues
  400. a := [argsLenMax][]byte{}
  401. args := a[:0]
  402. start := -1
  403. for i, b := range arg {
  404. switch b {
  405. case ' ', '\t', '\r', '\n':
  406. if start >= 0 {
  407. args = append(args, arg[start:i])
  408. start = -1
  409. }
  410. default:
  411. if start < 0 {
  412. start = i
  413. }
  414. }
  415. }
  416. if start >= 0 {
  417. args = append(args, arg[start:])
  418. }
  419. switch len(args) {
  420. case 3:
  421. nc.ps.ma.subject = args[0]
  422. nc.ps.ma.sid = parseInt64(args[1])
  423. nc.ps.ma.reply = nil
  424. nc.ps.ma.size = int(parseInt64(args[2]))
  425. case 4:
  426. nc.ps.ma.subject = args[0]
  427. nc.ps.ma.sid = parseInt64(args[1])
  428. nc.ps.ma.reply = args[2]
  429. nc.ps.ma.size = int(parseInt64(args[3]))
  430. default:
  431. return fmt.Errorf("nats: processMsgArgs Parse Error: '%s'", arg)
  432. }
  433. if nc.ps.ma.sid < 0 {
  434. return fmt.Errorf("nats: processMsgArgs Bad or Missing Sid: '%s'", arg)
  435. }
  436. if nc.ps.ma.size < 0 {
  437. return fmt.Errorf("nats: processMsgArgs Bad or Missing Size: '%s'", arg)
  438. }
  439. return nil
  440. }
  441. // Ascii numbers 0-9
  442. const (
  443. ascii_0 = 48
  444. ascii_9 = 57
  445. )
  446. // parseInt64 expects decimal positive numbers. We
  447. // return -1 to signal error
  448. func parseInt64(d []byte) (n int64) {
  449. if len(d) == 0 {
  450. return -1
  451. }
  452. for _, dec := range d {
  453. if dec < ascii_0 || dec > ascii_9 {
  454. return -1
  455. }
  456. n = n*10 + (int64(dec) - ascii_0)
  457. }
  458. return n
  459. }