使用Python爬取B站UP主所有视频信息
运行整个流程self.get_url() # 获取当前页面的视频URLself.next_page() # 遍历所有页面获取视频URL# 使用多线程提高数据获取效率thread.join() # 等待所有线程完成# 所有线程完成后,保存数据到Excel。
目录
一、背景
之前B站有刷到一个视频,关于某UP主的视频播放数统计,这视频的UP主是个狠人,一个个视频统计,最后算所有视频的播放数,可以是可以,就是有点废人,突然有了想用爬虫的思路了.
首先我们知道B站的每一个视频是BV号,视频的URL是由B站域名加唯一BV号如:
https://www.bilibili.com/video/BV1Qm411S7BP/5分钟了解 蓝色监狱【『剧场版 蓝色监狱 -EPIOSODE 凪-』2024年4月19日公開】_哔哩哔哩_bilibili
那这样爬取的大致思路就是获取所有BV号,再用requests,去爬取视频,获取视频信息
二、爬虫的实现思路
1、直接requests爬取
发现行不通,这里直接在up主页面访问得不到每一个视频的BV号
看样子这个JS脚本是来用于检测用户的信息,并根据这些条件可能会重定向不同的页面。此外,还有一些用于页面性能监控、日志上报和webp图片格式支持检测的脚本。
本菜鸟技术有限,打不过就绕过,直接上selenium来获取所有视频的BV号
2、综合思路
selenium用来获取BV号,爬取到具体的信息保存下来,配合多线程、requests用来爬取具体视频的信息,提高爬取效率,最后用execl保存爬取到的所有信息
二、代码实现
1、初始化Selenium WebDriver:
使用Selenium的Chrome WebDriver来模拟用户在浏览器中的操作,这样可以处理JavaScript渲染的页面
class GetInfo():
def __init__(self, user_id):
self.a_list = [] # 存储每一个视频的url
self.d = webdriver.Chrome() # 初始化Chrome浏览器驱动
self.user_id = user_id
self.base_url = f'https://space.bilibili.com/{user_id}/video'
self.d.get(self.base_url)
#这篇文章写于2022年,当时B站免登入可以搜索视频,查看视频,但是这段时间再次尝试爬取资源时,加了必须认证登入,尝试过很多次,没有获取token,只能老老实实,登入后再去爬取信息
time.sleep(10)
print("速度扫码登入")
实际效果,是可以直接访问,
但是会弹出登入验证,甚至因为未登入无法获取视频列表,接口返回无权限,遇到这种情况,只能登入,尝试过selenium带cookie,但是没用,甚至登入后copy as curl再postman导入请求,都返回无权限,说明搜索接口再请求服务器时候,没有带上cookie
2、访问用户视频列表页面
这里使用XPath定位到视频列表的ul元素,然后遍历其中的li元素,提取每个视频的URL。
def get_url(self):
# 从当前页面获取所有视频的URL并保存到本地文件
ul=WebDriverWait(self.d, 10).until(lambda x: x.find_element(By.XPATH, '//*[@id="submit-video-list"]/ul[1]'))
lis = ul.find_elements(By.XPATH, "li")
for li in lis:
self.a_list.append(li.get_attribute("data-aid"))
with open("url.json", "w+", encoding="utf-8") as f:
data = json.dumps(self.a_list, ensure_ascii=False) # 确保中文字符正常保存
f.write(data) # 使用write而不是writelines
3、翻页处理
首先获取总页数,然后通过点击“下一页”按钮来遍历所有页面,并在每个页面上调用get_urls方法来提取视频URL
def next_page(self):
# 遍历所有页面,获取所有视频的URL)
total = WebDriverWait(self.d, 10).until(lambda x: x.find_element(By.XPATH, '//*[@id="submit-video-list"]/ul[3]/span[1]'))
number = re.findall(r"\d+", total.text)
total = int(number[0])
for page in range(1, total):
try:
self.d.find_element(By.LINK_TEXT, '下一页').click()
time.sleep(2) # 等待页面加载
self.get_url() # 修复方法名错误
except Exception as e:
print(f"Failed to click next page: {e}")
return self.a_list
4、获取视频详细信息
使用requests库来获取每个视频页面的HTML内容,然后调用parse_video_page方法来解析并提取视频的详细信息
def get_video(self, urls, start, end):
# 使用requests.Session()来复用TCP连接
with requests.Session() as session:
session.headers.update({
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/116.0.0.0 Safari/537.36"
})
base_url = "http://www.bilibili.com/video/"
# 预编译正则表达式以提高性能
title_pattern = re.compile(r'<title data-vue-meta="true">([^&]+)</title>')
play_count_pattern = re.compile(r'视频播放量 (\d+)')
danmu_count_pattern = re.compile(r'弹幕量 (\d+)')
like_count_pattern = re.compile(r'点赞数 (\d+)')
coin_count_pattern = re.compile(r'投硬币枚数 (\d+)')
favorite_count_pattern = re.compile(r'收藏人数 (\d+)')
share_count_pattern = re.compile(r'转发人数 (\d+)')
for url in urls[int(start):int(end)]:
full_url = base_url + url
try:
response = session.get(full_url)
if response.status_code == 200:
string = response.text
# 使用正则表达式提取视频信息
title_match = title_pattern.search(string)
title = title_match.group(1) if title_match else "未找到匹配的内容"
# 提取视频播放量、弹幕量等信息
play_count = play_count_pattern.search(string).group(1) if play_count_pattern.search(
string) else '0'
danmu_count = danmu_count_pattern.search(string).group(1) if danmu_count_pattern.search(
string) else '0'
like_count = like_count_pattern.search(string).group(1) if like_count_pattern.search(
string) else '0'
coin_count = coin_count_pattern.search(string).group(1) if coin_count_pattern.search(
string) else '0'
favorite_count = favorite_count_pattern.search(string).group(
1) if favorite_count_pattern.search(string) else '0'
share_count = share_count_pattern.search(string).group(1) if share_count_pattern.search(
string) else '0'
# 将提取的信息添加到self.data_list中
video_info = {
"url":full_url,
"title": title,
"play_count": play_count,
"danmu_count": danmu_count,
"like_count": like_count,
"coin_count": coin_count,
"favorite_count": favorite_count,
"share_count": share_count
}
self.data_list.append(video_info)
else:
print(f"Failed {full_url}: HTTP {response.status_code}")
except Exception as e:
print(f"Failed to get video info for url {full_url}: {e}")
5、保存数据
导入openpyxl库,用于将数据保存到Excel
def save_to_excel(self, filename):
wb = Workbook()
ws = wb.active
ws.append(['url','标题', '播放量', '弹幕数', '点赞数', '投币数', '收藏数', '分享数']) # 添加表头
for video_info in self.data_list:
ws.append([
video_info['url'],
video_info['title'],
video_info['play_count'],
video_info['danmu_count'],
video_info['like_count'],
video_info['coin_count'],
video_info['favorite_count'],
video_info['share_count']
])
wb.save(filename)
6、定义一个运行函数串联所有方法功能
def run(self):
# 运行整个流程
self.get_url() # 获取当前页面的视频URL
self.next_page() # 遍历所有页面获取视频URL
with open("url.json", "r", encoding="utf-8") as f:
data = json.load(f)
# 使用多线程提高数据获取效率
threads = []
part = int(len(data) / 3)
for i in range(3):
start = i * part
end = (i + 1) * part if i != 2 else len(data)
thread = threading.Thread(target=self.get_video, args=(data, start, end))
threads.append(thread)
thread.start()
for thread in threads:
thread.join() # 等待所有线程完成
# 所有线程完成后,保存数据到Excel
self.save_to_excel('final_video_info1.xlsx')
三、实际运行效果
if __name__ == '__main__':
obj = GetInfo()
obj.run() # 运行整个流程
1、selenium爬取视频BV号
自动翻页至最后一页
2、BV号的json文件
json文件列表长度和视频号一致
3、execl文件信息
有了这些文件,想统计具体的数据,就很轻松
开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!
更多推荐
所有评论(0)