Tornado框架——3(表单认证)

时间:Sept. 12, 2017 分类:

目录:

Tornado进行表单认证

对于tornado框架,并没有提供表单验证,本博客提供了简单的表单认证的实现,其他框架也是通过这个原理实现了表单的认证,其他的web框架都是这个原理,但是tornado没有

初步对form表单进行判定

对传入的host,ip,port和phone进行格式判定

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import tornado.ioloop                               #导入模块
import tornado.web
import re

class MainForm(object):
    def __init__(self):
        self.host = "(.*)"
        self.ip = "(([01]?\d?\d|2[0-4]\d|25[0-5])\.){3}([01]?\d?\d?|2[0-4]\d|25[0-5])"
        self.port = "(\d){0,5}"
        self.phone = "1[3|4|5|8]\d{9}"

    def check_valid(self, handle):
        flag = True
        value_dict = {}
        for key, regular in self.__dict__.items():
            input_value = handle.get_argument(key)
            val = re.match(regular, input_value).group()
            print (key, input_value, val, regular)
            if not val:
                flag = False
            value_dict[key] = input_value
        return flag, value_dict

class IndexHandler(tornado.web.RequestHandler):

    def get(self, *args, **kwargs):
        self.render("index.html")

    def post(self, *args, **kwargs):
        obj = MainForm()
        is_valib, value_dict = obj.check_valid(self)
        if is_valib:
            print value_dict
        else:
            print '11'

application = tornado.web.Application([             #创建tornado.web.Application对象
    (r"/", IndexHandler),
],)                                      #加载settings


if __name__ == "__main__":
    application.listen(8888)                        #创建一个socket
    tornado.ioloop.IOLoop.instance().start()        #使用epoll,IO多路复用循环

html代码

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Title</title>
</head>
<body>
<form action="/" method="post">
    <input name="host" type="text">
    <input name="ip" type="text">
    <input name="port" type="text">
    <input name="phone" type="text">
    <input value="提交" type="submit">
</form>
</body>
</html>

提交表单的执行结果

('ip', u'192.168.0.1', u'192.168.0.1', '(([01]?\\d?\\d|2[0-4]\\d|25[0-5])\\.){3}([01]?\\d?\\d?|2[0-4]\\d|25[0-5])')
('host', u'why', u'why', '(.*)')
('port', u'8888', u'8888', '(\\d){0,5}')
('phone', u'18832869630', u'18832869630', '1[3|4|5|8]\\d{9}')
{'ip': u'192.168.0.1', 'host': u'why', 'port': u'8888', 'phone': u'18832869630'}

进行封装

放入一个基类BaseForm进行封装

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import tornado.ioloop                               #导入模块
import tornado.web
import re


class BaseForm(object):
    def check_valid(self, handle):
        flag = True
        value_dict = {}
        for key, regular in self.__dict__.items():
            input_value = handle.get_argument(key)
            val = re.match(regular, input_value).group()
            print (key, input_value, val, regular)
            if not val:
                flag = False
            value_dict[key] = input_value
        return flag, value_dict


class MainForm(BaseForm):
    def __init__(self):
        self.host = "(.*)"
        self.ip = "(([01]?\d?\d|2[0-4]\d|25[0-5])\.){3}([01]?\d?\d?|2[0-4]\d|25[0-5])"
        self.port = "(\d){0,5}"
        self.phone = "1[3|4|5|8]\d{9}"


class HostForm(BaseForm):
    def __init__(self):
        self.host = "(.*)"
        self.ip = "(([01]?\d?\d|2[0-4]\d|25[0-5])\.){3}([01]?\d?\d?|2[0-4]\d|25[0-5])"
        self.port = "(\d){0,5}"


class IndexHandler(tornado.web.RequestHandler):

    def get(self, *args, **kwargs):
        self.render("index.html")

    def post(self, *args, **kwargs):
        obj = MainForm()
        is_valib, value_dict = obj.check_valid(self)
        if is_valib:
            print value_dict
        else:
            print '11'

