Gevent 开发指南:http://sdiehl.github.io/gevent-tutorial

1、gevent 简介

gevent 是什么

pypi:https://pypi.org/search/?q=gevent
gevent 官网文档:http://www.gevent.org/contents.html

Gevent 指南(中文)下载地址:http://download.csdn.net/download/freeking101/9924351

gevent 是一个基于协程的 Python 网络库,它使用 greenlet 在 libev 或 libuv 事件循环之上提供高级同步 API。gevent 实现了python 标准库里面大部分的阻塞式系统调用,包括 socket、ssl、threading 和 select 等模块,可以使用 "猴子补丁" 将这些阻塞式调用变为协作式运行。

Gevent 主要特性有以下几点:

  1. 基于 libev 和 libuv 的快速事件循环。( 在1.0.x之前更早期的版本里,gevent 使用 libevent 而不是 libev)
  2. 基于 greenlets 的轻量级执行单元。
  3. API 复用 Python 标准库中的概念(例如,有事件和队列)。
  4. TCP / UDP / HTTP 服务
  5. 支持 SSL 的协作套接字
  6. 通过 threadpool、dnspython 或 c-ares 执行的协作 DNS 查询。
  7. 通过 "猴子修补" 很容易使第三方模块变成协程
  8. 子进程 支持。(通过 gevent.subprocess)
  9. Thread pools 线程池
  10. gevent 的代码风格和线程非常相似,运行出来后的效果也非常相似。

事件驱动 (事件分发)

Linux 的 epoll 机制

epoll 是 Linux 内核为处理大批量文件描述符而作了改进的 poll,是 Linux 下 "多路复用IO" select/poll 的增强版本,它能显著提高程序在大量并发连接中只有少量活跃的情况下的系统CPU利用率。epoll 的优点:支持一个进程打开大数目的 socket 描述符。select 的一个进程所打开的 FD 由FD_SETSIZE 的设置来限定,而 epoll 没有这个限制,它所支持的 FD 上限是最大可打开文件的数目,远大于2048,而且IO 效率不随 FD 数目增加而线性下降:由于 epoll 只会对 "活跃" 的 socket 进行操作,于是,只有 "活跃" 的 socket 才会主动去调用 callback 函数,其他 idle 状态的 socket 则不会。epoll 使用 mmap 加速内核与用户空间的消息传递。epoll 是通过内核于用户空间 mmap 同一块内存实现的。

事件驱动(事件分发)库

libevent、libev、libuv 都是开源的事件驱动(事件分发)库,用来处理网络和IO事件的异步编程。原理:当文件描述符事件发生时,调用回调函数的机制。

libevent、libev和 libuv 都是用于异步IO编程的开源库。有以下几点区别:

  • 底层实现:libevent 和 libev 都是使用底层系统调用来实现事件驱动,如select、poll、epoll等。而libuv则是使用自己实现的事件驱动机制。
  • 跨平台支持:libevent、libev和libuv都具有跨平台的能力,但它们的实现方式不同。libevent和libev通过提供多种事件模型来适应不同操作系统的特性;而libuv则通过自己实现的事件驱动机制来屏蔽了操作系统的差异。
  • 功能丰富程度:libevent和libev主要用于事件驱动编程,提供了事件通知、回调和监听等基础功能;而libuv则提供了更多的功能,如异步文件IO、网络套接字、DNS解析、进程管理等。
  • 适用范围:libevent、libev和libuv都适用于异步编程,但在不同场景下可能会有不同的选择。libevent和libev适用于需要高效处理大量并发连接的场景,如服务器开发;而libuv则适用于开发面向网络和文件的异步应用。

gevent 和其他协程库区别

  • greenlet :greenlet 是Python中的一个轻量级协程库,基于Python中的yield关键字实现。Greenlet 没有自己的调度过程,需要手动编写协程调度逻辑。所以一般不会直接使用。
            pypi:https://pypi.org/search/?q=greenlet
            greenlet:http://greenlet.readthedocs.org/en/latest/
  • eventlet:基于 greenlet 的一个高级协程库,Eventlet 实现了自己调度器称为 Hub,在 Hub 中有一个 event loop,根据不同的事件来切换到对应的 GreenThread 中。同时 Eventlet 还实现了一系列的补丁来使 Python 标准库中的 socket 等等module 来支持 GreenThread 的切换。Eventlet 的 Hub 可以被定制来实现自己调度过程。
  • Gevent:基于 libev 与 greenlet 实现。gevent 使用 libev 来实现一个高效的 event loop 调度循环。同时类似于 Event,Gevent 也有自己的 monkey_patch,在打了补丁后,完全可以使用 python 线程的方式来无感知的使用协程。

gevent 特点总结:事件驱动 + 协程 + 非阻塞IO

  • 事件驱动指的是 libvent 对 epool 的封装,是基于事件的方式处理 IO。
  • 协程指的是 greenlet
  • 非阻塞 IO 指的是 gevent 已经 patch 过的各种库,例如 socket 和 select 等等。

猴子补丁(Monkey Patch)

官网文档:https://www.gevent.org/intro.html#monkey-patching

猴子补丁的由来

猴子补丁的这个叫法起源于 Zope 框架,大家在修正 Zope 的 Bug 的时候经常在程序后面追加更新部分,这些被称作是 “杂牌军补丁(guerillapatch)”,后来 guerilla 就渐渐的写成了 gorllia(猩猩),再后来就写了 monkey(猴子),所以猴子补丁的叫法是这么莫名其妙的得来的。 后来在动态语言中,不改变源代码而对功能进行追加和变更,统称为 "猴子补丁"。所以猴子补丁并不是 Python 中专有的。猴子补丁这种东西充分利用了动态语言的灵活性,可以对现有的语言Api 进行追加,替换,修改 Bug,甚至性能优化等等。 gevent 通过猴子补丁的方式能够修改标准库里面大部分的阻塞式系统调用,包括 socket、ssl、threading 和 select 等模块,而变为协作式运行。

猴子补丁使用时的注意事项

猴子补丁的功能很强大,但是也带来了很多的风险,尤其是像 gevent 这种直接进行 API替换的补丁,整个 Python 进程所使用的模块都会被替换,可能自己的代码能 hold 住,但是其它第三方库,有时候问题并不好排查,即使排查出来也是很棘手,所以,就像松本建议的那样,如果要使用猴子补丁,那么只是做功能追加,尽量避免大规模的 API 覆盖。 虽然猴子补丁仍然是邪恶的(evil),但在这种情况下它是 "有用的邪恶(useful evil)"

