parser.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552
  1. // Copyright 2012-2020 The NATS Authors
  2. // Licensed under the Apache License, Version 2.0 (the "License");
  3. // you may not use this file except in compliance with the License.
  4. // You may obtain a copy of the License at
  5. //
  6. // http://www.apache.org/licenses/LICENSE-2.0
  7. //
  8. // Unless required by applicable law or agreed to in writing, software
  9. // distributed under the License is distributed on an "AS IS" BASIS,
  10. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  11. // See the License for the specific language governing permissions and
  12. // limitations under the License.
  13. package nats
  14. import (
  15. "fmt"
  16. )
  17. type msgArg struct {
  18. subject []byte
  19. reply []byte
  20. sid int64
  21. hdr int
  22. size int
  23. }
  24. const MAX_CONTROL_LINE_SIZE = 4096
  25. type parseState struct {
  26. state int
  27. as int
  28. drop int
  29. hdr int
  30. ma msgArg
  31. argBuf []byte
  32. msgBuf []byte
  33. scratch [MAX_CONTROL_LINE_SIZE]byte
  34. }
  35. const (
  36. OP_START = iota
  37. OP_PLUS
  38. OP_PLUS_O
  39. OP_PLUS_OK
  40. OP_MINUS
  41. OP_MINUS_E
  42. OP_MINUS_ER
  43. OP_MINUS_ERR
  44. OP_MINUS_ERR_SPC
  45. MINUS_ERR_ARG
  46. OP_M
  47. OP_MS
  48. OP_MSG
  49. OP_MSG_SPC
  50. MSG_ARG
  51. MSG_PAYLOAD
  52. MSG_END
  53. OP_H
  54. OP_P
  55. OP_PI
  56. OP_PIN
  57. OP_PING
  58. OP_PO
  59. OP_PON
  60. OP_PONG
  61. OP_I
  62. OP_IN
  63. OP_INF
  64. OP_INFO
  65. OP_INFO_SPC
  66. INFO_ARG
  67. )
  68. // parse is the fast protocol parser engine.
  69. func (nc *Conn) parse(buf []byte) error {
  70. var i int
  71. var b byte
  72. // Move to loop instead of range syntax to allow jumping of i
  73. for i = 0; i < len(buf); i++ {
  74. b = buf[i]
  75. switch nc.ps.state {
  76. case OP_START:
  77. switch b {
  78. case 'M', 'm':
  79. nc.ps.state = OP_M
  80. nc.ps.hdr = -1
  81. nc.ps.ma.hdr = -1
  82. case 'H', 'h':
  83. nc.ps.state = OP_H
  84. nc.ps.hdr = 0
  85. nc.ps.ma.hdr = 0
  86. case 'P', 'p':
  87. nc.ps.state = OP_P
  88. case '+':
  89. nc.ps.state = OP_PLUS
  90. case '-':
  91. nc.ps.state = OP_MINUS
  92. case 'I', 'i':
  93. nc.ps.state = OP_I
  94. default:
  95. goto parseErr
  96. }
  97. case OP_H:
  98. switch b {
  99. case 'M', 'm':
  100. nc.ps.state = OP_M
  101. default:
  102. goto parseErr
  103. }
  104. case OP_M:
  105. switch b {
  106. case 'S', 's':
  107. nc.ps.state = OP_MS
  108. default:
  109. goto parseErr
  110. }
  111. case OP_MS:
  112. switch b {
  113. case 'G', 'g':
  114. nc.ps.state = OP_MSG
  115. default:
  116. goto parseErr
  117. }
  118. case OP_MSG:
  119. switch b {
  120. case ' ', '\t':
  121. nc.ps.state = OP_MSG_SPC
  122. default:
  123. goto parseErr
  124. }
  125. case OP_MSG_SPC:
  126. switch b {
  127. case ' ', '\t':
  128. continue
  129. default:
  130. nc.ps.state = MSG_ARG
  131. nc.ps.as = i
  132. }
  133. case MSG_ARG:
  134. switch b {
  135. case '\r':
  136. nc.ps.drop = 1
  137. case '\n':
  138. var arg []byte
  139. if nc.ps.argBuf != nil {
  140. arg = nc.ps.argBuf
  141. } else {
  142. arg = buf[nc.ps.as : i-nc.ps.drop]
  143. }
  144. if err := nc.processMsgArgs(arg); err != nil {
  145. return err
  146. }
  147. nc.ps.drop, nc.ps.as, nc.ps.state = 0, i+1, MSG_PAYLOAD
  148. // jump ahead with the index. If this overruns
  149. // what is left we fall out and process a split buffer.
  150. i = nc.ps.as + nc.ps.ma.size - 1
  151. default:
  152. if nc.ps.argBuf != nil {
  153. nc.ps.argBuf = append(nc.ps.argBuf, b)
  154. }
  155. }
  156. case MSG_PAYLOAD:
  157. if nc.ps.msgBuf != nil {
  158. if len(nc.ps.msgBuf) >= nc.ps.ma.size {
  159. nc.processMsg(nc.ps.msgBuf)
  160. nc.ps.argBuf, nc.ps.msgBuf, nc.ps.state = nil, nil, MSG_END
  161. } else {
  162. // copy as much as we can to the buffer and skip ahead.
  163. toCopy := nc.ps.ma.size - len(nc.ps.msgBuf)
  164. avail := len(buf) - i
  165. if avail < toCopy {
  166. toCopy = avail
  167. }
  168. if toCopy > 0 {
  169. start := len(nc.ps.msgBuf)
  170. // This is needed for copy to work.
  171. nc.ps.msgBuf = nc.ps.msgBuf[:start+toCopy]
  172. copy(nc.ps.msgBuf[start:], buf[i:i+toCopy])
  173. // Update our index
  174. i = (i + toCopy) - 1
  175. } else {
  176. nc.ps.msgBuf = append(nc.ps.msgBuf, b)
  177. }
  178. }
  179. } else if i-nc.ps.as >= nc.ps.ma.size {
  180. nc.processMsg(buf[nc.ps.as:i])
  181. nc.ps.argBuf, nc.ps.msgBuf, nc.ps.state = nil, nil, MSG_END
  182. }
  183. case MSG_END:
  184. switch b {
  185. case '\n':
  186. nc.ps.drop, nc.ps.as, nc.ps.state = 0, i+1, OP_START
  187. default:
  188. continue
  189. }
  190. case OP_PLUS:
  191. switch b {
  192. case 'O', 'o':
  193. nc.ps.state = OP_PLUS_O
  194. default:
  195. goto parseErr
  196. }
  197. case OP_PLUS_O:
  198. switch b {
  199. case 'K', 'k':
  200. nc.ps.state = OP_PLUS_OK
  201. default:
  202. goto parseErr
  203. }
  204. case OP_PLUS_OK:
  205. switch b {
  206. case '\n':
  207. nc.processOK()
  208. nc.ps.drop, nc.ps.state = 0, OP_START
  209. }
  210. case OP_MINUS:
  211. switch b {
  212. case 'E', 'e':
  213. nc.ps.state = OP_MINUS_E
  214. default:
  215. goto parseErr
  216. }
  217. case OP_MINUS_E:
  218. switch b {
  219. case 'R', 'r':
  220. nc.ps.state = OP_MINUS_ER
  221. default:
  222. goto parseErr
  223. }
  224. case OP_MINUS_ER:
  225. switch b {
  226. case 'R', 'r':
  227. nc.ps.state = OP_MINUS_ERR
  228. default:
  229. goto parseErr
  230. }
  231. case OP_MINUS_ERR:
  232. switch b {
  233. case ' ', '\t':
  234. nc.ps.state = OP_MINUS_ERR_SPC
  235. default:
  236. goto parseErr
  237. }
  238. case OP_MINUS_ERR_SPC:
  239. switch b {
  240. case ' ', '\t':
  241. continue
  242. default:
  243. nc.ps.state = MINUS_ERR_ARG
  244. nc.ps.as = i
  245. }
  246. case MINUS_ERR_ARG:
  247. switch b {
  248. case '\r':
  249. nc.ps.drop = 1
  250. case '\n':
  251. var arg []byte
  252. if nc.ps.argBuf != nil {
  253. arg = nc.ps.argBuf
  254. nc.ps.argBuf = nil
  255. } else {
  256. arg = buf[nc.ps.as : i-nc.ps.drop]
  257. }
  258. nc.processErr(string(arg))
  259. nc.ps.drop, nc.ps.as, nc.ps.state = 0, i+1, OP_START
  260. default:
  261. if nc.ps.argBuf != nil {
  262. nc.ps.argBuf = append(nc.ps.argBuf, b)
  263. }
  264. }
  265. case OP_P:
  266. switch b {
  267. case 'I', 'i':
  268. nc.ps.state = OP_PI
  269. case 'O', 'o':
  270. nc.ps.state = OP_PO
  271. default:
  272. goto parseErr
  273. }
  274. case OP_PO:
  275. switch b {
  276. case 'N', 'n':
  277. nc.ps.state = OP_PON
  278. default:
  279. goto parseErr
  280. }
  281. case OP_PON:
  282. switch b {
  283. case 'G', 'g':
  284. nc.ps.state = OP_PONG
  285. default:
  286. goto parseErr
  287. }
  288. case OP_PONG:
  289. switch b {
  290. case '\n':
  291. nc.processPong()
  292. nc.ps.drop, nc.ps.state = 0, OP_START
  293. }
  294. case OP_PI:
  295. switch b {
  296. case 'N', 'n':
  297. nc.ps.state = OP_PIN
  298. default:
  299. goto parseErr
  300. }
  301. case OP_PIN:
  302. switch b {
  303. case 'G', 'g':
  304. nc.ps.state = OP_PING
  305. default:
  306. goto parseErr
  307. }
  308. case OP_PING:
  309. switch b {
  310. case '\n':
  311. nc.processPing()
  312. nc.ps.drop, nc.ps.state = 0, OP_START
  313. }
  314. case OP_I:
  315. switch b {
  316. case 'N', 'n':
  317. nc.ps.state = OP_IN
  318. default:
  319. goto parseErr
  320. }
  321. case OP_IN:
  322. switch b {
  323. case 'F', 'f':
  324. nc.ps.state = OP_INF
  325. default:
  326. goto parseErr
  327. }
  328. case OP_INF:
  329. switch b {
  330. case 'O', 'o':
  331. nc.ps.state = OP_INFO
  332. default:
  333. goto parseErr
  334. }
  335. case OP_INFO:
  336. switch b {
  337. case ' ', '\t':
  338. nc.ps.state = OP_INFO_SPC
  339. default:
  340. goto parseErr
  341. }
  342. case OP_INFO_SPC:
  343. switch b {
  344. case ' ', '\t':
  345. continue
  346. default:
  347. nc.ps.state = INFO_ARG
  348. nc.ps.as = i
  349. }
  350. case INFO_ARG:
  351. switch b {
  352. case '\r':
  353. nc.ps.drop = 1
  354. case '\n':
  355. var arg []byte
  356. if nc.ps.argBuf != nil {
  357. arg = nc.ps.argBuf
  358. nc.ps.argBuf = nil
  359. } else {
  360. arg = buf[nc.ps.as : i-nc.ps.drop]
  361. }
  362. nc.processAsyncInfo(arg)
  363. nc.ps.drop, nc.ps.as, nc.ps.state = 0, i+1, OP_START
  364. default:
  365. if nc.ps.argBuf != nil {
  366. nc.ps.argBuf = append(nc.ps.argBuf, b)
  367. }
  368. }
  369. default:
  370. goto parseErr
  371. }
  372. }
  373. // Check for split buffer scenarios
  374. if (nc.ps.state == MSG_ARG || nc.ps.state == MINUS_ERR_ARG || nc.ps.state == INFO_ARG) && nc.ps.argBuf == nil {
  375. nc.ps.argBuf = nc.ps.scratch[:0]
  376. nc.ps.argBuf = append(nc.ps.argBuf, buf[nc.ps.as:i-nc.ps.drop]...)
  377. // FIXME, check max len
  378. }
  379. // Check for split msg
  380. if nc.ps.state == MSG_PAYLOAD && nc.ps.msgBuf == nil {
  381. // We need to clone the msgArg if it is still referencing the
  382. // read buffer and we are not able to process the msg.
  383. if nc.ps.argBuf == nil {
  384. nc.cloneMsgArg()
  385. }
  386. // If we will overflow the scratch buffer, just create a
  387. // new buffer to hold the split message.
  388. if nc.ps.ma.size > cap(nc.ps.scratch)-len(nc.ps.argBuf) {
  389. lrem := len(buf[nc.ps.as:])
  390. nc.ps.msgBuf = make([]byte, lrem, nc.ps.ma.size)
  391. copy(nc.ps.msgBuf, buf[nc.ps.as:])
  392. } else {
  393. nc.ps.msgBuf = nc.ps.scratch[len(nc.ps.argBuf):len(nc.ps.argBuf)]
  394. nc.ps.msgBuf = append(nc.ps.msgBuf, (buf[nc.ps.as:])...)
  395. }
  396. }
  397. return nil
  398. parseErr:
  399. return fmt.Errorf("nats: Parse Error [%d]: '%s'", nc.ps.state, buf[i:])
  400. }
  401. // cloneMsgArg is used when the split buffer scenario has the pubArg in the existing read buffer, but
  402. // we need to hold onto it into the next read.
  403. func (nc *Conn) cloneMsgArg() {
  404. nc.ps.argBuf = nc.ps.scratch[:0]
  405. nc.ps.argBuf = append(nc.ps.argBuf, nc.ps.ma.subject...)
  406. nc.ps.argBuf = append(nc.ps.argBuf, nc.ps.ma.reply...)
  407. nc.ps.ma.subject = nc.ps.argBuf[:len(nc.ps.ma.subject)]
  408. if nc.ps.ma.reply != nil {
  409. nc.ps.ma.reply = nc.ps.argBuf[len(nc.ps.ma.subject):]
  410. }
  411. }
  412. const argsLenMax = 4
  413. func (nc *Conn) processMsgArgs(arg []byte) error {
  414. // Use separate function for header based messages.
  415. if nc.ps.hdr >= 0 {
  416. return nc.processHeaderMsgArgs(arg)
  417. }
  418. // Unroll splitArgs to avoid runtime/heap issues
  419. a := [argsLenMax][]byte{}
  420. args := a[:0]
  421. start := -1
  422. for i, b := range arg {
  423. switch b {
  424. case ' ', '\t', '\r', '\n':
  425. if start >= 0 {
  426. args = append(args, arg[start:i])
  427. start = -1
  428. }
  429. default:
  430. if start < 0 {
  431. start = i
  432. }
  433. }
  434. }
  435. if start >= 0 {
  436. args = append(args, arg[start:])
  437. }
  438. switch len(args) {
  439. case 3:
  440. nc.ps.ma.subject = args[0]
  441. nc.ps.ma.sid = parseInt64(args[1])
  442. nc.ps.ma.reply = nil
  443. nc.ps.ma.size = int(parseInt64(args[2]))
  444. case 4:
  445. nc.ps.ma.subject = args[0]
  446. nc.ps.ma.sid = parseInt64(args[1])
  447. nc.ps.ma.reply = args[2]
  448. nc.ps.ma.size = int(parseInt64(args[3]))
  449. default:
  450. return fmt.Errorf("nats: processMsgArgs Parse Error: '%s'", arg)
  451. }
  452. if nc.ps.ma.sid < 0 {
  453. return fmt.Errorf("nats: processMsgArgs Bad or Missing Sid: '%s'", arg)
  454. }
  455. if nc.ps.ma.size < 0 {
  456. return fmt.Errorf("nats: processMsgArgs Bad or Missing Size: '%s'", arg)
  457. }
  458. return nil
  459. }
  460. // processHeaderMsgArgs is for a header based message.
  461. func (nc *Conn) processHeaderMsgArgs(arg []byte) error {
  462. // Unroll splitArgs to avoid runtime/heap issues
  463. a := [argsLenMax][]byte{}
  464. args := a[:0]
  465. start := -1
  466. for i, b := range arg {
  467. switch b {
  468. case ' ', '\t', '\r', '\n':
  469. if start >= 0 {
  470. args = append(args, arg[start:i])
  471. start = -1
  472. }
  473. default:
  474. if start < 0 {
  475. start = i
  476. }
  477. }
  478. }
  479. if start >= 0 {
  480. args = append(args, arg[start:])
  481. }
  482. switch len(args) {
  483. case 4:
  484. nc.ps.ma.subject = args[0]
  485. nc.ps.ma.sid = parseInt64(args[1])
  486. nc.ps.ma.reply = nil
  487. nc.ps.ma.hdr = int(parseInt64(args[2]))
  488. nc.ps.ma.size = int(parseInt64(args[3]))
  489. case 5:
  490. nc.ps.ma.subject = args[0]
  491. nc.ps.ma.sid = parseInt64(args[1])
  492. nc.ps.ma.reply = args[2]
  493. nc.ps.ma.hdr = int(parseInt64(args[3]))
  494. nc.ps.ma.size = int(parseInt64(args[4]))
  495. default:
  496. return fmt.Errorf("nats: processHeaderMsgArgs Parse Error: '%s'", arg)
  497. }
  498. if nc.ps.ma.sid < 0 {
  499. return fmt.Errorf("nats: processHeaderMsgArgs Bad or Missing Sid: '%s'", arg)
  500. }
  501. if nc.ps.ma.hdr < 0 || nc.ps.ma.hdr > nc.ps.ma.size {
  502. return fmt.Errorf("nats: processHeaderMsgArgs Bad or Missing Header Size: '%s'", arg)
  503. }
  504. if nc.ps.ma.size < 0 {
  505. return fmt.Errorf("nats: processHeaderMsgArgs Bad or Missing Size: '%s'", arg)
  506. }
  507. return nil
  508. }
  509. // Ascii numbers 0-9
  510. const (
  511. ascii_0 = 48
  512. ascii_9 = 57
  513. )
  514. // parseInt64 expects decimal positive numbers. We
  515. // return -1 to signal error
  516. func parseInt64(d []byte) (n int64) {
  517. if len(d) == 0 {
  518. return -1
  519. }
  520. for _, dec := range d {
  521. if dec < ascii_0 || dec > ascii_9 {
  522. return -1
  523. }
  524. n = n*10 + (int64(dec) - ascii_0)
  525. }
  526. return n
  527. }