cmdb后端django权限问题

时间:June 25, 2019 分类:

目录:

准备环境

系统环境

docker run -it -d -v /root/cmdb/:/cmdb -p 9000:9000 --name why python:3.5
pip install django
pip install mysqlclient

mysql需要最好修改一下字符集

set global character_set_database=utf8;
set global character_set_server=utf8;

创建项目和认证相关的account

$ django-admin startproject cmdb
$ cd cmdb
$ python manage.py startapp account

项目配置

配置数据库和引入app相关

cmdb/setting.py

# -*- coding: utf-8 -*-

SYSTEM_URL = 'https://cmdb.whysdomain.com'
LOGIN_URL = '/account/login/'
AUTH_USER_MODEL = 'account.User'


MIDDLEWARE = [
...
#'django.middleware.csrf.CsrfViewMiddleware',
...
]

DATABASES = {
    'default': {
    'ENGINE': 'django.db.backends.mysql',
    'NAME':'dbname',
    'USER': 'root',
    'PASSWORD': '123456',
    'HOST': '172.16.32.32',
    'PORT': '3306',
    }
}

STATIC_ROOT = os.path.join(BASE_DIR, 'static')

# Email Settings
EMAIL_BACKEND = 'django.core.mail.backends.smtp.EmailBackend'  #email后端
EMAIL_USE_TLS = False    #是否使用TLS安全传输协议
EMAIL_USE_SSL = False    #是否使用SSL加密
EMAIL_HOST = 'mail.whysdomain.com'    #发送邮件的邮箱的SMTP服务器
EMAIL_PORT = 12345    #发件箱的SMTP服务器端口
EMAIL_HOST_USER = 'notice@whysdomain.com'    #发送邮件的邮箱地址
EMAIL_HOST_PASSWORD = '123456'     #发送邮件的邮箱密码

相关代码

路由规则url

cmdb/urls.py

from django.urls import path, include
from django.contrib import admin

urlpatterns = [
    path('account/', include('account.urls')),
    path('admin/', admin.site.urls),
]

认证相关model

account/models.py

# -*- coding: utf-8 -*-
from django.db import models
from django.contrib.auth.models import (
    BaseUserManager, AbstractBaseUser, PermissionsMixin
)
from django.utils import timezone

# Create your models here.
class UserManager(BaseUserManager):
    def create_user(self, email, password, is_superuser, **extra_fields):
        if not email:
            raise ValueError('Users must have an email address')
        user = self.model(
            email=self.normalize_email(email),
            is_active=True,
            is_superuser=is_superuser,
            **extra_fields
        )

        user.set_password(password)
        user.save(using=self._db)
        return user

    def create_superuser(self, email, password, **extra_fields):
        return self.create_user(email, password, True, **extra_fields)


class User(AbstractBaseUser, PermissionsMixin):
    username = models.CharField(max_length=50, unique=True)
    fullname = models.CharField(max_length=100)
    mobile = models.CharField(max_length=30, default='', blank=True)
    email = models.EmailField(max_length=255, unique=True)
    # 是否为启用用户
    is_active = models.BooleanField(default=True)
    # 是否为管理员用户
    is_superuser = models.BooleanField(default=False)
    date_joined = models.DateTimeField(auto_now_add=True)

    class Meta:
        permissions = (
            ("view_user", ("查看用户")),
            ("add_user", ("添加用户")),
            ("change_user", ("编辑用户")),
            ("delete_user", ("删除用户")),
        )
        default_permissions = ()

    objects = UserManager()

    USERNAME_FIELD = 'username'
    REQUIRED_FIELDS = ['email']

    def __str__(self):
        return self.username

    @property
    def is_staff(self):
        return self.is_superuser

account配置admin

account/admin.py

from django.contrib import admin
from account.models import User

# Register your models here.
admin.site.register(User)

account配置url

account/urls.py

from django.urls import path
from account import views