使用 gevent 注意事项

  1. gevent.spawn 启动的所有协程,都是运行在同一个线程中,所以协程不能跨线程同步数据。
  2. gevent.queue.Queue 是协程安全的。
  3. gevent 启动的并发协程,具体到 task function,不能有长时间阻塞的IO操作。因为 gevent 的协程的特点是,当前协程阻塞了才会切换到别的协程。如果当前协程长时间阻塞,则不能显示( gevent.sleep(0),或隐式,由gevent来做)切换到别的协程。导致程序出问题。
  4. 如果有长时间阻塞的 IO 操作,还是用传统的线程模型比较好。
  5. 使用 gevent 的协程,最好使用 gevent 自身的非阻塞库。如 httplib, socket, select 等等。

使用 gevent 示例

使用方法:程序的重要部分是将任务函数封装到 gevent.spawn()

  • 初始化的 greenlet 列表存放在数组 threads 中,此数组被传给 gevent.joinall 函数,
  • gevent.joinall 会阻塞当前流程,并执行所有给定的 greenlet,执行流程只会在所有 greenlet 执行完后才会继续向下走。 
import gevent
from gevent import socket

urls = ['www.baidu.com', 'www.example.com', 'www.python.org']
jobs = [gevent.spawn(socket.gethostbyname, url) for url in urls]
gevent.joinall(jobs, timeout=2)
result = [job.value for job in jobs]
print(result)

"""
解释:
通过 gevent.spawn() 方法创建 job 
通过 gevent.joinall 将 jobs 加入到 微线程 执行队列中等待其完成,设置超时为 2 秒。
执行后的结果通过检查 gevent.Greenlet.value 值来收集。
gevent.socket.gethostbyname()是非阻塞的,同时与标准的socket.gethotbyname()有相同的接口。
"""

方法 1:继承 Gevent 的 Greenlet

import gevent
from gevent import monkey
from gevent import Greenlet


monkey.patch_all()


class Task(Greenlet):
    def __init__(self, name):
        Greenlet.__init__(self)
        self.name = name

    def _run(self):
        print("Task %s: some task..." % self.name)


t1 = Task("task1")
t2 = Task("task2")
t1.start()
t2.start()
# here we are waiting all tasks
gevent.joinall([t1, t2])

方法 2:直接使用

import gevent
from gevent import monkey;monkey.patch_all()
import random
import requests
from pprint import pprint


def func_1():
    print("func_1 结束")


def func_2():
    print("func_2 结束")


def get_url(url):
    print(f"开始 ---> {url}")
    res = requests.get(url)
    gevent.sleep(random.randint(1, 3))
    print(f"[{res.status_code}][{url}] ---> {len(res.text)}")


def main():
    url_list = [
        'https://httpbin.org/',
        'https://www.baidu.com',
        'https://www.cnblogs.com'
    ]
    greenlet_list = []
    for index in url_list:
        greenlet_list.append(gevent.spawn(get_url, index))
    greenlet_list += [gevent.spawn(func_1), gevent.spawn(func_2)]
    # timeout 设置最大等待时间,如果不设置就一直等待
    gevent.joinall(greenlet_list, timeout=None)
    # gevent.spawn(lambda: 1 / 0).join()
    print("main 结束")


if __name__ == '__main__':
    main()

示例:抓取豆瓣

利用 gevent 并发 抓取豆瓣

from gevent import monkey; monkey.patch_all()
import gevent
import requests


class Douban(object):

    def __init__(self):
        self.host = 'movie.douban.com'
        self.headers = {
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:61.0) Gecko/20100101 Firefox/61.0',
            'Referer': 'https://movie.douban.com/',
        }

    def test_search_tags_movie(self):
        method = 'search_tags'
        url = f"https://{self.host}/j/{method}"
        post_data = {
            'type': 'movie',
            'source': 'index'
        }
        resp = requests.post(url=url, data=post_data, headers=self.headers)
        ret_val = f"{url} ---> {resp.status_code}"
        return ret_val


if __name__ == '__main__':
    douban = Douban()
    job_list = []
    for index in range(6):
        job_obj = gevent.spawn(douban.test_search_tags_movie)
        job_list.append(job_obj)
    gevent.joinall(job_list)
    result_list = [job_obj.value for job_obj in job_list]
    list(map(lambda item=None: print(item), result_list))
    pass

示例:生产者、消费者

from gevent import monkey; monkey.patch_all()
import gevent
from gevent.queue import Queue  # 队列 gevent中的队列
import random

task_queue = Queue(3)


def producer(index=1):
    while True:
        print(f'生产者 [{index}]', end='')
        item = random.randint(0, 99)
        task_queue.put(item)
        print(f"生产 ---> {item}")


def consumer(index=1):
    while True:
        print(f'消费者 [{index}]', end='')
        item = task_queue.get()
        print(f"消费 ---> {item}")


def main():
    job_1 = gevent.spawn(producer)
    job_2 = gevent.spawn(consumer)
    job_3 = gevent.spawn(consumer, 2)
    job_list = [job_1, job_2, job_3]
    gevent.joinall(job_list)


if __name__ == '__main__':
    main()
    pass

示例:后门程序

gevent.backdoor:https://www.gevent.org/api/gevent.backdoor.html

此后门不提供身份验证,也不尝试限制远程用户可以执行的操作。任何可以访问服务器的人都可以执行正在运行的 python 进程可以执行的任何操作。因此,虽然您可以绑定到任何接口,但出于安全考虑,建议您绑定到只能由本地计算机访问的接口,例如 127.0.0.1/localhost。

基本用法:

from gevent.backdoor import BackdoorServer

server = BackdoorServer(
    ('127.0.0.1', 5001),
    banner="Hello from gevent backdoor!",
    locals={'foo': "From defined scope!"}
)
server.serve_forever()
  • locals – 如果给定,则为将在顶层提供的“内置”值字典。
  • banner – 如果为 geven,则为将打印给每个连接用户的字符串。

在另一个终端中,连接:telnet 127.0.0.1 5001  连接成功后,会进入一个交互式 Python shell

2、gevent 指南

gevent是一个基于libev的并发库。它为各种并发和网络相关的任务提供了整洁的API。

from gevent import monkey
monkey.patch_all()

这样两行,就可以使用 python 以前的 socket 之类的,因为 gevent 已经给你自动转化了。
而且安装 gevent 也是很方便,首先安装依赖 libevent 和 greenlet,再利用 pypi 安装即可

                安装 libevent:sudo apt-get install libevent-dev
                安装 python-dev:sudo apt-get install python-dev
                安装 gevent:sudo pip install gevent
                安装 greenlet:sudo pip install greenlet

示例:

