http://%77%77%77%2E%66网站制作%6F学习网%72%61%73%70%2E%63%6E
python直接有内存队列,以前用的很少,在看代码时看到了。它是异步队列asyncio.Queue() 不同版本的python 可能有不同属性。
下面是python 3.12 队列测试案例。 
 
案例有个前提,生产者和消费者必须在一个进程中
 
   # -*- coding: utf-8 -*-
import asyncio
import random


async def worker(queue, worker_id):
while True:
item = await queue.get()

# 停止信号
if item is None:
queue.task_done()
break

print(
f"[Worker {worker_id}] 开始处理: {item} | "
f"队列剩余: {queue.qsize()} | "
f"未完成总数: {queue.unfinished_tasks}"
)

# 模拟处理时间
await asyncio.sleep(random.uniform(0.5, 1.5))

# 标记任务完成 -> unfinished_tasks 减 1
queue.task_done()

print(
f"[Worker {worker_id}] 完成处理: {item} | "
f"队列剩余: {queue.qsize()} | "
f"未完成总数: {queue.unfinished_tasks}"
)


async def main():
q = asyncio.Queue()

# 启动 2 个消费者
workers = [asyncio.create_task(worker(q, i)) for i in range(2)]

# 生产 5 个任务
total_produced = 5
for i in range(total_produced):
item = f"Task-{i}"
await q.put(item)
print(
f">>> 放入: {item} | 当前队列大小 (qsize): {q.qsize()} | 未完成总数: {q.unfinished_tasks}"
)
await asyncio.sleep(0.2) # 稍微慢点放,方便观察

# 等待所有任务处理完毕
print("\n--- 等待所有任务完成 (q.join) ---\n")
await q.join()
# 此时 q.unfinished_tasks 应该为 0

# 发送停止信号
for _ in workers:
await q.put(None)

await asyncio.gather(*workers)

print("\n=== 最终统计 ===")
print(f"队列当前大小 (qsize): {q.qsize()}")
print(f"未完成的任务数 (unfinished_tasks): {q.unfinished_tasks}")
# 如果需要计算完成的数量:
# 假设我们没有中途丢弃任务,完成数 = total_produced
print(f"估算完成任务数: {total_produced} (因为 unfinished_tasks 归零了)")


if __name__ == "__main__":
asyncio.run(main())
   

forasp.cn