目录

python3在较复杂环境下利用tornado进行并发爬虫

1. 背景介绍

2.需求分析

3.代码实现 


python3在较复杂环境下利用tornado进行并发爬虫

1. 背景介绍

以下链接是tornado 项目客户端并发纵深爬虫demos,不过这个项目并没有提供在构造请求对象情况下的复杂爬虫,鄙人基于此改造了一篇携带请求头及认证信息的并发批量爬虫代码

https://github.com/tornadoweb/tornado/blob/master/demos/webspider/webspider.py

2.需求分析

2.1 get方式爬取某一站点的大批量的分页数据

2.2 需要进行登录认证

2.3 需要保存会话信息

2.4 将api返回的数据进行json格式的转换

2.5 批量入库

2.6 爬取完毕立刻停止爬虫

2.7 将待爬取资源放入队列,并统计最终的成功、失败的链接数量

2.8 计算耗费时间

2.9 不应使用多线程、多进程,利用性能消耗更小的tornado的异步协程

3.代码实现 

代码实现里面,数据已经脱敏,如要测试请自行找目标网站,更换相应的请求头信息和url拼接

#!/usr/bin/env python3

import time
import json
from datetime import timedelta
from tornado import gen, httpclient, ioloop, queues

page = 1
pageSize = 20

# 储备横向爬虫URI
URLS = []
for page in range(1, 100):
    session_monitor = f'https://bizapi.csdn.net/blog-console-api/v1/article/list?page={page}&pageSize={pageSize}'
    URLS.append(session_monitor)

# 并发执行限定
concurrency = 100


async def get_data_from_url(url):
    """获取当前url返回的数据
    """
    ip = '172.200.200.200'
    sessionid = 'cudjqtngdlnn76mugl6lghjo2n'
    header = {
        'Accept': 'application/json, text/javascript, */*; q=0.01',
        'Accept-Encoding': 'gzip, deflate',
        'Accept-Language': 'zh-CN,zh;q=0.9',
        'Connection': 'keep-alive',
        'Content-Length': '0',
        'Cookie': sessionid,
        'Host': ip,
        'Origin': 'http://' + ip,
        'Referer': 'http://' + ip + '/index.html',
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.132 Safari/537.36',
        # 'X-Requested-With': 'XMLHttpRequest'
    }
    request = httpclient.HTTPRequest(url=url,
                                     method='GET',
                                     headers=header,
                                     connect_timeout=2.0,
                                     request_timeout=2.0,
                                     follow_redirects=False,
                                     max_redirects=False)

    response = await httpclient.AsyncHTTPClient().fetch(request)

    print("fetched %s" % url)
    html = response.body.decode(errors="ignore")
    print(html)
    json_ret = json.loads(html)
    print(json_ret)


async def main():
    q = queues.Queue()
    start = time.time()
    fetching, fetched, dead = set(), set(), set()

    for url in URLS:
        await q.put(url)

    async def fetch_url(current_url):
        if current_url in fetching:
            return

        print("fetching %s" % current_url)
        fetching.add(current_url)
        data = await get_data_from_url(current_url)
        # print(data)
        fetched.add(current_url)

    async def worker():
        async for url in q:
            if url is None:
                return
            try:
                await fetch_url(url)
            except Exception as e:
                print("Exception: %s %s" % (e, url))
                dead.add(url)
            finally:
                q.task_done()

    workers = gen.multi([worker() for _ in range(concurrency)])
    await q.join(timeout=timedelta(seconds=300))
    assert fetching == (fetched | dead)
    print("Done in %d seconds, fetched %s URLs." % (time.time() - start, len(fetched)))
    print("Unable to fetch %s URLS." % len(dead))

    # Signal all the workers to exit.
    for _ in range(concurrency):
        await q.put(None)
    await workers


if __name__ == "__main__":
    io_loop = ioloop.IOLoop.current()
    io_loop.run_sync(main)

 

Logo

瓜分20万奖金 获得内推名额 丰厚实物奖励 易参与易上手

更多推荐