from gevent import monkey; monkey.patch_socket()
import gevent
 
 
def f(n):
    for i in range(n):
        print(gevent.getcurrent(), i)
        gevent.sleep(0)
 
 
g1 = gevent.spawn(f, 5)
g2 = gevent.spawn(f, 5)
g3 = gevent.spawn(f, 5)
g1.join()
g2.join()
g3.join()

3个 greenlet 交替运行,把循环次数改为 500000,运行时间长一点,然后在操作系统的进程管理器中看,线程数只有1个。 gevent.sleep() 作用是交出控制权

示例:

import gevent
from gevent import monkey


# 切换是在 IO 操作时自动完成,所以gevent需要修改Python自带的一些标准库
# 这一过程在启动时通过monkey patch完成
monkey.patch_all()


def func_a():
    count = 10
    while count > 0:
        print(f"func_a ---> {count}")
        # 用来模拟一个耗时操作,注意不是time模块中的sleep
        # 每当碰到耗时操作,会自动跳转至其他协程
        count -= 1
        gevent.sleep(1)


def func_b():
    count = 10
    while count > 0:
        print(f"func_b ---> {count}")
        count -= 1
        gevent.sleep(0.5)


# gevent.joinall([gevent.spawn(fn)
g1 = gevent.spawn(func_a)  # 创建一个协程
g2 = gevent.spawn(func_b)
g1.join()  # 等待协程执行结束
g2.join()

select() 函数通常是对各种文件描述符进行轮询的阻塞调用。

from gevent import select
...
select.select([], [], [], 2)

gevent 池

示例代码,测试 gevent 的 任务池

from gevent import pool

gevent_pool = pool.Pool()


def func_1():
    for index in range(100):
        gevent_pool.spawn(func_2, index)


def func_2(arg=None):
    print(f'func_2 ---> {arg}')


gevent_pool.spawn(func_1)
gevent_pool.join()

示例代码。程序及注释如下:

# -*- coding: utf-8 -*-

import time
import gevent
from gevent import event  # 调用 gevent 的 event 子模块


# 三个进程需要定义三个事件 event1,event2,event3,来进行12,23,31循环机制,即进程一,进程二,进程三顺序执行

def fun1(num, event1, event2):  # 固定格式
    i = 0
    while i < 10:  # 设置循环10次
        i += 1
        time.sleep(1)  # 睡眠1秒
        print('进程一:111111111')
        event2.set()    # 将event2值设为True
        event1.clear()  # 将event1值设为False
        event1.wait()   # event1等待,其值为True时才执行


def fun2(num, event2, event3):
    i = 0
    while i < 10:
        i += 1
        time.sleep(1)
        print('进程二:222222222')
        event3.set()  # 将event3值设为True
        event2.clear()  # 将event2值设为False
        event2.wait()  # event2等待,其值为True时才执行


def fun3(num, event3, event1):
    i = 0
    while i < 10:
        i += 1
        time.sleep(1)
        print('进程三:333333333')
        event1.set()
        event3.clear()
        event3.wait()


if __name__ == "__main__":  # 执行调用格式
    act1 = gevent.event.Event()  # 调用event中的Event类,用act1表示
    act2 = gevent.event.Event()
    act3 = gevent.event.Event()

    # 三个进程,act1,act2,act3
    gevent_list = []  # 建立一个数列,用来存和管理进程

    # 调用gevent中的Greenlet子模块,用Greenlet创建进程一
    g = gevent.Greenlet(fun1, 1, act1, act2)
    g.start()
    gevent_list.append(g)  # 将进程一加入到Gevents数列
    print('进程一启动:')

    g = gevent.Greenlet(fun2, 2, act2, act3)
    g.start()
    gevent_list.append(g)
    print('进程二启动:')

    g = gevent.Greenlet(fun3, 3, act3, act1)
    g.start()
    gevent_list.append(g)
    print('进程三启动:')
    print('所有进程都已启动!')

    # 调用Greenlet中的joinall函数,将Gevents的进程收集排列
    gevent.joinall(gevent_list)

gevent API 参考

更多可以查阅:Module Listing.

高级概念

网络接口

同步原语(锁、队列、事件)

低级接口详细信息

模块 列表

核心部分

Greenlets

在 gevent 中用到的主要模式是 Greenlet,它是以C扩展模块形式接入Python的轻量级协程。 Greenlet全部运行在主程序操作系统进程的内部,但它们被协作式地调度。

在任何时刻,只有一个协程在运行。

"同步、异步" 执行

并发性的核心思想是,可以将较大的任务分解为多个子任务的集合,这些子任务计划同时或异步运行,而不是一次或同步运行。两个子任务之间的切换称为上下文切换。

gevent中的上下文切换是通过 yielding 来完成的。在本例中,我们有两个上下文,它们通过调用 gevent.sleep(0) 相互让步。

import gevent

def foo():
    print('Running in foo')
    gevent.sleep(0)
    print('Explicit context switch to foo again')

def bar():
    print('Explicit context to bar')
    gevent.sleep(0)
    print('Implicit context switch back to bar')

gevent.joinall([
    gevent.spawn(foo),
    gevent.spawn(bar),
])

结果

Running in foo
Explicit context to bar
Explicit context switch to foo again
Implicit context switch back to bar

下图将控制流形象化,就像在调试器中单步执行整个程序,以说明上下文切换如何发生。

当我们将 gevent 用于网络和 IO 绑定函数时,它的真正威力就来了,这些函数可以协同调度。Gevent 负责处理所有细节,以确保网络库尽可能隐式地让步出它们的 greenlet 上下文。我再怎么强调这是一个多么有力的成语也不为过。但也许可以举个例子来说明。

在这种情况下,select() 函数通常是一个阻塞调用,它轮询各种文件描述符。

import time
import gevent
from gevent import select

start = time.time()
tic = lambda: 'at %1.1f seconds' % (time.time() - start)

def gr1():
    # Busy waits for a second, but we don't want to stick around...
    print('Started Polling: %s' % tic())
    select.select([], [], [], 2)                 
    # 可以理解成一个 IO 阻塞的操作,阻塞了2秒,这时 Greenlet 会自动切换到 gr2() 上下文执行 
    print('Ended Polling: %s' % tic())

def gr2():
    # Busy waits for a second, but we don't want to stick around...
    print('Started Polling: %s' % tic())
    select.select([], [], [], 2)
    print('Ended Polling: %s' % tic())

def gr3():
    print("Hey lets do some stuff while the greenlets poll, %s" % tic())
    gevent.sleep(1)
    # 让当前 Greenlet 休眠1秒,上面 gr1() gr2() 阻塞操作完成后,再切换到 gr1() gr2() 的上下文执行

