diff options
| author | liufen90 <564795204@qq.com> | 2021-06-11 09:56:44 +0800 |
|---|---|---|
| committer | liufen90 <564795204@qq.com> | 2021-06-11 11:20:10 +0800 |
| commit | 8d0c3ec2754d102afbaa1575d962cccb113aa0a2 (patch) | |
| tree | f87d7a463b46a82945bfd296cd727540193fb125 /edgesite | |
| parent | add v1.7.1 changelog (#2890) (diff) | |
| download | kubeedge-8d0c3ec2754d102afbaa1575d962cccb113aa0a2.tar.gz | |
refine edgesite
Signed-off-by: liufen90 <564795204@qq.com>
Diffstat (limited to 'edgesite')
| -rw-r--r-- | edgesite/cmd/edgesite-agent/main.go | 280 | ||||
| -rw-r--r-- | edgesite/cmd/edgesite-server/app/server.go | 390 | ||||
| -rw-r--r-- | edgesite/cmd/edgesite-server/main.go | 658 |
3 files changed, 401 insertions, 927 deletions
diff --git a/edgesite/cmd/edgesite-agent/main.go b/edgesite/cmd/edgesite-agent/main.go index 984e2f822..56464ca78 100644 --- a/edgesite/cmd/edgesite-agent/main.go +++ b/edgesite/cmd/edgesite-agent/main.go @@ -17,30 +17,20 @@ limitations under the License. package main import ( - "crypto/tls" "flag" "fmt" - "net" - "net/http" - "net/url" "os" - "time" - "github.com/google/uuid" - "github.com/prometheus/client_golang/prometheus/promhttp" - "github.com/spf13/cobra" - "github.com/spf13/pflag" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials" "k8s.io/klog/v2" - "sigs.k8s.io/apiserver-network-proxy/pkg/agent" + "sigs.k8s.io/apiserver-network-proxy/cmd/agent/app" + "sigs.k8s.io/apiserver-network-proxy/cmd/agent/app/options" "sigs.k8s.io/apiserver-network-proxy/pkg/util" ) func main() { - agent := &Agent{} - o := newGrpcProxyAgentOptions() - command := newAgentCommand(agent, o) + agent := &app.Agent{} + o := options.NewGrpcProxyAgentOptions() + command := app.NewAgentCommand(agent, o) flags := command.Flags() flags.AddFlagSet(o.Flags()) local := flag.NewFlagSet(os.Args[0], flag.ExitOnError) @@ -59,263 +49,3 @@ func main() { os.Exit(1) } } - -type GrpcProxyAgentOptions struct { - // Configuration for authenticating with the proxy-server - agentCert string - agentKey string - caCert string - - // Configuration for connecting to the proxy-server - proxyServerHost string - proxyServerPort int - alpnProtos []string - - // Ports for the health and admin server - healthServerPort int - adminServerPort int - - agentID string - agentIdentifiers string - syncInterval time.Duration - probeInterval time.Duration - - // file contains service account authorization token for enabling proxy-server token based authorization - serviceAccountTokenPath string -} - -func (o *GrpcProxyAgentOptions) ClientSetConfig(dialOptions ...grpc.DialOption) *agent.ClientSetConfig { - return &agent.ClientSetConfig{ - Address: fmt.Sprintf("%s:%d", o.proxyServerHost, o.proxyServerPort), - AgentID: o.agentID, - AgentIdentifiers: o.agentIdentifiers, - SyncInterval: o.syncInterval, - ProbeInterval: o.probeInterval, - DialOptions: dialOptions, - ServiceAccountTokenPath: o.serviceAccountTokenPath, - } -} - -func (o *GrpcProxyAgentOptions) Flags() *pflag.FlagSet { - flags := pflag.NewFlagSet("proxy-agent", pflag.ContinueOnError) - flags.StringVar(&o.agentCert, "agent-cert", o.agentCert, "If non-empty secure communication with this cert.") - flags.StringVar(&o.agentKey, "agent-key", o.agentKey, "If non-empty secure communication with this key.") - flags.StringVar(&o.caCert, "ca-cert", o.caCert, "If non-empty the CAs we use to validate clients.") - flags.StringVar(&o.proxyServerHost, "proxy-server-host", o.proxyServerHost, "The hostname to use to connect to the proxy-server.") - flags.IntVar(&o.proxyServerPort, "proxy-server-port", o.proxyServerPort, "The port the proxy server is listening on.") - flags.StringSliceVar(&o.alpnProtos, "alpn-proto", o.alpnProtos, "Additional ALPN protocols to be presented when connecting to the server. Useful to distinguish between network proxy and apiserver connections that share the same destination address.") - flags.IntVar(&o.healthServerPort, "health-server-port", o.healthServerPort, "The port the health server is listening on.") - flags.IntVar(&o.adminServerPort, "admin-server-port", o.adminServerPort, "The port the admin server is listening on.") - flags.StringVar(&o.agentID, "agent-id", o.agentID, "The unique ID of this agent. Default to a generated uuid if not set.") - flags.DurationVar(&o.syncInterval, "sync-interval", o.syncInterval, "The initial interval by which the agent periodically checks if it has connections to all instances of the proxy server.") - flags.DurationVar(&o.probeInterval, "probe-interval", o.probeInterval, "The interval by which the agent periodically checks if its connections to the proxy server are ready.") - flags.StringVar(&o.serviceAccountTokenPath, "service-account-token-path", o.serviceAccountTokenPath, "If non-empty proxy agent uses this token to prove its identity to the proxy server.") - flags.StringVar(&o.agentIdentifiers, "agent-identifiers", o.agentIdentifiers, "Identifiers of the agent that will be used by the server when choosing agent. N.B. the list of identifiers must be in URL encoded format. e.g.,host=localhost&host=node1.mydomain.com&cidr=127.0.0.1/16&ipv4=1.2.3.4&ipv4=5.6.7.8&ipv6=:::::") - return flags -} - -func (o *GrpcProxyAgentOptions) Print() { - klog.V(1).Infof("AgentCert set to %q.\n", o.agentCert) - klog.V(1).Infof("AgentKey set to %q.\n", o.agentKey) - klog.V(1).Infof("CACert set to %q.\n", o.caCert) - klog.V(1).Infof("ProxyServerHost set to %q.\n", o.proxyServerHost) - klog.V(1).Infof("ProxyServerPort set to %d.\n", o.proxyServerPort) - klog.V(1).Infof("ALPNProtos set to %+s.\n", o.alpnProtos) - klog.V(1).Infof("HealthServerPort set to %d.\n", o.healthServerPort) - klog.V(1).Infof("AdminServerPort set to %d.\n", o.adminServerPort) - klog.V(1).Infof("AgentID set to %s.\n", o.agentID) - klog.V(1).Infof("SyncInterval set to %v.\n", o.syncInterval) - klog.V(1).Infof("ProbeInterval set to %v.\n", o.probeInterval) - klog.V(1).Infof("ServiceAccountTokenPath set to %q.\n", o.serviceAccountTokenPath) - klog.V(1).Infof("AgentIdentifiers set to %s.\n", util.PrettyPrintURL(o.agentIdentifiers)) -} - -func (o *GrpcProxyAgentOptions) Validate() error { - if o.agentKey != "" { - if _, err := os.Stat(o.agentKey); os.IsNotExist(err) { - return fmt.Errorf("error checking agent key %s, got %v", o.agentKey, err) - } - if o.agentCert == "" { - return fmt.Errorf("cannot have agent cert empty when agent key is set to \"%s\"", o.agentKey) - } - } - if o.agentCert != "" { - if _, err := os.Stat(o.agentCert); os.IsNotExist(err) { - return fmt.Errorf("error checking agent cert %s, got %v", o.agentCert, err) - } - if o.agentKey == "" { - return fmt.Errorf("cannot have agent key empty when agent cert is set to \"%s\"", o.agentCert) - } - } - if o.caCert != "" { - if _, err := os.Stat(o.caCert); os.IsNotExist(err) { - return fmt.Errorf("error checking agent CA cert %s, got %v", o.caCert, err) - } - } - if o.proxyServerPort <= 0 { - return fmt.Errorf("proxy server port %d must be greater than 0", o.proxyServerPort) - } - if o.healthServerPort <= 0 { - return fmt.Errorf("health server port %d must be greater than 0", o.healthServerPort) - } - if o.adminServerPort <= 0 { - return fmt.Errorf("admin server port %d must be greater than 0", o.adminServerPort) - } - - if o.serviceAccountTokenPath != "" { - if _, err := os.Stat(o.serviceAccountTokenPath); os.IsNotExist(err) { - return fmt.Errorf("error checking service account token path %s, got %v", o.serviceAccountTokenPath, err) - } - } - if err := validateAgentIdentifiers(o.agentIdentifiers); err != nil { - return fmt.Errorf("agent address is invalid: %v", err) - } - return nil -} - -func validateAgentIdentifiers(agentIdentifiers string) error { - decoded, err := url.ParseQuery(agentIdentifiers) - if err != nil { - return err - } - for idType := range decoded { - switch agent.IdentifierType(idType) { - case agent.IPv4: - case agent.IPv6: - case agent.CIDR: - case agent.Host: - default: - return fmt.Errorf("unknown address type: %s", idType) - } - } - return nil -} - -func newGrpcProxyAgentOptions() *GrpcProxyAgentOptions { - o := GrpcProxyAgentOptions{ - agentCert: "", - agentKey: "", - caCert: "", - proxyServerHost: "127.0.0.1", - proxyServerPort: 8091, - healthServerPort: 8093, - adminServerPort: 8094, - agentID: uuid.New().String(), - agentIdentifiers: "", - syncInterval: 1 * time.Second, - probeInterval: 1 * time.Second, - serviceAccountTokenPath: "", - } - return &o -} - -func newAgentCommand(a *Agent, o *GrpcProxyAgentOptions) *cobra.Command { - cmd := &cobra.Command{ - Use: "agent", - Long: `A gRPC agent, Connects to the proxy and then allows traffic to be forwarded to it.`, - RunE: func(cmd *cobra.Command, args []string) error { - return a.run(o) - }, - } - - return cmd -} - -type Agent struct { -} - -func (a *Agent) run(o *GrpcProxyAgentOptions) error { - o.Print() - if err := o.Validate(); err != nil { - return fmt.Errorf("failed to validate agent options with %v", err) - } - - stopCh := make(chan struct{}) - if err := a.runProxyConnection(o, stopCh); err != nil { - return fmt.Errorf("failed to run proxy connection with %v", err) - } - - if err := a.runHealthServer(o); err != nil { - return fmt.Errorf("failed to run health server with %v", err) - } - - if err := a.runAdminServer(o); err != nil { - return fmt.Errorf("failed to run admin server with %v", err) - } - - <-stopCh - - return nil -} - -func (a *Agent) runProxyConnection(o *GrpcProxyAgentOptions, stopCh <-chan struct{}) error { - var tlsConfig *tls.Config - var err error - if tlsConfig, err = util.GetClientTLSConfig(o.caCert, o.agentCert, o.agentKey, o.proxyServerHost, o.alpnProtos); err != nil { - return err - } - dialOption := grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)) - cc := o.ClientSetConfig(dialOption) - cs := cc.NewAgentClientSet(stopCh) - cs.Serve() - - return nil -} - -func (a *Agent) runHealthServer(o *GrpcProxyAgentOptions) error { - livenessHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - fmt.Fprintf(w, "ok") - }) - readinessHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - fmt.Fprintf(w, "ok") - }) - - muxHandler := http.NewServeMux() - muxHandler.Handle("/metrics", promhttp.Handler()) - muxHandler.HandleFunc("/healthz", livenessHandler) - muxHandler.HandleFunc("/ready", readinessHandler) - healthServer := &http.Server{ - Addr: fmt.Sprintf(":%d", o.healthServerPort), - Handler: muxHandler, - MaxHeaderBytes: 1 << 20, - } - - go func() { - err := healthServer.ListenAndServe() - if err != nil { - klog.ErrorS(err, "health server could not listen") - } - klog.V(0).Infoln("Health server stopped listening") - }() - - return nil -} - -func (a *Agent) runAdminServer(o *GrpcProxyAgentOptions) error { - muxHandler := http.NewServeMux() - muxHandler.Handle("/metrics", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - host, _, err := net.SplitHostPort(r.Host) - // The port number may be omitted if the admin server is running on port - // 80, the default port for HTTP - if err != nil { - host = r.Host - } - http.Redirect(w, r, fmt.Sprintf("%s:%d%s", host, o.healthServerPort, r.URL.Path), http.StatusMovedPermanently) - })) - - adminServer := &http.Server{ - Addr: fmt.Sprintf("127.0.0.1:%d", o.adminServerPort), - Handler: muxHandler, - MaxHeaderBytes: 1 << 20, - } - - go func() { - err := adminServer.ListenAndServe() - if err != nil { - klog.ErrorS(err, "admin server could not listen") - } - klog.V(0).Infoln("Admin server stopped listening") - }() - - return nil -} diff --git a/edgesite/cmd/edgesite-server/app/server.go b/edgesite/cmd/edgesite-server/app/server.go new file mode 100644 index 000000000..ed674b1c5 --- /dev/null +++ b/edgesite/cmd/edgesite-server/app/server.go @@ -0,0 +1,390 @@ +package app + +import ( + "context" + "crypto/tls" + "crypto/x509" + "fmt" + "io/ioutil" + "net" + "net/http" + "net/http/pprof" + "os" + "os/signal" + "path/filepath" + "runtime" + "sync" + "syscall" + + "github.com/prometheus/client_golang/prometheus/promhttp" + "github.com/spf13/cobra" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" + "google.golang.org/grpc/keepalive" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/clientcmd" + "k8s.io/klog/v2" + "sigs.k8s.io/apiserver-network-proxy/cmd/server/app/options" + "sigs.k8s.io/apiserver-network-proxy/konnectivity-client/proto/client" + "sigs.k8s.io/apiserver-network-proxy/pkg/server" + "sigs.k8s.io/apiserver-network-proxy/pkg/util" + "sigs.k8s.io/apiserver-network-proxy/proto/agent" +) + +const grpcMode = "grpc" + +var udsListenerLock sync.Mutex + +func NewProxyCommand(p *Proxy, o *options.ProxyRunOptions) *cobra.Command { + cmd := &cobra.Command{ + Use: "proxy", + Long: `A gRPC proxy server, receives requests from the API server and forwards to the agent.`, + RunE: func(cmd *cobra.Command, args []string) error { + return p.run(o) + }, + } + + return cmd +} + +type Proxy struct { +} + +type StopFunc func() + +func (p *Proxy) run(o *options.ProxyRunOptions) error { + o.Print() + if err := o.Validate(); err != nil { + return fmt.Errorf("failed to validate server options with %v", err) + } + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var k8sClient *kubernetes.Clientset + if o.AgentNamespace != "" { + config, err := clientcmd.BuildConfigFromFlags("", o.KubeconfigPath) + if err != nil { + return fmt.Errorf("failed to load kubernetes client config: %v", err) + } + + if o.KubeconfigQPS != 0 { + klog.V(1).Infof("Setting k8s client QPS: %v", o.KubeconfigQPS) + config.QPS = o.KubeconfigQPS + } + if o.KubeconfigBurst != 0 { + klog.V(1).Infof("Setting k8s client Burst: %v", o.KubeconfigBurst) + config.Burst = o.KubeconfigBurst + } + k8sClient, err = kubernetes.NewForConfig(config) + if err != nil { + return fmt.Errorf("failed to create kubernetes clientset: %v", err) + } + } + + authOpt := &server.AgentTokenAuthenticationOptions{ + Enabled: o.AgentNamespace != "", + AgentNamespace: o.AgentNamespace, + AgentServiceAccount: o.AgentServiceAccount, + KubernetesClient: k8sClient, + AuthenticationAudience: o.AuthenticationAudience, + } + klog.V(1).Infoln("Starting master server for client connections.") + ps, err := server.GenProxyStrategiesFromStr(o.ProxyStrategies) + if err != nil { + return err + } + server := server.NewProxyServer(o.ServerID, ps, int(o.ServerCount), authOpt) + + masterStop, err := p.runMasterServer(ctx, o, server) + if err != nil { + return fmt.Errorf("failed to run the master server: %v", err) + } + + klog.V(1).Infoln("Starting agent server for tunnel connections.") + err = p.runAgentServer(o, server) + if err != nil { + return fmt.Errorf("failed to run the agent server: %v", err) + } + klog.V(1).Infoln("Starting admin server for debug connections.") + err = p.runAdminServer(o, server) + if err != nil { + return fmt.Errorf("failed to run the admin server: %v", err) + } + klog.V(1).Infoln("Starting health server for healthchecks.") + err = p.runHealthServer(o, server) + if err != nil { + return fmt.Errorf("failed to run the health server: %v", err) + } + + stopCh := SetupSignalHandler() + <-stopCh + klog.V(1).Infoln("Shutting down server.") + + if masterStop != nil { + masterStop() + } + + return nil +} + +var shutdownSignals = []os.Signal{os.Interrupt, syscall.SIGTERM} + +func SetupSignalHandler() (stopCh <-chan struct{}) { + stop := make(chan struct{}) + c := make(chan os.Signal, 2) + signal.Notify(c, shutdownSignals...) + go func() { + <-c + close(stop) + <-c + os.Exit(1) // second signal. Exit directly. + }() + + return stop +} + +func getUDSListener(ctx context.Context, udsName string) (net.Listener, error) { + udsListenerLock.Lock() + defer udsListenerLock.Unlock() + oldUmask := syscall.Umask(0007) + defer syscall.Umask(oldUmask) + var lc net.ListenConfig + lis, err := lc.Listen(ctx, "unix", udsName) + if err != nil { + return nil, fmt.Errorf("failed to listen(unix) name %s: %v", udsName, err) + } + return lis, nil +} + +func (p *Proxy) runMasterServer(ctx context.Context, o *options.ProxyRunOptions, server *server.ProxyServer) (StopFunc, error) { + if o.UdsName != "" { + return p.runUDSMasterServer(ctx, o, server) + } + return p.runMTLSMasterServer(ctx, o, server) +} + +func (p *Proxy) runUDSMasterServer(ctx context.Context, o *options.ProxyRunOptions, s *server.ProxyServer) (StopFunc, error) { + if o.DeleteUDSFile { + if err := os.Remove(o.UdsName); err != nil && !os.IsNotExist(err) { + klog.ErrorS(err, "failed to delete file", "file", o.UdsName) + } + } + var stop StopFunc + if o.Mode == "grpc" { + grpcServer := grpc.NewServer() + client.RegisterProxyServiceServer(grpcServer, s) + lis, err := getUDSListener(ctx, o.UdsName) + if err != nil { + return nil, fmt.Errorf("failed to get uds listener: %v", err) + } + go grpcServer.Serve(lis) + stop = grpcServer.GracefulStop + } else { + // http-connect + server := &http.Server{ + Handler: &server.Tunnel{ + Server: s, + }, + } + stop = func() { + err := server.Shutdown(ctx) + klog.ErrorS(err, "error shutting down server") + } + go func() { + udsListener, err := getUDSListener(ctx, o.UdsName) + if err != nil { + klog.ErrorS(err, "failed to get uds listener") + } + defer func() { + err := udsListener.Close() + klog.ErrorS(err, "failed to close uds listener") + }() + err = server.Serve(udsListener) + if err != nil { + klog.ErrorS(err, "failed to serve uds requests") + } + }() + } + + return stop, nil +} + +func (p *Proxy) getTLSConfig(caFile, certFile, keyFile string) (*tls.Config, error) { + cert, err := tls.LoadX509KeyPair(certFile, keyFile) + if err != nil { + return nil, fmt.Errorf("failed to load X509 key pair %s and %s: %v", certFile, keyFile, err) + } + + if caFile == "" { + return &tls.Config{Certificates: []tls.Certificate{cert}, MinVersion: tls.VersionTLS12}, nil + } + + certPool := x509.NewCertPool() + caCert, err := ioutil.ReadFile(filepath.Clean(caFile)) + if err != nil { + return nil, fmt.Errorf("failed to read cluster CA cert %s: %v", caFile, err) + } + ok := certPool.AppendCertsFromPEM(caCert) + if !ok { + return nil, fmt.Errorf("failed to append cluster CA cert to the cert pool") + } + tlsConfig := &tls.Config{ + ClientAuth: tls.RequireAndVerifyClientCert, + Certificates: []tls.Certificate{cert}, + ClientCAs: certPool, + MinVersion: tls.VersionTLS12, + } + + return tlsConfig, nil +} + +func (p *Proxy) runMTLSMasterServer(ctx context.Context, o *options.ProxyRunOptions, s *server.ProxyServer) (StopFunc, error) { + var stop StopFunc + + var tlsConfig *tls.Config + var err error + if tlsConfig, err = p.getTLSConfig(o.ServerCaCert, o.ServerCert, o.ServerKey); err != nil { + return nil, err + } + + addr := fmt.Sprintf(":%d", o.ServerPort) + + if o.Mode == grpcMode { + serverOption := grpc.Creds(credentials.NewTLS(tlsConfig)) + grpcServer := grpc.NewServer(serverOption) + client.RegisterProxyServiceServer(grpcServer, s) + lis, err := net.Listen("tcp", addr) + if err != nil { + return nil, fmt.Errorf("failed to listen on %s: %v", addr, err) + } + go grpcServer.Serve(lis) + stop = grpcServer.GracefulStop + } else { + // http-connect with no tls + httpServer := &http.Server{ + Addr: ":8088", + Handler: &server.Tunnel{ + Server: s, + }, + } + // http-connect + server := &http.Server{ + Addr: addr, + TLSConfig: tlsConfig, + Handler: &server.Tunnel{ + Server: s, + }, + TLSNextProto: make(map[string]func(*http.Server, *tls.Conn, http.Handler)), + } + + stop = func() { + err := server.Shutdown(ctx) + if err != nil { + klog.ErrorS(err, "failed to shutdown server") + } + err = httpServer.Shutdown(ctx) + if err != nil { + klog.ErrorS(err, "failed to shutdown httpServer") + } + } + go func() { + err := server.ListenAndServeTLS("", "") // empty files defaults to tlsConfig + if err != nil { + klog.ErrorS(err, "failed to listen on master port") + } + }() + go func() { + err := httpServer.ListenAndServe() + if err != nil { + klog.ErrorS(err, "failed to listen on http master port") + } + }() + } + + return stop, nil +} + +func (p *Proxy) runAgentServer(o *options.ProxyRunOptions, server *server.ProxyServer) error { + var tlsConfig *tls.Config + var err error + if tlsConfig, err = p.getTLSConfig(o.ClusterCaCert, o.ClusterCert, o.ClusterKey); err != nil { + return err + } + + addr := fmt.Sprintf(":%d", o.AgentPort) + serverOptions := []grpc.ServerOption{ + grpc.Creds(credentials.NewTLS(tlsConfig)), + grpc.KeepaliveParams(keepalive.ServerParameters{Time: o.KeepaliveTime}), + } + grpcServer := grpc.NewServer(serverOptions...) + agent.RegisterAgentServiceServer(grpcServer, server) + lis, err := net.Listen("tcp", addr) + if err != nil { + return fmt.Errorf("failed to listen on %s: %v", addr, err) + } + go grpcServer.Serve(lis) + + return nil +} + +func (p *Proxy) runAdminServer(o *options.ProxyRunOptions, server *server.ProxyServer) error { + muxHandler := http.NewServeMux() + muxHandler.Handle("/metrics", promhttp.Handler()) + if o.EnableProfiling { + muxHandler.HandleFunc("/debug/pprof", util.RedirectTo("/debug/pprof/")) + muxHandler.HandleFunc("/debug/pprof/", pprof.Index) + if o.EnableContentionProfiling { + runtime.SetBlockProfileRate(1) + } + } + adminServer := &http.Server{ + Addr: fmt.Sprintf("127.0.0.1:%d", o.AdminPort), + Handler: muxHandler, + MaxHeaderBytes: 1 << 20, + } + + go func() { + err := adminServer.ListenAndServe() + if err != nil { + klog.ErrorS(err, "admin server could not listen") + } + klog.V(1).Infoln("Admin server stopped listening") + }() + + return nil +} + +func (p *Proxy) runHealthServer(o *options.ProxyRunOptions, server *server.ProxyServer) error { + livenessHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + fmt.Fprintf(w, "ok") + }) + readinessHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + ready, msg := server.Readiness.Ready() + if ready { + w.WriteHeader(200) + fmt.Fprintf(w, "ok") + return + } + w.WriteHeader(500) + fmt.Fprintf(w, msg) + }) + + muxHandler := http.NewServeMux() + muxHandler.HandleFunc("/healthz", livenessHandler) + muxHandler.HandleFunc("/ready", readinessHandler) + healthServer := &http.Server{ + Addr: fmt.Sprintf(":%d", o.HealthPort), + Handler: muxHandler, + MaxHeaderBytes: 1 << 20, + } + + go func() { + err := healthServer.ListenAndServe() + if err != nil { + klog.ErrorS(err, "health server could not listen") + } + klog.V(1).Infoln("Health server stopped listening") + }() + + return nil +} diff --git a/edgesite/cmd/edgesite-server/main.go b/edgesite/cmd/edgesite-server/main.go index 8725c758a..881eb0354 100644 --- a/edgesite/cmd/edgesite-server/main.go +++ b/edgesite/cmd/edgesite-server/main.go @@ -17,49 +17,22 @@ limitations under the License. package main import ( - "context" - "crypto/tls" - "crypto/x509" "flag" "fmt" - "io/ioutil" - "net" - "net/http" - "net/http/pprof" "os" - "os/signal" - "path/filepath" - "runtime" - "strings" - "sync" - "syscall" - "time" - "github.com/google/uuid" - "github.com/prometheus/client_golang/prometheus/promhttp" - "github.com/spf13/cobra" - "github.com/spf13/pflag" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials" - "google.golang.org/grpc/keepalive" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/tools/clientcmd" "k8s.io/klog/v2" - "sigs.k8s.io/apiserver-network-proxy/konnectivity-client/proto/client" - "sigs.k8s.io/apiserver-network-proxy/pkg/server" + "sigs.k8s.io/apiserver-network-proxy/cmd/server/app/options" "sigs.k8s.io/apiserver-network-proxy/pkg/util" - "sigs.k8s.io/apiserver-network-proxy/proto/agent" -) - -var udsListenerLock sync.Mutex -const grpcMode = "grpc" + "github.com/kubeedge/kubeedge/edgesite/cmd/edgesite-server/app" +) func main() { // flag.CommandLine.Parse(os.Args[1:]) - proxy := &Proxy{} - o := newProxyRunOptions() - command := newProxyCommand(proxy, o) + proxy := &app.Proxy{} + o := options.NewProxyRunOptions() + command := app.NewProxyCommand(proxy, o) flags := command.Flags() flags.AddFlagSet(o.Flags()) local := flag.NewFlagSet(os.Args[0], flag.ExitOnError) @@ -78,622 +51,3 @@ func main() { os.Exit(1) } } - -type ProxyRunOptions struct { - // Certificate setup for securing communication to the "client" i.e. the Kube API Server. - serverCert string - serverKey string - serverCaCert string - // Certificate setup for securing communication to the "agent" i.e. the managed cluster. - clusterCert string - clusterKey string - clusterCaCert string - // Flag to switch between gRPC and HTTP Connect - mode string - // Location for use by the "unix" network. Setting enables UDS for server connections. - udsName string - // If file udsName already exists, delete the file before listen on that UDS file. - deleteUDSFile bool - // Port we listen for server connections on. - serverPort uint - // Port we listen for agent connections on. - agentPort uint - // Port we listen for admin connections on. - adminPort uint - // Port we listen for health connections on. - healthPort uint - // After a duration of this time if the server doesn't see any activity it - // pings the client to see if the transport is still alive. - keepaliveTime time.Duration - // Enables pprof at host:adminPort/debug/pprof. - enableProfiling bool - // If enableProfiling is true, this enables the lock contention - // profiling at host:adminPort/debug/pprof/block. - enableContentionProfiling bool - - // ID of this proxy server. - serverID string - // Number of proxy server instances, should be 1 unless it is a HA proxy server. - serverCount uint - // Agent pod's namespace for token-based agent authentication - agentNamespace string - // Agent pod's service account for token-based agent authentication - agentServiceAccount string - // Token's audience for token-based agent authentication - authenticationAudience string - // Path to kubeconfig (used by kubernetes client) - kubeconfigPath string - - // Proxy strategies used by the server. - // NOTE the order of the strategies matters. e.g., for list - // "destHost,destCIDR", the server will try to find a backend associating - // to the destination host first, if not found, it will try to find a - // backend within the destCIDR. if it still can't find any backend, - // it will use the default backend manager to choose a random backend. - proxyStrategies string -} - -func (o *ProxyRunOptions) Flags() *pflag.FlagSet { - flags := pflag.NewFlagSet("proxy-server", pflag.ContinueOnError) - flags.StringVar(&o.serverCert, "server-cert", o.serverCert, "If non-empty secure communication with this cert.") - flags.StringVar(&o.serverKey, "server-key", o.serverKey, "If non-empty secure communication with this key.") - flags.StringVar(&o.serverCaCert, "server-ca-cert", o.serverCaCert, "If non-empty the CA we use to validate KAS clients.") - flags.StringVar(&o.clusterCert, "cluster-cert", o.clusterCert, "If non-empty secure communication with this cert.") - flags.StringVar(&o.clusterKey, "cluster-key", o.clusterKey, "If non-empty secure communication with this key.") - flags.StringVar(&o.clusterCaCert, "cluster-ca-cert", o.clusterCaCert, "If non-empty the CA we use to validate Agent clients.") - flags.StringVar(&o.mode, "mode", o.mode, "Mode can be either 'grpc' or 'http-connect'.") - flags.StringVar(&o.udsName, "uds-name", o.udsName, "uds-name should be empty for TCP traffic. For UDS set to its name.") - flags.BoolVar(&o.deleteUDSFile, "delete-existing-uds-file", o.deleteUDSFile, "If true and if file udsName already exists, delete the file before listen on that UDS file") - flags.UintVar(&o.serverPort, "server-port", o.serverPort, "Port we listen for server connections on. Set to 0 for UDS.") - flags.UintVar(&o.agentPort, "agent-port", o.agentPort, "Port we listen for agent connections on.") - flags.UintVar(&o.adminPort, "admin-port", o.adminPort, "Port we listen for admin connections on.") - flags.UintVar(&o.healthPort, "health-port", o.healthPort, "Port we listen for health connections on.") - flags.DurationVar(&o.keepaliveTime, "keepalive-time", o.keepaliveTime, "Time for gRPC server keepalive.") - flags.BoolVar(&o.enableProfiling, "enable-profiling", o.enableProfiling, "enable pprof at host:admin-port/debug/pprof") - flags.BoolVar(&o.enableContentionProfiling, "enable-contention-profiling", o.enableContentionProfiling, "enable contention profiling at host:admin-port/debug/pprof/block. \"--enable-profiling\" must also be set.") - flags.StringVar(&o.serverID, "server-id", o.serverID, "The unique ID of this server.") - flags.UintVar(&o.serverCount, "server-count", o.serverCount, "The number of proxy server instances, should be 1 unless it is an HA server.") - flags.StringVar(&o.agentNamespace, "agent-namespace", o.agentNamespace, "Expected agent's namespace during agent authentication (used with agent-service-account, authentication-audience, kubeconfig).") - flags.StringVar(&o.agentServiceAccount, "agent-service-account", o.agentServiceAccount, "Expected agent's service account during agent authentication (used with agent-namespace, authentication-audience, kubeconfig).") - flags.StringVar(&o.kubeconfigPath, "kubeconfig", o.kubeconfigPath, "absolute path to the kubeconfig file (used with agent-namespace, agent-service-account, authentication-audience).") - flags.StringVar(&o.authenticationAudience, "authentication-audience", o.authenticationAudience, "Expected agent's token authentication audience (used with agent-namespace, agent-service-account, kubeconfig).") - flags.StringVar(&o.proxyStrategies, "proxy-strategies", o.proxyStrategies, "The list of proxy strategies used by the server to pick a backend/tunnel, available strategies are: default, destHost.") - return flags -} - -func (o *ProxyRunOptions) Print() { - klog.V(1).Infof("ServerCert set to %q.\n", o.serverCert) - klog.V(1).Infof("ServerKey set to %q.\n", o.serverKey) - klog.V(1).Infof("ServerCACert set to %q.\n", o.serverCaCert) - klog.V(1).Infof("ClusterCert set to %q.\n", o.clusterCert) - klog.V(1).Infof("ClusterKey set to %q.\n", o.clusterKey) - klog.V(1).Infof("ClusterCACert set to %q.\n", o.clusterCaCert) - klog.V(1).Infof("Mode set to %q.\n", o.mode) - klog.V(1).Infof("UDSName set to %q.\n", o.udsName) - klog.V(1).Infof("DeleteUDSFile set to %v.\n", o.deleteUDSFile) - klog.V(1).Infof("Server port set to %d.\n", o.serverPort) - klog.V(1).Infof("Agent port set to %d.\n", o.agentPort) - klog.V(1).Infof("Admin port set to %d.\n", o.adminPort) - klog.V(1).Infof("Health port set to %d.\n", o.healthPort) - klog.V(1).Infof("Keepalive time set to %v.\n", o.keepaliveTime) - klog.V(1).Infof("EnableProfiling set to %v.\n", o.enableProfiling) - klog.V(1).Infof("EnableContentionProfiling set to %v.\n", o.enableContentionProfiling) - klog.V(1).Infof("ServerID set to %s.\n", o.serverID) - klog.V(1).Infof("ServerCount set to %d.\n", o.serverCount) - klog.V(1).Infof("AgentNamespace set to %q.\n", o.agentNamespace) - klog.V(1).Infof("AgentServiceAccount set to %q.\n", o.agentServiceAccount) - klog.V(1).Infof("AuthenticationAudience set to %q.\n", o.authenticationAudience) - klog.V(1).Infof("KubeconfigPath set to %q.\n", o.kubeconfigPath) - klog.V(1).Infof("ProxyStrategies set to %q.\n", o.proxyStrategies) -} - -func (o *ProxyRunOptions) Validate() error { - if o.serverKey != "" { - if _, err := os.Stat(o.serverKey); os.IsNotExist(err) { - return fmt.Errorf("error checking server key %s, got %v", o.serverKey, err) - } - if o.serverCert == "" { - return fmt.Errorf("cannot have server cert empty when server key is set to %q", o.serverKey) - } - } - if o.serverCert != "" { - if _, err := os.Stat(o.serverCert); os.IsNotExist(err) { - return fmt.Errorf("error checking server cert %s, got %v", o.serverCert, err) - } - if o.serverKey == "" { - return fmt.Errorf("cannot have server key empty when server cert is set to %q", o.serverCert) - } - } - if o.serverCaCert != "" { - if _, err := os.Stat(o.serverCaCert); os.IsNotExist(err) { - return fmt.Errorf("error checking server CA cert %s, got %v", o.serverCaCert, err) - } - } - if o.clusterKey != "" { - if _, err := os.Stat(o.clusterKey); os.IsNotExist(err) { - return fmt.Errorf("error checking cluster key %s, got %v", o.clusterKey, err) - } - if o.clusterCert == "" { - return fmt.Errorf("cannot have cluster cert empty when cluster key is set to %q", o.clusterKey) - } - } - if o.clusterCert != "" { - if _, err := os.Stat(o.clusterCert); os.IsNotExist(err) { - return fmt.Errorf("error checking cluster cert %s, got %v", o.clusterCert, err) - } - if o.clusterKey == "" { - return fmt.Errorf("cannot have cluster key empty when cluster cert is set to %q", o.clusterCert) - } - } - if o.clusterCaCert != "" { - if _, err := os.Stat(o.clusterCaCert); os.IsNotExist(err) { - return fmt.Errorf("error checking cluster CA cert %s, got %v", o.clusterCaCert, err) - } - } - if o.mode != grpcMode && o.mode != "http-connect" { - return fmt.Errorf("mode must be set to either 'grpc' or 'http-connect' not %q", o.mode) - } - if o.udsName != "" { - if o.serverPort != 0 { - return fmt.Errorf("server port should be set to 0 not %d for UDS", o.serverPort) - } - if o.serverKey != "" { - return fmt.Errorf("server key should not be set for UDS") - } - if o.serverCert != "" { - return fmt.Errorf("server cert should not be set for UDS") - } - if o.serverCaCert != "" { - return fmt.Errorf("server ca cert should not be set for UDS") - } - } - if o.serverPort > 49151 { - return fmt.Errorf("please do not try to use ephemeral port %d for the server port", o.serverPort) - } - if o.agentPort > 49151 { - return fmt.Errorf("please do not try to use ephemeral port %d for the agent port", o.agentPort) - } - if o.adminPort > 49151 { - return fmt.Errorf("please do not try to use ephemeral port %d for the admin port", o.adminPort) - } - if o.healthPort > 49151 { - return fmt.Errorf("please do not try to use ephemeral port %d for the health port", o.healthPort) - } - - if o.serverPort < 1024 { - if o.udsName == "" { - return fmt.Errorf("please do not try to use reserved port %d for the server port", o.serverPort) - } - } - if o.agentPort < 1024 { - return fmt.Errorf("please do not try to use reserved port %d for the agent port", o.agentPort) - } - if o.adminPort < 1024 { - return fmt.Errorf("please do not try to use reserved port %d for the admin port", o.adminPort) - } - if o.healthPort < 1024 { - return fmt.Errorf("please do not try to use reserved port %d for the health port", o.healthPort) - } - if o.enableContentionProfiling && !o.enableProfiling { - return fmt.Errorf("if --enable-contention-profiling is set, --enable-profiling must also be set") - } - - // validate agent authentication params - // all 4 parametes must be empty or must have value (except kubeconfigPath that might be empty) - if o.agentNamespace != "" || o.agentServiceAccount != "" || o.authenticationAudience != "" || o.kubeconfigPath != "" { - if o.clusterCaCert != "" { - return fmt.Errorf("clusterCaCert can not be used when service account authentication is enabled") - } - if o.agentNamespace == "" { - return fmt.Errorf("agentNamespace cannot be empty when agent authentication is enabled") - } - if o.agentServiceAccount == "" { - return fmt.Errorf("agentServiceAccount cannot be empty when agent authentication is enabled") - } - if o.authenticationAudience == "" { - return fmt.Errorf("authenticationAudience cannot be empty when agent authentication is enabled") - } - if o.kubeconfigPath != "" { - if _, err := os.Stat(o.kubeconfigPath); os.IsNotExist(err) { - return fmt.Errorf("error checking kubeconfigPath %q, got %v", o.kubeconfigPath, err) - } - } - } - - // validate the proxy strategies - if o.proxyStrategies != "" { - pss := strings.Split(o.proxyStrategies, ",") - for _, ps := range pss { - switch ps { - case string(server.ProxyStrategyDestHost): - case string(server.ProxyStrategyDefault): - default: - return fmt.Errorf("unknown proxy strategy: %s, available strategy are: default, destHost", ps) - } - } - } - - return nil -} - -func newProxyRunOptions() *ProxyRunOptions { - o := ProxyRunOptions{ - serverCert: "", - serverKey: "", - serverCaCert: "", - clusterCert: "", - clusterKey: "", - clusterCaCert: "", - mode: grpcMode, - udsName: "", - deleteUDSFile: false, - serverPort: 8090, - agentPort: 8091, - healthPort: 8092, - adminPort: 8095, - keepaliveTime: 1 * time.Hour, - enableProfiling: false, - enableContentionProfiling: false, - serverID: uuid.New().String(), - serverCount: 1, - agentNamespace: "", - agentServiceAccount: "", - kubeconfigPath: "", - authenticationAudience: "", - proxyStrategies: "default", - } - return &o -} - -func newProxyCommand(p *Proxy, o *ProxyRunOptions) *cobra.Command { - cmd := &cobra.Command{ - Use: "proxy", - Long: `A gRPC proxy server, receives requests from the API server and forwards to the agent.`, - RunE: func(cmd *cobra.Command, args []string) error { - return p.run(o) - }, - } - - return cmd -} - -type Proxy struct { -} - -type StopFunc func() - -func (p *Proxy) run(o *ProxyRunOptions) error { - o.Print() - if err := o.Validate(); err != nil { - return fmt.Errorf("failed to validate server options with %v", err) - } - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - var k8sClient *kubernetes.Clientset - if o.agentNamespace != "" { - config, err := clientcmd.BuildConfigFromFlags("", o.kubeconfigPath) - if err != nil { - return fmt.Errorf("failed to load kubernetes client config: %v", err) - } - - k8sClient, err = kubernetes.NewForConfig(config) - if err != nil { - return fmt.Errorf("failed to create kubernetes clientset: %v", err) - } - } - - authOpt := &server.AgentTokenAuthenticationOptions{ - Enabled: o.agentNamespace != "", - AgentNamespace: o.agentNamespace, - AgentServiceAccount: o.agentServiceAccount, - KubernetesClient: k8sClient, - AuthenticationAudience: o.authenticationAudience, - } - klog.V(1).Infoln("Starting master server for client connections.") - ps, err := server.GenProxyStrategiesFromStr(o.proxyStrategies) - if err != nil { - return err - } - server := server.NewProxyServer(o.serverID, ps, int(o.serverCount), authOpt) - - masterStop, err := p.runMasterServer(ctx, o, server) - if err != nil { - return fmt.Errorf("failed to run the master server: %v", err) - } - - klog.V(1).Infoln("Starting agent server for tunnel connections.") - err = p.runAgentServer(o, server) - if err != nil { - return fmt.Errorf("failed to run the agent server: %v", err) - } - klog.V(1).Infoln("Starting admin server for debug connections.") - err = p.runAdminServer(o, server) - if err != nil { - return fmt.Errorf("failed to run the admin server: %v", err) - } - klog.V(1).Infoln("Starting health server for healthchecks.") - err = p.runHealthServer(o, server) - if err != nil { - return fmt.Errorf("failed to run the health server: %v", err) - } - - stopCh := SetupSignalHandler() - <-stopCh - klog.V(1).Infoln("Shutting down server.") - - if masterStop != nil { - masterStop() - } - - return nil -} - -var shutdownSignals = []os.Signal{os.Interrupt, syscall.SIGTERM} - -func SetupSignalHandler() (stopCh <-chan struct{}) { - stop := make(chan struct{}) - c := make(chan os.Signal, 2) - signal.Notify(c, shutdownSignals...) - go func() { - <-c - close(stop) - <-c - os.Exit(1) // second signal. Exit directly. - }() - - return stop -} - -func getUDSListener(ctx context.Context, udsName string) (net.Listener, error) { - udsListenerLock.Lock() - defer udsListenerLock.Unlock() - oldUmask := syscall.Umask(0007) - defer syscall.Umask(oldUmask) - var lc net.ListenConfig - lis, err := lc.Listen(ctx, "unix", udsName) - if err != nil { - return nil, fmt.Errorf("failed to listen(unix) name %s: %v", udsName, err) - } - return lis, nil -} - -func (p *Proxy) runMasterServer(ctx context.Context, o *ProxyRunOptions, server *server.ProxyServer) (StopFunc, error) { - if o.udsName != "" { - return p.runUDSMasterServer(ctx, o, server) - } - return p.runMTLSMasterServer(ctx, o, server) -} - -func (p *Proxy) runUDSMasterServer(ctx context.Context, o *ProxyRunOptions, s *server.ProxyServer) (StopFunc, error) { - if o.deleteUDSFile { - if err := os.Remove(o.udsName); err != nil && !os.IsNotExist(err) { - klog.ErrorS(err, "failed to delete file", "file", o.udsName) - } - } - var stop StopFunc - if o.mode == grpcMode { - grpcServer := grpc.NewServer() - client.RegisterProxyServiceServer(grpcServer, s) - lis, err := getUDSListener(ctx, o.udsName) - if err != nil { - return nil, fmt.Errorf("failed to get uds listener: %v", err) - } - go grpcServer.Serve(lis) - stop = grpcServer.GracefulStop - } else { - // http-connect - server := &http.Server{ - Handler: &server.Tunnel{ - Server: s, - }, - } - stop = func() { - err := server.Shutdown(ctx) - klog.ErrorS(err, "error shutting down server") - } - go func() { - udsListener, err := getUDSListener(ctx, o.udsName) - if err != nil { - klog.ErrorS(err, "failed to get uds listener") - } - defer func() { - err := udsListener.Close() - klog.ErrorS(err, "failed to close uds listener") - }() - err = server.Serve(udsListener) - if err != nil { - klog.ErrorS(err, "failed to serve uds requests") - } - }() - } - - return stop, nil -} - -func (p *Proxy) getTLSConfig(caFile, certFile, keyFile string) (*tls.Config, error) { - cert, err := tls.LoadX509KeyPair(certFile, keyFile) - if err != nil { - return nil, fmt.Errorf("failed to load X509 key pair %s and %s: %v", certFile, keyFile, err) - } - - if caFile == "" { - return &tls.Config{Certificates: []tls.Certificate{cert}, MinVersion: tls.VersionTLS12}, nil - } - - certPool := x509.NewCertPool() - caCert, err := ioutil.ReadFile(filepath.Clean(caFile)) - if err != nil { - return nil, fmt.Errorf("failed to read cluster CA cert %s: %v", caFile, err) - } - ok := certPool.AppendCertsFromPEM(caCert) - if !ok { - return nil, fmt.Errorf("failed to append cluster CA cert to the cert pool") - } - tlsConfig := &tls.Config{ - ClientAuth: tls.RequireAndVerifyClientCert, - Certificates: []tls.Certificate{cert}, - ClientCAs: certPool, - MinVersion: tls.VersionTLS12, - } - - return tlsConfig, nil -} - -func (p *Proxy) runMTLSMasterServer(ctx context.Context, o *ProxyRunOptions, s *server.ProxyServer) (StopFunc, error) { - var stop StopFunc - - var tlsConfig *tls.Config - var err error - if tlsConfig, err = p.getTLSConfig(o.serverCaCert, o.serverCert, o.serverKey); err != nil { - return nil, err - } - - addr := fmt.Sprintf(":%d", o.serverPort) - - if o.mode == grpcMode { - serverOption := grpc.Creds(credentials.NewTLS(tlsConfig)) - grpcServer := grpc.NewServer(serverOption) - client.RegisterProxyServiceServer(grpcServer, s) - lis, err := net.Listen("tcp", addr) - if err != nil { - return nil, fmt.Errorf("failed to listen on %s: %v", addr, err) - } - go grpcServer.Serve(lis) - stop = grpcServer.GracefulStop - } else { - // http-connect with no tls - httpServer := &http.Server{ - Addr: ":8088", - Handler: &server.Tunnel{ - Server: s, - }, - } - // http-connect - server := &http.Server{ - Addr: addr, - TLSConfig: tlsConfig, - Handler: &server.Tunnel{ - Server: s, - }, - TLSNextProto: make(map[string]func(*http.Server, *tls.Conn, http.Handler)), - } - - stop = func() { - err := server.Shutdown(ctx) - if err != nil { - klog.ErrorS(err, "failed to shutdown server") - } - err = httpServer.Shutdown(ctx) - if err != nil { - klog.ErrorS(err, "failed to shutdown httpServer") - } - } - go func() { - err := server.ListenAndServeTLS("", "") // empty files defaults to tlsConfig - if err != nil { - klog.ErrorS(err, "failed to listen on master port") - } - }() - go func() { - err := httpServer.ListenAndServe() - if err != nil { - klog.ErrorS(err, "failed to listen on http master port") - } - }() - } - - return stop, nil -} - -func (p *Proxy) runAgentServer(o *ProxyRunOptions, server *server.ProxyServer) error { - var tlsConfig *tls.Config - var err error - if tlsConfig, err = p.getTLSConfig(o.clusterCaCert, o.clusterCert, o.clusterKey); err != nil { - return err - } - - addr := fmt.Sprintf(":%d", o.agentPort) - serverOptions := []grpc.ServerOption{ - grpc.Creds(credentials.NewTLS(tlsConfig)), - grpc.KeepaliveParams(keepalive.ServerParameters{Time: o.keepaliveTime}), - } - grpcServer := grpc.NewServer(serverOptions...) - agent.RegisterAgentServiceServer(grpcServer, server) - lis, err := net.Listen("tcp", addr) - if err != nil { - return fmt.Errorf("failed to listen on %s: %v", addr, err) - } - go grpcServer.Serve(lis) - - return nil -} - -// redirectTo redirects request to a certain destination. -func redirectTo(to string) func(http.ResponseWriter, *http.Request) { - return func(rw http.ResponseWriter, req *http.Request) { - http.Redirect(rw, req, to, http.StatusMovedPermanently) - } -} - -func (p *Proxy) runAdminServer(o *ProxyRunOptions, server *server.ProxyServer) error { - muxHandler := http.NewServeMux() - muxHandler.Handle("/metrics", promhttp.Handler()) - if o.enableProfiling { - muxHandler.HandleFunc("/debug/pprof", redirectTo("/debug/pprof/")) - muxHandler.HandleFunc("/debug/pprof/", pprof.Index) - if o.enableContentionProfiling { - runtime.SetBlockProfileRate(1) - } - } - adminServer := &http.Server{ - Addr: fmt.Sprintf("127.0.0.1:%d", o.adminPort), - Handler: muxHandler, - MaxHeaderBytes: 1 << 20, - } - - go func() { - err := adminServer.ListenAndServe() - if err != nil { - klog.ErrorS(err, "admin server could not listen") - } - klog.V(1).Infoln("Admin server stopped listening") - }() - - return nil -} - -func (p *Proxy) runHealthServer(o *ProxyRunOptions, server *server.ProxyServer) error { - livenessHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - fmt.Fprintf(w, "ok") - }) - readinessHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - ready, msg := server.Readiness.Ready() - if ready { - w.WriteHeader(200) - fmt.Fprintf(w, "ok") - return - } - w.WriteHeader(500) - fmt.Fprintf(w, msg) - }) - - muxHandler := http.NewServeMux() - muxHandler.HandleFunc("/healthz", livenessHandler) - muxHandler.HandleFunc("/ready", readinessHandler) - healthServer := &http.Server{ - Addr: fmt.Sprintf(":%d", o.healthPort), - Handler: muxHandler, - MaxHeaderBytes: 1 << 20, - } - - go func() { - err := healthServer.ListenAndServe() - if err != nil { - klog.ErrorS(err, "health server could not listen") - } - klog.V(1).Infoln("Health server stopped listening") - }() - - return nil -} |
