用户模型
from datetime import datetime
from werkzeug.security import generate_password_hash, check_password_hash
from itsdangerous import TimedJSONWebSignatureSerializer as Serializer, BadSignature, SignatureExpired
from flask import current_app
from app import db
from app import redis_conn as redis
class User(db.Model):
'''用户模型
'''
__tablename__ = 'users'
id = db.Column(db.Integer, primary_key=True)
email = db.Column(db.String(255), unique=True, index=True)
nickname = db.Column(db.String(255), nullable=True, index=True)
password_hash = db.Column(db.String(128))
join_time = db.Column(db.DateTime, default=datetime.now)
last_seen = db.Column(db.DateTime, default=datetime.now)
@property
def password(self):
raise AttributeError('密码不可访问')
@password.setter
def password(self, password):
'''生成hash密码'''
self.password_hash = generate_password_hash(password)
def verify_password(self, password):
'''
密码验证
:param password-> 用户密码
:return 验证成功返回True,否则返回False
'''
return check_password_hash(self.password_hash, password)
def generate_user_token(self, expiration):
'''
生成确认身份的Token(密令)
:param expiration-> 有效期限,单位为秒/此处默认设置为12h
:return 加密过的token
'''
s = Serializer(current_app.config['SECRET_KEY'], expires_in=expiration)
return s.dumps({'id': self.id, 'email': self.email}).decode('utf-8')
# 解析token,确认登录的用户身份
@staticmethod
def verify_user_token(token):
s = Serializer(current_app.config['SECRET_KEY'])
try:
data = s.loads(token)
except SignatureExpired:
return None # valid token, but expired
except BadSignature:
return None # invalid token
user = User.query.get(data['id'])
return user
#额外的功能
def update_last_seen(self):
'''更新最后一次登录时间'''
self.last_seen = datetime.now()
db.session.add(self)
def update_rftoken(self, rftoken):
redis.setex(f'rftoken-{self.email}', current_app.config['RFTOKEN_EXPIRE'], rftoken)
def update_token(self, token):
redis.setex(f'token-{self.email}', current_app.config['TOKEN_EXPIRE'], token)
def get_rftoken(self):
value = redis.get(f'rftoken-{self.email}')
if value:
return str(value, encoding='utf8')
else:
return None
def get_token(self):
value = redis.get(f'token-{self.email}')
if value:
return str(value, encoding='utf8')
else:
return None
def to_json(self):
'''返回用户信息'''
return {
'user_id': self.id,
'nickname': self.nickname,
# 'email': self.email,
}
def __repr__(self):
return '<User %r>' % self.nickname
#TODO: 可以添加用户头像
视图函数
import os
from flask import Flask, current_app, make_response, jsonify, request, g
from app import db, redis_conn
from app.api_v1 import api
from app.models import User
from utils.response_code import RET, error_map
from utils import timestamp_to_format
@api.route('/signin', methods=['POST'])
def signin():
'''用户注册接口
:return 返回注册信息{'ret_code': '0', 'msg': '注册成功'}
'''
# nickname =request.json.get('nickname')
email =request.form.get('email')
password =request.form.get('password')
# mailcode_client =request.json.get('mailcode')
if not all([email, password]):
return jsonify(ret_code=RET.PARAMERR, msg='参数不完整')
#从Redis中获取此邮箱对应的验证码,与前端传来的数据校验
# try:
# mailcode_server = redis_conn.get('EMAILCODE:'+ email).decode()
# except Exception as e:
# current_app.logger.debug(e)
# return jsonify(ret_code=RET.DBERR, msg='查询邮箱验证码失败')
# if mailcode_server != mailcode_client:
# current_app.logger.debug(mailcode_server)
# return jsonify(ret_code=RET.PARAMERR, msg='邮箱验证码错误')
user = User()
user.email = email
# user.nickname = nickname
user.password = password #利用user model中的类属性方法加密用户的密码并存入数据库
try:
db.session.add(user)
db.session.commit()
except Exception as e:
current_app.logger.debug(e)
db.session.rollback()
return jsonify(ret_code=RET.DBERR, msg='注册失败')
#6.响应结果
return jsonify(ret_code=RET.OK, msg='注册成功')
@api.route('/login', methods=['POST'])
def login():
'''登录
TODO: 添加图片验证
:return 返回响应,保持登录状态
'''
email = request.form.get('email')
password = request.form.get('password')
#解析Authorization
#email, password = base64.b64decode(request.headers['Authorization'].split(' ')[-1]).decode().split(':')
if not all([email, password]):
return jsonify(ret_code=RET.PARAMERR, msg=error_map[RET.PARAMERR])
try:
user = User.query.filter(User.email==email).first()
except Exception as e:
current_app.logger.debug(e)
return jsonify(ret_code=RET.DBERR, msg=error_map[RET.DBERR])
if not user:
return jsonify(ret_code=RET.USERERR, msg=error_map[RET.USERERR])
if not user.verify_password(password):
return jsonify(ret_code=RET.PWDERR, msg=error_map[RET.PWDERR])
g.user = user
rftoken = user.generate_user_token(expiration=current_app.config['RFTOKEN_EXPIRE'])
token = user.generate_user_token(expiration=current_app.config['TOKEN_EXPIRE'])
#更新最后一次登录时间
user.update_last_seen()
# mysql 更新 rftoken
user.update_rftoken(rftoken)
# redis 更新 token
user.update_token(token)
data = {
'ret_code': RET.OK,
'msg': {
'token': token,
'rftoken': rftoken
}
}
return jsonify(data)
@api.route('/refresh', methods=['GET'])
def refresh_token():
token = request.headers.get('Authorization', '')
if token:
user = User.verify_user_token(token)
if not user:
return jsonify(ret_code=RET.NOTUSER, msg=error_map[RET.NOTUSER])
g.user = user
if user.get_rftoken() == token:
token = user.generate_user_token(expiration=current_app.config['TOKEN_EXPIRE'])
rftoken = user.generate_user_token(expiration=current_app.config['RFTOKEN_EXPIRE'])
# mysql 更新 rftoken
user.update_rftoken(rftoken)
# redis 更新 token
user.update_token(token)
data = {
'ret_code': RET.OK,
'msg': {
'token': token,
'rftoken': rftoken,
}
}
return jsonify(data)
else:
return jsonify(ret_code=RET.NOTUSER, msg=error_map[RET.NOTUSER])
else:
return jsonify(ret_code=RET.NOTUSER, msg=error_map[RET.NOTUSER])
@api.errorhandler(404)
def page_not_found(error):
data = {'msg':'404 error'}
response = make_response(jsonify(data), 404)
return response
0 评论
大哥整点话呗~