application = tornado.web.Application([             #创建tornado.web.Application对象
    (r"/", IndexHandler),
],)                                      #加载settings

if __name__ == "__main__":
    application.listen(8888)                        #创建一个socket
    tornado.ioloop.IOLoop.instance().start()        #使用epoll,IO多路复用循环

将各项匹配封装到类

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import tornado.ioloop                               #导入模块
import tornado.web
import re


class IPFiled:

    ipregular = "(([01]?\d?\d|2[0-4]\d|25[0-5])\.){3}([01]?\d?\d?|2[0-4]\d|25[0-5])"

    def __init__(self, required=True, error_dict=None):
        self.error_dict = {}
        if error_dict:
            self.error_dict.update(error_dict)

        self.required = required
        self.error = None       # 错误信息
        self.value = None       # 匹配到的值
        self.is_valid = False   # 是否匹配

    def validate(self, name, input_value):
        """

        :param name:字段名
        :param input_value:表单提交数据
        :return:
        """
        # 判断允许为空,Flase代表允许为空
        if not self.required:
            self.is_valid = True
            self.value = input_value
        # 不允许为空
        else:
            # 输入为空
            if not input_value.strip():
                # 根据传入error进行设置
                if self.error_dict.get('required', None):
                    self.error = self.error_dict['required']
                # 默认报错
                else:
                    self.error = "%s is required" % name
            # 输入不为空
            else:
                # 输入不为空进行正则匹配
                # IPFiled.ipregular获取类中静态字段
                ret = re.match(IPFiled.ipregular, input_value)
                print ret
                # 匹配到
                if ret:
                    self.is_valid = True
                    self.value = input_value
                # 没有匹配到
                else:
                    # 根据传入error进行设置
                    if self.error_dict.get('valid', None):
                        self.error = self.error_dict['valid']
                    # 默认报错
                    else:
                        self.error = "%s is invalid" % name



class BaseForm(object):
    def check_valid(self, handle):
        flag = True
        error_message_dict = {}
        success_value_dict = {}
        for key, regular in self.__dict__.items():
            """
            key: ip
            handle: Homeindex对象,可以进行get_argument等操作
            regular: IPFil3d对象,如果是其他对象也可以,都会在下边执行regular的validdate方法
            """
            input_value = handle.get_argument(key)
            # 对象内部进行验证
            regular.validate(key, input_value)
            if regular.is_valid:
                success_value_dict[key] = regular.value
            else:
                error_message_dict[key] = regular.error
                flag = False
        return flag, success_value_dict, error_message_dict


class MainForm(BaseForm):
    def __init__(self):
        self.host = "(.*)"
        self.ip = "(([01]?\d?\d|2[0-4]\d|25[0-5])\.){3}([01]?\d?\d?|2[0-4]\d|25[0-5])"
        self.port = "(\d){0,5}"
        self.phone = "1[3|4|5|8]\d{9}"


class HostForm(BaseForm):
    def __init__(self):
        self.ip = IPFiled(required=True, error_dict={"required":"骚年,没填"})


class IndexHandler(tornado.web.RequestHandler):

    def get(self, *args, **kwargs):
        self.render("index.html")

    def post(self, *args, **kwargs):
        obj = HostForm()
        is_valib, success_dict, error_dict = obj.check_valid(self)
        if is_valib:
            print ('success', success_dict)
        else:
            print ('error', error_dict)

application = tornado.web.Application([             #创建tornado.web.Application对象
    (r"/", IndexHandler),
],)                                      #加载settings

if __name__ == "__main__":
    application.listen(8888)                        #创建一个socket
    tornado.ioloop.IOLoop.instance().start()        #使用epoll,IO多路复用循环

对于checkbox

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import tornado.ioloop                               #导入模块
import tornado.web
import re

