Python -- 获取王者荣耀皮肤壁纸(二)
???? Python 学习笔记 文章目录网页分析下载准备设计生产者和消费者实现cache_image_url方法,调用接口获取列表代码实现save_pictrue方法,下载图片资源代码总结完整代码通过上一篇文章中的方法,我们获取到了游戏内所有人物的名称。接下来我们就要找到显示皮肤壁纸的网页,获取壁纸路径,并将壁纸放到对应文件夹内。网页分析我们点击【游戏壁纸】,我们可以看到一个【高清壁纸
📖 Python 学习笔记
通过上一篇文章中的方法,我们获取到了游戏内所有人物的名称。接下来我们就要找到显示皮肤壁纸的网页,获取壁纸路径,并将壁纸放到对应文件夹内。
网页分析
我们点击【游戏壁纸】,我们可以看到一个【高清壁纸】栏,这里的内容是分页现实的,所以是动态加载的。
这里面的壁纸只是缩略图,我们点击图片之后会显示大图和各种分辨率链接
我们点击最清晰的分辨率链接,可以看到最高清的图片
下载准备
通过分析页面,我们明确了需要进行的工作:
- 通过调用HTTP接口,获取高清壁纸列表
- 分析信息中的图片所属人物,以及解析出最高请的图片地址
- 下载放入指定文件夹
设计生产者和消费者
我们先设计好生产者和消费者,生产者只管获取图片地址以及解析图片信息,消费者按照生产者提供的信息下载图片。
# 生产者
class Producer(threading.Thread):
# 接收页面队列和图片信息队列
def __init__(self,page_queue,pic_queue,*args,**kwargs):
super(Producer, self).__init__(*args, **kwargs)
self.page_queue = page_queue
self.pic_queue = pic_queue
def run(self) -> None:
# 页码队列不为空就获取页码调用接口并解析
while not self.page_queue.empty():
page_num = self.page_queue.get()
# 获取图片信息 放到图片信息队列中
cache_image_url(page_num,self.pic_queue)
# 消费者
class Consumer(threading.Thread):
# 接收一个图片信息队列
def __init__(self, pic_queue, *args, **kwargs):
super(Consumer, self).__init__(*args, **kwargs)
self.pic_queue = pic_queue
def run(self) -> None:
# 永久循环,直到内部逻辑判断无法获取到图片信息后终止循环
while True:
try:
# 十秒获取不到数据抛出异常
pic_msg = self.pic_queue.get(timeout=10)
try:
# 下载图片的方法
save_pictrue(pic_msg["role_name"],pic_msg["pic_name"],pic_msg["pic_url"])
except:
#下载失败 抛出异常
print("%s %s %s 下载失败" % (pic_msg["role_name"], pic_msg["pic_name"], pic_msg["pic_url"]))
except:
# 终止循环
break;
实现cache_image_url方法,调用接口获取列表
-
F12 打开开发者工具,切换高清壁纸页,找到页面请求的后台接口
https://apps.game.qq.com/cgi-bin/ams/module/ishow/V1.0/query/workList_inc.cgi?activityId=2735&sVerifyCode=ABCD&sDataType=JSON&iListNum=20&totalpage=0&page=1&iOrder=0&iSortNumClose=1&jsoncallback=jQuery17107996966853258656_1639548055525&iAMSActivityId=51991&everyRead=true&iTypeId=2&iFlowId=267733&iActId=2735&iM oduleId=2735&=1639548680748
返回的数据结构如下:
-
分析接口,接口中通过page参数传递页码,我们需要通过更改页码循环请求后台资源
-
返回的信息内容是经过编码的,我们需要进行解码
-
sProdName 属性是图片的描述(一般带有人物名称)
-
sProdImgNo_8 按理说解码后是最高请图片的地址,但是这个地址现实的仍然是个缩略图,我们和高清壁纸的网址比较发现,两者地址为
高清 https://shp.qpic.cn/ishow/2735120110/1638324367_84828260_10377_sProdImgNo_8.jpg/0
缩略 https://shp.qpic.cn/ishow/2735120110/1638324367_84828260_10377_sProdImgNo_8.jpg/200所以我们要把sProdImgNo_8 中链接最后的200替换为0就可以了
代码
def cache_image_url(page_num,pic_queue):
headers = {
"User_Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:95.0) Gecko/20100101 Firefox/95.0",
"Referer": "https://pvp.qq.com/"
}
wangzhe_url = "https://apps.game.qq.com/cgi-bin/ams/module/ishow/V1.0/query/workList_inc.cgi?" \
"activityId=2735&sVerifyCode=ABCD&sDataType=JSON&iListNum=20" \
"&totalpage=0&page=" + str(page_num) + \
"&iOrder=0&iSortNumClose=1&_everyRead=true&iTypeId=2" \
"&iFlowId=267733&iActId=2735&iModuleId=2735&_=1639123481381"
resp = requests.get(wangzhe_url, headers)
resp_txt = resp.text
# 字符串转为字典
resp_dic = json.loads(resp_txt)
# 获取图片列表信息
pic_list = resp_dic["List"]
if None != pic_list and len(pic_list)>0:
# 循环列表中的图片信息
for a in pic_list:
# 皮肤名称
pic_name = parse.unquote(a["sProdName"])
# 皮肤地址 地址最后面的参数默认是200缩略图 ,将最后面的200修改为 0
pic_url = parse.unquote(a["sProdImgNo_8"])
# 不能直接用替换 ,因为可能链接中间存在 200
if pic_url.endswith("200"):
pic_url = pic_url[:len(pic_url) - 3] + "0"
# 默认人物名称 其他
role_name="其他"
# 根据英雄列表中的名字去匹配,图片标题中带有该英雄的名字,就放到该英雄的文件夹下
for hero in herolist:
if hero in pic_name:
role_name = hero
# 将图片信息 存入到队列中
pic_queue.put({"pic_name":pic_name,"pic_url":pic_url,"role_name":role_name})
在这里面还有两点需要注意:
一、我们通过请求网页上获取到的接口,获取到的信息和想要的有出入,多出了一些信息,通过打印 text 内容可以发现:
这是jsonCallBack的格式,调用jQuery17107996966853258656_1639548055525方法对返回的数据进行解码,因为我们获取并执行这个js方法很困难,并且我们可以自己进行解码,所以不需要这个方法:
第一个 ( 和最后一个 ) 中的内容才是我们想要的能狗转换成 json 数据,我们可以通过截断字符串,截取想要的信息。
在这里我们通过删除 jsoncallback=jQuery17107996966853258656_1639548055525 属性去避免这个问题。
二、页面链接是有时效的,超过一定时间之后接口返回的数据就不完整了。要重新去获取下最新的
实现save_pictrue方法,下载图片资源
代码
def save_pictrue(role_name,pic_name,pic_url):
# 人物名字做文件夹 没有则创建 wangzheimages文件夹要提前创建
dir_path = os.path.join("wangzheimages", role_name)
if not os.access(dir_path, os.F_OK):
os.mkdir(dir_path)
try:
# 拼接图片路径
pic_path = os.path.join(dir_path, "%s.jpg" % pic_name)
# 判断图片不存在就下载
if not os.access(pic_path, os.F_OK):
request.urlretrieve(pic_url,pic_path)
print("%s %s 下载成功" % (pic_name, pic_url))
else:
print("%s %s 已存在" % (pic_name, pic_url))
except:
print("%s %s 下载失败" % (pic_name, pic_url))
总结
综上,获取壁纸的代码就完成了,这个代码主要是让自己熟悉一下爬虫过程中较常用到的工具和方法。
- 使用生产者消费者的模式,提高执行效率
- 多线程类型的创建方式
- 使用 Queue 队列 保证线程安全
- json – 字符串转 json 对象 方便key-value的获取
- urllib.parse – 对返回内容解码
- BeautifulSoup – 获取元素十分便利
完整代码
import json
import os
import threading
import queue
import requests
from bs4 import BeautifulSoup
from urllib import parse
from urllib import request
# 英雄列表
herolist = []
# 直接通过请求获取英雄列表
def get_rolenames_2():
global herolist
herolist_url = "https://pvp.qq.com/web201605/js/herolist.json"
headers = {
"User_Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:95.0) Gecko/20100101 Firefox/95.0",
"Referer": "https://pvp.qq.com/web201605/herolist.shtml"
}
resp = requests.get(herolist_url, headers)
resp_txt = resp.text
# 字符串转为字典
resp_dic = json.loads(resp_txt)
herolist = [heromsg["cname"] for heromsg in resp_dic]
herolist.append("其他")
# 解析网页 获取网页上的所有英雄名字
def get_rolenames():
#url
url ="https://pvp.qq.com/web201605/herolist.shtml"
# # 可以将字符串转换为html格式 并自动修正HTML格式
html = request.urlopen(url).read()
htmltext = html.decode('gbk')
soup = BeautifulSoup(htmltext, "html.parser")
body = soup.find('body')
div_0 = body.find('div',attrs={'class':'wrapper'})
div_1 = div_0.find('div', attrs={'class': 'zkcontent'})
div_2 = div_1.find('div', attrs={'class': 'zk-con-box'})
div_3 = div_2.find('div', attrs={'class': 'herolist-box'})
div_4 = div_3.find('div', attrs={'class': 'herolist-content'})
ul_0 = div_4.find('ul', attrs={'class': 'herolist clearfix'})
for elm in ul_0.select("li"):
# 循环li元素查找名字 先看看全不全
print(elm)
# 解析网址的方法
def cache_image_url(page_num,pic_queue):
headers = {
"User_Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:95.0) Gecko/20100101 Firefox/95.0",
"Referer": "https://pvp.qq.com/"
}
wangzhe_url = "https://apps.game.qq.com/cgi-bin/ams/module/ishow/V1.0/query/workList_inc.cgi?" \
"activityId=2735&sVerifyCode=ABCD&sDataType=JSON&iListNum=20" \
"&totalpage=0&page=" + str(page_num) + \
"&iOrder=0&iSortNumClose=1&_everyRead=true&iTypeId=2" \
"&iFlowId=267733&iActId=2735&iModuleId=2735&_=1639123481381"
resp = requests.get(wangzhe_url, headers)
resp_txt = resp.text
# 字符串转为字典
resp_dic = json.loads(resp_txt)
# 获取图片列表信息
pic_list = resp_dic["List"]
if None != pic_list and len(pic_list)>0:
# 循环列表中的图片信息
for a in pic_list:
# 下载最高清的图片
# 皮肤名称
pic_name = parse.unquote(a["sProdName"]).replace(":","")
# 皮肤地址 地址最后面的参数默认是200缩略图 ,将最后面的200修改为 0
pic_url = parse.unquote(a["sProdImgNo_8"])
if pic_url.endswith("200"):
pic_url = pic_url[:len(pic_url) - 3] + "0"
# 默认角色名 其他
role_name="其他"
# 根据英雄列表中的名字去匹配,图片标题中带有该英雄的名字,就放到该英雄的文件夹下
for hero in herolist:
if hero in pic_name:
role_name = hero
pic_queue.put({"pic_name":pic_name,"pic_url":pic_url,"role_name":role_name})
# 保存图片的方法
def save_pictrue(role_name,pic_name,pic_url):
# 人物名字做文件夹 没有则创建
dir_path = os.path.join("wangzheimages", role_name)
if not os.access(dir_path, os.F_OK):
os.mkdir(dir_path)
try:
pic_path = os.path.join(dir_path, "%s.jpg" % pic_name)
if not os.access(pic_path, os.F_OK):
request.urlretrieve(pic_url,pic_path)
print("%s %s 下载成功" % (pic_name, pic_url))
else:
print("%s %s 已存在" % (pic_name, pic_url))
except:
print("%s %s 下载失败" % (pic_name, pic_url))
# 生产者
class Producer(threading.Thread):
def __init__(self,page_queue,pic_queue,*args,**kwargs):
super(Producer, self).__init__(*args, **kwargs)
self.page_queue = page_queue
self.pic_queue = pic_queue
def run(self) -> None:
while not self.page_queue.empty():
page_num = self.page_queue.get()
# 下载图片
cache_image_url(page_num,self.pic_queue)
# 消费者
class Consumer(threading.Thread):
def __init__(self, pic_queue, *args, **kwargs):
super(Consumer, self).__init__(*args, **kwargs)
self.pic_queue = pic_queue
def run(self) -> None:
while True:
try:
# 十秒获取不到数据抛出异常
pic_msg = self.pic_queue.get(timeout=10)
# 下载图片
try:
save_pictrue(pic_msg["role_name"],pic_msg["pic_name"],pic_msg["pic_url"])
except:
print("%s %s %s 下载失败" % (pic_msg["role_name"], pic_msg["pic_name"], pic_msg["pic_url"]))
except:
break;
if __name__ == "__main__":
# 先获取所有英雄列表
get_rolenames_2()
# 爬取图片
page_queue = queue.Queue(20)
pic_queue = queue.Queue(2000)
for i in range(20):
page_queue.put(i)
for i in range(3):
th = Producer(page_queue,pic_queue,name="生产者%d正在执行"%i)
th.start()
for i in range(3):
th = Consumer(pic_queue,name="消费者%d正在执行"%i)
th.start()
开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!
更多推荐
所有评论(0)