引言
在实时应用程序开发中,使用消息队列是一种常见的方式,特别是当需要处理大量异步任务或者需要解耦各个组件时。Redis 是一个流行的内存数据库,它提供了列表数据结构,可以用来实现先进先出(FIFO)队列。本文将介绍如何使用 Python 中的 aioredis 库来连接 Redis,以及如何使用它来实现一个简单的先进先出队列。
import aioredisclass RedisPool:_instances = {}def __new__(cls, redisURL):if redisURL not in cls._instances:cls._instances[redisURL] = super().__new__(cls)cls._instances[redisURL]._pool = Nonereturn cls._instances[redisURL]else:raise cls._instances[redisURL]def __init__(self, redisURL):self._pool = Noneself.redisURL = redisURLasync def get_pool(self):if self._pool is None:self._pool = await aioredis.from_url(self.redisURL)return self._pool# 先进先出队列class RedisQueue:def __init__(self, redisPool, queueName):self._pool = redisPoolself._queueName = queueNameasync def push(self, data):pool = await self._pool.get_pool()await pool.lpush(self._queueName, data)async def pop(self):pool = await self._pool.get_pool()return await pool.rpop(self._queueName)async def size(self):pool = await self._pool.get_pool()return await pool.llen(self._queueName)async def main():# 带密码的redis redis://:password@localhost:6379/0redisPool = RedisPool('redis://:password@233.233.233.233:33333/0')queue = RedisQueue(redisPool, 'testQueue')for i in range(10):print(f"push {i}")await queue.push(str(i))print(f"size {await queue.size()}")for i in range(10):print(await queue.pop())print(f"size {await queue.size()}")if __name__ == '__main__':import asyncioasyncio.run(main())