Novel_download/Download_Novel.py

458 lines
18 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import io
import json
import os
import random
import re
import shutil
import threading
import time
from bs4 import BeautifulSoup
import requests
from ebooklib import epub
def get_user_agent():
# 定义多个User-Agent列表每个列表中包含多个不同的User-Agent字符串
user_agents = [
[
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3',
'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/57.0.2987.133 Safari/537.3',
'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/51.0.2704.103 Safari/537.36'],
[
'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:54.0) Gecko/20100101 Firefox/54.0',
'Mozilla/5.0 (Windows NT 6.1; Win64; x64; rv:56.0) Gecko/20100101 Firefox/56.0',
'Mozilla/5.0 (Windows NT 10.0; WOW64; rv:53.0) Gecko/20100101 Firefox/53.0'],
[
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_4) AppleWebKit/603.1.30 (KHTML, like Gecko) Version/10.1 Safari/603.1.30',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_6) AppleWebKit/604.1.38 (KHTML, like Gecko) Version/11.0 Safari/604.1.38']
]
# 随机选择一个User-Agent列表
user_agent_list = random.choice(user_agents)
# 从选定的User-Agent列表中随机选择一个User-Agent字符串
user_agent = random.choice(user_agent_list)
return {'User-Agent': user_agent}
class Download_Novel:
def search_novel(self):
# hm_url = f'https://user.bqgso.cc/hm.html?&q={self.name}'
# result = requests.get(hm_url, headers=get_user_agent()).text
# # print(result)
# hm = result[2:-2]
# print(hm)
if self.name!=None:
# 发起请求并获取响应
url = f'https://www.bqg222.com/user/search.html?q={self.name}'
# print(url)
print(requests.get(
url, headers=get_user_agent()).text[1:-1])
response = json.loads(requests.get(
url, headers=get_user_agent()).text[1:-1])
# print(type(response))
for i, book in enumerate(response):
# i['url_list'][:9] = 'https://www'
# trans_url = book['url_list'].replace('https://m', 'https://www')
print(type(book['url_list']))
trans_url ='https://www.bqg221.com' + str(book['url_list'])
response[i]['url_list'] = trans_url
# 返回一个json对象
return response
def get_novel_info(self, response=None):
# 定义请求间隔时间(秒)
interval = 2
# 设置请求头,模拟浏览器访问
if response!=None:
# 要爬取的小说主页链接
url = response['url_list']
else:
url = self.search_url
# 发起请求并获取响应
url_response = requests.get(url, headers=get_user_agent())
# 将响应转换为BeautifulSoup对象
soup = BeautifulSoup(url_response.text, 'html.parser')
# 获取小说名字
# title = soup.select_one('.book h1').get_text(strip=True)
self.title = soup.select_one('.book h1').get_text(strip=True)
print(self.title)
# 获取小说简介
# print(soup.select('.small')[0])
div_tag = soup.find('div', {'class': 'small'})
# print(div_tag)
all_span_tags = div_tag.find_all('span')
# print(all_span_tags)
# author = all_span_tags[0].text.strip()[3:]
self.author = all_span_tags[0].text.strip()[3:]
self.status = all_span_tags[1].text.strip()
self.update_time = all_span_tags[2].text.strip()
self.latest_update = all_span_tags[3].text.strip()
# for i in all_span_tags:
# print(i.text.strip())
self.intro = soup.select_one('.intro').get_text(strip=True)[:-6]
print(self.intro)
# cover = soup.select_one('.cover img')['src']
self.cover = soup.select_one('.cover img')['src']
# print(cover)
# 获取小说所有章节链接
self.chapter_urls = [url + i.get('href').split('/')[-1] for i in soup.select('.listmain a') if
i.get('href').split('/')[-1] != 'javascript:dd_show()']
# print(chapter_urls)
print('开始下载。。。')
# 获取当前文件所在目录路径
dir_path = os.path.dirname(os.path.abspath(__file__))
self.download_path = dir_path + '/downloads/'
self.file_path = self.download_path + self.title + '/'
if not os.path.exists(self.file_path):
os.makedirs(self.file_path)
# 停顿两秒
time.sleep(self.interval)
def get_multi_txt_file_status(self, file_name):
file_name = self.file_path + file_name
if os.path.exists(file_name) and os.path.getsize(file_name) > 0:
print(file_name + ' 已存在,跳过...\n')
return file_name, True
else:
return file_name, False
def download_url(self, chapter_url, file_name):
# 限制下载线程数
with self.semaphore:
# 获取当前文件所在目录路径
# dir_path = os.path.dirname(os.path.abspath(__file__))
#
# file_path = dir_path + '/downloads/' + self.title
# file_name = file_path + '/' + file_name
#
# if not os.path.exists(file_path):
# os.makedirs(file_path)
# # print('文件夹不存在,创建文件夹')
file_name, status = self.get_multi_txt_file_status(
file_name=file_name)
if status:
print(file_name + ' 已存在,跳过...\n')
# success_account =+ 1
else:
print('开始下载:' + file_name)
with open(file_name, 'w', encoding='utf-8') as f:
retry = 8
while retry > 0:
try:
response = requests.get(
chapter_url, headers=get_user_agent(), timeout=5)
soup = BeautifulSoup(response.text, 'html.parser')
chapter_title = soup.select_one(
'.content h1').get_text()
print(chapter_title)
chapter_content = soup.select_one(
'div#chaptercontent').get_text().strip()
# print('before: '+chapter_content)
# # 将所有的<br>标签替换成换行符\n
chapter_content = chapter_content.replace(
'  ', '\n ')
# chapter_content = chapter_content.replace('<br>', '\n')
content = re.sub(r'(第\d+章|请收藏本站|『点此报错).*$', '', chapter_content,
flags=re.MULTILINE)
# print(content)
# 将处理后的结果写入到test.txt文件中
f.write(chapter_title + '\n' + content + '\n\n')
# success_account = success_account + 1
break
# return True
except Exception as e:
print(e, '\n retry...')
time.sleep(self.interval)
retry -= 1
# return False
def multi_thread_download(self):
self.threads = []
for file_name, chapter_url in enumerate(self.chapter_urls):
args = (chapter_url, str(file_name) + '.txt')
thread = threading.Thread(target=self.download_url, args=args)
self.threads.append(thread)
thread.start()
for thread in self.threads:
thread.join()
def download_process(self):
# 限制同时4线程建议使用4线程过多线程会导致错误增多
max_concurrent_threads = 4
# 创建Semaphore对象并将其初始值设置为max_concurrent_threads
self.semaphore = threading.Semaphore(max_concurrent_threads)
self.multi_thread_download()
time.sleep(self.interval)
# 标志位 0 为
file = 0
# 判断是否全部下载成功
for i in range(0, len(self.chapter_urls)):
status = self.get_multi_txt_file_status(str(i) + '.txt')[1]
if not status:
file += 1
break
if file:
print('部分文件下载失败,限制线程数可以提高下载成功率,是否重新下载个别文件?')
download = input('0 退出\n1 重试\n')
if download == 0:
exit(0)
else:
self.download_process()
else:
convert_type = int(input('下载成功!\n请输入要合并的格式:\n0 TxT文件\n1 Epub文件\n'))
convert_status = True
if convert_type == 0:
print(self.file_path, self.download_path + self.title + '.txt')
convert_status = self.merge_txt_file(
self.download_path + self.title + '.txt')
elif convert_type == 1:
txt_files = []
for n in range(0, len(self.chapter_urls)):
txt_files.append(self.file_path + str(n) + '.txt')
convert_status = self.merge_txt_to_epub(
txt_files, self.download_path + self.title + '.epub')
if convert_status:
print('合并成功!')
else:
print('合并失败请删除downloads下面目录后重新运行程序')
exit(1)
# 合并为txt文件
def merge_txt_file(self, merged_file_name=''):
"""
:param merged_file_name: 合并后文件保存位置
:returns bool: 返回合并成功或者失败状态
"""
# os.chdir(file_path)
if os.path.exists(merged_file_name):
os.remove(merged_file_name)
print('merge file : ', sorted(os.listdir(self.file_path),
key=lambda x: int(x.split('.')[0])))
time.sleep(self.interval)
with open(merged_file_name, 'wb') as outfile:
try:
for filename in sorted(os.listdir(self.file_path), key=lambda x: int(x.split('.')[0])):
print(filename)
if filename.endswith('.txt'):
# 判断文件是否为空
if os.path.exists(self.file_path + '/' + filename) and os.path.getsize(
self.file_path + '/' + filename) > 0:
# print(filename + ' 已存在,跳过...\n')
with open(self.file_path + '/' + filename, 'rb') as infile:
shutil.copyfileobj(infile, outfile)
else:
return False
except Exception as e:
os.remove(merged_file_name)
print(e)
return False
return True
# txt转换为epub
def merge_txt_to_epub(self, txt_files, epub_file):
"""
将txt转换为epub
txt_files (list) : 是一个列表
epub_file (str) 实际为转换成功的epub文件路径及名称
"""
# 创建EPUB书籍对象
book = epub.EpubBook()
if os.path.exists(epub_file):
os.remove(epub_file)
# 设置元数据(可根据需要进行调整)
book.set_title(self.title)
book.set_language('zh')
book.add_author(self.author)
book.add_metadata('DC', 'description', self.intro)
# 添加封面
# 获取图片并将其转换为字节流
response = requests.get(self.cover)
stream = io.BytesIO(response.content)
book.set_cover('cover.jpg', stream.getvalue(), 'image/jpeg')
print('合并中。。。。。。')
# print(txt_files)
# 遍历所有txt文件
# book.spine.append('nav')
for i, txt_file in enumerate(txt_files):
# 读取txt文件内容
with open(txt_file, 'r', encoding='utf-8') as file:
content = file.readlines()
try:
# 将所有换行符替换为<br>
content = [s.replace('\n', '')
for s in content if len(s.strip()) > 0]
# 获取章节标题
chapter_title = content[0]
# print(chapter_title)
# content = ['&nbsp;&nbsp;' + line + '<br>' for line in content]
# 处理html文档
content[0] = f""" <div class="calibre2" id="calibre_pb_0"></div>\n<h1 class="kindle-cn-heading" id="calibre_pb_1">
{content[0]} </h1> """
for j, line in enumerate(content[1:]):
content[j + 1] = '<p class="calibre3">' + line + '</p>\n'
# content.append('</body></html>')
except IndexError as e:
print(e)
return False
# 创建一个章节对象
chapter = epub.EpubHtml(
title=chapter_title, file_name='text/' + str(i) + '.xhtml')
chapter.content = ''.join(content) # 将整个文件内容作为章节内容
# 下面的是将css文件引用到单个章节里面
# page_style = open('./css/page_styles.css', 'r', encoding='utf-8').read()
page_style1 = open('./css/page_styles1.css',
'r', encoding='utf-8').read()
style = open('./css/stylesheet.css', 'r', encoding='utf-8').read()
# chapter.add_item(
# epub.EpubItem(uid="page_style", file_name="../style/page_styles.css", media_type="text/css",
# content=page_style))
chapter.add_item(
epub.EpubItem(uid="page_style1", file_name="../style/page_styles1.css", media_type="text/css",
content=page_style1))
chapter.add_item(
epub.EpubItem(uid="style_default", file_name="../style/stylesheet.css", media_type="text/css",
content=style))
# 将章节添加到书籍中
book.add_item(chapter)
book.spine.append(chapter)
book.toc.append(epub.Link('text/' + str(i) +
'.xhtml', chapter_title, str(i)))
# print('xxxxxxxx:','text/' + str(i) + '.xhtml', chapter_title, str(i))
# 将目录添加到书籍中
# book.toc = toc
# book.spine = book_spine
book.add_item(epub.EpubNcx())
book.add_item(epub.EpubNav())
# 设置书籍的样式文件
page_style = open('./css/page_styles.css',
'r', encoding='utf-8').read()
page_style1 = open('./css/page_styles1.css',
'r', encoding='utf-8').read()
style = open('./css/stylesheet.css', 'r', encoding='utf-8').read()
book.add_item(
epub.EpubItem(uid="page_style", file_name="style/page_styles.css", media_type="text/css",
content=page_style))
book.add_item(
epub.EpubItem(uid="page_style1", file_name="style/page_styles1.css", media_type="text/css",
content=page_style1))
book.add_item(
epub.EpubItem(uid="style_default", file_name="style/stylesheet.css", media_type="text/css", content=style))
# 打包EPUB文件
epub.write_epub(epub_file, book, {})
return True
def __init__(self, name=None,search_url=None):
self.file_path = None
self.chapter_urls = None
self.cover = None
self.intro = None
self.status = None
self.author = None
self.title = None
self.name = name
self.search_url=search_url
# 定义请求间隔时间(秒)
self.interval = 2
# 要爬取的小说主页链接
# self.novel_url = 'https://www.bqg221.com/xs/'
if __name__ == '__main__':
search_type=input('请选择你要下载的方式(0 or 1)\n0) 使用名称搜索\n1) 直接输入url(格式如https://www.bqg221.com/biquge/17931/)\n')
# if isinstance(search_type, int) and 0 <= search_type <= 1:
# download_novel
download_novel = Download_Novel()
if search_type == str(0):
search_name = input('请输入要搜索的书籍名称: ')
download_novel.name=search_name
else:
download_novel.search_url=search_type
response = download_novel.search_novel()
# print(response)
if download_novel.name!=None:
print('搜索到 ' + str(len(response)) + ' 个结果\n')
print('---------------------------------------\n')
for i, book in enumerate(reversed(response)):
print(str(len(response) - 1 - i) + ' 书籍名称:' + book['articlename'] + '\n作者:' + book['author'] + '\n简介:' + book[
'intro'] + '...\n')
print('---------------------------------------')
print('---------------------------------------\n')
select_book = int(
input(f'选择要下载的书籍序号(从0-{str(len(response) - 1)}中选择)'))
# 判断输入合法
if isinstance(select_book, int) and 0 <= select_book <= len(response):
download_novel.get_novel_info(response[select_book])
download_novel.download_process()
else:
print('输入内容不合法!')
else:
# print('---------------------------------------\n')
# # for i, book in enumerate(reversed(response)):
# print( ' 书籍名称:' + response['articlename'] + '\n作者' + response['author'] + '\n简介' + response[
# 'intro'] + '...\n')
# print('---------------------------------------')
# print('---------------------------------------\n')
download_novel.get_novel_info()
download_novel.download_process()
# else:
# exit(0)