gevent.joinall([
    gevent.spawn(gr1),
    gevent.spawn(gr2),
    gevent.spawn(gr3),
])

Started Polling: at 0.0 seconds
Started Polling: at 0.0 seconds
Hey lets do some stuff while the greenlets poll, at 0.0 seconds
Ended Polling: at 2.0 seconds
Ended Polling: at 2.0 seconds

另一个比较综合的例子定义了一个不确定的任务函数 (它的输出不能保证对相同的输入给出相同的结果) 。在这种情况下,运行该函数的副作用是任务暂停执行的时间是随机的。

import gevent
import random


def task(pid):
    """
    Some non-deterministic task
    """
    gevent.sleep(random.randint(0, 2) * 0.001)
    print('Task %s done' % pid)


def synchronous():
    for i in range(1, 10):
        task(i)


def asynchronous():
    threads = [gevent.spawn(task, i) for i in range(10)]
    gevent.joinall(threads)


print('同步执行:')
synchronous()

print('异步执行:')
asynchronous()

在同步情况下,所有任务都是按顺序运行的,这会导致在执行每个任务时主程序阻塞(即暂停主程序的执行)。

程序的重要部分是 gevent.spawn,它将给定的函数封装在一个 Greenlet 线程中。初始化的 greenlet 列表存储在传递给 gevent 的数组线程中。gevent.joinall 函数,它会阻塞当前程序,来运行所有给定的 greenlet。只有当所有 greenlet 终止时,执行才会继续进行。异步情况下的执行顺序本质上是随机的。

确定性

如前所述,greenlet 是确定的。给定相同的 greenlet 配置和相同的输入集,它们总是产生相同的输出。例如,让我们将一个任务分散到一个多进程(multiprocessing)池中,并将其结果与一个gevent池的结果进行比较。

import time


def echo(i):
    time.sleep(0.001)
    return i


if __name__ == '__main__':
    # Non Deterministic Process Pool
    from multiprocessing.pool import Pool

    p = Pool(10)
    # imap_unordered 返回一个顺序随机的 iterable 对象
    run1 = [a for a in p.imap_unordered(echo, range(10))]
    run2 = [a for a in p.imap_unordered(echo, range(10))]
    run3 = [a for a in p.imap_unordered(echo, range(10))]
    run4 = [a for a in p.imap_unordered(echo, range(10))]
    print(run1 == run2 == run3 == run4)

    # Deterministic Gevent Pool
    from gevent.pool import Pool

    p = Pool(10)
    run1 = [a for a in p.imap_unordered(echo, range(10))]  # [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
    run2 = [a for a in p.imap_unordered(echo, range(10))]  # [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
    run3 = [a for a in p.imap_unordered(echo, range(10))]  # [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
    run4 = [a for a in p.imap_unordered(echo, range(10))]  # [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
    print(run1 == run2 == run3 == run4)

尽管 gevent 通常是确定的,但当您开始与外部服务(如 socket 和文件)进行交互时,非确定性的来源可能会潜入您的程序。因此,即使 green 线程是确定性并发的一种形式,它们仍然会遇到POSIX线程和进程遇到的一些相同的问题。

与并发有关的长期问题称为竞争条件。简而言之,当两个并发线程/进程依赖于某些共享资源但还试图修改该值时,就会发生竞争状态。这将导致资源的值变得依赖于执行顺序。这是一个问题,一般来说,应该尽量避免竞态条件,因为它们会导致全局的不确定程序行为。

最好的方法是在任何时候都避免所有全局状态。

直接使用、创建 Greenlet类的子类

gevent 提供了一些关于Greenlet初始化的包装器。一些最常见的模式是:

import gevent
from gevent import Greenlet


def foo(message, n):
    """
    Each thread will be passed the message, and n arguments
    in its initialization.
    """
    gevent.sleep(n)
    print(message)


# Initialize a new Greenlet instance running the named function
# foo
thread1 = Greenlet.spawn(foo, "Hello", 1)

# Wrapper for creating and running a new Greenlet from the named
# function foo, with the passed arguments
thread2 = gevent.spawn(foo, "I live!", 2)

# Lambda expressions
thread3 = gevent.spawn(lambda x: (x + 1), 2)

threads = [thread1, thread2, thread3]

# Block until all threads complete.
gevent.joinall(threads)

除使用基本的 Greenlet类之外,你也可以 子类化Greenlet类,重载它的 _run 方法。

import gevent
from gevent import Greenlet


class MyGreenlet(Greenlet):

    def __init__(self, message, n):
        Greenlet.__init__(self)
        self.message = message
        self.n = n

    def _run(self):
        print(self.message)
        gevent.sleep(self.n)


g = MyGreenlet("Hi there!", 3)
g.start()
g.join()

Greenlet 状态

与代码的其他部分一样,greenlet可能以各种方式失败。greenlet可能无法抛出异常、无法停止或消耗太多系统资源。greenlet 的内部状态通常是一个与时间相关的参数。在greenlets上有许多标志,它们允许您监视线程的状态:

  • started -- 布尔值,指示Greenlet是否已启动
  • ready() -- 布尔值,指示Greenlet是否已停止
  • successful() -- 布尔值,指示Greenlet是否已停止且没有抛出异常
  • value -- Greenlet返回的值
  • exception -- 异常,在greenlet中抛出的未捕获异常实例
import gevent


def win():
    return 'You win!'


def fail():
    raise Exception('You fail at failing.')


winner = gevent.spawn(win)
loser = gevent.spawn(fail)

print(winner.started)  # True
print(loser.started)  # True

# Exceptions raised in the Greenlet, stay inside the Greenlet.
try:
    gevent.joinall([winner, loser])
except Exception as e:
    print('This will never be reached')

print(winner.value)  # 'You win!'
print(loser.value)  # None

print(winner.ready())  # True
print(loser.ready())  # True

print(winner.successful())  # True
print(loser.successful())  # False

# The exception raised in fail, will not propagate outside the
# greenlet. A stack trace will be printed to stdout but it
# will not unwind the stack of the parent.

print(loser.exception)

# It is possible though to raise the exception again outside
# raise loser.exception
# or with
# loser.get()

程序停止

当主程序接收到SIGQUIT时,无法生成(yield)的 greenlet 可能会使程序的执行时间比预期的更长。这将导致所谓的“僵尸进程”,需要从 Python 解释器外部杀死这些进程。

一种常见的模式是监听主程序上的SIGQUIT事件并在退出前调用 gevent.shutdown 。

import gevent
import signal


def run_forever():
    gevent.sleep(1000)