urlpatterns = [
    path('login/', views.login),
    path('logout/', views.logout),
    path('getUsers/', views.get_users),
    path('getUserList/', views.get_user_list),
    path('addUser/', views.add_user),
    path('changeUser/', views.change_user),
    path('deleteUser/', views.delete_user),
    path('updatePassword/', views.update_password),
    path('getGroups/', views.get_groups),
    path('getUsersByGroupId/', views.get_users_by_group_id),
    path('addGroup/', views.add_group),
    path('changeGroup/', views.change_group),
    path('deleteGroup/', views.delete_group),
    path('getPermissions/', views.get_permissions),
    path('getPermissionsByGroupId/', views.get_permissions_by_group_id),
    path('addPermission/', views.add_permission),
    path('changePermission/', views.change_permission),
]

业务逻辑

对不同的操作进行认证

account/views.py

from django.contrib.auth.decorators import login_required, permission_required

from account import user, group, permission

# Create your views here.
def login(request):
    return user.login(request)


def logout(request):
    return user.logout(request)

# 获取用户信息
@login_required
@permission_required('account.view_user', raise_exception=True)
def get_users(request):
    return user.get_users(request)

# 获取用户列表
@login_required
@permission_required('account.view_user', raise_exception=True)
def get_user_list(request):
    return user.get_user_list(request)

# 添加用户
@login_required
@permission_required('account.add_user', raise_exception=True)
def add_user(request):
    return user.add_user(request)

# 修改用户
@login_required
@permission_required('account.change_user', raise_exception=True)
def change_user(request):
    return user.change_user(request)

# 删除用户
@login_required
@permission_required('account.delete_user', raise_exception=True)
def delete_user(request):
    return user.delete_user(request)

# 更改用户密码
@login_required
def update_password(request):
    return user.update_password(request)

# 获取用户组信息
@login_required
@permission_required('account.change_group', raise_exception=True)
def get_groups(request):
    return group.get_groups(request)

# 获取用户所属组
@login_required
@permission_required('account.change_group', raise_exception=True)
def get_users_by_group_id(request):
    return group.get_users_by_group_id(request)

# 添加用户组
@login_required
@permission_required('account.add_group', raise_exception=True)
def add_group(request):
    return group.add_group(request)

# 修改用户组
@login_required
@permission_required('account.change_group', raise_exception=True)
def change_group(request):
    return group.change_group(request)

# 删除用户组
@login_required
@permission_required('account.delete_group', raise_exception=True)
def delete_group(request):
    return group.delete_group(request)

# 获取用户权限
@login_required
@permission_required('account.change_permission', raise_exception=True)
def get_permissions(request):
    return permission.get_permissions(request)

# 获取用户组权限
@login_required
@permission_required('account.change_permission', raise_exception=True)
def get_permissions_by_group_id(request):
    return permission.get_permissions_by_group_id(request)

# 添加权限
@login_required
@permission_required('account.add_permission', raise_exception=True)
def add_permission(request):
    return permission.add_permission(request)

# 修改权限
@login_required
@permission_required('account.change_permission', raise_exception=True)
def change_permission(request):
    return permission.change_permission(request)

# 删除权限
@login_required
@permission_required('account.delete_permission', raise_exception=True)
def delete_permission(request):
    return permission.delete_permission(request)

用户相关逻辑

account/user.py

from django.http import HttpResponseRedirect, HttpResponse, HttpResponseBadRequest
from django.shortcuts import get_object_or_404
from django.contrib import auth
from django.contrib.auth.models import Group
from django.contrib.auth.decorators import login_required, permission_required
from django.core.mail import send_mail
from django.db import transaction
from django.db.models import F
from django.db.models import Q
from account.models import User
from cmdb.settings import SYSTEM_URL
from cmdb.settings import EMAIL_HOST_USER

import json
import random
import string


