2026年01月21日/ 浏览 8
✍️ 作者:茶水间Tech
️ 标签:#云计算#云原生#kubernetes#容器
kubernetes的模块比较多,架构复杂,代码量更是庞大,看代码比较麻烦,我们从现实场景出发,从创建POD分析在Kubernetes内部的代码流程,本系列文章从POD创建,整体梳理Kubernetes源码实现,其中本节主要分析kube-apiserver侧的流程实现。
本文基于 Client Version: v1.34.3 , Server Version: v1.34.2
POD创建的整体架构图:

整个Kubernetes技术体系由声明式API以及Controller构成,而kube-apiserver是Kubernetes的声明式api server,作为整个Kubernetes集群操作etcd的唯一入口,负责Kubernetes各资源的认证&鉴权,校验以及CRUD等操作,提供RESTful APIs,供其它组件调用
官方地址:
https://github.com/kubernetes/apiserverk8s API Server提供了k8s各类资源对象(pod,RC,Service等)的增删改查及watch等HTTP Rest接口,是整个系统的数据总线和数据中心。
kubernetes API Server的功能:
提供了集群管理的REST API接口(包括认证授权、数据校验以及集群状态变更);提供其他模块之间的数据交互和通信的枢纽(其他模块通过API Server查询或修改数据,只有API Server才直接操作etcd);是资源配额控制的入口;拥有完备的集群安全机制.整个kubeAPIServer提供了三类API Resource接口:
core group:主要在 /api/v1 下;named groups:其 path 为 /apis/$GROUP/$VERSION;系统状态的一些 API:如/metrics 、/version 等;而API的URL大致以 /apis/{group}/{version}/namespaces/{namespace}/resource/{name} 组成,结构如下图所示:
上文中,kubectl 最终请求 https://<apiserver-host>:<port>/apis/apps/v1/namespaces/<namespace>/deployments 创建了nginx的deployment
apiserver 启动的时候添加了为handler 添加了
WithAuthentication/WithAuthorization等中间件filter,经filter过滤后创建POD的代码在 CreateResource中(installer.go 中的 registerResourceHandlers 中将POST请求转过来),实际createHandler() 中 实现了完整的资源创建流程,包括参数验证、准入控制WithAdmission、对象转换、存储和响应处理等所有必要步骤。代码路径:kubernetes/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/create.go
// 参数: 资源的创建接口, 请求作用域以及准入控制接口 func CreateResource(r rest.Creater, scope *RequestScope, admission admission.Interface) http.HandlerFunc { return createHandler(&namedCreaterAdapter{r}, scope, admission, false) } func createHandler(r rest.NamedCreater, scope *RequestScope, admit admission.Interface, includeName bool) http.HandlerFunc { return func(w http.ResponseWriter, req *http.Request) { //...(略) // 1. 解析命名空间和名称 namespace, name, err := scope.Namer.Name(req) //...(略) // 设置 34 秒的上限超时 ctx, cancel := context.WithTimeout(ctx, requestTimeoutUpperBound) //...(略) //读取POST请求体body(带大小限制) body, err := limitedReadBodyWithRecordMetric(ctx, req, scope.MaxRequestBodyBytes, scope.Resource.GroupResource(), requestmetrics.Create) if err != nil { span.AddEvent("limitedReadBody failed", attribute.Int("len", len(body)), attribute.String("err", err.Error())) scope.err(err, w, req) return } span.AddEvent("limitedReadBody succeeded", attribute.Int("len", len(body))) // 解析创建选项 options := &metav1.CreateOptions{} values := req.URL.Query() if err := metainternalversionscheme.ParameterCodec.DecodeParameters(values, scope.MetaGroupVersion, options); err != nil { err = errors.NewBadRequest(err.Error()) scope.err(err, w, req) return } if errs := validation.ValidateCreateOptions(options); len(errs) > 0 { err := errors.NewInvalid(schema.GroupKind{Group: metav1.GroupName, Kind: "CreateOptions"}, "", errs) scope.err(err, w, req) return } options.TypeMeta.SetGroupVersionKind(metav1.SchemeGroupVersion.WithKind("CreateOptions")) defaultGVK := scope.Kind original := r.New() // 字段验证 validationDirective := fieldValidation(options.FieldValidation) decodeSerializer := s.Serializer if validationDirective == metav1.FieldValidationWarn || validationDirective == metav1.FieldValidationStrict { decodeSerializer = s.StrictSerializer } // 将body按照编码正常期望版本的obj decoder := scope.Serializer.DecoderToVersion(decodeSerializer, scope.HubGroupVersion) span.AddEvent("About to convert to expected version") obj, gvk, err := decoder.Decode(body, &defaultGVK, original) // ...(略) // 验证API版本 if !scope.AcceptsGroupVersion(objGV) { err = errors.NewBadRequest(fmt.Sprintf("the API version in the data (%s) does not match the expected API version (%v)", objGV.String(), gv.String())) scope.err(err, w, req) return } span.AddEvent("Conversion done") // 处理创建的name 以及namespace if len(name) == 0 { _, name, _ = scope.Namer.ObjectName(obj) } if len(namespace) == 0 && scope.Resource == namespaceGVR { namespace = name } ctx = request.WithNamespace(ctx, namespace) admit = admission.WithAudit(admit) audit.LogRequestObject(req.Context(), obj, objGV, scope.Resource, scope.Subresource, scope.Serializer) userInfo, _ := request.UserFrom(ctx) if objectMeta, err := meta.Accessor(obj); err == nil { preserveObjectMetaSystemFields := false if c, ok := r.(rest.SubresourceObjectMetaPreserver); ok && len(scope.Subresource) > 0 { preserveObjectMetaSystemFields = c.PreserveRequestObjectMetaSystemFieldsOnSubresourceCreate() } if !preserveObjectMetaSystemFields { rest.WipeObjectMetaSystemFields(objectMeta) } // ensure namespace on the object is correct, or error if a conflicting namespace was set in the object if err := rest.EnsureObjectNamespaceMatchesRequestNamespace(rest.ExpectedNamespaceForResource(namespace, scope.Resource), objectMeta); err != nil { scope.err(err, w, req) return } } span.AddEvent("About to store object in database") //准备准入控制所需的属性插件内容 admissionAttributes := admission.NewAttributesRecord(obj, nil, scope.Kind, namespace, name, scope.Resource, scope.Subresource, admission.Create, options, dryrun.IsDryRun(options.DryRun), userInfo) // 创建请求函数 requestFunc := func() (runtime.Object, error) { return r.Create( ctx, name, obj, rest.AdmissionToValidateObjectFunc(admit, admissionAttributes, scope), options, ) } // Dedup owner references before updating managed fields dedupOwnerReferencesAndAddWarning(obj, req.Context(), false) //执行创建 result, err := finisher.FinishRequest(ctx, func() (runtime.Object, error) { //创建新对象 liveObj, err := scope.Creater.New(scope.Kind) if err != nil { return nil, fmt.Errorf("failed to create new object (Create for %v): %v", scope.Kind, err) } //更新字段管理器 obj = scope.FieldManager.UpdateNoErrors(liveObj, obj, managerOrUserAgent(options.FieldManager, req.UserAgent())) admit = fieldmanager.NewManagedFieldsValidatingAdmissionController(admit) //执行 Validating Admission (验证型准入) if mutatingAdmission, ok := admit.(admission.MutationInterface); ok && mutatingAdmission.Handles(admission.Create) { if err := mutatingAdmission.Admit(ctx, admissionAttributes, scope); err != nil { return nil, err } } // Dedup owner references again after mutating admission happens dedupOwnerReferencesAndAddWarning(obj, req.Context(), true) //执行之际创建 result, err := requestFunc() // If the object wasnt committed to storage because its serialized size was too large, // it is safe to remove managedFields (which can be large) and try again. if isTooLargeError(err) { if accessor, accessorErr := meta.Accessor(obj); accessorErr == nil { accessor.SetManagedFields(nil) result, err = requestFunc() } } return result, err }) if err != nil { span.AddEvent("Write to database call failed", attribute.Int("len", len(body)), attribute.String("err", err.Error())) scope.err(err, w, req) return } span.AddEvent("Write to database call succeeded", attribute.Int("len", len(body))) // 设置状态码 code := http.StatusCreated status, ok := result.(*metav1.Status) if ok && status.Code == 0 { status.Code = int32(code) } span.AddEvent("About to write a response") defer span.AddEvent("Writing http response done") // 转换响应对象并返回 transformResponseObject(ctx, scope, req, w, code, outputMediaType, result) } }requestFunc 调用Store.create() 准备deployment对象,再调用e.Storage.Create() 往etcd中写入deployment序列化后的数据
代码路径:kubernetes/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go
func (e *Store) create(ctx context.Context, obj runtime.Object, createValidation rest.ValidateObjectFunc, options *metav1.CreateOptions) (runtime.Object, error) { var finishCreate FinishFunc = finishNothing // Init metadata as early as possible. if objectMeta, err := meta.Accessor(obj); err != nil { return nil, err } else { rest.FillObjectMetaSystemFields(objectMeta) if len(objectMeta.GetGenerateName()) > 0 && len(objectMeta.GetName()) == 0 { objectMeta.SetName(e.CreateStrategy.GenerateName(objectMeta.GetGenerateName())) } } if e.BeginCreate != nil { fn, err := e.BeginCreate(ctx, obj, options) if err != nil { return nil, err } finishCreate = fn defer func() { finishCreate(ctx, false) }() } if err := rest.BeforeCreate(e.CreateStrategy, ctx, obj); err != nil { return nil, err } // at this point we have a fully formed object. It is time to call the validators that the apiserver // handling chain wants to enforce. if createValidation != nil { if err := createValidation(ctx, obj.DeepCopyObject()); err != nil { return nil, err } } name, err := e.ObjectNameFunc(obj) if err != nil { return nil, err } // 生成存储 key key, err := e.KeyFunc(ctx, name) if err != nil { return nil, err } qualifiedResource := e.qualifiedResourceFromContext(ctx) // 准备存储对象 , 计算ttl ttl, err := e.calculateTTL(obj, 0, false) if err != nil { return nil, err } // 关键:调用底层存储接口写入 etcd out := e.NewFunc() if err := e.Storage.Create(ctx, key, obj, out, ttl, dryrun.IsDryRun(options.DryRun)); err != nil { err = storeerr.InterpretCreateError(err, qualifiedResource, name) err = rest.CheckGeneratedNameError(ctx, e.CreateStrategy, err, obj) if !apierrors.IsAlreadyExists(err) { return nil, err } if errGet := e.Storage.Get(ctx, key, storage.GetOptions{}, out); errGet != nil { return nil, err } accessor, errGetAcc := meta.Accessor(out) if errGetAcc != nil { return nil, err } if accessor.GetDeletionTimestamp() != nil { msg := &err.(*apierrors.StatusError).ErrStatus.Message *msg = fmt.Sprintf("object is being deleted: %s", *msg) } return nil, err } // The operation has succeeded. Call the finish function if there is one, // and then make sure the defer doesnt call it again. fn := finishCreate finishCreate = finishNothing fn(ctx, true) if e.AfterCreate != nil { e.AfterCreate(out, options) } if e.Decorator != nil { e.Decorator(out) } return out, nil }kube-apiserver 接收http请求后,先后对请求进行认证(Authentication)、鉴权(Authorization)以及准入控制(Admission Control)后,调用CreateResource() 创建deployment资源对象,然后调用通用Store处理Create()将body反序列化成资源创建对象,最终序列化后存储到etcd,整个流程完成。