if __name__ == '__main__':
    gevent.signal(signal.SIGQUIT, gevent.kill)
    thread = gevent.spawn(run_forever)
    thread.join()

超时

超时是对代码块或Greenlet的运行时的约束。

import gevent
from gevent import Timeout

seconds = 10

timeout = Timeout(seconds)
timeout.start()

def wait():
    gevent.sleep(10)

try:
    gevent.spawn(wait).join()
except Timeout:
    print('Could not complete')

在with语句中,它们还可以与上下文管理器一起使用。

import gevent
from gevent import Timeout

time_to_wait = 5 # seconds

class TooLong(Exception):
    pass

with Timeout(time_to_wait, TooLong):
    gevent.sleep(10)

此外,gevent 还为各种 Greenlet 和数据结构相关的调用提供超时参数。例如:

import gevent
from gevent import Timeout

def wait():
    gevent.sleep(2)

timer = Timeout(1).start()
thread1 = gevent.spawn(wait)

try:
    thread1.join(timeout=timer)
except Timeout:
    print('Thread 1 timed out')

# --

timer = Timeout.start_new(1)
thread2 = gevent.spawn(wait)

try:
    thread2.get(timeout=timer)
except Timeout:
    print('Thread 2 timed out')

# --

try:
    gevent.with_timeout(1, wait)
except Timeout:
    print('Thread 3 timed out')

猴子补丁(Monkey patching)

我们来到了Gevent的黑暗角落。到目前为止,我一直避免提到monkey patching,以尝试和激发强大的协同模式,但是现在是讨论monkey patching的黑魔法的时候了。 如果您注意到上面我们调用了命令 monkey.patch_socket(),这是一个纯粹用于修改标准库套接字库(socket)的副作用命令。

import socket
print(socket.socket)

print("After monkey patch")
from gevent import monkey
monkey.patch_socket()
print(socket.socket)

import select
print(select.select)
monkey.patch_select()
print("After monkey patch")
print(select.select)

结果:

class 'socket.socket'
After monkey patch
class 'gevent.socket.socket'

built-in function select
After monkey patch
function select at 0x1924de8

Python 允许在运行时修改大多数对象,包括模块、类甚至函数。这通常是一个令人震惊的坏主意,因为它创建了一个“隐式副作用”,如果出现问题,通常非常难以调试,然而在极端情况下,库需要改变Python本身的基本行为,可以使用monkey补丁。在这种情况下,gevent能够修补标准库中的大多数阻塞系统调用,包括 socket、ssl、threading 和 select 模块中的调用。

例如,Redis-python 的绑定通常使用常规tcp socket与Redis-server实例通信。只需调用gevent.monkey.patch_all(),我们可以让redis绑定协同调度请求,并与gevent堆栈的其他部分一起工作。

这让我们可以在不编写任何代码的情况下集成通常无法与gevent一起工作的库。(尽管猴子补丁仍然是邪恶的,但在这种情况下,它是“有用的邪恶”。)

数据结构

事件

事件(event) 是 greenlet 之间异步通信的一种形式。

import gevent
from gevent.event import Event

'''
Illustrates the use of events
'''

evt = Event()


def setter():
    """After 3 seconds, wake all threads waiting on the value of evt"""
    print('A: Hey wait for me, I have to do something')
    gevent.sleep(3)
    print("Ok, I'm done")
    evt.set()  # 运行到evt.set()会将flag设置为True,然后另外两个被阻塞的waitter的evt.wait()方法在看到flag已经为True之后不再继续阻塞运行并且结束。


def waiter():
    """After 3 seconds the get call will unblock"""
    print("I'll wait for you")
    evt.wait()  # blocking
    print("It's about time")


def main():
    gevent.joinall([
        gevent.spawn(setter),
        gevent.spawn(waiter),
        gevent.spawn(waiter),
        gevent.spawn(waiter),
        gevent.spawn(waiter),
        gevent.spawn(waiter)
    ])


if __name__ == '__main__': 
    main()

事件对象的扩展是 AsyncResult,它允许您在唤醒调用时发送一个值。这有时被称为future或deferred,因为它有对 future 值的引用,可以在任意的时间设置该值。

import gevent
from gevent.event import AsyncResult

a = AsyncResult()


def setter():
    """
    After 3 seconds set the result of a.
    """
    gevent.sleep(3)
    a.set('Hello!')


def waiter():
    """
    After 3 seconds the get call will unblock after the setter
    puts a value into the AsyncResult.
    """
    print(a.get())


gevent.joinall([
    gevent.spawn(setter),
    gevent.spawn(waiter),
])

队列

队列是按顺序排列的数据集,它们具有通常的 put / get 操作,但可以在Greenlets上安全操作的方式编写。例如,如果一个Greenlet从队列中获取一个项目(item),则同一项目(item)不会被同时执行的另一个Greenlet获取。

import gevent
from gevent.queue import Queue

tasks = Queue()


def worker(n):
    while not tasks.empty():
        task = tasks.get()
        print('Worker %s got task %s' % (n, task))
        gevent.sleep(0)

    print('Quitting time!')


def boss():
    for i in range(1, 25):
        tasks.put_nowait(i)


gevent.spawn(boss).join()

gevent.joinall([
    gevent.spawn(worker, 'steve'),
    gevent.spawn(worker, 'john'),
    gevent.spawn(worker, 'nancy'),
])

根据需要,队列还可以在put或get上阻塞。

每个put和get操作都有一个非阻塞的对应操作,put_nowait 和 get_nowait 不会阻塞。如果操作是不可能的,会引发 gevent.queue.Empty 或 gevent.queue.Full

在这个例子中,我们让boss同时和workers运行,并且对队列有一个限制,防止它包含三个以上的元素。这个限制意味着put操作将阻塞,直到队列上有空间为止。相反,如果队列上没有要获取的元素,get操作就会阻塞,它还会接受一个超时参数,如果在超时的时间范围内找不到工作(work),则允许队列以异常 gevent.queue.Empty 退出。

import gevent
from gevent.queue import Queue, Empty

tasks = Queue(maxsize=3)


def worker(name):
    try:
        while True:
            task = tasks.get(timeout=1)  # decrements queue size by 1
            print('Worker %s got task %s' % (name, task))
            gevent.sleep(0)
    except Empty:
        print('Quitting time!')


def boss():
    """
    Boss will wait to hand out work until a individual worker is
    free since the maxsize of the task queue is 3.
    """

    for i in range(1, 10):
        # 输入1,2,3,到4时,队列达到最大值,put方法阻塞,gevent 切换到下一个协程worker(steve)
        tasks.put(i)  
    print('Assigned all work in iteration 1')

    for i in range(10, 20):
        tasks.put(i)
    print('Assigned all work in iteration 2')