# Create your views here.
def login(request):
    # GET方法用于获取用户权限
    if request.method == 'GET':
        if request.user.username:
            user_info = {
                'id': request.user.id, 
                'username': request.user.username,
                'fullname': request.user.fullname, 
                'isSuperuser': request.user.is_superuser
            }
            permissions = list(User.objects.get(username=request.user.username).get_all_permissions())
            return HttpResponse(json.dumps({'loggedIn': True, 'user': user_info,
                'permissions': permissions}))
        else:
            return HttpResponse(json.dumps({'loggedIn': False}), status=401)
    # POST方法用于认证用户名和密码返回用户名权限
    if request.method == "POST":
        user_data = json.loads(request.body)
        username = user_data.get('username')
        password = user_data.get('password')
        # Django提供的authenticate函数,验证用户名和密码是否在数据库中匹配
        user = auth.authenticate(request, username=username, password=password)
        if user is not None and user.is_active:
            # Django提供的login函数,将当前登录用户信息保存到会话key中
            auth.login(request, user)
            user_info = {'id': request.user.id, 
                         'username': request.user.username,
                         'fullname': request.user.fullname, 
                         'isSuperuser': request.user.is_superuser
            }
            permissions = list(User.objects.get(username=request.user.username).get_all_permissions())
            return HttpResponse(json.dumps({'loggedIn': True, 'user': user_info,
                'permissions': permissions}))
        else:
            # 返回用户名和密码错误信息
            return HttpResponse(json.dumps({'loggedIn': False}), status=401)


def logout(request):
    # logout函数会清除当前用户保存在会话中的信息
    auth.logout(request)
    return HttpResponse(json.dumps({'loggedIn': False}))

def get_users(request):
    # 搜索关键词
    search = request.GET.get('search')
    # 排序依据
    order_by = request.GET.get('orderBy')
    # 排序方式
    order = request.GET.get('order')
    size = int(request.GET.get('size'))
    page = int(request.GET.get('page'))
    # 超级用户可以查询所用用户,非超级用户只能查询自己
    if request.user.is_superuser:
        users = User.objects
    else:
        users = User.objects.filter(id=request.user.id)
        #.annotate(firstName=F('first_name'), lastName=F('last_name'), mobile=F('userprofile__mobile'), isActive=F('is_active'), isSuperuser=F('is_superuser'), dateJoined=F('date_joined'), lastLogin=F('last_login'), groupId=F('groups__id'), groupName=F('groups__name'))
    if search:
        users = users.filter(Q(username__contains=search) | Q(fullname__contains=search) | Q(email__contains=search) | Q(mobile__contains=search) | Q(groups__name__contains=search)).distinct()
        #users = users.extra(select={'mobile': 'userprofile.mobile'})
    # 排序方式
    if order_by and order:
        if order_by == 'mobile':
            order_by = 'userprofile__mobile'
        elif order_by == 'isActive':
            order_by = 'is_active'
        elif order_by == 'isSuperuser':
            order_by = 'is_superuser'
        elif order_by == 'userGroups':
            order_by = 'groups'
        elif order_by == 'dateJoined':
            order_by = 'date_joined'
        elif order_by == 'lastLogin':
            order_by = 'last_login'
        else:
            pass
        if order == 'ascending':
            users = users.order_by(order_by)
        if order == 'descending':
            users = users.order_by('-' + order_by)
    else:
        users = users.order_by('id')
    # 获取分页数据
    start = (page - 1) * size
    end = size * page
    total = len(users)
    users = users[start : end]
    table_data = []
    for user in users:
        id = user.id
        username = user.username
        fullname = user.fullname
        mobile = user.mobile
        email = user.email
        is_active = user.is_active
        is_superuser = user.is_superuser
        user_groups = []
        for group in user.groups.all():
            user_groups.append({'groupId': group.id, 'groupName': group.name})
        date_joined = user.date_joined.strftime("%Y-%m-%d %H:%M:%S")
        if user.last_login is not None:
            last_login = user.last_login.strftime("%Y-%m-%d %H:%M:%S")
        else:
            last_login = ''
        table_data.append({'id': id, 'username': username, 'fullname': fullname, 'mobile': mobile, 'email': email, 'isActive': is_active, 'isSuperuser': is_superuser, 'userGroups': user_groups, 'dateJoined': date_joined, 'lastLogin': last_login})
    group_options = list(Group.objects.all().values('id', 'name'))
    return HttpResponse(json.dumps({'groupOptions': group_options, 'tableData': table_data,
        'total': total}))

