一、创作来源
最近搞数据分析需要爬取B站上相关视频的内容,但打开两年前的代码却发现已经跑不通了,或者说根本就是漏洞百出。经过一段时间的缝缝补补,我发现是B站的网页代码更换的原因。(应该是吧,不确定哈!)由于当时写代码的时候也是东抄西抄,最后搞得自己也看不懂是什么意思(鬼知道当时的程序怎么跑起来的)。索性从头来过,自己学自己写。
二、第一部分:利用Selenium获取BV_ID
对于B站视频来说,只要知道了他的BV号就相当于一个人你知道了他的身份证号,想要知晓他的更多的信息也就不是什么难事儿了,因此在本文中,我们要进行的第一步就是获取到我们想要爬取信息的B站视频的身份证——BV_ID。
这是第一步,也是最关键的一步。这个时候就需要用到selenium这个库,关于这个库的详细介绍和科普可以去别的博客下了解,这里我们不过多赘述。
Selenium库的安装:
pip install selenium
具体使用方法如下:
引入必要的库
from selenium import webdriverfrom selenium.webdriver.common.by import Byfrom selenium.webdriver.chrome.options import Options
定义一个selenium的爬虫
这里只用关键词作为形参,其余的部分都可以以这些关键词为核心进行生成
def spider_bvid(keyword):"""利用seleniume获取搜索结果的bvid,供给后续程序使用:param keyword: 搜索关键词:return: 生成去重的output_filename = f'{keyword}BV号.csv'"""
首先,定义一个用于写入爬取结果的文件,这里因为数据量大,我选择了csv文件,大家在自己运行的时候,可以选择自己喜欢的文件形式。
接着设置无界面爬虫,窗口大小并禁用gpu加速,以减少浏览器的内存占用,防止出现浏览器崩溃的情况。
# 保存的文件名input_filename = f'{keyword}BV号.csv'# 启动爬虫options = Options()options.add_argument('--headless')options.add_argument('--disable-gpu')browser = webdriver.Chrome(options=options)# 设置无界面爬虫browser.set_window_size(1400, 900)# 设置全屏,注意把窗口设置太小的话可能导致有些button无法点击browser.get('https://bilibili.com')# 刷新一下,防止搜索button被登录弹框遮住browser.refresh()print("============成功进入B站首页!!!===========")
利用网页元素定位找到B站首页的搜索框和搜索按钮,输入我们要搜索的关键词,确定点击搜索按钮。
input = browser.find_element(By.CLASS_NAME, 'nav-search-input')button = browser.find_element(By.CLASS_NAME, 'nav-search-btn')# 输入关键词并点击搜索input.send_keys(keyword)button.click()print(f'==========成功搜索{keyword}相关内容==========')
成功进入搜索结果页面后,我们本来的技术思路是:1、根据网页元素定位,css、xpath或其他方法定位到页面最下方的页数box和下一页box;2、通过获取最后一页的box中的text值和下一页的text值,利用循环不断模拟点击下一页,从而达到爬取所有结果页面的内容的目的。
但B站网页代码更改后,显示为34页,网页内容检查后显示为42页(至多),因而会导致页面定位的不同,在我个人运行程序的过程中,出现过第一个关键词搜索可以定位到最大页数box,但是利用循环输入的下一个keyword就无法定位到相同的box。
因而我们更换思路:B站搜索结果显示的并非全部内容,而是至多一千多个,也就是说页面数最大是42页。因而当我们手动搜索后发现结果页面页数较多时,可以直接设置最大页数为42。
同样,由于本小白并不会定位这个新的下一页按钮,因而我通过循环输入页码数,拼成搜索的网页URL进而达到与模拟点击下一页相同的效果。但这种操作方法的结果就是会出现重复爬取第情况,因而需要在最后进行去重操作。
另外值得一说的是,如果各位像研究元素定位到话,我之前看到本站里有一位大佬“潘帕斯的雄鹰”,他写了一个只用selenium进行滚动爬取搜索结果下所有视频的一级、二级评论的博客。代码是开源的,那个里面的定位和断点续爬都做得很好,有时间的话可以研究一下。
# 设置窗口all_h = browser.window_handlesbrowser.switch_to.window(all_h[1])# B站最多显示42页total_page = 42# 同样由于B站网页代码的更改,通过找到并点击下一页的方式个人暂不能实现#(对,不会分析那个破网页!!!)for i in range(0, total_page):# url 需要根据不同关键词进行调整内容!!!# 这里的url需要自己先搜索一下然后复制网址进来url = (f"https://search.bilibili.com/all?keyword={keyword}" f"&from_source=webtop_search&spm_id_from='你自己的'&search_source='你自己的'&page={i}")print(f"===========正在尝试获取第{i + 1}页网页内容===========")print(f"===========本次的url为:{url}===========")browser.get(url)# 这里请求访问网页的时间也比较久(可能因为我是macos),所以是否需要等待因设备而异# 取消刷新并长时间休眠爬虫以避免爬取太快导致爬虫抓取到js动态加载源码# browser.refresh()print('正在等待页面加载:3')time.sleep(1)print('正在等待页面加载:2')time.sleep(1)print('正在等待页面加载:1')time.sleep(1)
能够顺利获取到所有页面结果之后,我们就可以直接分析页面,因为我们只需要获取到BV号就可以,因此并不需要重复爬取一些后面可以轻易获得的数据。
这里直接使用bs4对页面进行分析,直接定位到card中的herf,获取每个视频的详情页URL,在这个URL中可以拆分出我们需要的BV号。
# 直接分析网页html = browser.page_source# print("网页源码" + html) 用于判断是否获取成功soup = BeautifulSoup(html, 'lxml')infos = soup.find_all(class_='bili-video-card')bv_id_list = []for info in infos:# 只定位视频链接href = info.find('a').get('href')# 拆分split_url_data = href.split('/')# 利用循环删除拆分出现的空白for element in split_url_data:if element == '':split_url_data.remove(element)# 打印检验内容# print(split_url_data)# 获取bvidbvid = split_url_data[2]# 利用if语句直接去重if bvid not in bv_id_list:bv_id_list.append(bvid)for bvid_index in range(0, len(bv_id_list)):# 写入 input_filenamewrite_to_csv_bvid(input_filename, bv_id_list[bvid_index])# 输出提示进度print('写入文件成功')print("===========成功获取第" + str(i + 1) + "次===========")time.sleep(1)i += 1# 退出爬虫browser.quit()# 打印信息显示是否成功print(f'==========爬取完成。退出爬虫==========')
写入文件后,我们就能得到去重之后的BV号了,下面就可以通过BV号来爬取我们需要的视频的基本信息了。
三、第二部分 Request函数请求访问
Request函数不用多说,涉及到爬虫的程序大多都会用到,这里也不再赘述。另一方面,使用request函数的原因是bilibili的api开放接口可以轻松地获取到我们想要的信息。
这里也可以使用bilibili-api库,但本文不使用的原因是:bilibili-api库在获取视频详细信息时需要进行异步请求,但直接在循环中调用异步请求的函数会导致各种报错,小白肯定是无法解决的,哪怕翻阅资料也很难看懂。(对,就是我没看懂。)例如:aiohttp.client_exceptions.ServerDisconnectedError: Server disconnected、pipe broken等。
因此这里用比较笨的方法,通过BV号拼接成视频数据api接口的URL在进行访问,返回的页面转成json格式,然后直接读取json字典中的值,进行调用。
这里想要了解更多B站API接口的,可以去Github上查看。
https://github.com/SocialSisterYi/bilibili-API-collect
此处的关于视频信息的API接口URL为:
#A EXAMPLE : https://api.bilibili.com/x/web-interface/view?bvid=BV1n24y1D75Vapi_url = f'https://api.bilibili.com/x/web-interface/view?bvid={bv_id}'
写在最前面:使用Request函数时,一定要注意时间间隔。如果你的设备接入URL很快,那么可以适当增加间隔,如果接入较慢,可以适当减少间隔。Time.sleep()的数值至少要大于1.5s,不然轻则报错,重则被叔叔封网络IP。(当然,如果真的被封IP了,换个网络环境就行了。比如你在家里用的无线网,那么切换到自己的手机流量热点就可以解决。)
requests.exceptions.SSLError: HTTPSConnectionPool(host='api.bilibili.com', port=443)
具体的调用代码为:
首先,我们需要编写我们自己的请求头。User-Agenta是我们自己的网络代理,可以有chrome、firefox等,Cookies就是网站获取到一些记录文件,主要用于识别。这两个值都可以通过网页抓包来获取。以Chrome为例,登录B站后,点开任意一个视频播放后,按下F12(win)或option+command+I(mac)后,进入network部分,尽量找到total?list开头的js文件,里面就可以比较轻松地找到我们需要的这两个值。
接着,传入BV号后,拼接成可用的URL后,可以自己拼好后先用浏览器打开进行检验和分析一下,以确保URL的有效性。之后,利用json库返回json形式的网页源码。返回的值基本是字典,很好操作。
def get_video_info(bv_id):headers = {'User-Agent': "你的",'Cookie': "你的"}api_url = f'https://api.bilibili.com/x/web-interface/view?bvid={bv_id}'# 打印本次要获取的bvid,用于错误时确认print(f"正在进行爬取uid为:{bv_id}的UP主的粉丝数量与作品总数")print(f"==========本次获取数据的视频BV号为:{bv_id}==========")print(f"url为:{api_url}")# https://api.bilibili.com/x/web-interface/view?BV1n24y1D75Vvideo_info = requests.get(url=api_url, headers=headers)video_info_json = json.loads(video_info.text)
得到网页源码后,我们需要的值都存放在”data”标签中,直接根据我们对需要进行调用就可以。我这里是新建了一个字典,进行存储值,大家也可以不用这么麻烦。对于相关值,英文名对应的中文意义,可以参考这篇知乎专栏的介绍,也可在上面的github文档中进行查看。
# 创建存放的字典info_dict = {}# 信息解读# https://zhuanlan.zhihu.com/p/618885790# 视频bvid,即bv号bvid = video_info_json['data']['bvid']info_dict['bvid'] = bvid# 视频aid,即av号aid = video_info_json['data']['aid']info_dict['aid'] = aid# 视频cid,用于获取弹幕信息cid = video_info_json['data']['cid']info_dict['cid'] = cid# 作者idmid = video_info_json['data']['owner']['mid']info_dict['mid'] = mid# up主昵称name = video_info_json['data']['owner']['name']info_dict['name'] = name# 视频标题title = video_info_json['data']['title']info_dict['title'] = title# 视频标签tname = video_info_json['data']['tname']info_dict['tname'] = tname# 视频发布时间戳pubdate = video_info_json['data']['pubdate']# 转化时间戳pub_datatime = datetime.fromtimestamp(pubdate)# 整体格式pub_datatime_strf = pub_datatime.strftime('%Y-%m-%d %H:%M:%S')# 日期date = re.search(r"(\d{4}-\d{1,2}-\d{1,2})", pub_datatime_strf)info_dict['pub_date'] = date.group()# 时间pub_time = re.search(r"(\d{1,2}:\d{1,2}:\d{1,2})", pub_datatime_strf)info_dict['pub_time'] = pub_time.group()# 视频创建时间戳# ctime = info['ctime']# 视频简介desc = video_info_json['data']['desc']info_dict['desc'] = desc# 视频播放量view = video_info_json['data']['stat']['view']info_dict['view'] = view# 点赞数like = video_info_json['data']['stat']['like']info_dict['like'] = like# 投币数coin = video_info_json['data']['stat']['coin']info_dict['coin'] = coin# 收藏数favorite = video_info_json['data']['stat']['favorite']info_dict['favorite'] = favorite# 分享数share = video_info_json['data']['stat']['share']info_dict['share'] = share# 评论数repiy = video_info_json['data']['stat']['reply']info_dict['reply'] = repiy# 视频弹幕数量danmaku = video_info_json['data']['stat']['danmaku']info_dict['danmaku'] = danmakuprint(f'=========={bv_id} 的视频基本信息已成功获取==========')# 发布作品时的动态# dynamic = info['dynamic']print('正在等待,以防访问过于频繁\n')time.sleep(3)return info_dict
如此就返回了带有我们数据的字典,后续可以直接调用。
获取UP主信息的整体思路相同,这里就不再赘述,直接贴上代码:
def get_user_info(uid):"""通过uid(即mid)获取UP主的粉丝总数和作品总数:param uid: mid:return:user_info_dict"""# 定义空字典用于存放数据# 粉丝数 follower# 作品总数 archiveuser_info_dict = {}# 首先写入请求头# 设置用户代理 User_Agent及Cookiesheaders = {'User-Agent': "",'Cookie': ""}# 将传入的的uid组成up主主页的api_url# A Example: https://api.bilibili.com/x/web-interface/card?mid=1177893348api_url = f'https://api.bilibili.com/x/web-interface/card?mid={uid}'# https://api.bilibili.com/x/web-interface/view?BV1n24y1D75V# 打印次数,数据量大,便于查看进程print(f"正在进行爬取uid为:{uid}的UP主的粉丝数量与作品总数")# 打印本次要获取的uid,用于错误时确认print(f"==========本次获取数据的up主的uid为:{uid}==========")print(f"url为{api_url}")# 利用requests进行访问,并返回需要的封装信息up_info = requests.get(url=api_url, headers=headers)# 不知道会不会被封ip,保险起见# time.sleep(2)# 将数据转化为json格式up_info_json = json.loads(up_info.text)# 利用json定位相关数据fans_number = up_info_json['data']['card']['fans']user_info_dict['follower'] = fans_numberarchive_count = up_info_json['data']['archive_count']user_info_dict['archive'] = archive_countprint(f'=========={bv_id} 的作者基本信息已成功获取==========\n')# 等待print('正在等待,以防访问过于频繁\n')time.sleep(1.5)return user_info_dict
四、第三部分 最后的调用
上面的函数都写好之后,我们只需要创建一个主入口,之后直接调用函数就可以。
if __name__ == '__main__':# 针对不同内容修改搜索关键词!!!!keywords = ["1", "2"]for keyword in keywords:# 自动爬取多个主题时须注意上面的最大页数定位问题# 爬取后生成去重了的len(keywords)个f'{keyword}BV号.csv'文件spider_bvid(keyword)for keyword in keywords:# 拼接成文件名csv_to_merge = f'{keyword}BV号.csv'# 合并后生成未去重的文件merge_csv(input_filename=csv_to_merge, output_filename='BV号合并.csv')# 遍历读取bv_idfilename = 'BV号合并.csv'# 打开文件并去重open_csv = pd.read_csv(filename)open_csv.drop_duplicates(subset='BV号')bv_id_list = np.array(open_csv['BV号'])"""# 第一次调用,若读取csv进行爬取时,意外中断# 则更改为读取txt文本,将已爬取第bvid删除,以达到断点续爬的目的for bvid in bv_id_list:with open("bv_id_list.txt", 'a') as f:f.write(bvid+'\n')"""with open("bv_id_list.txt", 'r') as f:bv_id_list = f.readlines()# 循环写入内容for i in range(0, len(bv_id_list)):bv_id = bv_id_list[i]print(f'正在进行第{i+1}次爬取\n')# 获取视频所有的基本信息video_info = get_video_info(bv_id)bvid = video_info['bvid']aid = video_info['aid']cid = video_info['cid']mid = video_info['mid']name = video_info['name']title = video_info['title']tname = video_info['tname']pub_date = video_info['pub_date']pub_time = video_info['pub_time']desc = video_info['desc']view = video_info['view']like = video_info['like']coin = video_info['coin']favorite = video_info['favorite']share = video_info['share']reply = video_info['reply']danmaku = video_info['danmaku']# 传播效果计算公式Communication_Index = math.log(0.5 * int(view) + 0.3 * (int(like) + int(coin) + int(favorite)) + 0.2 * (int(reply) + int(danmaku)))# 获取作者的相关信息user_info = get_user_info(uid=mid)follower = user_info['follower']archive = user_info['archive']write_to_csv(filename='视频基本信息.csv', bvid=bvid, aid=aid, cid=cid, mid=mid, name=name, follower=follower, archive=archive, title=title, tname=tname, pub_date=pub_date, pub_time=pub_time, desc=desc, view=view, like=like, coin=coin, favorite=favorite, share=share, reply=reply, danmaku=danmaku, communication_index=Communication_Index)print(f'==========第{i+1}个BV号:{bv_id}的相关数据已写入csv文件中==========')print('==================================================\n')
五、完整代码
需要注明的一些事情:
1、writr_to_csv()函数是抄的大佬的代码,虽然后面else基本用不到,但是前面的真的很好用,大家可以自己理解一下再改一改。
2、关于传播效果计算公式,这个是引用自己大佬的论文,如果有需要请注明引用,学术不端是很严重的事情。
引用:陈强,张杨一,马晓悦,等.政务B站号信息传播效果影响因素与实证研究[J].图书情报工作,2020,64(22) :126 – 134.
3、代码中的一些部分,我是以非常笨的方法解决到,有同学优化了之后请贴在评论区交流学习。
# -*- coding: utf-8 -*-"""@ Project : pythonProject@ File : spider bilibi.py@ IDE : PyCharm@ Auther : Avi-OvO-CreapDiem@ Date : 2023/9/2 08:49@ Purpose : """import reimport osimport csvimport timeimport mathimport jsonimport requestsimport numpy as npimport pandas as pdfrom datetime import datetimefrom bs4 import BeautifulSoupfrom selenium import webdriverfrom selenium.webdriver.common.by import Byfrom selenium.webdriver.chrome.options import Optionsdef merge_csv(input_filename, output_filename):"""读取csv文件内容,并写入新的文件:param input_filename: 传入的文件名称:param output_filename: 写入的新文件的名称:return: 向新文件中写入input_filename中的内容"""# 读取文件csv_data_read = pd.read_csv(input_filename)# 获取文件总行数number_of_row = (len(csv_data_read))# 循环该csv文件中的所有行,并写入信息for i in range(0, number_of_row):row_info = csv_data_read.values[i]# 输出查看内容# print(row_info)# 具体内容row_content = row_info[0]# 写入write_to_csv_bvid(output_filename, row_content)# 退出循环# 打印进度print(f'成功向{output_filename}中写入了{input_filename}的全部信息')def write_to_csv_bvid(input_filename, bvid):"""写入新的csv文件,若没有则创建,须根据不同程序进行修改:param input_filename: 写入的文件名称:param bvid: BV号:return: 生成写入的input_filename文件"""# OS 判断路径是否存在file_exists = os.path.isfile(input_filename)# 设置最大尝试次数max_retries = 50retries = 0while retries < max_retries:try:with open(input_filename, mode='a', encoding='utf-8', newline='') as csvfile:fieldnames = ['BV号']writer = csv.DictWriter(csvfile, fieldnames=fieldnames)if not file_exists:writer.writeheader()writer.writerow({'BV号': bvid})# print('写入文件成功')break# 如果成功写入,跳出循环except PermissionError as e:retries += 1print(f"将爬取到的数据写入csv时,遇到权限错误Permission denied,文件可能被占用或无写入权限: {e}")print(f"等待3s后重试,将会重试50次... (尝试 {retries}/{max_retries})")time.sleep(3)# 等待10秒后重试else:print("将爬取到的数据写入csv时遇到权限错误,且已达到最大重试次数50次,退出程序")def spider_bvid(keyword):"""利用seleniume获取搜索结果的bvid,供给后续程序使用:param keyword: 搜索关键词:return: 生成去重的output_filename = f'{keyword}BV号.csv'"""# 保存的文件名input_filename = f'{keyword}BV号.csv'# 启动爬虫options = Options()options.add_argument('--headless')options.add_argument('--disable-gpu')browser = webdriver.Chrome(options=options)# 设置无界面爬虫browser.set_window_size(1400, 900)# 设置全屏,注意把窗口设置太小的话可能导致有些button无法点击browser.get('https://bilibili.com')# 刷新一下,防止搜索button被登录弹框遮住browser.refresh()print("============成功进入B站首页!!!===========")input = browser.find_element(By.CLASS_NAME, 'nav-search-input')button = browser.find_element(By.CLASS_NAME, 'nav-search-btn')# 输入关键词并点击搜索input.send_keys(keyword)button.click()print(f'==========成功搜索{keyword}相关内容==========')# 设置窗口all_h = browser.window_handlesbrowser.switch_to.window(all_h[1])"""# 这里可以通过xpath或者其他方法找到B站搜索结果页最下方的页码数值# 但B站网页代码更改后,显示为34页,网页内容检查后显示为42页(至多)# 由于我们的搜索结果很多,肯定超出B站最大显示的42页,故而直接设置最大页数为42# 找到最后一个页码所在位置,并获取值# total_btn = browser.find_element(By.XPATH,"//*[@id="i_cecream"]/div/div[2]/div[2]/div/div/div/div[4]/div/div/button[9]"")# //*[@id="i_cecream"]/div/div[2]/div[2]/div/div/div/div[4]/div/div/button[9]# total = int(total_btn)# print(f'==========成功搜索! 总页数: {total}==========')"""# B站最多显示42页total_page = 42# 同样由于B站网页代码的更改,通过找到并点击下一页的方式个人暂不能实现(对,不会分析那个破网页!!!)# 因此这里利用总页数进行循环访问来实现自动翻页的效果for i in range(0, total_page):# url 需要根据不同关键词进行调整内容!!!url = (f"https://search.bilibili.com/all?keyword={keyword}" f"&from_source=webtop_search&spm_id_from=333.1007&search_source=5&page={i}")print(f"===========正在尝试获取第{i + 1}页网页内容===========")print(f"===========本次的url为:{url}===========")browser.get(url)# 这里请求访问网页的时间也比较久(可能因为我是macos),所以是否需要等待因设备而异# 取消刷新并长时间休眠爬虫以避免爬取太快导致爬虫抓取到js动态加载源码# browser.refresh()print('正在等待页面加载:3')time.sleep(1)print('正在等待页面加载:2')time.sleep(1)print('正在等待页面加载:1')time.sleep(1)# 直接分析网页html = browser.page_source# print("网页源码" + html) 用于判断是否获取成功soup = BeautifulSoup(html, 'lxml')infos = soup.find_all(class_='bili-video-card')bv_id_list = []for info in infos:# 只定位视频链接href = info.find('a').get('href')# 拆分split_url_data = href.split('/')# 利用循环删除拆分出现的空白for element in split_url_data:if element == '':split_url_data.remove(element)# 打印检验内容# print(split_url_data)# 获取bvidbvid = split_url_data[2]# 利用if语句直接去重if bvid not in bv_id_list:bv_id_list.append(bvid)for bvid_index in range(0, len(bv_id_list)):# 写入 input_filenamewrite_to_csv_bvid(input_filename, bv_id_list[bvid_index])# 输出提示进度print('写入文件成功')print("===========成功获取第" + str(i + 1) + "次===========")time.sleep(1)i += 1# 退出爬虫browser.quit()# 打印信息显示是否成功print(f'==========爬取完成。退出爬虫==========')def write_to_csv(filename, bvid, aid, cid, mid, name, follower, archive, title, tname, pub_date, pub_time, desc, view, like, coin, favorite, share, reply, danmaku, communication_index):"""向csv文件中写入B站视频相关的基本信息,未按路径找到文件,则新建文件:param filename: 写入数据的文件名:param bvid: BV号:param aid: AV号:param cid: 用于获取弹幕文本的:param mid: UP主的ID:param name: UP主名称:param follower: UP主粉丝数:param archive: UP主作品总数:param title: 标题:param tname: tag名称:param pub_date: 发布日期:param pub_time: 发布时间:param desc: 视频简介:param view: 播放量:param like: 点赞数:param coin: 投币数:param favorite: 收藏数:param share: 分享数:param reply: 评论数:param danmaku: 弹幕数:param communication_index: 传播效果公式的值:return:"""file_exists = os.path.isfile(filename)max_retries = 50retries = 0while retries < max_retries:try:with open(filename, mode='a', encoding='utf-8', newline='') as csvfile:fieldnames = ['BV号', 'AV号', 'CID', 'UP主ID', 'UP主名称', 'UP主粉丝数', '作品总数', '视频标题','视频分类标签','发布日期', '发布时间', '视频简介', '播放量', '点赞数', '投币数', '收藏数', '分享数','评论数','弹幕数', '传播效果指数']writer = csv.DictWriter(csvfile, fieldnames=fieldnames)if not file_exists:writer.writeheader()writer.writerow({'BV号': bvid, 'AV号': aid, 'CID': cid, 'UP主ID': mid, 'UP主名称': name, 'UP主粉丝数': follower,'作品总数': archive, '视频标题': title, '视频分类标签': tname, '发布日期': pub_date,'发布时间': pub_time,'视频简介': desc, '播放量': view, '点赞数': like, '投币数': coin, '收藏数': favorite,'分享数': share,'评论数': reply, '弹幕数': danmaku, '传播效果指数': communication_index})break# 如果成功写入,跳出循环except PermissionError as e:retries += 1print(f"将爬取到的数据写入csv时,遇到权限错误Permission denied,文件可能被占用或无写入权限: {e}")print(f"等待3s后重试,将会重试50次... (尝试 {retries}/{max_retries})")else:print("将爬取到的数据写入csv时遇到权限错误,且已达到最大重试次数50次,退出程序")def get_user_info(uid):"""通过uid(即mid)获取UP主的粉丝总数和作品总数:param uid: mid:return:user_info_dict"""# 定义空字典用于存放数据# 粉丝数 follower# 作品总数 archiveuser_info_dict = {}# 首先写入请求头# 设置用户代理 User_Agent及Cookiesheaders = {'User-Agent': "",'Cookie': ""}# 将传入的的uid组成up主主页的api_url# A Example: https://api.bilibili.com/x/web-interface/card?mid=1177893348api_url = f'https://api.bilibili.com/x/web-interface/card?mid={uid}'# https://api.bilibili.com/x/web-interface/view?BV1n24y1D75V# 打印次数,数据量大,便于查看进程print(f"正在进行爬取uid为:{uid}的UP主的粉丝数量与作品总数")# 打印本次要获取的uid,用于错误时确认print(f"==========本次获取数据的up主的uid为:{uid}==========")print(f"url为{api_url}")# 利用requests进行访问,并返回需要的封装信息up_info = requests.get(url=api_url, headers=headers)# 不知道会不会被封ip,保险起见# time.sleep(2)# 将数据转化为json格式up_info_json = json.loads(up_info.text)# 利用json定位相关数据fans_number = up_info_json['data']['card']['fans']user_info_dict['follower'] = fans_numberarchive_count = up_info_json['data']['archive_count']user_info_dict['archive'] = archive_countprint(f'=========={bv_id} 的作者基本信息已成功获取==========\n')# 等待print('正在等待,以防访问过于频繁\n')time.sleep(1.5)return user_info_dictdef get_video_info(bv_id):headers = {'User-Agent': "",'Cookie': ""}api_url = f'https://api.bilibili.com/x/web-interface/view?bvid={bv_id}'# 打印本次要获取的bvid,用于错误时确认print(f"正在进行爬取uid为:{bv_id}的UP主的粉丝数量与作品总数")print(f"==========本次获取数据的视频BV号为:{bv_id}==========")print(f"url为:{api_url}")# https://api.bilibili.com/x/web-interface/view?BV1n24y1D75Vvideo_info = requests.get(url=api_url, headers=headers)video_info_json = json.loads(video_info.text)# 创建存放的字典info_dict = {}# 信息解读# https://zhuanlan.zhihu.com/p/618885790# 视频bvid,即bv号bvid = video_info_json['data']['bvid']info_dict['bvid'] = bvid# 视频aid,即av号aid = video_info_json['data']['aid']info_dict['aid'] = aid# 视频cid,用于获取弹幕信息cid = video_info_json['data']['cid']info_dict['cid'] = cid# 作者idmid = video_info_json['data']['owner']['mid']info_dict['mid'] = mid# up主昵称name = video_info_json['data']['owner']['name']info_dict['name'] = name# 视频标题title = video_info_json['data']['title']info_dict['title'] = title# 视频标签tname = video_info_json['data']['tname']info_dict['tname'] = tname# 视频发布时间戳pubdate = video_info_json['data']['pubdate']# 转化时间戳pub_datatime = datetime.fromtimestamp(pubdate)# 整体格式pub_datatime_strf = pub_datatime.strftime('%Y-%m-%d %H:%M:%S')# 日期date = re.search(r"(\d{4}-\d{1,2}-\d{1,2})", pub_datatime_strf)info_dict['pub_date'] = date.group()# 时间pub_time = re.search(r"(\d{1,2}:\d{1,2}:\d{1,2})", pub_datatime_strf)info_dict['pub_time'] = pub_time.group()# 视频创建时间戳# ctime = info['ctime']# 视频简介desc = video_info_json['data']['desc']info_dict['desc'] = desc# 视频播放量view = video_info_json['data']['stat']['view']info_dict['view'] = view# 点赞数like = video_info_json['data']['stat']['like']info_dict['like'] = like# 投币数coin = video_info_json['data']['stat']['coin']info_dict['coin'] = coin# 收藏数favorite = video_info_json['data']['stat']['favorite']info_dict['favorite'] = favorite# 分享数share = video_info_json['data']['stat']['share']info_dict['share'] = share# 评论数repiy = video_info_json['data']['stat']['reply']info_dict['reply'] = repiy# 视频弹幕数量danmaku = video_info_json['data']['stat']['danmaku']info_dict['danmaku'] = danmakuprint(f'=========={bv_id} 的视频基本信息已成功获取==========')# 发布作品时的动态# dynamic = info['dynamic']print('正在等待,以防访问过于频繁\n')time.sleep(1.5)return info_dictif __name__ == '__main__':# 针对不同内容修改搜索关键词!!!!keywords = ["1", "2"]for keyword in keywords:# 自动爬取多个主题时须注意上面的最大页数定位问题# 爬取后生成去重了的len(keywords)个f'{keyword}BV号.csv'文件spider_bvid(keyword)for keyword in keywords:# 拼接成文件名csv_to_merge = f'{keyword}BV号.csv'# 合并后生成未去重的文件merge_csv(input_filename=csv_to_merge, output_filename='BV号合并.csv')# 遍历读取bv_idfilename = 'BV号合并.csv'# 打开文件并去重open_csv = pd.read_csv(filename)open_csv.drop_duplicates(subset='BV号')bv_id_list = np.array(open_csv['BV号'])"""# 第一次调用,若读取csv进行爬取时,意外中断# 则更改为读取txt文本,将已爬取第bvid删除,以达到断点续爬的目的for bvid in bv_id_list:with open("bv_id_list.txt", 'a') as f:f.write(bvid+'\n')with open("bv_id_list.txt", 'r') as f:bv_id_list = f.readlines()"""# 循环写入内容for i in range(0, len(bv_id_list)):bv_id = bv_id_list[i]print(f'正在进行第{i+1}次爬取\n')# 获取视频所有的基本信息video_info = get_video_info(bv_id)bvid = video_info['bvid']aid = video_info['aid']cid = video_info['cid']mid = video_info['mid']name = video_info['name']title = video_info['title']tname = video_info['tname']pub_date = video_info['pub_date']pub_time = video_info['pub_time']desc = video_info['desc']view = video_info['view']like = video_info['like']coin = video_info['coin']favorite = video_info['favorite']share = video_info['share']reply = video_info['reply']danmaku = video_info['danmaku']# 传播效果计算公式Communication_Index = math.log(0.5 * int(view) + 0.3 * (int(like) + int(coin) + int(favorite)) + 0.2 * (int(reply) + int(danmaku)))# 获取作者的相关信息user_info = get_user_info(uid=mid)follower = user_info['follower']archive = user_info['archive']write_to_csv(filename='视频基本信息.csv', bvid=bvid, aid=aid, cid=cid, mid=mid, name=name, follower=follower, archive=archive, title=title, tname=tname, pub_date=pub_date, pub_time=pub_time, desc=desc, view=view, like=like, coin=coin, favorite=favorite, share=share, reply=reply, danmaku=danmaku, communication_index=Communication_Index)print(f'==========第{i+1}个BV号:{bv_id}的相关数据已写入csv文件中==========')print('==================================================\n')