123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242 |
- package http2
- import "fmt"
- type WriteScheduler interface {
-
-
-
- OpenStream(streamID uint32, options OpenStreamOptions)
-
-
-
- CloseStream(streamID uint32)
-
-
-
-
- AdjustStream(streamID uint32, priority PriorityParam)
-
-
-
- Push(wr FrameWriteRequest)
-
-
-
- Pop() (wr FrameWriteRequest, ok bool)
- }
- type OpenStreamOptions struct {
-
-
- PusherID uint32
- }
- type FrameWriteRequest struct {
-
-
-
- write writeFramer
-
-
- stream *stream
-
-
-
- done chan error
- }
- func (wr FrameWriteRequest) StreamID() uint32 {
- if wr.stream == nil {
- if se, ok := wr.write.(StreamError); ok {
-
-
-
-
- return se.StreamID
- }
- return 0
- }
- return wr.stream.id
- }
- func (wr FrameWriteRequest) DataSize() int {
- if wd, ok := wr.write.(*writeData); ok {
- return len(wd.p)
- }
- return 0
- }
- func (wr FrameWriteRequest) Consume(n int32) (FrameWriteRequest, FrameWriteRequest, int) {
- var empty FrameWriteRequest
-
- wd, ok := wr.write.(*writeData)
- if !ok || len(wd.p) == 0 {
- return wr, empty, 1
- }
-
- allowed := wr.stream.flow.available()
- if n < allowed {
- allowed = n
- }
- if wr.stream.sc.maxFrameSize < allowed {
- allowed = wr.stream.sc.maxFrameSize
- }
- if allowed <= 0 {
- return empty, empty, 0
- }
- if len(wd.p) > int(allowed) {
- wr.stream.flow.take(allowed)
- consumed := FrameWriteRequest{
- stream: wr.stream,
- write: &writeData{
- streamID: wd.streamID,
- p: wd.p[:allowed],
-
-
-
- endStream: false,
- },
-
-
- done: nil,
- }
- rest := FrameWriteRequest{
- stream: wr.stream,
- write: &writeData{
- streamID: wd.streamID,
- p: wd.p[allowed:],
- endStream: wd.endStream,
- },
- done: wr.done,
- }
- return consumed, rest, 2
- }
-
-
- wr.stream.flow.take(int32(len(wd.p)))
- return wr, empty, 1
- }
- func (wr FrameWriteRequest) String() string {
- var des string
- if s, ok := wr.write.(fmt.Stringer); ok {
- des = s.String()
- } else {
- des = fmt.Sprintf("%T", wr.write)
- }
- return fmt.Sprintf("[FrameWriteRequest stream=%d, ch=%v, writer=%v]", wr.StreamID(), wr.done != nil, des)
- }
- func (wr *FrameWriteRequest) replyToWriter(err error) {
- if wr.done == nil {
- return
- }
- select {
- case wr.done <- err:
- default:
- panic(fmt.Sprintf("unbuffered done channel passed in for type %T", wr.write))
- }
- wr.write = nil
- }
- type writeQueue struct {
- s []FrameWriteRequest
- }
- func (q *writeQueue) empty() bool { return len(q.s) == 0 }
- func (q *writeQueue) push(wr FrameWriteRequest) {
- q.s = append(q.s, wr)
- }
- func (q *writeQueue) shift() FrameWriteRequest {
- if len(q.s) == 0 {
- panic("invalid use of queue")
- }
- wr := q.s[0]
-
- copy(q.s, q.s[1:])
- q.s[len(q.s)-1] = FrameWriteRequest{}
- q.s = q.s[:len(q.s)-1]
- return wr
- }
- func (q *writeQueue) consume(n int32) (FrameWriteRequest, bool) {
- if len(q.s) == 0 {
- return FrameWriteRequest{}, false
- }
- consumed, rest, numresult := q.s[0].Consume(n)
- switch numresult {
- case 0:
- return FrameWriteRequest{}, false
- case 1:
- q.shift()
- case 2:
- q.s[0] = rest
- }
- return consumed, true
- }
- type writeQueuePool []*writeQueue
- func (p *writeQueuePool) put(q *writeQueue) {
- for i := range q.s {
- q.s[i] = FrameWriteRequest{}
- }
- q.s = q.s[:0]
- *p = append(*p, q)
- }
- func (p *writeQueuePool) get() *writeQueue {
- ln := len(*p)
- if ln == 0 {
- return new(writeQueue)
- }
- x := ln - 1
- q := (*p)[x]
- (*p)[x] = nil
- *p = (*p)[:x]
- return q
- }
|