class CheckBoxFiled(object):

    def __init__(self, required=True, error_dict=None):
        self.error_dict = {}
        if error_dict:
            self.error_dict.update(error_dict)

        self.required = required
        self.error = None       # 错误信息
        self.value = None       # 匹配到的值
        self.is_valid = False   # 是否匹配

    def validate(self, name, input_value):
        """

        :param name:
        :param input_value:
        :return:
        """
        if not self.required:
            self.is_valid = True
            self.value = input_value
        else:
            if not self.required:
                self.is_valid = True
                self.value = input_value
            else:
                if not input_value:
                    if self.error_dict.get('required', None):
                        self.error = self.error_dict['required']
                    else:
                        self.error = "%s is required" % name
                else:
                    self.is_valid = True
                    self.value = input_value

class IPFiled(object):

    ipregular = "(([01]?\d?\d|2[0-4]\d|25[0-5])\.){3}([01]?\d?\d?|2[0-4]\d|25[0-5])"

    def __init__(self, required=True, error_dict=None):
        self.error_dict = {}
        if error_dict:
            self.error_dict.update(error_dict)

        self.required = required
        self.error = None       # 错误信息
        self.value = None       # 匹配到的值
        self.is_valid = False   # 是否匹配

    def validate(self, name, input_value):
        """

        :param name:字段名
        :param input_value:表单提交数据
        :return:
        """
        # 判断允许为空,Flase代表允许为空
        if not self.required:
            self.is_valid = True
            self.value = input_value
        # 不允许为空
        else:
            # 输入为空
            if not input_value.strip():
                # 根据传入error进行设置
                if self.error_dict.get('required', None):
                    self.error = self.error_dict['required']
                # 默认报错
                else:
                    self.error = "%s is required" % name
            # 输入不为空
            else:
                # 输入不为空进行正则匹配
                # IPFiled.ipregular获取类中静态字段
                ret = re.match(IPFiled.ipregular, input_value)
                # 匹配到
                if ret:
                    self.is_valid = True
                    self.value = input_value
                # 没有匹配到
                else:
                    # 根据传入error进行设置
                    if self.error_dict.get('valid', None):
                        self.error = self.error_dict['valid']
                    # 默认报错
                    else:
                        self.error = "%s is invalid" % name



class BaseForm(object):
    def check_valid(self, handle):
        flag = True
        error_message_dict = {}
        success_value_dict = {}
        for key, regular in self.__dict__.items():
            """
            key: ip
            handle: Homeindex对象,可以进行get_argument等操作
            regular: IPFil3d对象,如果是其他对象也可以,都会在下边执行regular的validdate方法
            """
            # checkbox类传入用get_arguments获取多个返回值
            if type(regular)  == CheckBoxFiled:
                input_value = handle.get_arguments(key)
            else:
                input_value = handle.get_argument(key)
            # 对象内部进行验证
            regular.validate(key, input_value)
            if regular.is_valid:
                success_value_dict[key] = regular.value
            else:
                error_message_dict[key] = regular.error
                flag = False
        return flag, success_value_dict, error_message_dict


class MainForm(BaseForm):
    def __init__(self):
        self.host = "(.*)"
        self.ip = "(([01]?\d?\d|2[0-4]\d|25[0-5])\.){3}([01]?\d?\d?|2[0-4]\d|25[0-5])"
        self.port = "(\d){0,5}"
        self.phone = "1[3|4|5|8]\d{9}"


class HostForm(BaseForm):
    def __init__(self):
        self.ip = IPFiled(required=True, error_dict={"required":"骚年,没填"})
        self.favor = CheckBoxFiled(required=True, error_dict={"required":"骚年,没填"})


class IndexHandler(tornado.web.RequestHandler):

    def get(self, *args, **kwargs):
        self.render("index.html")

    def post(self, *args, **kwargs):
        obj = HostForm()
        is_valib, success_dict, error_dict = obj.check_valid(self)
        if is_valib:
            print ('success', success_dict)
        else:
            print ('error', error_dict)