gevent.joinall([
    gevent.spawn(boss),
    gevent.spawn(worker, 'steve'),
    gevent.spawn(worker, 'john'),
    gevent.spawn(worker, 'bob'),
])

组和池

组是运行中的greenlet的集合,这些greenlet作为组一起管理和调度。它还兼做并行调度程序,借鉴 Python multiprocessing 库。

import gevent
from gevent.pool import Group


def talk(msg):
    for i in range(3):
        print(msg)


g1 = gevent.spawn(talk, 'bar')
g2 = gevent.spawn(talk, 'foo')
g3 = gevent.spawn(talk, 'fizz')

group = Group()
group.add(g1)
group.join()  # 这里join只会让当前线程等待g1,但g2和g3已经被启动,会被继续安排执行

这对于管理异步任务组非常有用。Group还提供了一个API,用于将作业分派给分组的greenlet并以各种方式收集它们的结果。

import gevent
from gevent import getcurrent
from gevent.pool import Group

group = Group()


def hello_from(n):
    print('Size of group %s' % len(group))
    print('Hello from Greenlet %s' % id(getcurrent()))


group.map(hello_from, range(3))


def intensive(n):
    gevent.sleep(3 - n)
    return 'task', n


print('Ordered')

ogroup = Group()
for i in ogroup.imap(intensive, range(3)):
    print(i)

print('Unordered')

igroup = Group()
for i in igroup.imap_unordered(intensive, range(3)):
    print(i)

池(pool) 是一种结构,用于处理需要限制并发的动态数量的greenlets。在需要并行执行许多网络或IO绑定任务的情况下,这通常是可取的。

import gevent
from gevent.pool import Pool

pool = Pool(2)

def hello_from(n):
    print('Size of pool %s' % len(pool))

pool.map(hello_from, range(3))

通常在构建 gevent 驱动的服务时,会将整个服务围绕一个池结构进行中心处理。一个例子可能是在各种套接字(socket)上轮询的类。

from gevent.pool import Pool

class SocketPool(object):

    def __init__(self):
        self.pool = Pool(1000)
        self.pool.start()

    def listen(self, socket):
        while True:
            socket.recv()

    def add_handler(self, socket):
        if self.pool.full():
            raise Exception("At maximum pool size")
        else:
            self.pool.spawn(self.listen, socket)

    def shutdown(self):
        self.pool.kill()

锁和信号量 (locks and semaphores)

信号量是一个允许 greenlet 相互合作,限制并发访问或运行的低层次的同步原语。 信号量有两个方法,acquire 和 release。在信号量是否已经被 acquire或release,和拥有资源的数量之间不同,被称为此信号量的范围 (the bound of the semaphore)。如果一个信号量的范围已经降低到0,它会 阻塞 acquire 操作直到另一个已经获得信号量的 greenlet 作出释放。

信号量是一种低级同步原语,它允许greenlet协调和限制并发访问或执行。信号量有两个方法:acquire 和 release。被获取和释放的次数之差称为信号量的界限。如果信号量范围达到0,它就会阻塞,直到另一个greenlet释放它的捕获。

from gevent import sleep
from gevent.pool import Pool
from gevent.lock import BoundedSemaphore

sem = BoundedSemaphore(2)


def worker1(n):
    sem.acquire()
    print('Worker %i acquired semaphore' % n)
    sleep(0)
    sem.release()
    print('Worker %i released semaphore' % n)


def worker2(n):
    with sem:
        print('Worker %i acquired semaphore' % n)
        sleep(0)
    print('Worker %i released semaphore' % n)


pool = Pool()
pool.map(worker1, range(0, 2))
pool.map(worker2, range(3, 6))

Worker 0 acquired semaphore
Worker 1 acquired semaphore
Worker 0 released semaphore
Worker 1 released semaphore
Worker 3 acquired semaphore
Worker 4 acquired semaphore
Worker 3 released semaphore
Worker 4 released semaphore
Worker 5 acquired semaphore
Worker 5 released semaphore

范围为1的信号量也称为锁(lock)。它向单个greenlet提供了互斥访问。 信号量和锁常常用来保证资源只在程序上下文被单次使用。

如果需要在使用 gevent 的时候加锁,也是非常方便的:

# -*- coding: utf-8 -*-
 
import gevent
from gevent.lock import Semaphore
 
sem = Semaphore(1)
 
 
def f1():
    for i in range(5):
        sem.acquire()
        print('run f1, this is ', i)
        sem.release()
        gevent.sleep(1)
 
 
def f2():
    for i in range(5):
        sem.acquire()
        print('run f2, that is ', i)
        sem.release()
        gevent.sleep(0.3)
 
 
t1 = gevent.spawn(f1)
t2 = gevent.spawn(f2)
gevent.joinall([t1, t2])

Thread Locals 线程局部变量

Gevent 还允许您指定 greenlet 上下文的本地数据。在内部,这是作为全局查找实现的,它寻址一个由 greenlet getcurrent() 的值键控的私有命名空间。

import gevent
from gevent.local import local

stash = local()


def f1():
    stash.x = 1
    print(stash.x)


def f2():
    stash.y = 2
    print(stash.y)

    try:
        stash.x
    except AttributeError:
        print("x is not local to f2")


g1 = gevent.spawn(f1)
g2 = gevent.spawn(f2)

gevent.joinall([g1, g2])

许多使用 gevent 的 Web 框架将 HTTP 会话对象存储在 gevent 线程局部变量中。例如,使用 Werkzeug 实用程序库及其代理对象,我们可以创建 Flask 样式的请求对象。

from gevent.local import local
from werkzeug.local import LocalProxy
from werkzeug.wrappers import Request
from contextlib import contextmanager
from gevent.pywsgi import WSGIServer

_requests = local()
request = LocalProxy(lambda: _requests.request)


@contextmanager
def sessionmanager(environ):
    _requests.request = Request(environ)
    yield
    _requests.request = None


def logic():
    return f"Hello {request.remote_addr}".encode('utf-8')


def application_1(environ, start_response):
    status = '200 OK'
    headers = [('Content-type', 'text/plain')]
    start_response(status, headers)
    return [b"Hello, World!"]


def application_2(environ, start_response):
    status = '200 OK'

    with sessionmanager(environ):
        body = logic()

    headers = [
        ('Content-Type', 'text/html')
    ]

    start_response(status, headers)
    return [body]


# http_server = WSGIServer(('0.0.0.0', 8000), application_1, log=None)
http_server = WSGIServer(('0.0.0.0', 8000), application_2, log=None)
http_server.serve_forever()

