[Python 爬虫] 使用 Scrapy 爬取新浪微博用户信息(四) —— 应对反爬技术(选取 User-Agent、添加 IP代理池以及Cookies池 )
上一篇:[Python 爬虫] 使用 Scrapy 爬取新浪微博用户信息(三) —— 数据的持久化——使用MongoDB存储爬取的数据最近项目有些忙,很多需求紧急上行,所以一直没能完善《使用 Scrapy 爬取新浪微博用户信息》这一系列的博客,今天好不容易闲下来,就完成这一系列最后一节:选取 User-Agent、添加 IP代理池以及Cookies池。在上一篇博客中,我们介绍了如何对爬取的用...
上一篇:[Python 爬虫] 使用 Scrapy 爬取新浪微博用户信息(三) —— 数据的持久化——使用MongoDB存储爬取的数据
最近项目有些忙,很多需求紧急上线,所以一直没能完善《 使用 Scrapy 爬取新浪微博用户信息》这一系列的博客,今天好不容易闲下来,就完成这一系列最后一节:选取 User-Agent、添加 IP代理池以及Cookies池。在上一篇博客中,我们介绍了如何对爬取的用户信息进行持久化处理,存入了 MongDB,但是并没有限制爬取速度,导致爬虫程序频繁出现 418 响应码,这是微博反爬的一种策略,这一篇博客我们就来介绍如何应对目标网页的反爬程序,需要注意的是,微博反爬策略是针对用户的,在只用单用户的情况下,只能降低爬取频率,当然,如果手里有一批账号,可以采用多账号的Cookies池,当出现 418 请求时,就切换 Cookies,由于我目前只有一个账号,因此我只能通过降低爬取频率来应对新浪微博的反爬策略,但是我会用单个账号获取多个 Cookies 来模拟多账户情况下的Cookies池。
选取 User-Agent
User Agent 中文名为用户代理,简称 UA,它是一个特殊字符串头,使得服务器能够识别客户使用的操作系统及版本、CPU 类型、浏览器及版本、浏览器渲染引擎、浏览器语言、浏览器插件等。(数据来自:百度百科 User-Agent)
User-Agent 可以通过浏览器调试模式,然后选择 Network,任意查看一个连接,就能找到,如下图:
在这里我们通过上网查找了几个 User-Agent,添加到 settings.py 文件内,代码如下:
# User-Agent 列表,提供随机 User-Agent
USER_AGENT_LIST = [
"Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; AcooBrowser; .NET CLR 1.1.4322; .NET CLR 2.0.50727)",
"Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 6.0; Acoo Browser; SLCC1; .NET CLR 2.0.50727; Media Center PC 5.0; .NET CLR 3.0.04506)",
"Mozilla/4.0 (compatible; MSIE 7.0; AOL 9.5; AOLBuild 4337.35; Windows NT 5.1; .NET CLR 1.1.4322; .NET CLR 2.0.50727)",
"Mozilla/5.0 (Windows; U; MSIE 9.0; Windows NT 9.0; en-US)",
"Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; Win64; x64; Trident/5.0; .NET CLR 3.5.30729; .NET CLR 3.0.30729; .NET CLR 2.0.50727; Media Center PC 6.0)",
"Mozilla/5.0 (compatible; MSIE 8.0; Windows NT 6.0; Trident/4.0; WOW64; Trident/4.0; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; .NET CLR 1.0.3705; .NET CLR 1.1.4322)",
"Mozilla/4.0 (compatible; MSIE 7.0b; Windows NT 5.2; .NET CLR 1.1.4322; .NET CLR 2.0.50727; InfoPath.2; .NET CLR 3.0.04506.30)",
"Mozilla/5.0 (Windows; U; Windows NT 5.1; zh-CN) AppleWebKit/523.15 (KHTML, like Gecko, Safari/419.3) Arora/0.3 (Change: 287 c9dfb30)",
"Mozilla/5.0 (X11; U; Linux; en-US) AppleWebKit/527+ (KHTML, like Gecko, Safari/419.3) Arora/0.6",
"Mozilla/5.0 (Windows; U; Windows NT 5.1; en-US; rv:1.8.1.2pre) Gecko/20070215 K-Ninja/2.1.1",
"Mozilla/5.0 (Windows; U; Windows NT 5.1; zh-CN; rv:1.9) Gecko/20080705 Firefox/3.0 Kapiko/3.0",
"Mozilla/5.0 (X11; Linux i686; U;) Gecko/20070322 Kazehakase/0.4.5",
"Mozilla/5.0 (X11; U; Linux i686; en-US; rv:1.9.0.8) Gecko Fedora/1.9.0.8-1.fc10 Kazehakase/0.5.6",
"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/535.11 (KHTML, like Gecko) Chrome/17.0.963.56 Safari/535.11",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_7_3) AppleWebKit/535.20 (KHTML, like Gecko) Chrome/19.0.1036.7 Safari/535.20",
"Opera/9.80 (Macintosh; Intel Mac OS X 10.6.8; U; fr) Presto/2.9.168 Version/11.52",
"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/536.11 (KHTML, like Gecko) Chrome/20.0.1132.11 TaoBrowser/2.0 Safari/536.11",
"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.1 (KHTML, like Gecko) Chrome/21.0.1180.71 Safari/537.1 LBBROWSER",
"Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; WOW64; Trident/5.0; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; Media Center PC 6.0; .NET4.0C; .NET4.0E; LBBROWSER)",
"Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; QQDownload 732; .NET4.0C; .NET4.0E; LBBROWSER)",
"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/535.11 (KHTML, like Gecko) Chrome/17.0.963.84 Safari/535.11 LBBROWSER",
"Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 6.1; WOW64; Trident/5.0; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; Media Center PC 6.0; .NET4.0C; .NET4.0E)",
"Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; WOW64; Trident/5.0; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; Media Center PC 6.0; .NET4.0C; .NET4.0E; QQBrowser/7.0.3698.400)",
"Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; QQDownload 732; .NET4.0C; .NET4.0E)",
"Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; Trident/4.0; SV1; QQDownload 732; .NET4.0C; .NET4.0E; 360SE)",
"Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; QQDownload 732; .NET4.0C; .NET4.0E)",
"Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 6.1; WOW64; Trident/5.0; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; Media Center PC 6.0; .NET4.0C; .NET4.0E)",
"Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.1 (KHTML, like Gecko) Chrome/21.0.1180.89 Safari/537.1",
"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.1 (KHTML, like Gecko) Chrome/21.0.1180.89 Safari/537.1",
"Mozilla/5.0 (iPad; U; CPU OS 4_2_1 like Mac OS X; zh-cn) AppleWebKit/533.17.9 (KHTML, like Gecko) Version/5.0.2 Mobile/8C148 Safari/6533.18.5",
"Mozilla/5.0 (Windows NT 6.1; Win64; x64; rv:2.0b13pre) Gecko/20110307 Firefox/4.0b13pre",
"Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:16.0) Gecko/20100101 Firefox/16.0",
"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.11 (KHTML, like Gecko) Chrome/23.0.1271.64 Safari/537.11",
"Mozilla/5.0 (X11; U; Linux x86_64; zh-CN; rv:1.9.2.10) Gecko/20100922 Ubuntu/10.10 (maverick) Firefox/3.6.10",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.36",
]
添加了 User-Agent 列表之后,我们在 middleware.py 中使用它,我们在 middleware.py 定义一个名为 RandomUserAgentMiddleware 的类,该类继承了 UserAgentMiddleware 类,在开始构造一个爬虫请求前,会调用 RandomUserAgentMiddleware 类的 from_crawler() 方法,构造请求后,发送请求前,将会执行 RandomUserAgentMiddleware 类 的 from_crawler() 方法,代码如下:
from scrapy.http.headers import Headers
from scrapy.downloadermiddlewares.useragent import UserAgentMiddleware
class RandomUserAgentMiddleware(UserAgentMiddleware):
"""
随机选取 代理(User-Agent)
"""
def __init__(self, user_agent):
self.user_agent = user_agent
self.headers = Headers()
@classmethod
def from_crawler(cls, crawler):
"""
开始构造请求前执行的方法\n
:param crawler:整个爬虫的全局对象\n
:return:
"""
# 从配置里获取 用户代理(User-Agent) 列表
return cls(user_agent=crawler.settings.get('USER_AGENT_LIST'))
def process_request(self, request, spider):
"""
发送请求前执行的方法\n
:param request:请求\n
:param spider:爬虫应用\n
:return:
"""
# 从 代理 列表中随机选取一个 代理
agent = random.choice(self.user_agent)
print('当前 User-Agent :', agent)
self.headers['User-Agent'] = agent
request.headers = self.headers
目前,我们定义了在中间件(middleware)中定义了随机选取 User-Agent 的类,但是如果要使用该类,还得在 settings.py 中启用该中间件,并设置优先级,代码如下:
DOWNLOADER_MIDDLEWARES = {
'sina_scrapy.middlewares.RandomUserAgentMiddleware': 10
}
好了,现在打开控制台(Console)使用 scrapy crawl sina_user 命令启动爬虫,可以看到输出如下的日志信息,每次爬虫请求使用的 User-Agent 都是从 USER_AGENT_LIST 中随机获取的,如下图所示:
添加 IP代理池
在反爬技术中,很多目标网页会记录访问者的 ip,并通过计算单位时间内同一 ip 访问网站的次数,如果次数过高,则被视为爬虫程序,然后服务器将拦截该请求。对此,不难想到,采取使用多个 ip 来访问目标网站,可以有效的应对这种反爬机制。对于 ip 资源的来源,目前网上有很多代理,提供了很多可用的 ip,例如:西刺代理、快代理等。需要注意的是,代理网站提供的 ip 是有时效的,因此,我们需要动态地获取代理 ip,在这里,我只是采用最简单地爬虫爬取代理网站的 ip,我们并不能保证所有获取的 ip 的都是可用的,因此还需要增加校验机制,确定获取的 ip 是否有效,最直接的办法就是利用 ping 命令,去 ping ip,看该 ip 是否能够 ping 通,为此,我们新建一个包,名为 utils,然后在该包下新建 crawl_proxy.py 脚本,脚本如下:
# -*-* encoding:UTF-8 -*-
# author : mengy
# date : 2019/9/1
# python-version : Python 3.7.0
# description :
import re, subprocess as sp, time, json
from urllib import request
from bs4 import BeautifulSoup
from sina_scrapy.utils.cache_utils import Cache
from sina_scrapy.utils.thread_pool import ThreadPool
executor = ThreadPool()
cache = Cache()
# 西刺代理 URL
PROXY_IP_XICI_URL = 'https://www.xicidaili.com/nn/%s'
# 快代理 URL
PROXY_IP_QUICK_URL = 'https://www.kuaidaili.com/free/inha/%s/'
# 模拟请求头
PROXY_IP_XICI_HEADERS = {
'Host': 'www.xicidaili.com',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/74.0.3729.157 Safari/537.36',
'Accept - Encoding': 'gzip, deflate, br',
'Accept - Language': 'zh - CN, zh;q = 0.9, en;q = 0.8',
'Cookie': '_free_proxy_session = BAh7B0kiD3Nlc3Npb25faWQGOgZFVEkiJTBhMGNlZjVlYjdjNDU5NjY3ZDNlOGU0YmQ4NTU0OTBhBjsAVEkiEF9jc3JmX3Rva2VuBjsARkkiMVZpMzIrOVV3aFp5cnJXR3hTVUtFRy9ud0MxMGtyY2R3WjJzMjltSFNSeEE9BjsARg % 3D % 3D - -55779e702f4e95b04fa84eafbb70ccb4006cd839;Hm_lvt_0cf76c77469e965d2957f0553e6ecf59 = 1558427855, 1558427893, 1558427898, 1558427901;Hm_lpvt_0cf76c77469e965d2957f0553e6ecf59 = 1558428119'
}
PROXY_IP_QUICK_HEADERS = {
'Host': 'www.kuaidaili.com',
'Connection': ' keep-alive',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/74.0.3729.169 Safari/537.36',
'Cookie': 'channelid=bdtg_a10_a10a1; sid=1559283308913843; _ga=GA1.2.594886518.1559283655; _gid=GA1.2.578719903.1559283655; Hm_lvt_7ed65b1cc4b810e9fd37959c9bb51b31=1559283656; Hm_lpvt_7ed65b1cc4b810e9fd37959c9bb51b31=1559283719'
}
# 代理ip列表在缓存中的命名
PROXY_IP_NAMESPACE = 'POOL_PROXY_IPS'
# 缓存中代理 ip 失效时间(s)
PROXY_IP_EXPIRE = 15 * 60
# ping ip 最高丢包率(%)
MAX_LOST = 75
# ping ip 最大延迟时间(ms)
MAX_TIMEOUT = 1000
def get_ips(pages=1, refresh=False):
"""
获取代理ip,优先从缓存取,如果缓存为空,则爬取新的代理 ip,并更新缓存\n
:param refresh: 是否强制爬取\n
:return:
"""
if refresh:
return crawl_quick(pages)
else:
# 从缓存中查询代理ip
data = cache.lrange(name=PROXY_IP_NAMESPACE, start=0, end=100)
if not data:
print(u'缓存数据为空!开始爬取高匿代理ip')
return crawl_quick(pages)
else:
return data
def sub_thread(ip_info):
"""
校验 ip 是否连通\n
:param ip_info:
:return:
"""
if check_ip(ip_info.get('ip')):
# 将可用的 ip 放入缓存
cache.lpush(PROXY_IP_NAMESPACE, json.dumps(ip_info))
# 如果 ip 可用,则返回 ip 的信息
return json.dumps(ip_info)
else:
return None
def crawl_quick(page=1):
"""
请求 快代理 爬取高匿代理 ip\n
:param page:
:return:
"""
print(u'请求 快代理 爬取高匿代理 ip')
assert 1 <= page <= 10, '页数有效范围为(1 - 10)'
validate_ips = []
for i in range(page):
req = request.Request(url=PROXY_IP_QUICK_URL % str(i + 1), headers=PROXY_IP_QUICK_HEADERS)
response = request.urlopen(req)
if response.status == 200:
# 解析页面元素
soap = BeautifulSoup(str(response.read(), encoding='utf-8'), 'lxml')
ip_table = soap.select('#list > table > tbody > tr')
ips = []
# 获取当前页的所有 ip 信息
for data in ip_table:
item = data.text.split('\n')
info = {}
ip, port, area, proxy_type, protocol, alive_time, check_time = item[1], item[2], item[5], item[3], item[
4], '', item[7]
url = str.lower(protocol) + "://" + ip + ":" + port
# 将 ip 信息封装成字典
info.update(ip=ip, port=port, area=area, type=proxy_type, protocol=protocol, alive_time=alive_time,
check_time=check_time, url=url, add_time=int(time.time()))
ips.append(info)
# 遍历爬取的 ip 信息,校验 ip 是否连通
tasks = [executor.submit(sub_thread, (ip_info)) for ip_info in ips]
# 轮询所有完成的线程,查询线程的执行结果
for task in executor.completed_tasks(tasks):
data = task.result()
if data:
# 将线程执行结果返回
validate_ips.append(data)
# 降低爬取频率
time.sleep(2.5)
# 当还没有子线程返回可用的 ip 时,再次查询缓存
if not validate_ips:
validate_ips = cache.lrange(name=PROXY_IP_NAMESPACE, start=0, end=100)
# 设置缓存超时时间
cache.expire(name=PROXY_IP_NAMESPACE, time=PROXY_IP_EXPIRE)
print(u'本次爬取 ip :%d 条,有效:%d 条' % (15 * page, len(validate_ips)))
return validate_ips
def check_ip(ip):
"""
通过 ping ip 来验证 ip 是否有效\n
:param ip: 待 ping 的 ip
:return:
"""
assert ip, 'ip 不能为空!'
# CMD 命令(windows)
cmd = 'ping -n 4 -w 4 %s' % ip
# 参数 shell 设为 true,程序将通过 shell 来执行,subprocess.PIPE 可以初始化 stdin , stdout 或 stderr 参数。表示与子进程通信的标准流
p = sp.Popen(cmd, stdin=sp.PIPE, stdout=sp.PIPE, stderr=sp.PIPE, shell=True)
out = p.stdout.read().decode('gbk')
# 丢失率
lost_ratio = re.compile(u'(\d+)% 丢失', re.IGNORECASE).findall(out)
# 平均耗时
avg_time = re.compile(u'平均 = (\d+)', re.IGNORECASE).findall(out)
# 如果失败率高于最高丢包率则丢弃
if lost_ratio[0] and int(lost_ratio[0]) > MAX_LOST:
print('%s 失败率过高!丢弃' % ip)
return False
# 如果响应时间高于最大延迟时间则丢弃
if avg_time and int(avg_time[0]) > MAX_TIMEOUT:
print('%s 响应时间过长,网络不稳定,丢弃' % ip)
return False
return True
在以上脚本中,一次性从代理网站爬取了一页的 ip,每页 15 条数据,然后将爬取的 ip 存入缓存 Redis 中,并且我们设置了休眠时间为 2.5s ,避免爬取过快,被目标服务器拦截。另外,增加了线程池,用子线程来校验 ip 是否可用,并设定丢弃丢包率大于 75% 的ip。缓存工具类 cache_utils.py 以及线程池工具类 thread_pool,都在 utils 包下,完整代码如下:
cache_utils
# -*-* encoding:UTF-8 -*-
# author : mengy
# date : 2019/5/21
# python-version : Python 3.7.0
# description : Redis 缓存相关操作
import redis
# Redis 主机地址
CACHE_HOST = '127.0.0.1'
# Redis 端口
CACHE_PORT = '6379'
# 设置写入的键值对中的value为str类型
CACHE_DECODE_RESPONSES = True
class Cache(object):
__pool = redis.ConnectionPool(host=CACHE_HOST, port=CACHE_PORT, decode_responses=CACHE_DECODE_RESPONSES)
def __init__(self):
self.__redis = redis.Redis(connection_pool=self.__pool)
def delete(self, *names):
"""
根据name删除redis中的任意数据类型\n
:param names: key或者命名空间
:return:
"""
self.__redis.delete(*names)
def exists(self, name):
"""
检测redis的name是否存在\n
:param name: key或者命名空间
:return:
"""
return self.__redis.exists(name)
def keys(self, pattern='*'):
"""
根据* ?等通配符匹配获取redis的name\n
:param pattern: 通配符
:return:
"""
return self.__redis.keys(pattern)
def expire(self, name, time):
"""
为某个name设置超时时间\n
:param name: key或者命名空间\n
:param time: 超时时间(s)
:return:
"""
if not self.exists(name):
raise Exception(name + ' 不存在')
self.__redis.expire(name, time)
def type(self, name):
"""
获取name对应值的类型\n
:param name: key或者命名空间\n
:return:
"""
return self.__redis.type(name)
def rename(self, src, dst):
"""
重命名key或者命名空间\n\n
:param src: 原key或者命名空间\n
:param dst: 修改后的key或者命名空间\n
:return:
"""
if self.exists(dst):
raise Exception(dst + ' 已存在')
if not self.exists(src):
raise Exception(src + ' 不存在')
self.__redis.rename(src, dst)
# ------------------------字符串-----------------------------
def get(self, key):
"""
获取指定字符串值\n
:param key:单个键\n
:return:
"""
return self.__redis.get(key)
def mget(self, *keys):
"""
批量获取指定字符串值\n
:param keys:多个键\n
:return:
"""
return self.__redis.mget(keys)
def set(self, key, value, px=None):
"""
字符串设置值 \n
:param key:键\n
:param value:值\n
:param px:过期时间(ms)\n
:return:
"""
self.__redis.set(name=key, value=value, px=px)
def mset(self, **map):
"""
字符串批量设置值\n
:param map:批量设置的键值字典\n
:return:
"""
self.__redis.mset(mapping=map)
# -------------------------Hash-----------------------------
def hget(self, name, key):
"""
在name对应的hash中根据key获取value \n
:param name: 命名空间
:param key: 命名空间下对应的键
:return:
"""
return self.__redis.hget(name=name, key=key)
def hmget(self, name, *keys):
"""
在name对应的hash中获取多个key的值\n
:param name: 命名空间\n
:param keys: 命名空间下的多个键
:return:
"""
return self.__redis.hmget(name=name, keys=keys)
def hgetall(self, name):
"""
获取name对应hash的所有键值 \n
:param name:命名空间 \n
:return:
"""
return self.__redis.hgetall(name=name)
def hset(self, name, key, value):
"""
name对应的hash中设置一个键值对(不存在,则创建,否则,修改)\n
:param name: 命名空间
:param key: 命名空间下对应的键
:param value: 命名空间下对应的值
:return:
"""
self.__redis.hset(name=name, key=key, value=value)
def hmset(self, name, **map):
"""
在name对应的hash中批量设置键值对\n
:param name:命名空间\n
:param map:键值对\n
:return:
"""
self.__redis.hmset(name=name, mapping=map)
def hexists(self, name, key):
"""
检查name对应的hash是否存在当前传入的key\n
:param name: 命名空间\n
:param key: 命名空间下对应的键
:return:
"""
return self.__redis.hexists(name=name, key=key)
def hdel(self, name, keys):
"""
批量删除指定name对应的key所在的键值对\n
:param name:命名空间\n
:param keys:要删除的键\n
:return:
"""
self.__redis.hdel(name, keys)
# -------------------------List-----------------------------
def lpush(self, name, *values, left=True):
"""
在name对应的list中添加元素,每个新的元素都添加到列表的最左边\n
:param name: 命名空间
:param values: 值
:param left: 是否添加到列表的最左边,True:最左边,False:最右边,默认为True
:return:
"""
if left:
self.__redis.lpush(name, *values)
else:
self.__redis.rpush(name, *values)
def lset(self, name, index, value):
"""
对list中的某一个索引位置重新赋值\n
:param name: 命名空间
:param index: 索引位置
:param value: 要插入的值
:return:
"""
self.__redis.lset(name=name, index=index, value=value)
def lrem(self, name, count, value):
"""
删除name对应的list中的指定值\n
:param name:命名空间\n
:param count:num=0 删除列表中所有的指定值;num=2 从前到后,删除2个;num=-2 从后向前,删除2个
:param value:要删除的值
:return:
"""
self.__redis.lrem(name=name, count=count, value=value)
def lpop(self, name):
"""
移除列表的左侧第一个元素,返回值则是第一个元素\n
:param name: 命名空间\n
:return: 第一个元素
"""
return self.__redis.lpop(name=name)
def lindex(self, name, index):
"""
根据索引获取列表内元素\n
:param name: 命名空间\n
:param index: 索引位置
:return:
"""
return self.__redis.lindex(name=name, index=index)
def lrange(self, name, start, end):
"""
获取指定范围内的元素\n
:param name: 命名空间\n
:param start: 起始位置
:param end: 结束位置
:return:
"""
return self.__redis.lrange(name=name, start=start, end=end)
def ltrim(self, name, start, end):
"""
移除列表内没有在该索引之内的值\n
:param name: 命名空间\n
:param start: 起始位置
:param end: 结束位置
:return:
"""
self.__redis.ltrim(name=name, start=start, end=end)
# -------------------------Set-----------------------------
def sadd(self, name, *values):
"""
给name对应的集合中添加元素\n
:param name:命名空间\n
:param values:集合
:return:
"""
self.__redis.sadd(name, *values)
def smembers(self, name):
"""
获取name对应的集合的所有成员\n
:param name: 命名空间\n
:return:
"""
return self.__redis.smembers(name=name)
def sdiff(self, name, *others):
"""
在第一个name对应的集合中且不在其他name对应的集合的元素集合,即,name集合对于其他集合的差集\n
:param name:主集合\n
:param others:其他集合\n
:return:
"""
# print(*others)
return self.__redis.sdiff(name, *others)
def sinter(self, name, *names):
"""
获取多个name对应集合的交集\n
:param name: 主集合\n
:param names: 其他集合\n
:return:
"""
return self.__redis.sinter(name, *names)
def sunion(self, name, *names):
"""
获取多个name对应集合的并集\n
:param name: 主集合\n
:param names: 其他集合\n
:return:
"""
return self.__redis.sunion(name, *names)
def sismember(self, name, value):
"""
检查value是否是name对应的集合内的元素\n
:param name:命名空间\n
:param value:待检查的值\n
:return:
"""
return self.__redis.sismember(name=name, value=value)
def smove(self, src, dst, value):
"""
将某个元素从一个集合中移动到另外一个集合\n
:param src: 原集合\n
:param dst: 目标集合\n
:param value: 待移动的值
:return:
"""
self.__redis.smove(src=src, dst=dst, value=value)
def spop(self, name):
"""
从集合的右侧移除一个元素,并将其返回\n
:param name: 命名空间\n
:return:
"""
return self.__redis.spop(name=name)
def srem(self, name, *values):
"""
删除name对应的集合中的某些值\n
:param name: 命名空间\n
:param values: 要删除的值
:return:
"""
self.__redis.srem(name, *values)
thread_pool
# -*-* encoding:UTF-8 -*-
# author : mengy
# date : 2019/5/23
# python-version : Python 3.7.0
# description : 线程池
from concurrent.futures import ThreadPoolExecutor, as_completed
# 最大线程数
MAX_WORKERS = 50
class ThreadPool:
__instance = None
def __init__(self):
self.__executor = ThreadPoolExecutor(MAX_WORKERS)
def __new__(cls, *args, **kwargs):
"""
使用单例模式\n
:param args:
:param kwargs:
:return:
"""
if cls.__instance is None:
cls.__instance = object.__new__(cls)
return cls.__instance
def submit(self, func, *args, **kwargs):
return self.__executor.submit(func, *args, **kwargs)
def batch_submit(self, func, *args, **kwargs):
return [self.submit(func, *item, **kwargs) for item in args]
@staticmethod
def completed_tasks(tasks):
return as_completed(tasks)
至此,我们已经创建了自己的 ip 池,接下来就需要运用到我们的爬虫程序中。在 middleware.py 中新建 IPProxyMiddleware 类,在该类的构造方法中,首先从我们的 ip 池获取一页的 ip ,然后再请求之前,对请求对象 Request 设置代理的 url,然后在请求完成之后,根据目标网页的返回码进行判断,如果请求发生异常,则从 ip 池中随机取出一条 ip ,重新构造 Request,待下一次请求,另外,我们在 settings.py 文件中指定了失败的最大次数为 5 次,意味着,如果一个请求失败超过 5 次,则放弃该请求。该逻辑实现是在 process_response () 方法中。该方法如果返回值是 request,则表示重新将该请求发送到 Scheduler ,该请求将再次被执行;如果返回是 response ,则表示该请求已经完成,不会将对该请求返回的数据进行处理。具体代码如下:
class IPProxyMiddleware(object):
"""
IP 代理池中间件
"""
def __init__(self):
# 爬取有效 ip
self.ip_list = crawl_proxy.get_ips(pages=3)
# 请求已经失败的次数
self.retry_time = 0
self.index = random.randint(0, len(self.ip_list) - 1)
def process_request(self, request, spider):
"""
处理将要请求的 Request
:param request:
:param spider:
:return:
"""
# 失败重试次数
self.retry_time = 0
#
# if len(self.ip_list) < 5:
# self.ip_list.extend(crawl_proxy.get_ips(refresh=True))
# 随机选取 ip
proxy = json.loads(self.ip_list[self.index])
print('选取的 ip:' + proxy.get('url'))
# 设置代理
request.meta['Proxy'] = proxy.get('url')
def process_response(self, request, response, spider):
"""
处理返回的 Response
:param request:
:param response:
:param spider:
:return:
"""
# 针对4**、和5** 响应码,重新选取 ip
if re.findall('[45]\d+', str(response.status)):
print(u'[%s] 响应状态码:%s' % (response.url, response.status))
if self.retry_time > settings.get('MAX_RETRY', 5):
return response
if response.status == 418:
sec = random.randrange(30, 35)
print(u'休眠 %s 秒后重试' % sec)
# time.sleep(sec)
self.retry_time += 1
proxy = json.loads(random.choice(self.ip_list))
print('失败 %s 次后,重新选取的 ip:%s' % (self.retry_time, proxy.get('url')))
request.meta['Proxy'] = proxy.get('url')
return request
return response
最后,不要忘记在 settings.py 中启用 IPProxyMiddleware 中间件:
DOWNLOADER_MIDDLEWARES = {
'sina_scrapy.middlewares.RandomUserAgentMiddleware': 10,
'sina_scrapy.middlewares.IPProxyMiddleware': 30,
}
再次启动爬虫程序,我们可以看到,已经从我们的 ip 池中获取到 ip 并进行请求,当有异常发生时,将会再次从 ip 池中获取 ip,并重新请求。
Cookies 池
在本篇博客的开头就提到,新浪微博的反爬机制是针对账号的,但是我只有一个账号,因此这一小节提到的 Cookies 池对于我们 新浪微博爬虫程序来说,也许并不能起到太大的左右,但是如果你有多个微博账号,Cookies 池的方法将能够极大地提高你程序的爬取效率。方法都是一样的。Cookies 池和前面提到的 User-Agent 池、ip 池的原理是一样的,都是获取一组的数据,存入 Redis 中,等到需要使用的时候,在随机从 Redis 中取出数据,进行处理。
我们在 utils 包下新建一个文件 simulate_login.py 用于编写模拟登录新浪微博的脚本,在这里,我分别实现了微博移动网页版模拟登录(https://weibo.cn/)以及新浪微博 PC 网页版(https://weibo.com)。登录的难度不一致,可以根据需要自己选择。其中 PC 网页版的登录逻辑,可以参考《模拟新浪微博登录(Python+RSA加密算法)》这篇博客,里面有详细的分析,在这里就不一一赘述了,只是需要注意相应的 js 版本可能已经过时了。具体实现如下:
# -*-* encoding:UTF-8 -*-
# author : mengy
# date : 2019/6/26
# python-version : Python 3.7.0
# description : 模拟登录新浪微博
import base64
import urllib
import rsa
import binascii
import json
import re
import http.cookiejar
import urllib.request
from sina_scrapy.utils.cache_utils import Cache
# cookies 在缓存中的有效期(s)
COOKIES_EXPIRES = 3 * 24 * 60 * 60
def urlopen(url, callback=None, data=None, timeout=5):
"""
重写 urllib 的 urlopen 方法,该方法能够将 cookies 作为参数传给回调函数\n
:param url:请求的地址或者 url.request.Request() 对象\n
:param callback:回调函数\n
:param data:请求数据\n
:param timeout:超时时间(s),默认为 5s\n
:return:
"""
cookie = http.cookiejar.CookieJar()
handler = urllib.request.HTTPCookieProcessor(cookie)
opener = urllib.request.build_opener(handler)
response = opener.open(url, data=data, timeout=timeout)
if callback:
callback(cookie)
return response
class LoginBase(object):
"""
微博模拟登录基类,实现了新浪移动微博网页版(https://weibo.cn/)的模拟登录\n
"""
# 缓存工具
__cache = Cache()
# 移动网页版 cookies 在缓存中的命名
COOKIES_NAMESPACE = 'MOBILE_WEB_POOL_COOKIES'
def __init__(self, username, password):
# 微博账号
self.__username = username
# 微博密码
self.__password = password
# 记录 cookies,按照 domain 分组
self.cookies = {}
@property
def username(self) -> str:
return self.__username
@property
def password(self) -> str:
return self.__password
def save_cookies(self, cookie: http.cookiejar.CookieJar):
"""
保存 Cookies\n
:param cookie:
:return:
"""
# 按照 domain 分组记录所访问过 url 的 cookies
for item in cookie:
tmp = self.cookies.get(item.domain)
if tmp:
tmp.update({item.name: item.value})
else:
self.cookies.update({item.domain: {item.name: item.value}})
def login(self):
"""
微博移动网页版模拟登录(https://weibo.cn/),代码实现逻辑根据网页版 js,有一定的时效性\n
:return:
"""
print(u'微博移动网页版模拟登录(https://weibo.cn/)开始...')
# 登录地址
url = 'https://passport.weibo.cn/sso/login'
# 默认 Headers
headers = {
'Referer': 'https://passport.weibo.cn/signin/login?entry=mweibo&r=https%3A%2F%2Fweibo.cn&page=9.com&uid=1260427471&_T_WM=c6e864f47316ecbaf8607a214d4bb3fa',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/75.0.3770.100 Safari/537.36'
}
# 构造模拟登录请求的表单数据
data = {
'username': self.__username,
'password': self.__password,
'savestate': 1,
'r': 'https://weibo.cn',
'ec': 0,
'pagerefer': '',
'entry': 'mweibo',
'wentry': '',
'loginfrom': '',
'client_id': '',
'code': '',
'qq': '',
'mainpageflag': 1,
'hff': '',
'hfp': ''
}
try:
# 格式化 请求数据
post_data = urllib.parse.urlencode(data).encode('gbk')
# 构造请求
req = urllib.request.Request(url=url, data=post_data, headers=headers, method='POST')
# 使用自定义的请求方法,保存请求的 cookies
response = urlopen(url=req, callback=self.save_cookies, timeout=10)
# 将返回的数据转化成 dict
result = json.loads(response.read().decode('gbk'))
if result.get('retcode') == 20000000:
print(u'登录成功!')
# 登录成功后返回的 url
crossdomainlist = result.get('data').get('crossdomainlist')
# 依次访问 url,获取 cookies 并保存
if crossdomainlist:
for item in dict(crossdomainlist).values():
urlopen(item, self.save_cookies)
else:
print(u'登录失败!')
# 将 cookies 放入缓存 redis
self.push_cache()
return True
except Exception as e:
print(u'解析失败', e)
return False
def push_cache(self):
assert self.cookies, u'请先模拟登录'
# self.__cache.lpush(self.COOKIES_NAMESPACE, json.dumps(self.cookies))
self.__cache.hset(self.COOKIES_NAMESPACE, self.username, json.dumps(self.cookies))
# 设置 cookies 的有效时间(三天)
self.__cache.expire(self.COOKIES_NAMESPACE, COOKIES_EXPIRES)
def get_cookies(self, domain=None, is_force_login=False):
"""
获取 cookies\n
:param domain:域名
:param is_force_login:是否强制登录(默认为 False)\n
:return:
"""
# 从 redis 获取 cookies
# data = self.__cache.lrange(namespace, 0, 1)
data = self.__cache.hget(self.COOKIES_NAMESPACE, self.username)
if is_force_login or not data:
# 如果 redis 中没有 cookies,则模拟登录,重新获取 cookies
if self.login():
cookies = self.cookies
else:
raise Exception(u'获取 Cookies 失败!')
else:
print(u'从缓存中获取 cookies')
cookies = json.loads(data)
if domain:
return cookies.get(domain)
return cookies
class LoginForSinaCom(LoginBase):
"""
模拟新浪微博 PC 网页版(https://weibo.com)登录,登录后,将 cookies 保存到 redis 缓存中,并提供获取 cookies 的方法
"""
# PC 网页版 cookies 在缓存中的命名
COOKIES_NAMESPACE = 'PC_WEB_POOL_COOKIES'
def __init__(self, username, password):
LoginBase.__init__(self, username, password)
def encrypt_name(self) -> str:
"""
用 base64 加密用户名 \n
:return:
"""
return base64.encodebytes(bytes(urllib.request.quote(self.username), 'utf-8'))[:-1].decode('utf-8')
def encrypt_passwd(self, **kwargs) -> str:
"""
使用 rsa 加密密码\n
:param kwargs:
:return:
"""
try:
# 拼接明文
message = str(kwargs['servertime']) + '\t' + str(kwargs['nonce']) + '\n' + str(self.password)
# 10001 为 js 加密文件中的加密因子,16进制
key = rsa.PublicKey(int(kwargs['pubkey'], 16), 0x10001)
# 使用 rsa 加密拼接后的密码
encrypt_pwd = rsa.encrypt(message.encode('utf-8'), key)
# 将加密后的密文转化成 AscII 码
final_pwd = binascii.b2a_hex(encrypt_pwd)
return final_pwd
except Exception as e:
print(e)
return None
def pre_login(self) -> dict:
"""
预登录,请求 prelogin_url 链接地址 获取 servertime,nonce,pubkey 和 rsakv \n
:return:
"""
# 预登录地址
pre_login_url = 'http://login.sina.com.cn/sso/prelogin.php?entry=sso&callback=sinaSSOController.preloginCallBack&su=%s&rsakt=mod&client=ssologin.js(v1.4.19)' % self.encrypt_name()
try:
response = urlopen(pre_login_url, callback=self.save_cookies, timeout=5)
# 提取响应结果
preloginCallBack = re.compile('\((.*)\)').search(str(response.read(), 'UTF-8'))
if preloginCallBack:
result = json.loads(preloginCallBack.group(1))
else:
raise Exception(u'解析响应结果失败!')
return result
except Exception as e:
print(e)
return None
def login(self):
"""
登录新浪微博 PC 网页版(https://weibo.com)\n
:return:
"""
print(u'新浪微博 PC 网页版(https://weibo.com)登录开始...')
# 预登录
result = self.pre_login()
# 加密用户账号
encodedUserName = self.encrypt_name()
serverTime = result.get('servicetime')
nonce = result.get('nonce')
rsakv = result.get('rsakv')
# 加密密码
encodedPassWord = self.encrypt_passwd(**result)
# 构造请求数据
post_data = {
"entry": "weibo",
"gateway": "1",
"from": "",
"savestate": "7",
"qrcode_flag": 'false',
"useticket": "1",
"pagerefer": "https://login.sina.com.cn/crossdomain2.php?action=logout&r=https%3A%2F%2Fweibo.com%2Flogout.php%3Fbackurl%3D%252F",
"vsnf": "1",
"su": encodedUserName,
"service": "miniblog",
"servertime": serverTime,
"nonce": nonce,
"pwencode": "rsa2",
"rsakv": rsakv,
"sp": encodedPassWord,
"sr": "1680*1050",
"encoding": "UTF-8",
"prelt": "194",
"url": "https://weibo.com/ajaxlogin.php?framelogin=1&callback=parent.sinaSSOController.feedBackUrlCallBack",
"returntype": "META"
}
# 登录地址
url = 'https://login.sina.com.cn/sso/login.php?client=ssologin.js(v1.4.19)'
# 打包请求数据
data = urllib.parse.urlencode(post_data).encode('GBK')
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/74.0.3729.169 Safari/537.36'
}
try:
# 请求登录
req = urllib.request.Request(url=url, data=data, headers=headers)
response = urlopen(req, callback=self.save_cookies)
text = response.read().decode('GBK')
except Exception as e:
print(e)
try:
# 获取第一次重定向地址
login_url = re.compile('location\.replace\("(.*)"\)').search(text).group(1)
# 第一次重定向
response = urlopen(login_url, callback=self.save_cookies)
data = response.read().decode('GBK')
# 获取第二次重定向地址
jump_url = re.compile("location\.replace\('(.*)'\)").search(data).group(1)
# 第二次重定向
response = urlopen(jump_url, callback=self.save_cookies)
data = response.read().decode('utf-8')
# 获取服务器返回的加密的 用户名
name = re.compile('"userdomain":"(.*)"').search(data).group(1)
index = 'http://weibo.com/' + name
# 第三次跳转到首页
urlopen(index, callback=self.save_cookies)
print(u'登录成功!')
# 将 cookies 放入缓存 redis
self.push_cache()
return True
except Exception as e:
print(u'登录失败!,异常:', e)
return False
有了 Cookies 池以后,就需要编写中间件类使用 Cookies 池的数据了,在 middleware.py 中添加如下类:
class CookiesMiddleware(object):
"""
登录 Cookies 中间件
"""
def __init__(self):
self.cookies = simulate_login.LoginBase(settings.get('SINA_ACCOUNT'), settings.get('SINA_PASSWD')).get_cookies()
def process_request(self, request, spider):
cookies = self.cookies.get('.weibo.cn')
request.cookies = cookies
判断代码实现很简单,只是在请求之前,从 Cookies 中获取一个 Cookies,然后赋给 Request。另外,在前面的使用 ip 池的时候,我们有根据返回码来确定是不是要重新从 ip 池中拉取 ip,同样的,我们可以增加判断,如果响应码为 418 ,就重新从 Cookies 池中获取 Cookies,另外,我们的微博账号密码是配置在 settings.py 中的 SINA_ACCOUNT 以及 SINA_PASSWD,需要替换成自己的账号密码。代码如下:
if response.status == 418:
# 出现 418 重新获取 cookies
request.cookies = simulate_login.LoginBase(settings.get('SINA_ACCOUNT'),
settings.get('SINA_PASSWD')).get_cookies('.weibo.cn')
sec = random.randrange(30, 35)
print(u'休眠 %s 秒后重试' % sec)
# time.sleep(sec)
好了,接着,我们在 settings.py 中启用 CookiesMiddleware 中间件,代码如下:
DOWNLOADER_MIDDLEWARES = {
'sina_scrapy.middlewares.RandomUserAgentMiddleware': 10,
'sina_scrapy.middlewares.CookiesMiddleware': 20,
'sina_scrapy.middlewares.IPProxyMiddleware': 30,
}
SINA_ACCOUNT = 'XXXXXXXXXXX'
SINA_PASSWD = 'XXXXXXXXXXXX'
再次运行爬虫程序,将会从 Cookies 池中获取 Cookies。
总结
本系列博客代码均已提交至我的 GitHub,有需要的可以移步下载,如需更详细的代码(数据分析,使用 pyecharts2.0 和 pyplot 绘制图表,包括:日爬取量统计图、微博用户性别分布统计图、微博用户年龄分布统计图、新浪微博用户活跃度分布图、新浪微博用户粉丝和关注数分布图、新浪微博用户分布地图等),请从 CSDN下载。pyecharts2.0 数据分析效果预览:
-
日爬取量统计图
-
微博用户性别分布统计图
-
微博用户年龄分布统计图
-
新浪微博用户活跃度分布图
-
新浪微博用户粉丝和关注数分布图
-
新浪微博用户分布地图
写在最后
到这里,使用 Scrapy 爬取新浪微博用户信息系列博客就已经结束了,在这一系列博客中,我们学习了 Scrapy 框架的基本原理以及基本用法,当然 Scrapy 的强大之处远不止如此,例如:下载器中间件(DownloaderMiddleware)、基于 Redis 的分布式 Scrapy等。学无止境,希望大家不要止步于此。想要掌握 Scrapy 的全部功能,需要自己慢慢探索、实践。大家加油!
更多推荐
所有评论(0)