def get_user_list(request):
    user_list = list(User.objects.filter(is_active=True).values('id', 'username', 'fullname'))
    return HttpResponse(json.dumps(user_list))

def send_email(username, password, subject, email):
    message = '地址: ' + SYSTEM_URL + '\n用户名: ' + username + '\n密码:' + password
    send_mail(subject, message, EMAIL_HOST_USER, ['why@whysdomain.com', email],
        fail_silently=False)

# 类似数据库回滚操作
@transaction.atomic
def add_user(request):
    user_data = json.loads(request.body)
    username = user_data.get('username')
    fullname = user_data.get('fullname')
    mobile = user_data.get('mobile')
    email = user_data.get('email')
    is_superuser = user_data.get('isSuperuser')
    group_id_list = user_data.get('groupIdList')
    user, status = User.objects.get_or_create(username=username, defaults={
        'fullname': fullname, 'mobile': mobile, 'email': email, 'is_superuser': is_superuser})
    if status:
        for group in Group.objects.filter(id__in=group_id_list):
            user.groups.add(group)
        password = ''.join(random.sample(string.ascii_letters + string.digits, 8))
        user.set_password(password)
        user.save()
        send_email(username, password, 'CMDB初始化密码', email)
        return HttpResponse(status=201)
    else:
        return HttpResponseBadRequest()

@transaction.atomic
def change_user(request):
    user_data = json.loads(request.body)
    id = user_data.get('id')
    username = user_data.get('username')
    fullname = user_data.get('fullname')
    mobile = user_data.get('mobile')
    email = user_data.get('email')
    is_active = user_data.get('isActive')
    is_superuser = user_data.get('isSuperuser')
    group_id_list = user_data.get('groupIdList')
    count = User.objects.filter(id=id).update(username=username, fullname=fullname, mobile=mobile, email=email, is_active=is_active, is_superuser=is_superuser)
    # 重新加入组
    if count > 0:
        User.objects.get(id=id).groups.clear()
        for group in Group.objects.filter(id__in=group_id_list):
            User.objects.get(id=id).groups.add(group)
        return HttpResponse(status=202)
    else:
        return HttpResponseBadRequest()

@transaction.atomic
def delete_user(request):
    id_list = json.loads(request.body).get('idList')
    result = User.objects.filter(id__in=id_list).delete()
    if result[0] > 0:
        return HttpResponse(status=204)
    else:
        return HttpResponseBadRequest()

def update_password(request):
    password_data = json.loads(request.body)
    user_id = password_data.get('userId')
    old_password = password_data.get('oldPassword')
    new_password = password_data.get('newPassword')
    if not user_id or user_id == request.user.id:
        user = auth.authenticate(username=request.user.username, password=old_password)
        if user is not None and user.is_active:
            request.user.set_password(new_password)
            request.user.save()
            return HttpResponse(status=200)
        else:
            return HttpResponseBadRequest()
    else:
        if not request.user.is_superuser:
            return HttpResponseBadRequest()
        user = get_object_or_404(User.objects.all(), pk=user_id)
        if user is not None and user.is_active:
            user.set_password(new_password)
            user.save()
            send_email(user.username, new_password, 'CMDB重置密码', user.email)
            return HttpResponse(status=200)
        else:
            return HttpResponseBadRequest()

account/group.py

from django.http import HttpResponse, HttpResponseBadRequest
from django.contrib.auth.models import Group
from django.db import transaction
from django.db.models import F
from django.db.models import Q
from account.models import User
import json

# Create your views here.

def get_groups(request):
    search = request.GET.get('search')
    order_by = request.GET.get('orderBy')
    order = request.GET.get('order')
    size = int(request.GET.get('size'))
    page = int(request.GET.get('page'))
    groups = Group.objects.annotate(userId=F('user__id'), fullname=F('user__fullname'), username=F('user__username'))
    if search:
        groups = groups.filter(Q(name__contains=search) | Q(user__username__contains=search) | Q(user__fullname__contains=search)).distinct()
    if order_by and order:
        if order == 'ascending':
            groups = groups.order_by(order_by)
        if order == 'descending':
            groups = groups.order_by('-' + order_by)
    else:
        groups = groups.order_by('id')
    start = (page - 1) * size
    end = size * page
    total = groups.count()
    user_options = list(User.objects.all().values('id', 'username', 'fullname'))
    groups = groups[start : end].values('id', 'name', 'userId', 'username', 'fullname')
    return HttpResponse(json.dumps({'userOptions': user_options, 'tableData': list(groups),
        'total': total}))