Flask 系统比这个例子复杂一点,然而使用线程局部变量作为局部的会话存储, 这个思想是相同的。

Subprocess  子进程

自 gevent 1.0 起,gevent.subprocess 添加了 Python subprocess 模块的修补版本。它支持对子进程进行协作等待。

import gevent
from gevent.subprocess import Popen, PIPE


def cron():
    while True:
        print("cron")
        gevent.sleep(0.2)


g = gevent.spawn(cron)
sub = Popen(['sleep 1; uname'], stdout=PIPE, shell=True)
out, err = sub.communicate()
g.kill()
print(out.rstrip())

cron
cron
cron
cron
cron
Linux

很多人也想将 gevent 和 multiprocessing 一起使用。最明显的挑战之一 就是 multiprocessing 提供的进程间通信默认不是协作式的。由于基于 multiprocessing.Connection 的对象(例如Pipe)暴露了它们下面的 文件描述符(file descriptor),gevent.socket.wait_read 和 wait_write 可以用来在直接读写之前协作式的等待 ready-to-read/ready-to-write 事件。

import gevent
from multiprocessing import Process, Pipe
from gevent.socket import wait_read, wait_write

# To Process
a, b = Pipe()

# From Process
c, d = Pipe()


def relay():
    for i in range(10):
        msg = b.recv()
        c.send(msg + " in " + str(i))


def put_msg():
    for i in range(10):
        wait_write(a.fileno())
        a.send('hi')


def get_msg():
    for i in range(10):
        wait_read(d.fileno())
        print(d.recv())


if __name__ == '__main__':
    proc = Process(target=relay)
    proc.start()

    g1 = gevent.spawn(get_msg)
    g2 = gevent.spawn(put_msg)
    gevent.joinall([g1, g2], timeout=1)

然而要注意,组合multiprocessing和gevent必定带来 依赖于操作系统(os-dependent)的缺陷,其中有:

  • 在兼容POSIX的系统创建子进程(forking)之后, 在子进程的gevent的状态是不适定的(ill-posed)。一个副作用就是, multiprocessing.Process创建之前的greenlet创建动作,会在父进程和子进程两 方都运行。

  • 上例的put_msg()中的a.send()可能依然非协作式地阻塞调用的线程:一个 ready-to-write事件只保证写了一个byte。在尝试写完成之前底下的buffer可能是满的。

  • 上面表示的基于wait_write()/wait_read()的方法在Windows上不工作 (IOError: 3 is not a socket (files are not supported)),因为Windows不能监视 pipe事件。

Python包gipc以大体上透明的方式在 兼容POSIX系统和Windows上克服了这些挑战。它提供了gevent感知的基于multiprocessing.Process的子进程和gevent基于pipe的协作式进程间通信。

Actors 并发模型

actor 模型是一个由于Erlang变得普及的更高层的并发模型。 简单的说它的主要思想就是许多个独立的Actor,每个Actor有一个可以从 其它Actor接收消息的收件箱。Actor内部的主循环遍历它收到的消息,并 根据它期望的行为来采取行动。

Gevent 没有原生的 Actor 类型,但在一个子类化的 Greenlet 内使用队列, 我们可以定义一个非常简单的。

import gevent
from gevent.queue import Queue


class Actor(gevent.Greenlet):

    def __init__(self):
        super(Actor, self).__init__()
        self.inbox = Queue()

    def receive(self, message):
        """
        Define in your subclass.
        """
        raise NotImplemented()

    def _run(self):
        self.running = True

        while self.running:
            message = self.inbox.get()
            self.receive(message)

下面是一个使用的例子:

import gevent
from gevent.queue import Queue


class Actor(gevent.Greenlet):

    def __init__(self):
        super(Actor, self).__init__()
        self.inbox = Queue()

    def receive(self, message):
        """
        Define in your subclass.
        """
        raise NotImplemented()

    def _run(self):
        self.running = True

        while self.running:
            message = self.inbox.get()
            self.receive(message)


class Pinger(Actor):
    def receive(self, message):
        print(message)
        pong.inbox.put('ping')
        gevent.sleep(0)


class Ponger(Actor):
    def receive(self, message):
        print(message)
        ping.inbox.put('pong')
        gevent.sleep(0)


ping = Pinger()
pong = Ponger()

ping.start()
pong.start()

ping.inbox.put('start')
gevent.joinall([ping, pong])

实际应用

Gevent ZeroMQ

ZeroMQ 被它的作者描述为 “一个表现得像一个并发框架的socket库”。 它是一个非常强大的,为构建并发和分布式应用的消息传递层。ZeroMQ提供了各种各样的socket原语。最简单的是请求-应答socket对 (Request-Response socket pair)。一个socket有两个方法sendrecv, 两者一般都是阻塞操作。但是Travis Cline 的一个杰出的库弥补了这一点,这个库使用gevent.socket来以非阻塞的方式 轮询ZereMQ socket。安装 gevent-zeremq 命令:pip install gevent-zeromq

import gevent
import zmq.green as zmq

# Global Context
context = zmq.Context()


def server():
    server_socket = context.socket(zmq.REQ)
    server_socket.bind("tcp://127.0.0.1:5000")

    for request in range(1, 10):
        server_socket.send("Hello")
        print('Switched to Server for %s' % request)
        # Implicit context switch occurs here
        server_socket.recv()


def client():
    client_socket = context.socket(zmq.REP)
    client_socket.connect("tcp://127.0.0.1:5000")

    for request in range(1, 10):
        client_socket.recv()
        print('Switched to Client for %s' % request)
        # Implicit context switch occurs here
        client_socket.send("World")


publisher = gevent.spawn(server)
client = gevent.spawn(client)

gevent.joinall([publisher, client])

执行结果

Switched to Server for 1
Switched to Client for 1
Switched to Server for 2
Switched to Client for 2
Switched to Server for 3
Switched to Client for 3
Switched to Server for 4
Switched to Client for 4
Switched to Server for 5
Switched to Client for 5
Switched to Server for 6
Switched to Client for 6
Switched to Server for 7
Switched to Client for 7
Switched to Server for 8
Switched to Client for 8
Switched to Server for 9
Switched to Client for 9

简单 server

# On Unix: Access with ``$ nc 127.0.0.1 5000``
# On Window: Access with ``$ telnet 127.0.0.1 5000``

from gevent.server import StreamServer


def handle(socket, address):
    socket.send("Hello from a telnet!\n")
    for i in range(5):
        socket.send(str(i) + '\n')
    socket.close()


server = StreamServer(('127.0.0.1', 5000), handle)
server.serve_forever()

