Python爬虫实战---爬取豆瓣瓣电影排行前250的字段并写入MySQL数据库
爬取豆瓣电影Top250要求:1、提取:电影名,导演,主演,拍摄时间,拍摄地,电影类型,评分,评论人数,电影宣传图片的url。2、将提取的数据保存到mysql数据库3、下载电影宣传图片环境配置:Anaconda3 + MySql8.0使用的IDE wingIDE 和 Nvicat Premium 12简要过程:1.访问URL,获取总电影数和每页电影数2.使用生成器生成每一个页面URL,获取:电影名
·
爬取豆瓣电影Top250
要求:
1、提取:电影名,导演,主演,拍摄时间,拍摄地,电影类型,评分,评论人数,电影宣传图片的url。
2、将提取的数据保存到mysql数据库
3、下载电影宣传图片
环境配置:Anaconda3 + MySql8.0
使用的IDE wingIDE 和 Nvicat Premium 12
简要过程:
1.访问URL,获取总电影数和每页电影数
2.使用生成器生成每一个页面URL,获取:电影名、导演评分、评论人数、电影宣传图片的url
3.访问单个电影的URL,获取:主演、拍摄地、电影类型 、拍摄时间
4.保存电影宣传图片
5.数据入库
效果图
1.dbconfig.json 数据库配置文件
{
"ip": "127.0.0.1",
"port": "3306",
"user": "root",
"pwd": "123456",
"db": "python_test",
"charset": "utf8mb4"
}
2.config.py 解析配置文件
# -*- coding:utf-8 -*-
import json
class DBConfig:
"""
解析连接数据库的配置信息
"""
def __init__(self,file):
"""初始化方法"""
self.flie = file
def config(self):
"""解析文件信息"""
with open(self.flie) as f:
config_info = json.loads(f.read())
return config_info
if __name__ == "__main__":
#config_info = DBConfig("dbconfig.json")
#print(config_info.config())
3.dboperation.py 数据库操作类
# -*- coding:utf-8 -*-
import logging
import pymysql
from config import DBConfig
logging.basicConfig(format="%(asctime)s %(message)s",datefmt="%Y-%m-%d %I:%M:%S %p")
class OperatonBD:
def __init__(self,config1):
self.config1 = config1
self.connect = self.connection_db()
def connection_db(self):
try:
connect = pymysql.connect(host = self.config1["ip"],
port = eval(self.config1["port"]),
user = self.config1["user"],
password = self.config1["pwd"],
db = self.config1["db"],
charset=self.config1["charset"])
logging.warning("数据库连接成功")
return connect
except Exception as e:
logging.error("数据库连接失败",e)
return None
def search_db(self,search_sql):
try:
cursor = self.connect.cursor()
cursor.execute(search_sql)
data = cursor.fetchall()
logging.warning("数据查询成功")
return data
except Exception as e:
logging.error("查询失败",e)
return None
def update_db(self,update_sql):
try:
cursor = self.connect.cursor()
cursor.execute(update_sql)
self.connect.commit()
logging.warning("数据插入成功")
except Exception as e:
logging.error("数据更新失败",insert_sql,e)
self.connect.rollback()
return None
def create_db(self,creat_sql):
try:
cursor = self.connect.cursor()
cursor.execute(creat_sql)
logging.warning("数据表创建成功")
except Exception as e:
logging.error("数据表创建失败",e)
def __del__(self):
self.connect.close()
#if __name__ == "__main__":
#configflie = "dbconfig.json"
#configs = DBConfig(configflie)
#instance = OperatonBD(configs.config())
#instance.connection_db()
#instance.search_db("show databases;")
#instance.update_db("insert into test111 values('23231322111','zhagnshan')")
#instance.create_db("create table if not exists shy111(id int(10),name varchar(100))")
4. sqls.py sql语句类
# -*- coding:utf-8 -*-
class Sql:
#添加数据
def insert_sqls(datas):
insert_sql = f'insert into moviesDB(name1,director,actor,time,adress,movie_type,review,review_store,images) values("{datas[i][0]}","{datas[i][1]}","{datas[i][2]}","{datas[i][3]}","{datas[i][4]}","{datas[i][5]}","{datas[i][6]}","{datas[i][7]}","{datas[i][8]}")'
return insert_sql
#创建表
def create_sqls():
create_sql = """
CREATE TABLE IF NOT EXISTS moviesDB (
id int PRIMARY KEY AUTO_INCREMENT,
name1 text COMMENT "电影名",
director varchar(100) COMMENT "导演",
actor text COMMENT "主演",
time VARCHAR(30) COMMENT "拍摄时间",
adress text COMMENT "拍摄地",
movie_type VARCHAR(30) COMMENT "电影类型",
review VARCHAR(30) COMMENT "评分",
review_store VARCHAR(30) COMMENT "评论人数",
images VARCHAR(1000) COMMENT "电影宣传图片的url"
)
"""
return create_sql
5.myspiderClass.py 主程序
# -*- coding:utf-8 -*-
"""
提取的字段:电影名,导演,主演,拍摄时间,拍摄地,电影类型,评分,评论人数,电影宣传图片的url。
"""
import time
import requests
from lxml import etree
import re
import os
import logging
from dboperation import OperatonBD
from config import DBConfig
from sqls import Sql
logging.basicConfig(format="%(asctime)s %(message)s",datefmt="%Y-%m-%d %I:%M:%S %p") #格式化日志
class Movies:
def __init__(self,instance,urls,data,header):
self.urls = urls
self.data = data
self.header = header
self.instance = instance
self.sql = Sql
self.instance.create_db(self.sql.create_sqls())
def it_url(self,count,n):
"""
定义一个生成器,用于生成每页url的data参数
"""
for i in range(0,count+1,n):
data["start"] = i
yield data
def movies_count(self):
"""用于获取电影的总数"""
try:
response_0 = requests.get(self.urls,headers=self.header)
myhtml_0 = etree.HTML(response_0.text)
b = myhtml_0.xpath("//span[@class='count']/text()") #['(共250条)']
counts = eval(re.search("\d+",myhtml_0.xpath("//span[@class='count']/text()")[0]).group()) #250
n = len(myhtml_0.xpath("//div/a/span[1][@class='title']/text()"))
return counts,n
except Exception as e:
logging.WARNING("请求访问失败",e)
return None
def movies_url(self):
"""获取html"""
count,n = self.movies_count()
try:
for dt in self.it_url(count,n):
global k
response = requests.get(self.urls,headers=self.header,params=dt)
#print(response.url) #用于校验url是否正确
if response.status_code == requests.codes.ok:
html = response.text
if k <= count/n:
logging.warning(f"第{k}请求成功,URL为{response.url}")
self.parsing(html) #调用解析html方法
k = k + 1
except:
return None
def parsing(self,html):
"""用于解析html"""
myhtml = etree.HTML(html)
name = myhtml.xpath("//div[@class='hd']/a/span[1][@class='title']/text()")
directors = myhtml.xpath("//div[@class='bd']/p/text()")
director = [ i for i in directors if "导演" in i] # 取出列表中无导演的字段
dr=director[1][director[1].find(":")+1:director[1].find("主演")].strip() #找出导演
director_list = [ director[i][director[i].find(":")+1:director[i].find("主")].strip() for i in range(len(director))] #导演列表
url_1 = myhtml.xpath("//div[@class='hd']/a/@href") #返回电影的子列表
actor = []
time = []
adress = []
movie_type = []
for url in url_1:
html_1 = requests.get(url,headers=header).text
html_1s = etree.HTML(html_1)
actor.append(html_1s.xpath("//span[@class='actor']/span/a/text()")) #主演
time.append(list(map(lambda x:x.strip("()"),html_1s.xpath("//span[@class='year']/text()")))) #拍摄时间
ad = re.findall('<span class="pl">制片国家/地区:</span>(.*?)<br/>', html_1) #找出拍摄地
adress.append(ad)
movie_type.append(html_1s.xpath("//span[@property='v:genre']/text()")) #电影类型
review = myhtml.xpath("//span[@class='rating_num']/text()") #评分
#b = myhtml.xpath("//div[@class='star']/span[4]/text()") #评论人数列表 '2100959人评价'
review_store = list(map(lambda x:eval(x.replace("人评价","")),myhtml.xpath("//div[@class='star']/span[4]/text()")))
images = myhtml.xpath("//li/div/div/a/img/@src") #图片列表
#电影名,导演,主演,拍摄时间,拍摄地,电影类型,评分,评论人数,电影宣传图片的url
datas = [i for i in zip(name,director_list,actor,time,adress,movie_type,review,review_store,images)]
self.save_date(datas)
def save_date(self,datas):
try:
for i in range(len(datas)):
insert_sql = f'insert into moviesDB(name1,director,actor,time,adress,\movie_type,review,review_store,images) values("{datas[i][0]}","{datas[i][1]}","{datas[i][2]}","{datas[i][3]}","{datas[i][4]}","{datas[i][5]}","{datas[i][6]}","{datas[i][7]}","{datas[i][8]}")'
self.instance.update_db(insert_sql)
time.sleep(2) #防止时间太频繁,造成封IP
image_url = datas[i][8] #获取图片的url
filename = f"./images/{datas[i][0]}.png" #获取图片的名称
self.download_image(image_url,filename) #下载图片
except Exception as e:
print("sql语句错误",e)
def download_image(self,image_url,filename):
if not os.path.isdir("images"):
os.mkdir("images")
r = requests.get(image_url)
with open(filename,"wb") as F:
F.write(r.content)
logging.warning("图片保存成功")
if __name__ == "__main__":
k = 1
configfile = "dbconfig.json"
configs = DBConfig(configfile)
instance = OperatonBD(configs.config())
header = {"user-agent":"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko)\
Chrome/84.0.4147.105 Safari/537.36"}
data = {"start":0,"filter":""}
urls = "https://movie.douban.com/top250"
m = Movies(instance,urls,data,header)
m.movies_url()
注:纸上得来终觉浅,绝知此事要躬行。
开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!
更多推荐
已为社区贡献3条内容
所有评论(0)