def get_users_by_group_id(request):
    id = request.GET.get('id')
    user_id_list = list(Group.objects.filter(id=id).values_list('user__id', flat=True))
    if None in user_id_list:
        user_id_list.remove(None)
    return HttpResponse(json.dumps({'userIdList': user_id_list}))

@transaction.atomic
def add_group(request):
    group_data = json.loads(request.body)
    name = group_data.get('name')
    user_id_list = group_data.get('userIdList')
    group, status = Group.objects.get_or_create(name=name)
    if status:
        for user in User.objects.filter(id__in=user_id_list):
            group.user_set.add(user)
        return HttpResponse(status=201)
    else:
        return HttpResponseBadRequest()

@transaction.atomic
def change_group(request):
    group_data = json.loads(request.body)
    id = group_data.get('id')
    name = group_data.get('name')
    user_id_list = group_data.get('userIdList')
    count = Group.objects.filter(id=id).update(name=name)
    if count > 0:
        Group.objects.get(id=id).user_set.clear()
        for user in User.objects.filter(id__in=user_id_list):
            Group.objects.get(id=id).user_set.add(user)
        return HttpResponse(status=202)
    else:
        return HttpResponseBadRequest()

@transaction.atomic
def delete_group(request):
    id_list = json.loads(request.body).get('idList')
    result = Group.objects.filter(id__in=id_list).delete()
    if result[0] > 0:
        return HttpResponse(status=204)
    else:
        return HttpResponseBadRequest()

权限相关

account/permission.py

from django.http import HttpResponse, HttpResponseBadRequest
from django.contrib.auth.models import Group, Permission
from django.db import DatabaseError, transaction
from django.db.models import F
from django.db.models import Q

import json

# Create your views here.

def get_permissions(request):
    search = request.GET.get('search')
    order_by = request.GET.get('orderBy')
    order = request.GET.get('order')
    size = int(request.GET.get('size'))
    page = int(request.GET.get('page'))
    permissions = Group.objects.annotate(permissionId=F('permissions__id'), permissionName=F('permissions__name'))
    if search:
        permissions = permissions.filter(Q(name__contains=search) | Q(permissions__name__contains=search)).distinct()
    if order_by and order:
        if order == 'ascending':
            permissions = permissions.order_by(order_by)
        if order == 'descending':
            permissions = permissions.order_by('-' + order_by)
    else:
        permissions = permissions.order_by('id')
    start = (page - 1) * size
    end = size * page
    total = permissions.count()
    permission_options = list(Permission.objects.exclude(name__startswith='Can').values(
        'id', 'name'))
    permissions = permissions[start : end].values('id', 'name', 'permissionName')
    return HttpResponse(json.dumps({'permissionOptions': permission_options,
        'tableData': list(permissions), 'total': total}))

def get_permissions_by_group_id(request):
    id = request.GET.get('id')
    permission_list = list(Group.objects.filter(id=id).values_list('permissions__id', flat=True))
    if None in permission_list:
        permission_list.remove(None)
    return HttpResponse(json.dumps({'permissionList': permission_list}))

@transaction.atomic
def add_permission(request):
    permission_data = json.loads(request.body)
    name = permission_data.get('name')
    user_id_list = permission_data.get('permissionIdList')
    group, status = Permission.objects.get_or_create(name=name)
    if status:
        for user in User.objects.filter(id__in=user_id_list):
            group.user_set.add(user)
        return HttpResponse(status=201)
    else:
        return HttpResponseBadRequest()

@transaction.atomic
def change_permission(request):
    permission_data = json.loads(request.body)
    id = permission_data.get('id')
    try:
        permission_list = permission_data.get('permissionList')
        Group.objects.get(id=id).permissions.clear()
        Group.objects.get(id=id).permissions.set(permission_list)
    except DatabaseError:
        return HttpResponseBadRequest()
    return HttpResponse(status=202)