WSGI 服务器 ( 多进程 )

Gevent 为 HTTP 内容服务提供了两种 WSGI server:

  • gevent.pywsgi.WSGIServer

windows 运行报错,linux 可以正常运行

from gevent import monkey
from gevent.pywsgi import WSGIServer

monkey.patch_all()

import datetime
import os
from multiprocessing import cpu_count, Process
from flask import Flask, jsonify

app = Flask(__name__)


@app.route("/cppla", methods=['GET'])
def function_benchmark():
    return jsonify(
        {
            "status": "ok",
            "time": datetime.datetime.now().strftime('%Y-%m-%d %H:%M'),
            "pid": os.getpid()
        }
    ), 200


def run(multi_process=None):
    if not multi_process:
        WSGIServer(('0.0.0.0', 8080), app).serve_forever()
    else:
        multi_server = WSGIServer(('0.0.0.0', 8080), app)
        multi_server.start()

        def server_forever():
            multi_server.start_accepting()
            multi_server._stop_event.wait()

        for i in range(cpu_count()):
            p = Process(target=server_forever)
            p.start()


if __name__ == "__main__":
    # 单进程 + 协程
    run(False)
    # 多进程 + 协程
    # run(True)

Streaming 服务器

from gevent.pywsgi import WSGIServer


def application(environ, start_response):
    status = '200 OK'
    body = b'<p>Hello World</p>'

    headers = [
        ('Content-Type', 'text/html')
    ]

    start_response(status, headers)
    return [body]


WSGIServer(('', 8000), application).serve_forever()

然而使用pywsgi我们可以将handler写成generator,并以块的形式yield出结果。

from gevent.pywsgi import WSGIServer

def application(environ, start_response):
    status = '200 OK'

    headers = [
        ('Content-Type', 'text/html')
    ]

    start_response(status, headers)
    yield "<p>Hello"
    yield "World</p>"

WSGIServer(('', 8000), application).serve_forever()

Long Polling 长轮询

import gevent
from gevent.queue import Queue, Empty
from gevent.pywsgi import WSGIServer
import simplejson as json

data_source = Queue()


def producer():
    while True:
        data_source.put_nowait('Hello World')
        gevent.sleep(1)


def ajax_endpoint(environ, start_response):
    status = '200 OK'
    headers = [
        ('Content-Type', 'application/json')
    ]

    start_response(status, headers)

    while True:
        try:
            datum = data_source.get(timeout=5)
            yield json.dumps(datum) + '\n'
        except Empty:
            pass


gevent.spawn(producer)
WSGIServer(('', 8000), ajax_endpoint).serve_forever()

Websockets 网络套接字

运行Websocket的例子需要gevent-websocket包。

# Simple gevent-websocket server
import json
import random

from gevent import pywsgi, sleep
from geventwebsocket.handler import WebSocketHandler


class WebSocketApp(object):
    '''Send random data to the websocket'''

    def __call__(self, environ, start_response):
        ws = environ['wsgi.websocket']
        x = 0
        while True:
            data = json.dumps({'x': x, 'y': random.randint(1, 5)})
            ws.send(data)
            x += 1
            sleep(0.5)


server = pywsgi.WSGIServer(
    ("", 10000), WebSocketApp(),
    handler_class=WebSocketHandler
)
server.serve_forever()

HTML Page:

<html>
    <head>
        <title>Minimal websocket application</title>
        <script type="text/javascript" src="jquery.min.js"></script>
        <script type="text/javascript">
        $(function() {
            // Open up a connection to our server
            var ws = new WebSocket("ws://localhost:10000/");

            // What do we do when we get a message?
            ws.onmessage = function(evt) {
                $("#placeholder").append('<p>' + evt.data + '</p>')
            }
            // Just update our conn_status field with the connection status
            ws.onopen = function(evt) {
                $('#conn_status').html('<b>Connected</b>');
            }
            ws.onerror = function(evt) {
                $('#conn_status').html('<b>Error</b>');
            }
            ws.onclose = function(evt) {
                $('#conn_status').html('<b>Closed</b>');
            }
        });
    </script>
    </head>
    <body>
        <h1>WebSocket Example</h1>
        <div id="conn_status">Not Connected</div>
        <div id="placeholder" style="width:600px;height:300px;"></div>
    </body>
</html>

Chat Server 聊天服务器

实现一个实时聊天室, 运行这个例子需要 Flask (你可以使用Django, Pyramid等,但不是必须的)。 对应的Javascript和HTML文件可以在 这里找到。

from flask import Flask, render_template, request
from gevent import queue
from gevent.pywsgi import WSGIServer

import simplejson as json

app = Flask(__name__)
app.debug = True


class Room(object):

    def __init__(self):
        self.users = set()
        self.messages = []

    def backlog(self, size=25):
        return self.messages[-size:]

    def subscribe(self, user):
        self.users.add(user)

    def add(self, message):
        for user in self.users:
            print(user)
            user.queue.put_nowait(message)
        self.messages.append(message)


class User(object):

    def __init__(self):
        self.queue = queue.Queue()


rooms = {
    'topic1': Room(),
    'topic2': Room(),
}
users = {}


@app.route('/')
def choose_name():
    return render_template('choose.html')


@app.route('/<uid>')
def main(uid):
    return render_template(
        'main.html',
        uid=uid,
        rooms=rooms.keys()
    )


@app.route('/<room>/<uid>')
def join(room, uid):
    user = users.get(uid, None)

    if not user:
        users[uid] = user = User()

    active_room = rooms[room]
    active_room.subscribe(user)
    print('subscribe %s %s' % (active_room, user))

    messages = active_room.backlog()

    return render_template(
        'room.html',
        room=room, uid=uid, messages=messages
    )


@app.route("/put/<room>/<uid>", methods=["POST"])
def put(room, uid):
    user = users[uid]
    room = rooms[room]

    message = request.form['message']
    room.add(':'.join([uid, message]))

    return ''


@app.route("/poll/<uid>", methods=["POST"])
def poll(uid):
    try:
        msg = users[uid].queue.get(timeout=10)
    except queue.Empty:
        msg = []
    return json.dumps(msg)


if __name__ == "__main__":
    http = WSGIServer(('', 5000), app)
    http.serve_forever()

gevent 示例

concurrent_download.py

dns_mass_resolve.py

echoserver.py

geventsendfile.py

portforwarder.py

processes.py

psycopg2_pool.py

threadpool.py

udp_client.py

udp_server.py

unixsocket_client.py

unixsocket_server.py

webproxy.py

webpy.py

wsgiserver.py

wsgiserver_ssl.py

Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