一、安装和配置flask-redis

  • 安装flask-redis
    pip install flask-redis
    
  • 在flask的配置文件中指定redis的连接信息
    # config.py
    REDIS_URL = "redis://:password@localhost:6379/0"
    #如果redis没设置密码 REDIS_URL = "redis://localhost:6379/0"
    
  • 在flask的应用对象中初始化flask-redis
    # app.py
    from flask import Flask
    from flask_redis import FlaskRedis
    
    app = Flask(__name__)
    app.config.from_pyfile('config.py')
    redis_client = FlaskRedis(app)
    

二、用flask-redis操作redis

  • flask-redis提供的redis客户端对象,实际上是对redis-py库的封装,它提供了与redis-py相同的API,可以让我们方便地使用redis的各种数据结构和命令。
  • 我们可以使用set和get方法来存储和获取字符串类型的数据,如下所示:
    @app.route('/set', methods=['POST'])
    def set_data():
    	# 获取请求参数
    	key = request.form.get('key')
    	value = request.form.get('value')
    	# 存储数据到redis
    	redis_client.set(key, value)
    	# 返回响应
    	return jsonify({'message': 'Data saved successfully'})
    
    @app.route('/get', methods=['GET'])
    def get_data():
    	# 获取请求参数
    	key = request.args.get('key')
    	# 从redis获取数据
    	value = redis_client.get(key)
    	# 返回响应
    	return jsonify({'value': value})
    
  • 删除redis中的数据,我们可以使用delete方法,它接受一个或多个键作为参数,返回删除的键的数量,例如:
    @app.route('/delete', methods=['POST'])
    def delete_data():
    	# 获取请求参数
    	keys = request.form.getlist('key')
    	# 从redis删除数据
    	count = redis_client.delete(*keys)
    	# 返回响应
    	return jsonify({'message': f'{count} keys deleted'})
    
  • 我们还可以调用pipeline方法,得到一个pipeline对象。然后在这个对象上执行多个Redis命令,最后调用execute方法来执行所有的命令,并返回一个结果列表。例如:
    @app.route("/pipeline")
    def pipeline():
    	# 创建一个pipeline对象
    	with redis_client.pipeline() as pipe:
        	# 执行一些Redis命令
        	pipe.incr("times")
        	pipe.set("key1", "v1")
        	pipe.get("key2")
        	# 获取命令的结果列表
        	results = pipe.execute()
        	# 返回结果列表的JSON格式
    		return jsonify(results)
    

三、把redis的操作封装到独立的py文件(可选)

  • 可以新建一个redis_utils.py,把比较复杂的操作放进去,先看主程序:
    # app.py
    from flask import Flask
    from redis_utils import RedisUtils # 导入redis_utils模块
    
    app = Flask(__name__)
    app.config.from_pyfile('config.py')
    redis_utils = RedisUtils(app) # 创建RedisUtils实例
    
    @app.route('/set/<key>/<value>')
    def set_key(key, value):
    	# 调用RedisUtils的set方法设置键值对
    	redis_utils.set(key, value)
    	return f'成功设置{key}{value}'
    
    @app.route('/get/<key>')
    def get_key(key):
    	# 调用RedisUtils的get方法获取键对应的值
    	value = redis_utils.get(key)
    	if value:
        	return f'{key}的值是{value}'
    	else:
        	return f'没有找到{key}'
    
    @app.route('/delete/<key>')
    def delete_key(key):
    	# 调用RedisUtils的delete方法删除键
    	result = redis_utils.delete(key)
    	if result:
        	return f'成功删除{key}'
    	else:
        	return f'没有找到{key}'
    
    if __name__ == '__main__':
    	app.run()
    
  • redis_utils.py的简单示例内容如下,实际情况一般会更复杂
    # redis_utils.py
    from flask_redis import FlaskRedis # 使用flask-redis库
    
    class RedisUtils:
    	# 初始化方法,接收一个Flask应用对象作为参数
    	def __init__(self, app):
        	self.redis_client = FlaskRedis() # 创建redis客户端
        	self.redis_client.init_app(app) # 初始化redis客户端
    
    	# 封装set方法,接收键和值作为参数
    	def set(self, key, value):
        	self.redis_client.set(key, value) # 使用redis客户端设置键值对
    
    	# 封装get方法,接收键作为参数,返回值或者None
    	def get(self, key):
        	value = self.redis_client.get(key) # 使用redis客户端获取键对应的值
        	if value:
            	return value.decode() # 如果值存在,返回解码后的字符串
        	else:
            	return None # 如果值不存在,返回None
    
    	# 封装delete方法,接收键作为参数,返回删除结果
    	def delete(self, key):
        	result = self.redis_client.delete(key) # 使用redis客户端删除键
        	return result # 返回删除结果,0表示失败,1表示成功
    
    
Logo

欢迎加入西安开发者社区!我们致力于为西安地区的开发者提供学习、合作和成长的机会。参与我们的活动,与专家分享最新技术趋势,解决挑战,探索创新。加入我们,共同打造技术社区!

更多推荐