password
的实现如下:从user
资源中获取信息。
func (im *passwordAuthenticator) Authenticate(username, password string) (authuser.Info, error) {
//从k8s 集群中查询到user资源
user, err := im.searchUser(username)
providerOptions, ldapProvider := im.getLdapProvider()
//....
//通过ldapProvider来判断账号密码是否正确
if ldapProvider != nil && username != constants.AdminUserName {
//
authenticated, err := ldapProvider.Authenticate(username, password)
//....
if authenticated != nil {
return &authuser.DefaultInfo{
Name: authenticated.Name,
UID: string(authenticated.UID),
}, nil
}
}
//也可以通过use资源的EncryptedPassword,验证密码
if checkPasswordHash(password, user.Spec.EncryptedPassword) {
return &authuser.DefaultInfo{
Name: user.Name,
UID: string(user.UID),
}, nil
}
return nil, AuthFailedIncorrectPassword
}
token
:使用jwt验证
func (t tokenOperator) Verify(tokenStr string) (user.Info, error) {
//调用jwt包验证
authenticated, tokenType, err := t.issuer.Verify(tokenStr)
//这里应该是过期时间,0为永不过期。这个为配置设置
if t.options.OAuthOptions.AccessTokenMaxAge == 0 ||
tokenType == token.StaticToken {
return authenticated, nil
}
//redis验证
if err := t.tokenCacheValidate(authenticated.GetName(), tokenStr); err != nil {
klog.Error(err)
return nil, err
}
return authenticated, nil
}
多集群路由转发和协议升级
只在多集群下运行此函数
func (c *clusterDispatch) Dispatch(w http.ResponseWriter, req *http.Request, handler http.Handler) {
info, _ := request.RequestInfoFrom(req.Context())
//cluster 是crd 资源。获取集群所有cluster信息
cluster, err := c.clusterLister.Get(info.Cluster)
//请求集群是主机集群,不需要通过代理
if isClusterHostCluster(cluster) {
req.URL.Path = strings.Replace(req.URL.Path, fmt.Sprintf("/clusters/%s", info.Cluster), "", 1)
handler.ServeHTTP(w, req)
return
}
//查询集群
innCluster := c.getInnerCluster(cluster.Name)
transport := http.DefaultTransport
//替换url
u := *req.URL
u.Path = strings.Replace(u.Path, fmt.Sprintf("/clusters/%s", info.Cluster), "", 1)
if cluster.Spec.Connection.Type == clusterv1alpha1.ConnectionTypeDirect &&
len(cluster.Spec.Connection.KubeSphereAPIEndpoint) == 0 {
u.Scheme = innCluster.kubernetesURL.Scheme
u.Host = innCluster.kubernetesURL.Host
u.Path = fmt.Sprintf(proxyURLFormat, u.Path)
transport = innCluster.transport
//...
} else {
u.Host = innCluster.kubesphereURL.Host
u.Scheme = innCluster.kubesphereURL.Scheme
}
httpProxy := proxy.NewUpgradeAwareHandler(&u, transport, false, false, c)
httpProxy.ServeHTTP(w, req)
}
权限获取
权限获取是通过RBAC方式。先调用Authorize
func (r *RBACAuthorizer) Authorize(requestAttributes authorizer.Attributes) (authorizer.Decision, string, error) {
//调用visitRulesFor检查权限
r.visitRulesFor(requestAttributes, ruleCheckingVisitor.visit)
if ruleCheckingVisitor.allowed {
return authorizer.DecisionAllow, ruleCheckingVisitor.reason, nil
}
//...
//记录日志
klog.Infof("...")
//...
return authorizer.DecisionNoOpinion, reason, nil
}
在调用visitRulesFor
,
func (r *RBACAuthorizer) visitRulesFor(requestAttributes authorizer.Attributes, visitor func(source fmt.Stringer, regoPolicy string, rule *rbacv1.PolicyRule, err error) bool) {
//查看globalRole是否有对此资源的获取权限
if globalRoleBindings, err := r.am.ListGlobalRoleBindings(""); err != nil {
//...
} else {
sourceDescriber := &globalRoleBindingDescriber{}
//循环globalRoleBindings。找到该用户的globalRoleBindings。
for _, globalRoleBinding := range globalRoleBindings {
subjectIndex, applies := appliesTo(requestAttributes.GetUser(), globalRoleBinding.Subjects, "")
if !applies {
//如果不是这个找到该用户的globalRoleBindings,则continue
continue
}
//根据globalRoleBinding上的roleref,获取到role。进一步获取到regoPolicy和rules。这是两个规则,只要有一个规则符合即可
regoPolicy, rules, err := r.am.GetRoleReferenceRules(globalRoleBinding.RoleRef, "")
//...
//根据regoPolicy验证是否符合规则
if !visitor(sourceDescriber, regoPolicy, nil, nil) {
return
}
//根据rules验证是否符合规则
for i := range rules {
if !visitor(sourceDescriber, "", &rules[i], nil) {
return
}
}
}
//...
}
if requestAttributes.GetResourceScope() == request.WorkspaceScope ||
requestAttributes.GetResourceScope() == request.NamespaceScope ||
requestAttributes.GetResourceScope() == request.DevOpsScope {
//...
// 代码部分删除,这部分主要是获取workspace
workspace, err = r.am.GetNamespaceControlledWorkspace(requestAttributes.GetNamespace()); err != nil
// 通过workspace获取workspaceRoleBindings。
if workspaceRoleBindings, err := r.am.ListWorkspaceRoleBindings("", workspace); ...{
//...
} else {
//...轮训workspaceRoleBindings,找到workspaceRole,在进行权限检测。这部分和grobalrole一样
for _, workspaceRoleBinding := range workspaceRoleBindings {
subjectIndex, applies := appliesTo(requestAttributes.GetUser(), workspaceRoleBinding.Subjects, "")
if !applies {
continue
}
regoPolicy, rules, err := r.am.GetRoleReferenceRules(workspaceRoleBinding.RoleRef, "")
//...
if !visitor(sourceDescriber, regoPolicy, nil, nil) {
return
}
for i := range rules {
if !visitor(sourceDescriber, "", &rules[i], nil) {
return
}
}
}
}
}
if requestAttributes.GetResourceScope() == request.NamespaceScope ||
requestAttributes.GetResourceScope() == request.DevOpsScope {
namespace := requestAttributes.GetNamespace()
// 直接获取namespace,或者根据DevOps获取namespace
if requestAttributes.GetResourceScope() == request.DevOpsScope {
if relatedNamespace, err := r.am.GetDevOpsRelatedNamespace(requestAttributes.GetDevOps()); err != nil {
if !visitor(nil, "", nil, err) {
return
}
} else {
namespace = relatedNamespace
}
}
//根据namespace获取rolebinding
if roleBindings, err := r.am.ListRoleBindings("", namespace); err != nil {
if !visitor(nil, "", nil, err) {
return
}
} else {
sourceDescriber := &roleBindingDescriber{}
//轮训roleBindings,找到role,检查role
for _, roleBinding := range roleBindings {
subjectIndex, applies := appliesTo(requestAttributes.GetUser(), roleBinding.Subjects, namespace)
if !applies {
continue
}
regoPolicy, rules, err := r.am.GetRoleReferenceRules(roleBinding.RoleRef, namespace)
if err != nil {
visitor(nil, "", nil, err)
continue
}
sourceDescriber.binding = roleBinding
sourceDescriber.subject = &roleBinding.Subjects[subjectIndex]
if !visitor(sourceDescriber, regoPolicy, nil, nil) {
return
}
for i := range rules {
if !visitor(sourceDescriber, "", &rules[i], nil) {
return
}
}
}
}
}
//获取所有clusterRoleBindings,轮训查找到clusterRole。检查规则
if clusterRoleBindings, err := r.am.ListClusterRoleBindings(""); err != nil {
if !visitor(nil, "", nil, err) {
return
}
} else {
sourceDescriber := &clusterRoleBindingDescriber{}
for _, clusterRoleBinding := range clusterRoleBindings {
subjectIndex, applies := appliesTo(requestAttributes.GetUser(), clusterRoleBinding.Subjects, "")
if !applies {
continue
}
regoPolicy, rules, err := r.am.GetRoleReferenceRules(clusterRoleBinding.RoleRef, "")
if err != nil {
visitor(nil, "", nil, err)
continue
}
sourceDescriber.binding = clusterRoleBinding
sourceDescriber.subject = &clusterRoleBinding.Subjects[subjectIndex]
if !visitor(sourceDescriber, regoPolicy, nil, nil) {
return
}
for i := range rules {
if !visitor(sourceDescriber, "", &rules[i], nil) {
return
}
}
}
}
}
检查规则的函数是visitor
,如果regoPolicy符合就不检查rule
func (v *authorizingVisitor) visit(source fmt.Stringer, regoPolicy string, rule *rbacv1.PolicyRule, err error) bool {
//调用open-policy-agent库检查权限
if regoPolicy != "" && regoPolicyAllows(v.requestAttributes, regoPolicy) {
v.allowed = true
v.reason = fmt.Sprintf("RBAC: allowed by %s", source.String())
return false
}
//调用k8s 接口实现rbac检查权限
if rule != nil && ruleAllows(v.requestAttributes, rule) {
v.allowed = true
v.reason = fmt.Sprintf("RBAC: allowed by %s", source.String())
return false
}
if err != nil {
v.errors = append(v.errors, err)
}
return true
}
审计
主要记录请求日志, 这部分主要使用k8s的Auditing接口。
func WithAuditing(handler http.Handler, a auditing.Auditing) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
//...
//构建Event结构体
e := a.LogRequestObject(req, info)
if e != nil {
resp := auditing.NewResponseCapture(w)
handler.ServeHTTP(resp, req)
go a.LogResponseObject(e, resp)
} else {
handler.ServeHTTP(w, req)
}
})
}
k8s集群资源转发
如果这个资源是向k8s请求的,则直接向k8s请求
func WithKubeAPIServer(handler http.Handler, config *rest.Config, failed proxy.ErrorResponder) http.Handler {
//...
//这部分为初始化内容,在请求时不执行。
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
info, ok := request.RequestInfoFrom(req.Context())
if !ok {
err := errors.New("Unable to retrieve request info from request")
klog.Error(err)
responsewriters.InternalError(w, req, err)
}
// 确认是否为k8s请求
if info.IsKubernetesRequest {
s := *req.URL
s.Host = kubernetes.Host
s.Scheme = kubernetes.Scheme
// make sure we don't override kubernetes's authorization
req.Header.Del("Authorization")
//转发
httpProxy := proxy.NewUpgradeAwareHandler(&s, defaultTransport, true, false, failed)
httpProxy.UpgradeTransport = proxy.NewUpgradeRequestRoundTripper(defaultTransport, defaultTransport)
httpProxy.ServeHTTP(w, req)
return
}
handler.ServeHTTP(w, req)
})
}