application = tornado.web.Application([             #创建tornado.web.Application对象
    (r"/", IndexHandler),
],)                                      #加载settings

if __name__ == "__main__":
    application.listen(8888)                        #创建一个socket
    tornado.ioloop.IOLoop.instance().start()        #使用epoll,IO多路复用循环

html代码进行修改

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Title</title>
</head>
<body>
<form action="/" method="post">
    <input name="host" type="text">
    <input name="ip" type="text">
    <input name="port" type="text">
    <input name="phone" type="text">
    <input name="favor" type="checkbox" value="1">Linux
    <input name="favor" type="checkbox" value="2">python
    <input value="提交" type="submit">

</form>
</body>
</html>

需要注意的是,Base类一定要继承object类才行,否则type获得的类一概是instance

参考知乎

对于文件File

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import tornado.ioloop                               #导入模块
import tornado.web
import re

class CheckBoxFiled(object):

    def __init__(self, required=True, error_dict=None):
        self.error_dict = {}
        if error_dict:
            self.error_dict.update(error_dict)

        self.required = required
        self.error = None       # 错误信息
        self.value = None       # 匹配到的值
        self.is_valid = False   # 是否匹配

    def validate(self, name, input_value):
        """

        :param name:
        :param input_value:
        :return:
        """
        if not self.required:
            self.is_valid = True
            self.value = input_value
        else:
            if not self.required:
                self.is_valid = True
                self.value = input_value
            else:
                if not input_value:
                    if self.error_dict.get('required', None):
                        self.error = self.error_dict['required']
                    else:
                        self.error = "%s is required" % name
                else:
                    self.is_valid = True
                    self.value = input_value

class IPFiled(object):

    ipregular = "(([01]?\d?\d|2[0-4]\d|25[0-5])\.){3}([01]?\d?\d?|2[0-4]\d|25[0-5])"

    def __init__(self, required=True, error_dict=None):
        self.error_dict = {}
        if error_dict:
            self.error_dict.update(error_dict)

        self.required = required
        self.error = None       # 错误信息
        self.value = None       # 匹配到的值
        self.is_valid = False   # 是否匹配

    def validate(self, name, input_value):
        """

        :param name:字段名
        :param input_value:表单提交数据
        :return:
        """
        # 判断允许为空,Flase代表允许为空
        if not self.required:
            self.is_valid = True
            self.value = input_value
        # 不允许为空
        else:
            # 输入为空
            if not input_value.strip():
                # 根据传入error进行设置
                if self.error_dict.get('required', None):
                    self.error = self.error_dict['required']
                # 默认报错
                else:
                    self.error = "%s is required" % name
            # 输入不为空
            else:
                # 输入不为空进行正则匹配
                # IPFiled.ipregular获取类中静态字段
                ret = re.match(IPFiled.ipregular, input_value)
                # 匹配到
                if ret:
                    self.is_valid = True
                    self.value = input_value
                # 没有匹配到
                else:
                    # 根据传入error进行设置
                    if self.error_dict.get('valid', None):
                        self.error = self.error_dict['valid']
                    # 默认报错
                    else:
                        self.error = "%s is invalid" % name

