如何让一个httphttp同步请求异步请求非阻塞

> 博客详情
摘要: asynchronous coroutine
非堵塞和异步有什么区别?
在tornado的框架中非堵塞一般指得是网络I/O层面的socket数据接收模式(select或者epoll),不论用哪个模式,最终程序都会收到数据并处理数据(这个数据要么被转发、要么被解析和处理)。
非堵塞的弊端:&& 如果处理一个密集计算的请求需要花费10秒钟(就是堵塞了10秒钟),当两个或多个请求同时到达时,只要第一个被接受处理没结束,其他全部请求都要等,并且挨个挨个等到被轮询结束。这就是单线程事件还回机制(非堵塞机制), 对堵塞零容忍, 任何一个地方堵住了还回线程,其他全部请求都被堵住。
也就是说采用了非堵塞模式之后,最好不要用堵塞(常规解析数据的函数)的代码块来解析数据。
异步的作用是将堵塞代码错开来,不放在当前接受数据的线程中处理,
要么丢到rabbitmq/zeromq/activemq中交给另外一个进程去处理,要么用其他线程或进程来处理。
让监听数据的这个socket收到数据后直接抛给其他程序来处理,然后立马保持监听状态,这样子程序的循环能力就非常强。
再就是要提的一点,tornado本身的ioloop就采用epool/select/kqueue来完成非堵塞动作,咱们使用tornado只要把异步的代码写好就可以很好的发挥出tornado的优势了。
堵塞模式编程流程:
传统的I/O(socket)堵塞编程模式流程:
while True:
&&& 1. socket accept (等待)
&&& 2. socket receive (接受数据)
&&& 3. handle data (处理数据)
&&& 4. socket send (返回结果)
非堵塞模式编程流程:
while True:
& && 1. events = epoll poll (主动拉取列表)
&&&& 2. for file_descriptor, event in events: (查找是否有新的请求)
&&&& 3. async handle data
&&&&&&&&&&& 3.1 标注状态(running)
&&&&&&&&&&& 3.2 异步丢给其他函数通过线程的方式执行.
&&&&&&&&&&& 3.3 线程执行完毕后修改状态为(finish), 并且通过回掉的方式注册进 ioloop中(ioloop.add_done_callback或者ioloop.add_future)
&&&& 4. socket send (返回结果)
Python环境准备
&&& 1. python& &= 2.7& & 3.x
&&& 2. pip install requests tornado futures
Server环境准备
blockingServer.py
192.168.1.100
构建用于测试的堵塞环节
blockingClient.py nbAsync.py nbFuture.py nbCoroutine.py nbGenTask.py
192.168.1.101
验证常用异步非堵塞写法
192.168.1.102
启动Server
在192.168.1.100服务器上运行用于测试的堵塞服务器(实际上是非堵塞模式,只不过是每个连接都要等待5秒钟).
# 目的是提供一个堵塞的环境用来证明tornado结合常用的异步写法都是非堵塞高效模式.
python blockingServer.py&&&
# -.- coding:utf-8 -.-
import tornado.web
import tornado.gen
import tornado.ioloop
import tornado.options
import tornado.httpserver
class BlockingHandler(tornado.web.RequestHandler):
@tornado.gen.coroutine
def get(self, *args, **kwargs):
# 如果这条命令没看懂的话,请参考这个链接: http://www.tornadoweb.org/en/stable/faq.html
yield tornado.gen.sleep(5)
self.write('ok')
class Application(tornado.web.Application):
def __init__(self):
handlers = [
('/blocking', BlockingHandler),
super(Application, self).__init__(handlers)
if __name__ == "__main__":
tornado.options.define("port", default=88, help="run on the given port", type=int)
tornado.options.parse_command_line()
http_server = tornado.httpserver.HTTPServer(Application())
http_server.listen(tornado.options.options.port)
tornado.ioloop.IOLoop.current().start()
1. tornado + 非异步代码(堵塞的代码)
# 文件名: blockingClient.py
# -.- coding:utf-8 -.-
# __author__ = 'zhengtong'
import tornado.ioloop
import tornado.web
import tornado.options
import tornado.httpserver
import requests
class Application(tornado.web.Application):
def __init__(self):
handlers = [
('/blocking', BlockHandler),
('/non_blocking', NonBlockHandler),
super(Application, self).__init__(handlers)
class BlockHandler(tornado.web.RequestHandler):
def get(self, *args, **kwargs):
response = requests.get('http://192.168.1.100:88/blocking')
# blocked here.
result = dict(response.headers)
result.update({'content': response.content})
self.write(result)
class NonBlockHandler(tornado.web.RequestHandler):
def get(self, *args, **kwargs):
self.write('non_blocking')
if __name__ == "__main__":
tornado.options.define("port", default=80, help="run on the given port", type=int)
tornado.options.parse_command_line()
http_server = tornado.httpserver.HTTPServer(Application())
http_server.listen(tornado.options.options.port)
tornado.ioloop.IOLoop.current().start()
&&& 1. 在192.168.1.102压力测试服务器上运行如下并发测试命令.
# 发起10个并发,持续60秒钟.
[root@localhost ~]# siege http://192.168.1.101/blocking -c10 -t60s
&&& 2. 在 windows 8 (192.168.1.101)上用浏览器来访问如下链接.
http://192.168.1.101/non_blocking
** SIEGE 4.0.2
** Preparing 10 concurrent users for battle.
The server is now under siege...
HTTP/1.1 200
5.07 secs:
212 bytes ==& GET
HTTP/1.1 200
10.15 secs:
212 bytes ==& GET
HTTP/1.1 200
15.23 secs:
212 bytes ==& GET
HTTP/1.1 200
20.31 secs:
212 bytes ==& GET
HTTP/1.1 200
25.38 secs:
212 bytes ==& GET
HTTP/1.1 200
30.47 secs:
212 bytes ==& GET
HTTP/1.1 200
35.55 secs:
212 bytes ==& GET
HTTP/1.1 200
40.63 secs:
212 bytes ==& GET
HTTP/1.1 200
45.71 secs:
212 bytes ==& GET
HTTP/1.1 200
45.53 secs:
212 bytes ==& GET
HTTP/1.1 200
45.51 secs:
212 bytes ==& GET
Lifting the server siege...
Transactions:
Availability:
Elapsed time:
59.65 secs
Data transferred:
Response time:
29.05 secs
Transaction rate:
0.18 trans/sec
Throughput:
0.00 MB/sec
Concurrency:
Successful transactions:
Failed transactions:
Longest transaction:
Shortest transaction:
&&&&&&&&non_block也是等待状态,必须要等block执行完成后,才会执行non_block.
&&& siege在60秒钟内,只得到了11个结果,证明堵塞非常严重, 并且浏览器也是出于一直等待的状态.
&&& 也就是说在tornado中如果写堵塞代码,只有单线程在运行的tornado,会死的很难看,刚接触tornado的同学甚至都不知道为什么会这样,根本没有像听说那样tornado是一个极其高效的web框架。
&&& 通过结果可以看出,不采用异步的方式就无法发挥出它的能力。
2. tornado.web.asynchronous
# 文件名: nbAysnc.py
# -.- coding:utf-8 -.-
# __author__ = 'zhengtong'
import tornado.ioloop
import tornado.web
import tornado.options
import tornado.httpserver
# import requests
# 不用requests, 后面再讨论用requests也能异步非堵塞.
import tornado.httpclient
# 采用tornado自带的异步httpclient客户端
class Application(tornado.web.Application):
def __init__(self):
handlers = [
('/blocking', BlockHandler),
('/non_blocking', NonBlockHandler),
super(Application, self).__init__(handlers)
class BlockHandler(tornado.web.RequestHandler):
@tornado.web.asynchronous
def get(self, *args, **kwargs):
client = tornado.httpclient.AsyncHTTPClient()
client.fetch('http://192.168.1.100:88/blocking', callback=self.on_response)
def on_response(self, content):
result = dict(content.headers)
result.update({'content': content.body})
self.write(result)
self.finish()
class NonBlockHandler(tornado.web.RequestHandler):
def get(self, *args, **kwargs):
self.write('non_blocking')
if __name__ == "__main__":
tornado.options.define("port", default=80, help="run on the given port", type=int)
tornado.options.parse_command_line()
http_server = tornado.httpserver.HTTPServer(Application())
http_server.listen(tornado.options.options.port)
tornado.ioloop.IOLoop.current().start()
# 这里提供一个不采用任何装饰器的写法, 比较raw ioloop, 运行结果是一致的,效率也是一致的.
# 文件名: nbAsync_NoAsyncDecorator.py
# -.- coding:utf-8 -.-
# __author__ = 'zhengtong'
import tornado.ioloop
import tornado.web
import tornado.options
import tornado.httpserver
import tornado.concurrent
# import requests
# 不只用requests
import tornado.httpclient
# 采用tornado自带的异步httpclient客户端
class Application(tornado.web.Application):
def __init__(self):
handlers = [
('/blocking', BlockHandler),
('/non_blocking', NonBlockHandler),
super(Application, self).__init__(handlers)
class BlockHandler(tornado.web.RequestHandler):
def get(self, *args, **kwargs):
# def get上方移除了tornado.web.asynchonous装饰器
self._auto_finish = False
client = tornado.httpclient.AsyncHTTPClient()
future = client.fetch('http://192.168.1.100:88/blocking')
# 在这里添加callback也行
tornado.ioloop.IOLoop.current().add_future(future, callback=self.on_response)
def on_response(self, content):
result = dict(content.headers)
result.update({'content': content.body})
self.write(result)
self.finish()
class NonBlockHandler(tornado.web.RequestHandler):
def get(self, *args, **kwargs):
self.write('non_blocking')
if __name__ == "__main__":
# 通过define 可以为options增加变量.
tornado.options.define("port", default=80, help="run on the given port", type=int)
tornado.options.parse_command_line()
http_server = tornado.httpserver.HTTPServer(Application())
http_server.listen(tornado.options.options.port)
tornado.ioloop.IOLoop.current().start()
&&&&&&& 参考&1. tornado + 非异步代码(堵塞的代码) &章节的测试方法.
Lifting the server siege...
Transactions:
Availability:
Elapsed time:
59.13 secs
Data transferred:
Response time:
Transaction rate:
1.69 trans/sec
Throughput:
0.00 MB/sec
Concurrency:
Successful transactions:
Failed transactions:
Longest transaction:
Shortest transaction:
&&&&&&& 访问non_blocking页面正常而且响应很快。
&&&&&&& siege一直在持续并发请求的同时用浏览器来访问non_blocking和blocking页面都能够得到响应,也就证明tornado已经开始发挥它的功效了。
&&&&&&& 采用了异步非堵塞模式后,被命中只有110次,落差很大,心理非常不平衡。其实这并不是问题,这里面有多重限制所以才会导致这个结果。
&&&&&&& 1. AsyncHttpClient本身的限制(默认情况下只允许同时发起10个客户端).& 详情请参考tornado源码的 simple_httpclient.py文件
&&&&&&& 2. ioloop本身的限制(为了保证线程的稳定性,默认只开启了10个线程来支持并发). 详情请参考tornado源码的 netutil.py文件
&&&&&&& 可以通过设定参数来提高并发能力(将 tornado.httpclient.AsyncHTTPClient() 改为 tornado.httpclient.AsyncHTTPClient(max_clients=100)).
&&&&&&& max_clients由默认的10改为100后,测试结果的hits也随之增加了十倍.
Lifting the server siege...
Transactions:
Availability:
Elapsed time:
59.71 secs
Data transferred:
Response time:
Transaction rate:
18.41 trans/sec
Throughput:
0.00 MB/sec
Concurrency:
Successful transactions:
Failed transactions:
Longest transaction:
Shortest transaction:
3. tornado.concurrent.futures
# 文件名: nbFuture.py
# 备注: 在第二章节中的移除tornado.web.asynchonous装饰器的写法同样适合futures. 详情请参考源码文件: nbFuture_NoAsyncDecorator.py
# -.- coding:utf-8 -.-
# __author__ = 'zhengtong'
import tornado.ioloop
import tornado.web
import tornado.options
import tornado.httpserver
import tornado.concurrent
# import requests
# 不只用requests
import tornado.httpclient
# 采用tornado自带的异步httpclient客户端
class Application(tornado.web.Application):
def __init__(self):
handlers = [
('/blocking', BlockHandler),
('/non_blocking', NonBlockHandler),
super(Application, self).__init__(handlers)
class BlockHandler(tornado.web.RequestHandler):
@tornado.web.asynchronous
def get(self, *args, **kwargs):
client = tornado.httpclient.AsyncHTTPClient()
future = tornado.concurrent.Future()
fetch_future = client.fetch('http://192.168.1.100:88/blocking', callback=self.on_response)
fetch_future.add_done_callback(lambda x: future.set_result(x.result()))
def on_response(self, content):
result = dict(content.headers)
result.update({'content': content.body})
self.write(result)
self.finish()
class NonBlockHandler(tornado.web.RequestHandler):
def get(self, *args, **kwargs):
self.write('non_blocking')
if __name__ == "__main__":
# 通过define 可以为options增加变量.
tornado.options.define("port", default=80, help="run on the given port", type=int)
tornado.options.parse_command_line()
http_server = tornado.httpserver.HTTPServer(Application())
http_server.listen(tornado.options.options.port)
tornado.ioloop.IOLoop.current().start()
&&&&&&&& 参考&1. tornado + 非异步代码(堵塞的代码) &章节的测试方法.
&&&&&&& 于&2. tornado.web.asynchronous &的测试结果基本一致.
&&&&&&& future是官方特别推荐用来练习的一种编码方式,因为这样会比较深入的了解tornado的运作原理。
&&&&&&& future的add_done_callback方法,是告诉ioloop当future的状态变更为完成的时候,就调用包裹在add_done_callback中的函数(或匿名函数).
&&&&&&& future还提供了一组produce方法和consumer方法, 用于管理future的状态.
4. tornado.gen.Task
# 文件名: nbGenTask.py
# -.- coding:utf-8 -.-
# __author__ = 'zhengtong'
import tornado.ioloop
import tornado.web
import tornado.options
import tornado.httpserver
import tornado.concurrent
import tornado.gen
# 导入tornado.gen模块
# import requests
# 不只用requests
import tornado.httpclient
# 采用tornado自带的异步httpclient客户端
class Application(tornado.web.Application):
def __init__(self):
handlers = [
('/blocking', BlockHandler),
('/non_blocking', NonBlockHandler),
super(Application, self).__init__(handlers)
class BlockHandler(tornado.web.RequestHandler):
@tornado.gen.coroutine
def get(self, *args, **kwargs):
client = tornado.httpclient.AsyncHTTPClient()
content = yield tornado.gen.Task(client.fetch, ('http://192.168.1.100:88/blocking'))
result = dict(content.headers)
result.update({'content': content.body})
self.write(result)
self.finish()
class NonBlockHandler(tornado.web.RequestHandler):
def get(self, *args, **kwargs):
self.write('non_blocking')
if __name__ == "__main__":
tornado.options.define("port", default=80, help="run on the given port", type=int)
tornado.options.parse_command_line()
http_server = tornado.httpserver.HTTPServer(Application())
http_server.listen(tornado.options.options.port)
tornado.ioloop.IOLoop.current().start()
&&&&&&&& 参考&1. tornado + 非异步代码(堵塞的代码) &章节的测试方法.
&&&&&&&& 于&2. tornado.web.asynchronous &的测试结果基本一致.
&&&&&&& tornado.gen.Task需要配合tornado.gen.coroutine装饰器来完成代码的运行,因为Task利用了yield,它的隐藏方法run()利用了gen.send()方法,所以gen模块必须要用coroutine装饰器.
&&&&&&& 利用coroutine的方式比较明显的一个地方是,代码不用再分开了, 这个是Python语言的一个特性,yield关键字可以赋值给一个变量, 因此就不需要callback了.
&&&&&&& 这样有什么好处? 本地变量和全局变量不用传递了,默认就是共享的,这个算不算很爽?
5. tornado.gen.coroutine + ThreadPool/ProcessPool
# 文件名: nbFuture.py
# -.- coding:utf-8 -.-
# __author__ = 'zhengtong'
import tornado.ioloop
import tornado.web
import tornado.options
import tornado.httpserver
import tornado.concurrent
import tornado.gen
import requests
import tornado.concurrent
# 导入 tornado.concurrent 并发模块
class Application(tornado.web.Application):
def __init__(self):
handlers = [
('/blocking', BlockHandler),
('/non_blocking', NonBlockHandler),
super(Application, self).__init__(handlers)
# 建议设定为CPU核心数量 * 4或8或16也是可以接受的, 取决于计算量,计算量越大设定的值应该越小.
self.executor = tornado.concurrent.futures.ThreadPoolExecutor(16)
class BlockHandler(tornado.web.RequestHandler):
def executor(self):
return self.application.executor
@tornado.gen.coroutine
def get(self, *args, **kwargs):
print dir(self)
content = yield self.executor.submit(requests.get, ('http://192.168.1.100:88/blocking'))
result = dict(content.headers)
result.update({'content': content.content})
self.write(result)
class NonBlockHandler(tornado.web.RequestHandler):
def get(self, *args, **kwargs):
self.write('non_blocking')
if __name__ == "__main__":
# 通过define 可以为options增加变量.
tornado.options.define("port", default=80, help="run on the given port", type=int)
tornado.options.parse_command_line()
http_server = tornado.httpserver.HTTPServer(Application())
http_server.listen(tornado.options.options.port)
tornado.ioloop.IOLoop.current().start()
&&&&&&&& 参考&1. tornado + 非异步代码(堵塞的代码) &章节的测试方法.
&&&&&&&& 于&2. tornado.web.asynchronous &的测试结果基本一致.
&&&&&&& 自从了coroutine、threadpool、processpool之后,tornado算是一个里程碑式的解放了对异步的要求, 原因是tornado的异步库只针对httpclient, 没有针对mysql或者其他数据库的异步库(自己写一个异步库难度太高,因为辗转十几个源码文件的重度调用以及每个类中的状态控制)。
&&&&&&& coroutine结合threadpool让编写异步代码不再拆成多个函数,变量能够共享,堵塞的代码(例如 requests、mysql.connect、密集计算)可以不影响ioloop,形成真正的闭合.
&&&&&&&&&& /tornadoweb/tornado/wiki
&&&&&&&&&& /python-epoll-howto.html
&&&&&&&&&& tornado源码
人打赏支持
码字总数 14739
谢谢你提出来,把asynchronous装饰器去掉之后,需要在get方法的下面声明一个内置变量 self._auto_finish = Falseclass BlockHandler(tornado.web.RequestHandler):
def get(self, *args, **kwargs):
# def get上方移除了tornado.web.asynchonous装饰器
self._auto_finish = False
client = tornado.httpclient.AsyncHTTPClient()
支付宝支付
微信扫码支付
打赏金额: ¥
已支付成功
打赏金额: ¥异步和非阻塞I/O
实时web应用需要保持对每一个用户的长连接.在传统的同步web服务器中,通过给每个用户分发一个线程来实现,这样代价很昂贵.
为了最小化并发连接的成本,Tornado使用单线程事件循环.这意味着所有的应有代码应该是异步和非阻塞的,因为在一个时刻只能有一个活跃操作.
异步和非阻塞的情况非常相似,而且内部经常关联,但是他们不是同样的一件事情.
函数在它返回结果前等待其他事情时阻塞.函数可能由其他原因阻塞:网络I/O,磁盘I/O,并发锁,等.事实上,每个函数都阻塞,多少有些阻塞,因为它运行时会使用CPU(极端的例子是演示CPU阻塞应该比其他种类的阻塞更引起重视,比如像bcrypt的密码hash函数会消耗许多CPU时间,比典型的网络和磁盘消耗更多时间)
有些情况下函数可能会阻塞,有些情况下函数不会阻塞.比如,tornado.httpclient默认配置下会在DNS解析中阻塞,但是在其他网络过程中不会阻塞(使用ThreadedResolver或使用libcurl正确配置的tornado.curl_httpclient来缓解这个).在Tornado上下文中我们通常讨论的是网络I/O阻塞,虽然所有种类的阻塞都应该最小化.
异步函数在它完成之前就已经返回,通常引起一些在后台发生的工作,然后在应用中触发一些将来的动作(和通常的同步函数相反,处理完成后才返回).有许多种类的异步接口:
返回占位符(Future,Promise,Deferred)
分发到一个对列中
注册回调(比如POSIX信号)
不管使用哪种接口,通过定义的异步函数和调用它们不同步;没有方式使得同步函数在对它的调用者透明的情况下异步(像gevent的系统使用轻量级的线程来提供与异步相似的性能,但是它们没有真正的使得事物异步)
下面是同步函数的例子:
from tornado.httpclient import HTTPClient
def synchronous_fetch(url):
http_client=HTTPClient()
response=http_client.fetch(url)
return response.body
下面是使用回调参数来将函数重写为异步执行:
from tornado.httpclient impot AsyncHTTPClient
def asynchronous_fetch(url,callback):
http_client=AsyncHTTPClient
def handle_response(response):
callback(response.body)
http_client.fetch(url,callback=handle_response)
使用Future代替回调:
from tornado.concurrent import Future
def async_fetch_future(url):
http_client=AsyncHTTPClient()
my_future=Future()
fetch_future=http_client.fetch(url)
fetch_future.add_done_callback(
lambda f:my_future.set_result(f.result()))
return my_future
原生的Future版本更复杂,但是Futures在Tornado中更加推荐使用,因为它的两个优点.因为Future.result方法可以简单的抛出错误(比起面向回调接口的点对点的错误处理)错误处理更连续,并且Future使得他们使用协同也方便.协同将在这个章节的下一部分重点讨论.下面是我们的函数使用协同的例子,和先前的同步版本非常相似:
from tornado import gen
@gen.coroutine
def fetch_coroutine(url):
http_client=AsyncHTTPClient()
response=yield http_client.fetch(url)
raise gen.Return(response.body)
raise gen.Return(response.body)是Python 2的写法,因为Python 2 生成器中不允许返回值.为了避免此事,Tornado 协程抛出特殊的称作Return的意外.协程可以捕获这个错误把它当做返回值对待.Python 3.3和以后版本中,return response.body可以达到同样效果.
阅读(...) 评论() &}

我要回帖

更多关于 lua http请求会阻塞吗 的文章

更多推荐

版权声明:文章内容来源于网络,版权归原作者所有,如有侵权请点击这里与我们联系,我们将及时删除。

点击添加站长微信