Novel_download/Download_Novel.py
2023-06-21 08:58:08 +08:00

304 lines
11 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import io
import os
import random
import shutil
import threading
import time
import requests
from bs4 import BeautifulSoup
import re
from ebooklib import epub
def get_user_agent():
# 定义多个User-Agent列表每个列表中包含多个不同的User-Agent字符串
user_agents = [
[
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3',
'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/57.0.2987.133 Safari/537.3',
'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/51.0.2704.103 Safari/537.36'],
[
'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:54.0) Gecko/20100101 Firefox/54.0',
'Mozilla/5.0 (Windows NT 6.1; Win64; x64; rv:56.0) Gecko/20100101 Firefox/56.0',
'Mozilla/5.0 (Windows NT 10.0; WOW64; rv:53.0) Gecko/20100101 Firefox/53.0'],
[
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_4) AppleWebKit/603.1.30 (KHTML, like Gecko) Version/10.1 Safari/603.1.30',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_6) AppleWebKit/604.1.38 (KHTML, like Gecko) Version/11.0 Safari/604.1.38']
]
# 随机选择一个User-Agent列表
user_agent_list = random.choice(user_agents)
# 从选定的User-Agent列表中随机选择一个User-Agent字符串
user_agent = random.choice(user_agent_list)
return {'User-Agent': user_agent}
# 定义请求间隔时间(秒)
interval = 2
# 设置请求头,模拟浏览器访问
# 要爬取的小说主页链接
url = 'https://www.bqg221.com/xs/17931/'
# 发起请求并获取响应
response = requests.get(url, headers=get_user_agent())
# 将响应转换为BeautifulSoup对象
soup = BeautifulSoup(response.text, 'html.parser')
# 获取小说名字
title = soup.select_one('.book h1').get_text(strip=True)
print(title)
# 获取小说简介
# print(soup.select('.small')[0])
div_tag = soup.find('div', {'class': 'small'})
# print(div_tag)
all_span_tags = div_tag.find_all('span')
# print(all_span_tags)
author = all_span_tags[0].text.strip()[3:]
status = all_span_tags[1].text.strip()
update_time = all_span_tags[2].text.strip()
latest_update = all_span_tags[3].text.strip()
# for i in all_span_tags:
# print(i.text.strip())
intro = soup.select_one('.intro').get_text(strip=True)[:-6]
print(intro)
cover = soup.select_one('.cover img')['src']
# print(cover)
# 获取小说所有章节链接
chapter_urls = [url + i.get('href').split('/')[-1] for i in soup.select('.listmain a') if
i.get('href').split('/')[-1] != 'javascript:dd_show()']
# print(chapter_urls)
print('开始下载。。。')
# 停顿两秒
time.sleep(interval)
# 多线程下载txt
def download_url(chapter_url, file_name):
# 限制下载线程数
with semaphore:
file_path = './' + title
file_name = file_path + '/' + file_name
if not os.path.exists(file_path):
os.makedirs(file_path)
print('文件夹不存在,创建文件夹')
if os.path.exists(file_name) and os.path.getsize(file_name) > 0:
print(file_name + ' 已存在,跳过...\n')
# success_account =+ 1
else:
print('开始下载:' + file_name)
with open(file_name, 'w', encoding='utf-8') as f:
retry = 8
while retry > 0:
try:
response = requests.get(chapter_url, headers=get_user_agent(), timeout=5)
soup = BeautifulSoup(response.text, 'html.parser')
chapter_title = soup.select_one('.content h1').get_text()
print(chapter_title)
chapter_content = soup.select_one('div#chaptercontent').get_text().strip()
# print('before: '+chapter_content)
# # 将所有的<br>标签替换成换行符\n
chapter_content = chapter_content.replace('  ', '\n ')
# chapter_content = chapter_content.replace('<br>', '\n')
content = re.sub(r'(第\d+章|请收藏本站|『点此报错).*$', '', chapter_content, flags=re.MULTILINE)
# print(content)
# 将处理后的结果写入到test.txt文件中
f.write(chapter_title + '\n' + content + '\n\n')
# success_account = success_account + 1
break
# return True
except Exception as e:
print(e, '\n retry...')
time.sleep(interval)
retry -= 1
# return False
# txt转换为epub
def merge_txt_to_epub(txt_files=[], epub_file='', author='', cover='', direction=''):
"""
将txt转换为epub
txt_files (list) : 是一个列表
epub_file (str) 实际为转换成功的epub文件路径及名称
author (str) :作者
cover (str) :封面图片链接
direction (str) :书籍简介
"""
# 创建EPUB书籍对象
book = epub.EpubBook()
if os.path.exists(epub_file):
os.remove(epub_file)
# 设置元数据(可根据需要进行调整)
book.set_title(title)
book.set_language('zh')
book.add_author(author)
book.set_direction(direction)
# 添加封面
# 获取图片并将其转换为字节流
response = requests.get(cover)
stream = io.BytesIO(response.content)
book.set_cover('cover.jpg', stream.getvalue(), 'image/jpeg')
print('合并中。。。。。。')
# 书籍目录
book_spine = []
# 遍历所有txt文件
os.chdir(title)
for i, txt_file in enumerate(txt_files):
# 读取txt文件内容
with open(txt_file, 'r', encoding='utf-8') as file:
content = file.readlines()
try:
# 将所有换行符替换为<br>
content = [s.replace('\n', '') for s in content if len(s.strip()) > 0]
# 获取章节标题
chapter_title = content[0]
# print(chapter_title)
# content = ['&nbsp;&nbsp;' + line + '<br>' for line in content]
# 处理html文档
content[0] = f""" <div class="calibre2" id="calibre_pb_0"></div><h1 class="kindle-cn-heading" id="calibre_pb_1">
{content[0]} </h1> """
for j, line in enumerate(content[1:]):
content[j + 1] = '<p class="calibre3">' + line + '</p>'
# content.append('</body></html>')
except IndexError as e:
print(e)
return False
# 创建一个章节对象
chapter = epub.EpubHtml(title=chapter_title, file_name='text/' + str(i) + '.xhtml')
chapter.content = ''.join(content) # 将整个文件内容作为章节内容
# 下面的是将css文件引用到单个章节里面
page_style = open('../css/page_styles.css', 'r', encoding='utf-8').read()
page_style1 = open('../css/page_styles1.css', 'r', encoding='utf-8').read()
style = open('../css/stylesheet.css', 'r', encoding='utf-8').read()
chapter.add_item(
epub.EpubItem(uid="page_style", file_name="../style/page_styles.css", media_type="text/css",
content=page_style))
chapter.add_item(
epub.EpubItem(uid="page_style1", file_name="../style/page_styles1.css", media_type="text/css",
content=page_style1))
chapter.add_item(
epub.EpubItem(uid="style_default", file_name="../style/stylesheet.css", media_type="text/css",
content=style))
# 将章节添加到书籍中
book.add_item(chapter)
book_spine.append(chapter)
# 将目录添加到书籍中
book.add_item(epub.EpubNcx())
book.add_item(epub.EpubNav())
book.spine = book_spine
book.toc = book_spine
# 设置书籍的样式文件
os.chdir('../')
page_style = open('./css/page_styles.css', 'r', encoding='utf-8').read()
page_style1 = open('./css/page_styles1.css', 'r', encoding='utf-8').read()
style = open('./css/stylesheet.css', 'r', encoding='utf-8').read()
book.add_item(
epub.EpubItem(uid="page_style", file_name="style/page_styles.css", media_type="text/css", content=page_style))
book.add_item(
epub.EpubItem(uid="page_style1", file_name="style/page_styles1.css", media_type="text/css",
content=page_style1))
book.add_item(
epub.EpubItem(uid="style_default", file_name="style/stylesheet.css", media_type="text/css", content=style))
# 打包EPUB文件
epub.write_epub('./' + epub_file, book, {})
return True
# 合并为txt文件
def merge_txt_file(file_path='', merged_file_name=''):
"""
:param file_path: txt文件的保存位置
:param merged_file_name: 合并后文件保存位置
:returns bool: 返回合并成功或者失败状态
"""
os.chdir(file_path)
if os.path.exists(merged_file_name):
os.rmdir(merged_file_name)
print('merge file : ', sorted(os.listdir('.'), key=lambda x: int(x.split('.')[0])))
with open(merged_file_name, 'wb') as outfile:
for filename in sorted(os.listdir('.'), key=lambda x: int(x.split('.')[0])):
print(filename)
if filename.endswith('.txt'):
# 判断文件是否为空
if os.path.exists(filename) and os.path.getsize(filename) > 0:
# print(filename + ' 已存在,跳过...\n')
with open(filename, 'rb') as infile:
shutil.copyfileobj(infile, outfile)
else:
return False
return True
def multi_thread_download():
threads = []
for file_name, chapter_url in enumerate(chapter_urls):
args = (chapter_url, str(file_name) + '.txt')
thread = threading.Thread(target=download_url, args=args)
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
# 限制同时4线程建议使用4线程过多线程会导致错误增多
max_concurrent_threads = 4
# 创建Semaphore对象并将其初始值设置为max_concurrent_threads
semaphore = threading.Semaphore(max_concurrent_threads)
multi_thread_download()
time.sleep(interval)
while True:
# merge_txt_file('./' + title, '../' + title + '.txt')
# 调用函数进行合并
txt_files = sorted(os.listdir(title), key=lambda x: int(x.split('.')[0]))
epub_file_path = title + '.epub'
result = merge_txt_to_epub(txt_files, epub_file_path, author, cover, intro)
if not result:
print('下载失败:', result, '\t是否重试?')
num = int(input('0 重试\n1 退出\n'))
if num == 0:
multi_thread_download()
merge_txt_to_epub(txt_files, epub_file_path, author, cover, intro)
else:
break
else:
print('合并成功!')
break