class FileFiled(object):

    fileregular = '(\w+)\.pdf|(\w+)\.mp3|(\w+\.py)'

    def __init__(self, required=True, error_dict=None):
        self.error_dict = {}
        if error_dict:
            self.error_dict.update(error_dict)

        self.required = required
        self.error = None  # 错误信息
        self.value = []  # 匹配到的值
        self.is_valid = False  # 是否匹配
        self.success_file_name_list = []

    def validate(self, name, all_file_name_list):
        """

        :param name: 字段名
        :param all_file_name_list:用户表单输入内容
        :return:
        """
        self.name = name
        if not self.required:
            self.is_valid = True
            self.value = all_file_name_list
        else:
            if not all_file_name_list:
                self.is_valid = False
                if self.error_dict.get('required', None):
                    self.error = self.error_dict['required']
                else:
                    self.error = '%s is required' % name
            else:
                for file_name in all_file_name_list:
                    ret = re.match(FileFiled.fileregular, file_name)
                    if not ret:
                        self.is_valid = False
                        if self.error_dict.get('valid', None):
                            self.error = self.error_dict['valid']
                        else:
                            self.error = "%s is invalid" % name
                    else:
                        self.is_valid = True
                        self.value.append(file_name)

    def save(self, request):
        # 文件列表
        file_metas = request.files.get(self.name)
        # 循环文件列表
        for meta in file_metas:
            # 每一个文件
            file_name = meta['filename']
            # 额外指定目录
            # import os
            # path = ''
            # new_file_name = os.path.join(path, file_name)
            if file_name and file_name in self.value:
                with open(file_name, 'wb') as f:
                    f.write(meta['body'])
                self.success_file_name_list.append(file_name)


class BaseForm(object):
    def check_valid(self, handle):
        flag = True
        error_message_dict = {}
        success_value_dict = {}
        for key, regular in self.__dict__.items():
            """
            key: ip
            handle: Homeindex对象,可以进行get_argument等操作
            regular: IPFil3d对象,如果是其他对象也可以,都会在下边执行regular的validdate方法
            """
            # checkbox类传入用get_arguments获取多个返回值
            if type(regular)  == CheckBoxFiled:
                input_value = handle.get_arguments(key)
            elif type(regular) == FileFiled:
                file_list = handle.request.files.get(key)
                file_name_list = []
                for i in file_list:
                    file_name_list.append(i['filename'])
                input_value = file_name_list
            else:
                input_value = handle.get_argument(key)
            # 对象内部进行验证
            regular.validate(key, input_value)
            if regular.is_valid:
                success_value_dict[key] = regular.value
            else:
                error_message_dict[key] = regular.error
                flag = False
        return flag, success_value_dict, error_message_dict


class MainForm(BaseForm):
    def __init__(self):
        self.host = "(.*)"
        self.ip = "(([01]?\d?\d|2[0-4]\d|25[0-5])\.){3}([01]?\d?\d?|2[0-4]\d|25[0-5])"
        self.port = "(\d){0,5}"
        self.phone = "1[3|4|5|8]\d{9}"


class HostForm(BaseForm):
    def __init__(self):
        self.ip = IPFiled(required=True, error_dict={"required":"骚年,没填"})
        self.favor = CheckBoxFiled(required=True, error_dict={"required":"骚年,没填"})
        self.fafafa = FileFiled(required=True, error_dict={"required":"骚年,没填"})


class IndexHandler(tornado.web.RequestHandler):

    def get(self, *args, **kwargs):
        self.render("index.html")

    def post(self, *args, **kwargs):
        obj = HostForm()
        is_valib, success_dict, error_dict = obj.check_valid(self)
        if is_valib:
            print ('success', success_dict)
            obj.fafafa.save(self.request)
        else:
            print ('error', error_dict)

application = tornado.web.Application([             #创建tornado.web.Application对象
    (r"/", IndexHandler),
],)                                      #加载settings

if __name__ == "__main__":
    application.listen(8888)                        #创建一个socket
    tornado.ioloop.IOLoop.instance().start()        #使用epoll,IO多路复用循环

html代码

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Title</title>
</head>
<body>
<form action="/" method="post" enctype="multipart/form-data">
    <input name="host" type="text">
    <input name="ip" type="text">
    <input name="port" type="text">
    <input name="phone" type="text">
    <input name="favor" type="checkbox" value="1">Linux
    <input name="favor" type="checkbox" value="2">python
    <input name="fafafa" type="file">
    <input value="提交" type="submit">
</form>
</body>
</html>