diff options
Diffstat (limited to 'vendor/google.golang.org/grpc/server.go')
| -rw-r--r-- | vendor/google.golang.org/grpc/server.go | 450 |
1 files changed, 311 insertions, 139 deletions
diff --git a/vendor/google.golang.org/grpc/server.go b/vendor/google.golang.org/grpc/server.go index 2ad9da7bf..8869cc906 100644 --- a/vendor/google.golang.org/grpc/server.go +++ b/vendor/google.golang.org/grpc/server.go @@ -43,8 +43,8 @@ import ( "google.golang.org/grpc/internal" "google.golang.org/grpc/internal/binarylog" "google.golang.org/grpc/internal/channelz" - "google.golang.org/grpc/internal/grpcrand" "google.golang.org/grpc/internal/grpcsync" + "google.golang.org/grpc/internal/grpcutil" "google.golang.org/grpc/internal/transport" "google.golang.org/grpc/keepalive" "google.golang.org/grpc/metadata" @@ -73,12 +73,14 @@ func init() { internal.DrainServerTransports = func(srv *Server, addr string) { srv.drainServerTransports(addr) } - internal.AddExtraServerOptions = func(opt ...ServerOption) { - extraServerOptions = opt + internal.AddGlobalServerOptions = func(opt ...ServerOption) { + globalServerOptions = append(globalServerOptions, opt...) } - internal.ClearExtraServerOptions = func() { - extraServerOptions = nil + internal.ClearGlobalServerOptions = func() { + globalServerOptions = nil } + internal.BinaryLogger = binaryLogger + internal.JoinServerOptions = newJoinServerOption } var statusOK = status.New(codes.OK, "") @@ -113,12 +115,6 @@ type serviceInfo struct { mdata interface{} } -type serverWorkerData struct { - st transport.ServerTransport - wg *sync.WaitGroup - stream *transport.Stream -} - // Server is a gRPC server to serve RPC requests. type Server struct { opts serverOptions @@ -143,7 +139,7 @@ type Server struct { channelzID *channelz.Identifier czData *channelzData - serverWorkerChannels []chan *serverWorkerData + serverWorkerChannel chan func() } type serverOptions struct { @@ -155,6 +151,7 @@ type serverOptions struct { streamInt StreamServerInterceptor chainUnaryInts []UnaryServerInterceptor chainStreamInts []StreamServerInterceptor + binaryLogger binarylog.Logger inTapHandle tap.ServerInHandle statsHandlers []stats.Handler maxConcurrentStreams uint32 @@ -174,13 +171,14 @@ type serverOptions struct { } var defaultServerOptions = serverOptions{ + maxConcurrentStreams: math.MaxUint32, maxReceiveMessageSize: defaultServerMaxReceiveMessageSize, maxSendMessageSize: defaultServerMaxSendMessageSize, connectionTimeout: 120 * time.Second, writeBufferSize: defaultWriteBufSize, readBufferSize: defaultReadBufSize, } -var extraServerOptions []ServerOption +var globalServerOptions []ServerOption // A ServerOption sets options such as credentials, codec and keepalive parameters, etc. type ServerOption interface { @@ -214,10 +212,27 @@ func newFuncServerOption(f func(*serverOptions)) *funcServerOption { } } -// WriteBufferSize determines how much data can be batched before doing a write on the wire. -// The corresponding memory allocation for this buffer will be twice the size to keep syscalls low. -// The default value for this buffer is 32KB. -// Zero will disable the write buffer such that each write will be on underlying connection. +// joinServerOption provides a way to combine arbitrary number of server +// options into one. +type joinServerOption struct { + opts []ServerOption +} + +func (mdo *joinServerOption) apply(do *serverOptions) { + for _, opt := range mdo.opts { + opt.apply(do) + } +} + +func newJoinServerOption(opts ...ServerOption) ServerOption { + return &joinServerOption{opts: opts} +} + +// WriteBufferSize determines how much data can be batched before doing a write +// on the wire. The corresponding memory allocation for this buffer will be +// twice the size to keep syscalls low. The default value for this buffer is +// 32KB. Zero or negative values will disable the write buffer such that each +// write will be on underlying connection. // Note: A Send call may not directly translate to a write. func WriteBufferSize(s int) ServerOption { return newFuncServerOption(func(o *serverOptions) { @@ -225,11 +240,10 @@ func WriteBufferSize(s int) ServerOption { }) } -// ReadBufferSize lets you set the size of read buffer, this determines how much data can be read at most -// for one read syscall. -// The default value for this buffer is 32KB. -// Zero will disable read buffer for a connection so data framer can access the underlying -// conn directly. +// ReadBufferSize lets you set the size of read buffer, this determines how much +// data can be read at most for one read syscall. The default value for this +// buffer is 32KB. Zero or negative values will disable read buffer for a +// connection so data framer can access the underlying conn directly. func ReadBufferSize(s int) ServerOption { return newFuncServerOption(func(o *serverOptions) { o.readBufferSize = s @@ -368,6 +382,9 @@ func MaxSendMsgSize(m int) ServerOption { // MaxConcurrentStreams returns a ServerOption that will apply a limit on the number // of concurrent streams to each ServerTransport. func MaxConcurrentStreams(n uint32) ServerOption { + if n == 0 { + n = math.MaxUint32 + } return newFuncServerOption(func(o *serverOptions) { o.maxConcurrentStreams = n }) @@ -452,6 +469,14 @@ func StatsHandler(h stats.Handler) ServerOption { }) } +// binaryLogger returns a ServerOption that can set the binary logger for the +// server. +func binaryLogger(bl binarylog.Logger) ServerOption { + return newFuncServerOption(func(o *serverOptions) { + o.binaryLogger = bl + }) +} + // UnknownServiceHandler returns a ServerOption that allows for adding a custom // unknown service handler. The provided method is a bidi-streaming RPC service // handler that will be invoked instead of returning the "unimplemented" gRPC @@ -533,47 +558,40 @@ func NumStreamWorkers(numServerWorkers uint32) ServerOption { const serverWorkerResetThreshold = 1 << 16 // serverWorkers blocks on a *transport.Stream channel forever and waits for -// data to be fed by serveStreams. This allows different requests to be +// data to be fed by serveStreams. This allows multiple requests to be // processed by the same goroutine, removing the need for expensive stack // re-allocations (see the runtime.morestack problem [1]). // // [1] https://github.com/golang/go/issues/18138 -func (s *Server) serverWorker(ch chan *serverWorkerData) { - // To make sure all server workers don't reset at the same time, choose a - // random number of iterations before resetting. - threshold := serverWorkerResetThreshold + grpcrand.Intn(serverWorkerResetThreshold) - for completed := 0; completed < threshold; completed++ { - data, ok := <-ch +func (s *Server) serverWorker() { + for completed := 0; completed < serverWorkerResetThreshold; completed++ { + f, ok := <-s.serverWorkerChannel if !ok { return } - s.handleStream(data.st, data.stream, s.traceInfo(data.st, data.stream)) - data.wg.Done() + f() } - go s.serverWorker(ch) + go s.serverWorker() } -// initServerWorkers creates worker goroutines and channels to process incoming +// initServerWorkers creates worker goroutines and a channel to process incoming // connections to reduce the time spent overall on runtime.morestack. func (s *Server) initServerWorkers() { - s.serverWorkerChannels = make([]chan *serverWorkerData, s.opts.numServerWorkers) + s.serverWorkerChannel = make(chan func()) for i := uint32(0); i < s.opts.numServerWorkers; i++ { - s.serverWorkerChannels[i] = make(chan *serverWorkerData) - go s.serverWorker(s.serverWorkerChannels[i]) + go s.serverWorker() } } func (s *Server) stopServerWorkers() { - for i := uint32(0); i < s.opts.numServerWorkers; i++ { - close(s.serverWorkerChannels[i]) - } + close(s.serverWorkerChannel) } // NewServer creates a gRPC server which has no service registered and has not // started to accept requests yet. func NewServer(opt ...ServerOption) *Server { opts := defaultServerOptions - for _, o := range extraServerOptions { + for _, o := range globalServerOptions { o.apply(&opts) } for _, o := range opt { @@ -870,7 +888,7 @@ func (s *Server) drainServerTransports(addr string) { s.mu.Lock() conns := s.conns[addr] for st := range conns { - st.Drain() + st.Drain("") } s.mu.Unlock() } @@ -915,29 +933,29 @@ func (s *Server) newHTTP2Transport(c net.Conn) transport.ServerTransport { } func (s *Server) serveStreams(st transport.ServerTransport) { - defer st.Close() + defer st.Close(errors.New("finished serving streams for the server transport")) var wg sync.WaitGroup - var roundRobinCounter uint32 + streamQuota := newHandlerQuota(s.opts.maxConcurrentStreams) st.HandleStreams(func(stream *transport.Stream) { wg.Add(1) + + streamQuota.acquire() + f := func() { + defer streamQuota.release() + defer wg.Done() + s.handleStream(st, stream, s.traceInfo(st, stream)) + } + if s.opts.numServerWorkers > 0 { - data := &serverWorkerData{st: st, wg: &wg, stream: stream} select { - case s.serverWorkerChannels[atomic.AddUint32(&roundRobinCounter, 1)%s.opts.numServerWorkers] <- data: + case s.serverWorkerChannel <- f: + return default: // If all stream workers are busy, fallback to the default code path. - go func() { - s.handleStream(st, stream, s.traceInfo(st, stream)) - wg.Done() - }() } - } else { - go func() { - defer wg.Done() - s.handleStream(st, stream, s.traceInfo(st, stream)) - }() } + go f() }, func(ctx context.Context, method string) context.Context { if !EnableTracing { return ctx @@ -981,7 +999,8 @@ var _ http.Handler = (*Server)(nil) func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { st, err := transport.NewServerHandlerTransport(w, r, s.opts.statsHandlers) if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) + // Errors returned from transport.NewServerHandlerTransport have + // already been written to w. return } if !s.addConn(listenerAddressForServeHTTP, st) { @@ -1019,13 +1038,13 @@ func (s *Server) addConn(addr string, st transport.ServerTransport) bool { s.mu.Lock() defer s.mu.Unlock() if s.conns == nil { - st.Close() + st.Close(errors.New("Server.addConn called when server has already been stopped")) return false } if s.drain { // Transport added after we drained our existing conns: drain it // immediately. - st.Drain() + st.Drain("") } if s.conns[addr] == nil { @@ -1123,21 +1142,16 @@ func chainUnaryServerInterceptors(s *Server) { func chainUnaryInterceptors(interceptors []UnaryServerInterceptor) UnaryServerInterceptor { return func(ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler) (interface{}, error) { - // the struct ensures the variables are allocated together, rather than separately, since we - // know they should be garbage collected together. This saves 1 allocation and decreases - // time/call by about 10% on the microbenchmark. - var state struct { - i int - next UnaryHandler - } - state.next = func(ctx context.Context, req interface{}) (interface{}, error) { - if state.i == len(interceptors)-1 { - return interceptors[state.i](ctx, req, info, handler) - } - state.i++ - return interceptors[state.i-1](ctx, req, info, state.next) - } - return state.next(ctx, req) + return interceptors[0](ctx, req, info, getChainUnaryHandler(interceptors, 0, info, handler)) + } +} + +func getChainUnaryHandler(interceptors []UnaryServerInterceptor, curr int, info *UnaryServerInfo, finalHandler UnaryHandler) UnaryHandler { + if curr == len(interceptors)-1 { + return finalHandler + } + return func(ctx context.Context, req interface{}) (interface{}, error) { + return interceptors[curr+1](ctx, req, info, getChainUnaryHandler(interceptors, curr+1, info, finalHandler)) } } @@ -1199,9 +1213,16 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport. } }() } - - binlog := binarylog.GetMethodLogger(stream.Method()) - if binlog != nil { + var binlogs []binarylog.MethodLogger + if ml := binarylog.GetMethodLogger(stream.Method()); ml != nil { + binlogs = append(binlogs, ml) + } + if s.opts.binaryLogger != nil { + if ml := s.opts.binaryLogger.GetMethodLogger(stream.Method()); ml != nil { + binlogs = append(binlogs, ml) + } + } + if len(binlogs) != 0 { ctx := stream.Context() md, _ := metadata.FromIncomingContext(ctx) logEntry := &binarylog.ClientHeader{ @@ -1221,7 +1242,9 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport. if peer, ok := peer.FromContext(ctx); ok { logEntry.PeerAddr = peer.Addr } - binlog.Log(logEntry) + for _, binlog := range binlogs { + binlog.Log(ctx, logEntry) + } } // comp and cp are used for compression. decomp and dc are used for @@ -1231,6 +1254,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport. var comp, decomp encoding.Compressor var cp Compressor var dc Decompressor + var sendCompressorName string // If dc is set and matches the stream's compression, use it. Otherwise, try // to find a matching registered compressor for decomp. @@ -1251,23 +1275,29 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport. // NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686. if s.opts.cp != nil { cp = s.opts.cp - stream.SetSendCompress(cp.Type()) + sendCompressorName = cp.Type() } else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity { // Legacy compressor not specified; attempt to respond with same encoding. comp = encoding.GetCompressor(rc) if comp != nil { - stream.SetSendCompress(rc) + sendCompressorName = comp.Name() + } + } + + if sendCompressorName != "" { + if err := stream.SetSendCompress(sendCompressorName); err != nil { + return status.Errorf(codes.Internal, "grpc: failed to set send compressor: %v", err) } } var payInfo *payloadInfo - if len(shs) != 0 || binlog != nil { + if len(shs) != 0 || len(binlogs) != 0 { payInfo = &payloadInfo{} } d, err := recvAndDecompress(&parser{r: stream}, stream, dc, s.opts.maxReceiveMessageSize, payInfo, decomp) if err != nil { if e := t.WriteStatus(stream, status.Convert(err)); e != nil { - channelz.Warningf(logger, s.channelzID, "grpc: Server.processUnaryRPC failed to write status %v", e) + channelz.Warningf(logger, s.channelzID, "grpc: Server.processUnaryRPC failed to write status: %v", e) } return err } @@ -1280,17 +1310,21 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport. } for _, sh := range shs { sh.HandleRPC(stream.Context(), &stats.InPayload{ - RecvTime: time.Now(), - Payload: v, - WireLength: payInfo.wireLength + headerLen, - Data: d, - Length: len(d), + RecvTime: time.Now(), + Payload: v, + Length: len(d), + WireLength: payInfo.compressedLength + headerLen, + CompressedLength: payInfo.compressedLength, + Data: d, }) } - if binlog != nil { - binlog.Log(&binarylog.ClientMessage{ + if len(binlogs) != 0 { + cm := &binarylog.ClientMessage{ Message: d, - }) + } + for _, binlog := range binlogs { + binlog.Log(stream.Context(), cm) + } } if trInfo != nil { trInfo.tr.LazyLog(&payload{sent: false, msg: v}, true) @@ -1314,18 +1348,24 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport. if e := t.WriteStatus(stream, appStatus); e != nil { channelz.Warningf(logger, s.channelzID, "grpc: Server.processUnaryRPC failed to write status: %v", e) } - if binlog != nil { + if len(binlogs) != 0 { if h, _ := stream.Header(); h.Len() > 0 { // Only log serverHeader if there was header. Otherwise it can // be trailer only. - binlog.Log(&binarylog.ServerHeader{ + sh := &binarylog.ServerHeader{ Header: h, - }) + } + for _, binlog := range binlogs { + binlog.Log(stream.Context(), sh) + } } - binlog.Log(&binarylog.ServerTrailer{ + st := &binarylog.ServerTrailer{ Trailer: stream.Trailer(), Err: appErr, - }) + } + for _, binlog := range binlogs { + binlog.Log(stream.Context(), st) + } } return appErr } @@ -1334,6 +1374,11 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport. } opts := &transport.Options{Last: true} + // Server handler could have set new compressor by calling SetSendCompressor. + // In case it is set, we need to use it for compressing outbound message. + if stream.SendCompress() != sendCompressorName { + comp = encoding.GetCompressor(stream.SendCompress()) + } if err := s.sendResponse(t, stream, reply, cp, opts, comp); err != nil { if err == io.EOF { // The entire stream is done (for unary RPC only). @@ -1351,26 +1396,34 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport. panic(fmt.Sprintf("grpc: Unexpected error (%T) from sendResponse: %v", st, st)) } } - if binlog != nil { + if len(binlogs) != 0 { h, _ := stream.Header() - binlog.Log(&binarylog.ServerHeader{ + sh := &binarylog.ServerHeader{ Header: h, - }) - binlog.Log(&binarylog.ServerTrailer{ + } + st := &binarylog.ServerTrailer{ Trailer: stream.Trailer(), Err: appErr, - }) + } + for _, binlog := range binlogs { + binlog.Log(stream.Context(), sh) + binlog.Log(stream.Context(), st) + } } return err } - if binlog != nil { + if len(binlogs) != 0 { h, _ := stream.Header() - binlog.Log(&binarylog.ServerHeader{ + sh := &binarylog.ServerHeader{ Header: h, - }) - binlog.Log(&binarylog.ServerMessage{ + } + sm := &binarylog.ServerMessage{ Message: reply, - }) + } + for _, binlog := range binlogs { + binlog.Log(stream.Context(), sh) + binlog.Log(stream.Context(), sm) + } } if channelz.IsOn() { t.IncrMsgSent() @@ -1381,14 +1434,16 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport. // TODO: Should we be logging if writing status failed here, like above? // Should the logging be in WriteStatus? Should we ignore the WriteStatus // error or allow the stats handler to see it? - err = t.WriteStatus(stream, statusOK) - if binlog != nil { - binlog.Log(&binarylog.ServerTrailer{ + if len(binlogs) != 0 { + st := &binarylog.ServerTrailer{ Trailer: stream.Trailer(), Err: appErr, - }) + } + for _, binlog := range binlogs { + binlog.Log(stream.Context(), st) + } } - return err + return t.WriteStatus(stream, statusOK) } // chainStreamServerInterceptors chains all stream server interceptors into one. @@ -1414,21 +1469,16 @@ func chainStreamServerInterceptors(s *Server) { func chainStreamInterceptors(interceptors []StreamServerInterceptor) StreamServerInterceptor { return func(srv interface{}, ss ServerStream, info *StreamServerInfo, handler StreamHandler) error { - // the struct ensures the variables are allocated together, rather than separately, since we - // know they should be garbage collected together. This saves 1 allocation and decreases - // time/call by about 10% on the microbenchmark. - var state struct { - i int - next StreamHandler - } - state.next = func(srv interface{}, ss ServerStream) error { - if state.i == len(interceptors)-1 { - return interceptors[state.i](srv, ss, info, handler) - } - state.i++ - return interceptors[state.i-1](srv, ss, info, state.next) - } - return state.next(srv, ss) + return interceptors[0](srv, ss, info, getChainStreamHandler(interceptors, 0, info, handler)) + } +} + +func getChainStreamHandler(interceptors []StreamServerInterceptor, curr int, info *StreamServerInfo, finalHandler StreamHandler) StreamHandler { + if curr == len(interceptors)-1 { + return finalHandler + } + return func(srv interface{}, stream ServerStream) error { + return interceptors[curr+1](srv, stream, info, getChainStreamHandler(interceptors, curr+1, info, finalHandler)) } } @@ -1499,8 +1549,15 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp }() } - ss.binlog = binarylog.GetMethodLogger(stream.Method()) - if ss.binlog != nil { + if ml := binarylog.GetMethodLogger(stream.Method()); ml != nil { + ss.binlogs = append(ss.binlogs, ml) + } + if s.opts.binaryLogger != nil { + if ml := s.opts.binaryLogger.GetMethodLogger(stream.Method()); ml != nil { + ss.binlogs = append(ss.binlogs, ml) + } + } + if len(ss.binlogs) != 0 { md, _ := metadata.FromIncomingContext(ctx) logEntry := &binarylog.ClientHeader{ Header: md, @@ -1519,7 +1576,9 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp if peer, ok := peer.FromContext(ss.Context()); ok { logEntry.PeerAddr = peer.Addr } - ss.binlog.Log(logEntry) + for _, binlog := range ss.binlogs { + binlog.Log(stream.Context(), logEntry) + } } // If dc is set and matches the stream's compression, use it. Otherwise, try @@ -1541,12 +1600,18 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp // NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686. if s.opts.cp != nil { ss.cp = s.opts.cp - stream.SetSendCompress(s.opts.cp.Type()) + ss.sendCompressorName = s.opts.cp.Type() } else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity { // Legacy compressor not specified; attempt to respond with same encoding. ss.comp = encoding.GetCompressor(rc) if ss.comp != nil { - stream.SetSendCompress(rc) + ss.sendCompressorName = rc + } + } + + if ss.sendCompressorName != "" { + if err := stream.SetSendCompress(ss.sendCompressorName); err != nil { + return status.Errorf(codes.Internal, "grpc: failed to set send compressor: %v", err) } } @@ -1584,13 +1649,16 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp ss.trInfo.tr.SetError() ss.mu.Unlock() } - t.WriteStatus(ss.s, appStatus) - if ss.binlog != nil { - ss.binlog.Log(&binarylog.ServerTrailer{ + if len(ss.binlogs) != 0 { + st := &binarylog.ServerTrailer{ Trailer: ss.s.Trailer(), Err: appErr, - }) + } + for _, binlog := range ss.binlogs { + binlog.Log(stream.Context(), st) + } } + t.WriteStatus(ss.s, appStatus) // TODO: Should we log an error from WriteStatus here and below? return appErr } @@ -1599,14 +1667,16 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp ss.trInfo.tr.LazyLog(stringer("OK"), false) ss.mu.Unlock() } - err = t.WriteStatus(ss.s, statusOK) - if ss.binlog != nil { - ss.binlog.Log(&binarylog.ServerTrailer{ + if len(ss.binlogs) != 0 { + st := &binarylog.ServerTrailer{ Trailer: ss.s.Trailer(), Err: appErr, - }) + } + for _, binlog := range ss.binlogs { + binlog.Log(stream.Context(), st) + } } - return err + return t.WriteStatus(ss.s, statusOK) } func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) { @@ -1748,7 +1818,7 @@ func (s *Server) Stop() { } for _, cs := range conns { for st := range cs { - st.Close() + st.Close(errors.New("Server.Stop called")) } } if s.opts.numServerWorkers > 0 { @@ -1784,7 +1854,7 @@ func (s *Server) GracefulStop() { if !s.drain { for _, conns := range s.conns { for st := range conns { - st.Drain() + st.Drain("graceful_stop") } } s.drain = true @@ -1873,6 +1943,60 @@ func SendHeader(ctx context.Context, md metadata.MD) error { return nil } +// SetSendCompressor sets a compressor for outbound messages from the server. +// It must not be called after any event that causes headers to be sent +// (see ServerStream.SetHeader for the complete list). Provided compressor is +// used when below conditions are met: +// +// - compressor is registered via encoding.RegisterCompressor +// - compressor name must exist in the client advertised compressor names +// sent in grpc-accept-encoding header. Use ClientSupportedCompressors to +// get client supported compressor names. +// +// The context provided must be the context passed to the server's handler. +// It must be noted that compressor name encoding.Identity disables the +// outbound compression. +// By default, server messages will be sent using the same compressor with +// which request messages were sent. +// +// It is not safe to call SetSendCompressor concurrently with SendHeader and +// SendMsg. +// +// # Experimental +// +// Notice: This function is EXPERIMENTAL and may be changed or removed in a +// later release. +func SetSendCompressor(ctx context.Context, name string) error { + stream, ok := ServerTransportStreamFromContext(ctx).(*transport.Stream) + if !ok || stream == nil { + return fmt.Errorf("failed to fetch the stream from the given context") + } + + if err := validateSendCompressor(name, stream.ClientAdvertisedCompressors()); err != nil { + return fmt.Errorf("unable to set send compressor: %w", err) + } + + return stream.SetSendCompress(name) +} + +// ClientSupportedCompressors returns compressor names advertised by the client +// via grpc-accept-encoding header. +// +// The context provided must be the context passed to the server's handler. +// +// # Experimental +// +// Notice: This function is EXPERIMENTAL and may be changed or removed in a +// later release. +func ClientSupportedCompressors(ctx context.Context) ([]string, error) { + stream, ok := ServerTransportStreamFromContext(ctx).(*transport.Stream) + if !ok || stream == nil { + return nil, fmt.Errorf("failed to fetch the stream from the given context %v", ctx) + } + + return strings.Split(stream.ClientAdvertisedCompressors(), ","), nil +} + // SetTrailer sets the trailer metadata that will be sent when an RPC returns. // When called more than once, all the provided metadata will be merged. // @@ -1907,3 +2031,51 @@ type channelzServer struct { func (c *channelzServer) ChannelzMetric() *channelz.ServerInternalMetric { return c.s.channelzMetric() } + +// validateSendCompressor returns an error when given compressor name cannot be +// handled by the server or the client based on the advertised compressors. +func validateSendCompressor(name, clientCompressors string) error { + if name == encoding.Identity { + return nil + } + + if !grpcutil.IsCompressorNameRegistered(name) { + return fmt.Errorf("compressor not registered %q", name) + } + + for _, c := range strings.Split(clientCompressors, ",") { + if c == name { + return nil // found match + } + } + return fmt.Errorf("client does not support compressor %q", name) +} + +// atomicSemaphore implements a blocking, counting semaphore. acquire should be +// called synchronously; release may be called asynchronously. +type atomicSemaphore struct { + n int64 + wait chan struct{} +} + +func (q *atomicSemaphore) acquire() { + if atomic.AddInt64(&q.n, -1) < 0 { + // We ran out of quota. Block until a release happens. + <-q.wait + } +} + +func (q *atomicSemaphore) release() { + // N.B. the "<= 0" check below should allow for this to work with multiple + // concurrent calls to acquire, but also note that with synchronous calls to + // acquire, as our system does, n will never be less than -1. There are + // fairness issues (queuing) to consider if this was to be generalized. + if atomic.AddInt64(&q.n, 1) <= 0 { + // An acquire was waiting on us. Unblock it. + q.wait <- struct{}{} + } +} + +func newHandlerQuota(n uint32) *atomicSemaphore { + return &atomicSemaphore{n: int64(n), wait: make(chan struct{}, 1)} +} |
