cmdb后端django权限问题
目录:
准备环境
系统环境
docker run -it -d -v /root/cmdb/:/cmdb -p 9000:9000 --name why python:3.5
pip install django
pip install mysqlclient
mysql需要最好修改一下字符集
set global character_set_database=utf8;
set global character_set_server=utf8;
创建项目和认证相关的account
$ django-admin startproject cmdb
$ cd cmdb
$ python manage.py startapp account
项目配置
配置数据库和引入app相关
cmdb/setting.py
# -*- coding: utf-8 -*-
SYSTEM_URL = 'https://cmdb.whysdomain.com'
LOGIN_URL = '/account/login/'
AUTH_USER_MODEL = 'account.User'
MIDDLEWARE = [
...
#'django.middleware.csrf.CsrfViewMiddleware',
...
]
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.mysql',
'NAME':'dbname',
'USER': 'root',
'PASSWORD': '123456',
'HOST': '172.16.32.32',
'PORT': '3306',
}
}
STATIC_ROOT = os.path.join(BASE_DIR, 'static')
# Email Settings
EMAIL_BACKEND = 'django.core.mail.backends.smtp.EmailBackend' #email后端
EMAIL_USE_TLS = False #是否使用TLS安全传输协议
EMAIL_USE_SSL = False #是否使用SSL加密
EMAIL_HOST = 'mail.whysdomain.com' #发送邮件的邮箱的SMTP服务器
EMAIL_PORT = 12345 #发件箱的SMTP服务器端口
EMAIL_HOST_USER = 'notice@whysdomain.com' #发送邮件的邮箱地址
EMAIL_HOST_PASSWORD = '123456' #发送邮件的邮箱密码
相关代码
路由规则url
cmdb/urls.py
from django.urls import path, include
from django.contrib import admin
urlpatterns = [
path('account/', include('account.urls')),
path('admin/', admin.site.urls),
]
认证相关model
account/models.py
# -*- coding: utf-8 -*-
from django.db import models
from django.contrib.auth.models import (
BaseUserManager, AbstractBaseUser, PermissionsMixin
)
from django.utils import timezone
# Create your models here.
class UserManager(BaseUserManager):
def create_user(self, email, password, is_superuser, **extra_fields):
if not email:
raise ValueError('Users must have an email address')
user = self.model(
email=self.normalize_email(email),
is_active=True,
is_superuser=is_superuser,
**extra_fields
)
user.set_password(password)
user.save(using=self._db)
return user
def create_superuser(self, email, password, **extra_fields):
return self.create_user(email, password, True, **extra_fields)
class User(AbstractBaseUser, PermissionsMixin):
username = models.CharField(max_length=50, unique=True)
fullname = models.CharField(max_length=100)
mobile = models.CharField(max_length=30, default='', blank=True)
email = models.EmailField(max_length=255, unique=True)
# 是否为启用用户
is_active = models.BooleanField(default=True)
# 是否为管理员用户
is_superuser = models.BooleanField(default=False)
date_joined = models.DateTimeField(auto_now_add=True)
class Meta:
permissions = (
("view_user", ("查看用户")),
("add_user", ("添加用户")),
("change_user", ("编辑用户")),
("delete_user", ("删除用户")),
)
default_permissions = ()
objects = UserManager()
USERNAME_FIELD = 'username'
REQUIRED_FIELDS = ['email']
def __str__(self):
return self.username
@property
def is_staff(self):
return self.is_superuser
account配置admin
account/admin.py
from django.contrib import admin
from account.models import User
# Register your models here.
admin.site.register(User)
account配置url
account/urls.py
from django.urls import path
from account import views
urlpatterns = [
path('login/', views.login),
path('logout/', views.logout),
path('getUsers/', views.get_users),
path('getUserList/', views.get_user_list),
path('addUser/', views.add_user),
path('changeUser/', views.change_user),
path('deleteUser/', views.delete_user),
path('updatePassword/', views.update_password),
path('getGroups/', views.get_groups),
path('getUsersByGroupId/', views.get_users_by_group_id),
path('addGroup/', views.add_group),
path('changeGroup/', views.change_group),
path('deleteGroup/', views.delete_group),
path('getPermissions/', views.get_permissions),
path('getPermissionsByGroupId/', views.get_permissions_by_group_id),
path('addPermission/', views.add_permission),
path('changePermission/', views.change_permission),
]
业务逻辑
对不同的操作进行认证
account/views.py
from django.contrib.auth.decorators import login_required, permission_required
from account import user, group, permission
# Create your views here.
def login(request):
return user.login(request)
def logout(request):
return user.logout(request)
# 获取用户信息
@login_required
@permission_required('account.view_user', raise_exception=True)
def get_users(request):
return user.get_users(request)
# 获取用户列表
@login_required
@permission_required('account.view_user', raise_exception=True)
def get_user_list(request):
return user.get_user_list(request)
# 添加用户
@login_required
@permission_required('account.add_user', raise_exception=True)
def add_user(request):
return user.add_user(request)
# 修改用户
@login_required
@permission_required('account.change_user', raise_exception=True)
def change_user(request):
return user.change_user(request)
# 删除用户
@login_required
@permission_required('account.delete_user', raise_exception=True)
def delete_user(request):
return user.delete_user(request)
# 更改用户密码
@login_required
def update_password(request):
return user.update_password(request)
# 获取用户组信息
@login_required
@permission_required('account.change_group', raise_exception=True)
def get_groups(request):
return group.get_groups(request)
# 获取用户所属组
@login_required
@permission_required('account.change_group', raise_exception=True)
def get_users_by_group_id(request):
return group.get_users_by_group_id(request)
# 添加用户组
@login_required
@permission_required('account.add_group', raise_exception=True)
def add_group(request):
return group.add_group(request)
# 修改用户组
@login_required
@permission_required('account.change_group', raise_exception=True)
def change_group(request):
return group.change_group(request)
# 删除用户组
@login_required
@permission_required('account.delete_group', raise_exception=True)
def delete_group(request):
return group.delete_group(request)
# 获取用户权限
@login_required
@permission_required('account.change_permission', raise_exception=True)
def get_permissions(request):
return permission.get_permissions(request)
# 获取用户组权限
@login_required
@permission_required('account.change_permission', raise_exception=True)
def get_permissions_by_group_id(request):
return permission.get_permissions_by_group_id(request)
# 添加权限
@login_required
@permission_required('account.add_permission', raise_exception=True)
def add_permission(request):
return permission.add_permission(request)
# 修改权限
@login_required
@permission_required('account.change_permission', raise_exception=True)
def change_permission(request):
return permission.change_permission(request)
# 删除权限
@login_required
@permission_required('account.delete_permission', raise_exception=True)
def delete_permission(request):
return permission.delete_permission(request)
用户相关逻辑
account/user.py
from django.http import HttpResponseRedirect, HttpResponse, HttpResponseBadRequest
from django.shortcuts import get_object_or_404
from django.contrib import auth
from django.contrib.auth.models import Group
from django.contrib.auth.decorators import login_required, permission_required
from django.core.mail import send_mail
from django.db import transaction
from django.db.models import F
from django.db.models import Q
from account.models import User
from cmdb.settings import SYSTEM_URL
from cmdb.settings import EMAIL_HOST_USER
import json
import random
import string
# Create your views here.
def login(request):
# GET方法用于获取用户权限
if request.method == 'GET':
if request.user.username:
user_info = {
'id': request.user.id,
'username': request.user.username,
'fullname': request.user.fullname,
'isSuperuser': request.user.is_superuser
}
permissions = list(User.objects.get(username=request.user.username).get_all_permissions())
return HttpResponse(json.dumps({'loggedIn': True, 'user': user_info,
'permissions': permissions}))
else:
return HttpResponse(json.dumps({'loggedIn': False}), status=401)
# POST方法用于认证用户名和密码返回用户名权限
if request.method == "POST":
user_data = json.loads(request.body)
username = user_data.get('username')
password = user_data.get('password')
# Django提供的authenticate函数,验证用户名和密码是否在数据库中匹配
user = auth.authenticate(request, username=username, password=password)
if user is not None and user.is_active:
# Django提供的login函数,将当前登录用户信息保存到会话key中
auth.login(request, user)
user_info = {'id': request.user.id,
'username': request.user.username,
'fullname': request.user.fullname,
'isSuperuser': request.user.is_superuser
}
permissions = list(User.objects.get(username=request.user.username).get_all_permissions())
return HttpResponse(json.dumps({'loggedIn': True, 'user': user_info,
'permissions': permissions}))
else:
# 返回用户名和密码错误信息
return HttpResponse(json.dumps({'loggedIn': False}), status=401)
def logout(request):
# logout函数会清除当前用户保存在会话中的信息
auth.logout(request)
return HttpResponse(json.dumps({'loggedIn': False}))
def get_users(request):
# 搜索关键词
search = request.GET.get('search')
# 排序依据
order_by = request.GET.get('orderBy')
# 排序方式
order = request.GET.get('order')
size = int(request.GET.get('size'))
page = int(request.GET.get('page'))
# 超级用户可以查询所用用户,非超级用户只能查询自己
if request.user.is_superuser:
users = User.objects
else:
users = User.objects.filter(id=request.user.id)
#.annotate(firstName=F('first_name'), lastName=F('last_name'), mobile=F('userprofile__mobile'), isActive=F('is_active'), isSuperuser=F('is_superuser'), dateJoined=F('date_joined'), lastLogin=F('last_login'), groupId=F('groups__id'), groupName=F('groups__name'))
if search:
users = users.filter(Q(username__contains=search) | Q(fullname__contains=search) | Q(email__contains=search) | Q(mobile__contains=search) | Q(groups__name__contains=search)).distinct()
#users = users.extra(select={'mobile': 'userprofile.mobile'})
# 排序方式
if order_by and order:
if order_by == 'mobile':
order_by = 'userprofile__mobile'
elif order_by == 'isActive':
order_by = 'is_active'
elif order_by == 'isSuperuser':
order_by = 'is_superuser'
elif order_by == 'userGroups':
order_by = 'groups'
elif order_by == 'dateJoined':
order_by = 'date_joined'
elif order_by == 'lastLogin':
order_by = 'last_login'
else:
pass
if order == 'ascending':
users = users.order_by(order_by)
if order == 'descending':
users = users.order_by('-' + order_by)
else:
users = users.order_by('id')
# 获取分页数据
start = (page - 1) * size
end = size * page
total = len(users)
users = users[start : end]
table_data = []
for user in users:
id = user.id
username = user.username
fullname = user.fullname
mobile = user.mobile
email = user.email
is_active = user.is_active
is_superuser = user.is_superuser
user_groups = []
for group in user.groups.all():
user_groups.append({'groupId': group.id, 'groupName': group.name})
date_joined = user.date_joined.strftime("%Y-%m-%d %H:%M:%S")
if user.last_login is not None:
last_login = user.last_login.strftime("%Y-%m-%d %H:%M:%S")
else:
last_login = ''
table_data.append({'id': id, 'username': username, 'fullname': fullname, 'mobile': mobile, 'email': email, 'isActive': is_active, 'isSuperuser': is_superuser, 'userGroups': user_groups, 'dateJoined': date_joined, 'lastLogin': last_login})
group_options = list(Group.objects.all().values('id', 'name'))
return HttpResponse(json.dumps({'groupOptions': group_options, 'tableData': table_data,
'total': total}))
def get_user_list(request):
user_list = list(User.objects.filter(is_active=True).values('id', 'username', 'fullname'))
return HttpResponse(json.dumps(user_list))
def send_email(username, password, subject, email):
message = '地址: ' + SYSTEM_URL + '\n用户名: ' + username + '\n密码:' + password
send_mail(subject, message, EMAIL_HOST_USER, ['why@whysdomain.com', email],
fail_silently=False)
# 类似数据库回滚操作
@transaction.atomic
def add_user(request):
user_data = json.loads(request.body)
username = user_data.get('username')
fullname = user_data.get('fullname')
mobile = user_data.get('mobile')
email = user_data.get('email')
is_superuser = user_data.get('isSuperuser')
group_id_list = user_data.get('groupIdList')
user, status = User.objects.get_or_create(username=username, defaults={
'fullname': fullname, 'mobile': mobile, 'email': email, 'is_superuser': is_superuser})
if status:
for group in Group.objects.filter(id__in=group_id_list):
user.groups.add(group)
password = ''.join(random.sample(string.ascii_letters + string.digits, 8))
user.set_password(password)
user.save()
send_email(username, password, 'CMDB初始化密码', email)
return HttpResponse(status=201)
else:
return HttpResponseBadRequest()
@transaction.atomic
def change_user(request):
user_data = json.loads(request.body)
id = user_data.get('id')
username = user_data.get('username')
fullname = user_data.get('fullname')
mobile = user_data.get('mobile')
email = user_data.get('email')
is_active = user_data.get('isActive')
is_superuser = user_data.get('isSuperuser')
group_id_list = user_data.get('groupIdList')
count = User.objects.filter(id=id).update(username=username, fullname=fullname, mobile=mobile, email=email, is_active=is_active, is_superuser=is_superuser)
# 重新加入组
if count > 0:
User.objects.get(id=id).groups.clear()
for group in Group.objects.filter(id__in=group_id_list):
User.objects.get(id=id).groups.add(group)
return HttpResponse(status=202)
else:
return HttpResponseBadRequest()
@transaction.atomic
def delete_user(request):
id_list = json.loads(request.body).get('idList')
result = User.objects.filter(id__in=id_list).delete()
if result[0] > 0:
return HttpResponse(status=204)
else:
return HttpResponseBadRequest()
def update_password(request):
password_data = json.loads(request.body)
user_id = password_data.get('userId')
old_password = password_data.get('oldPassword')
new_password = password_data.get('newPassword')
if not user_id or user_id == request.user.id:
user = auth.authenticate(username=request.user.username, password=old_password)
if user is not None and user.is_active:
request.user.set_password(new_password)
request.user.save()
return HttpResponse(status=200)
else:
return HttpResponseBadRequest()
else:
if not request.user.is_superuser:
return HttpResponseBadRequest()
user = get_object_or_404(User.objects.all(), pk=user_id)
if user is not None and user.is_active:
user.set_password(new_password)
user.save()
send_email(user.username, new_password, 'CMDB重置密码', user.email)
return HttpResponse(status=200)
else:
return HttpResponseBadRequest()
account/group.py
from django.http import HttpResponse, HttpResponseBadRequest
from django.contrib.auth.models import Group
from django.db import transaction
from django.db.models import F
from django.db.models import Q
from account.models import User
import json
# Create your views here.
def get_groups(request):
search = request.GET.get('search')
order_by = request.GET.get('orderBy')
order = request.GET.get('order')
size = int(request.GET.get('size'))
page = int(request.GET.get('page'))
groups = Group.objects.annotate(userId=F('user__id'), fullname=F('user__fullname'), username=F('user__username'))
if search:
groups = groups.filter(Q(name__contains=search) | Q(user__username__contains=search) | Q(user__fullname__contains=search)).distinct()
if order_by and order:
if order == 'ascending':
groups = groups.order_by(order_by)
if order == 'descending':
groups = groups.order_by('-' + order_by)
else:
groups = groups.order_by('id')
start = (page - 1) * size
end = size * page
total = groups.count()
user_options = list(User.objects.all().values('id', 'username', 'fullname'))
groups = groups[start : end].values('id', 'name', 'userId', 'username', 'fullname')
return HttpResponse(json.dumps({'userOptions': user_options, 'tableData': list(groups),
'total': total}))
def get_users_by_group_id(request):
id = request.GET.get('id')
user_id_list = list(Group.objects.filter(id=id).values_list('user__id', flat=True))
if None in user_id_list:
user_id_list.remove(None)
return HttpResponse(json.dumps({'userIdList': user_id_list}))
@transaction.atomic
def add_group(request):
group_data = json.loads(request.body)
name = group_data.get('name')
user_id_list = group_data.get('userIdList')
group, status = Group.objects.get_or_create(name=name)
if status:
for user in User.objects.filter(id__in=user_id_list):
group.user_set.add(user)
return HttpResponse(status=201)
else:
return HttpResponseBadRequest()
@transaction.atomic
def change_group(request):
group_data = json.loads(request.body)
id = group_data.get('id')
name = group_data.get('name')
user_id_list = group_data.get('userIdList')
count = Group.objects.filter(id=id).update(name=name)
if count > 0:
Group.objects.get(id=id).user_set.clear()
for user in User.objects.filter(id__in=user_id_list):
Group.objects.get(id=id).user_set.add(user)
return HttpResponse(status=202)
else:
return HttpResponseBadRequest()
@transaction.atomic
def delete_group(request):
id_list = json.loads(request.body).get('idList')
result = Group.objects.filter(id__in=id_list).delete()
if result[0] > 0:
return HttpResponse(status=204)
else:
return HttpResponseBadRequest()
权限相关
account/permission.py
from django.http import HttpResponse, HttpResponseBadRequest
from django.contrib.auth.models import Group, Permission
from django.db import DatabaseError, transaction
from django.db.models import F
from django.db.models import Q
import json
# Create your views here.
def get_permissions(request):
search = request.GET.get('search')
order_by = request.GET.get('orderBy')
order = request.GET.get('order')
size = int(request.GET.get('size'))
page = int(request.GET.get('page'))
permissions = Group.objects.annotate(permissionId=F('permissions__id'), permissionName=F('permissions__name'))
if search:
permissions = permissions.filter(Q(name__contains=search) | Q(permissions__name__contains=search)).distinct()
if order_by and order:
if order == 'ascending':
permissions = permissions.order_by(order_by)
if order == 'descending':
permissions = permissions.order_by('-' + order_by)
else:
permissions = permissions.order_by('id')
start = (page - 1) * size
end = size * page
total = permissions.count()
permission_options = list(Permission.objects.exclude(name__startswith='Can').values(
'id', 'name'))
permissions = permissions[start : end].values('id', 'name', 'permissionName')
return HttpResponse(json.dumps({'permissionOptions': permission_options,
'tableData': list(permissions), 'total': total}))
def get_permissions_by_group_id(request):
id = request.GET.get('id')
permission_list = list(Group.objects.filter(id=id).values_list('permissions__id', flat=True))
if None in permission_list:
permission_list.remove(None)
return HttpResponse(json.dumps({'permissionList': permission_list}))
@transaction.atomic
def add_permission(request):
permission_data = json.loads(request.body)
name = permission_data.get('name')
user_id_list = permission_data.get('permissionIdList')
group, status = Permission.objects.get_or_create(name=name)
if status:
for user in User.objects.filter(id__in=user_id_list):
group.user_set.add(user)
return HttpResponse(status=201)
else:
return HttpResponseBadRequest()
@transaction.atomic
def change_permission(request):
permission_data = json.loads(request.body)
id = permission_data.get('id')
try:
permission_list = permission_data.get('permissionList')
Group.objects.get(id=id).permissions.clear()
Group.objects.get(id=id).permissions.set(permission_list)
except DatabaseError:
return HttpResponseBadRequest()
return HttpResponse(status=202)
@transaction.atomic
def delete_permission(request):
id_list = json.loads(request.body).get('idList')
result = Permission.objects.filter(id__in=id_list).delete()
if result[0] > 0:
return HttpResponse(json.dumps({'status': True}), status=204)
else:
return HttpResponseBadRequest()
同步数据库创建超级用户
python manage.py makemigrations
python manage.py migrate
python manage.py createsuperuser
django_content_type
存储权限类型
select * from django_content_type;
+----+--------------+-------------+
| id | app_label | model |
+----+--------------+-------------+
| 6 | account | user |
| 1 | admin | logentry |
| 3 | auth | group |
| 2 | auth | permission |
| 4 | contenttypes | contenttype |
| 5 | sessions | session |
+----+--------------+-------------+
auth_permission 存储详细的权限
select * from auth_permission;
+----+-------------------------+-----------------+--------------------+
| id | name | content_type_id | codename |
+----+-------------------------+-----------------+--------------------+
| 1 | Can add log entry | 1 | add_logentry |
| 2 | Can change log entry | 1 | change_logentry |
| 3 | Can delete log entry | 1 | delete_logentry |
| 4 | Can add permission | 2 | add_permission |
| 5 | Can change permission | 2 | change_permission |
| 6 | Can delete permission | 2 | delete_permission |
| 7 | Can add group | 3 | add_group |
| 8 | Can change group | 3 | change_group |
| 9 | Can delete group | 3 | delete_group |
| 10 | Can add content type | 4 | add_contenttype |
| 11 | Can change content type | 4 | change_contenttype |
| 12 | Can delete content type | 4 | delete_contenttype |
| 13 | Can add session | 5 | add_session |
| 14 | Can change session | 5 | change_session |
| 15 | Can delete session | 5 | delete_session |
| 16 | 查看用户 | 6 | view_user |
| 17 | 添加用户 | 6 | add_user |
| 18 | 编辑用户 | 6 | change_user |
| 19 | 删除用户 | 6 | delete_user |
+----+-------------------------+-----------------+--------------------+
auth_group_permissions
存储group_id
和permission_id
的对应关系
select * from auth_group_permissions;
+----+----------+---------------+
| id | group_id | permission_id |
+----+----------+---------------+
| 17 | 1 | 17 |
+----+----------+---------------+
1 row in set (0.01 sec)
测试权限系统
创建测试用户
创建op用户组,组内有account.add_user权限
创建test用户,隶属于op用户组
创建测试页面用于验证权限
urls部分添加路由
path('testAddPermissions/', views.test_add_permission),
path('testDelPermissions/', views.test_del_permission),
views部分添加代码
from django.shortcuts import HttpResponse
@login_required
@permission_required('account.add_user', raise_exception=True)
def test_add_permission(request):
return HttpResponse('ok')
@login_required
@permission_required('account.delete_user', raise_exception=True)
def test_del_permission(request):
return HttpResponse('ok')
测试
这边superuser和test用户分别采用不同浏览器登录
- 使用superuser将test用户设置superuser(目的是使用admin的登录获取session)
- 使用/admin/的url登录test用户
- 测试test用户访问/account/testAddPermissions/返回是
add ok
- 测试test用户访问/account/testDelPermissions/返回是
del ok
- 使用superuser将test用户设置的superuser权限去掉(目的是superuser默认就有所有的权限)
- 测试test用户访问/account/testAddPermissions/返回是
add ok
- 测试test用户访问/account/testDelPermissions/返回是
403 Forbidden
- 使用superuser将test用户的op组去掉(目的是superuser默认就有所有的权限)
- 测试test用户访问/account/testAddPermissions/返回是
403 Forbidden
- 测试test用户访问/account/testDelPermissions/返回是
403 Forbidden