@transaction.atomic
def delete_permission(request):
    id_list = json.loads(request.body).get('idList')
    result = Permission.objects.filter(id__in=id_list).delete()
    if result[0] > 0:
        return HttpResponse(json.dumps({'status': True}), status=204)
    else:
        return HttpResponseBadRequest()

同步数据库创建超级用户

python manage.py makemigrations
python manage.py migrate
python manage.py createsuperuser

django_content_type存储权限类型

select * from django_content_type;
+----+--------------+-------------+
| id | app_label    | model       |
+----+--------------+-------------+
|  6 | account      | user        |
|  1 | admin        | logentry    |
|  3 | auth         | group       |
|  2 | auth         | permission  |
|  4 | contenttypes | contenttype |
|  5 | sessions     | session     |
+----+--------------+-------------+

auth_permission 存储详细的权限

select * from auth_permission;
+----+-------------------------+-----------------+--------------------+
| id | name                    | content_type_id | codename           |
+----+-------------------------+-----------------+--------------------+
|  1 | Can add log entry       |               1 | add_logentry       |
|  2 | Can change log entry    |               1 | change_logentry    |
|  3 | Can delete log entry    |               1 | delete_logentry    |
|  4 | Can add permission      |               2 | add_permission     |
|  5 | Can change permission   |               2 | change_permission  |
|  6 | Can delete permission   |               2 | delete_permission  |
|  7 | Can add group           |               3 | add_group          |
|  8 | Can change group        |               3 | change_group       |
|  9 | Can delete group        |               3 | delete_group       |
| 10 | Can add content type    |               4 | add_contenttype    |
| 11 | Can change content type |               4 | change_contenttype |
| 12 | Can delete content type |               4 | delete_contenttype |
| 13 | Can add session         |               5 | add_session        |
| 14 | Can change session      |               5 | change_session     |
| 15 | Can delete session      |               5 | delete_session     |
| 16 | 查看用户                |               6 | view_user          |
| 17 | 添加用户                |               6 | add_user           |
| 18 | 编辑用户                |               6 | change_user        |
| 19 | 删除用户                |               6 | delete_user        |
+----+-------------------------+-----------------+--------------------+

auth_group_permissions存储group_idpermission_id的对应关系

select * from auth_group_permissions;
+----+----------+---------------+
| id | group_id | permission_id |
+----+----------+---------------+
| 17 |        1 |            17 |
+----+----------+---------------+
1 row in set (0.01 sec)

测试权限系统

创建测试用户

创建op用户组,组内有account.add_user权限

创建test用户,隶属于op用户组

创建测试页面用于验证权限

urls部分添加路由

    path('testAddPermissions/', views.test_add_permission),
    path('testDelPermissions/', views.test_del_permission),

views部分添加代码

from django.shortcuts import HttpResponse

@login_required
@permission_required('account.add_user', raise_exception=True)
def test_add_permission(request):
    return HttpResponse('ok')

@login_required
@permission_required('account.delete_user', raise_exception=True)
def test_del_permission(request):
    return HttpResponse('ok')

测试

这边superuser和test用户分别采用不同浏览器登录

  1. 使用superuser将test用户设置superuser(目的是使用admin的登录获取session)
  2. 使用/admin/的url登录test用户
  3. 测试test用户访问/account/testAddPermissions/返回是add ok
  4. 测试test用户访问/account/testDelPermissions/返回是del ok
  5. 使用superuser将test用户设置的superuser权限去掉(目的是superuser默认就有所有的权限)
  6. 测试test用户访问/account/testAddPermissions/返回是add ok
  7. 测试test用户访问/account/testDelPermissions/返回是403 Forbidden
  8. 使用superuser将test用户的op组去掉(目的是superuser默认就有所有的权限)
  9. 测试test用户访问/account/testAddPermissions/返回是403 Forbidden
  10. 测试test用户访问/account/testDelPermissions/返回是403 Forbidden