import io import json import os import random import re import shutil import threading import time from bs4 import BeautifulSoup import requests from ebooklib import epub def get_user_agent(): # 定义多个User-Agent列表,每个列表中包含多个不同的User-Agent字符串 user_agents = [ [ 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3', 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/57.0.2987.133 Safari/537.3', 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/51.0.2704.103 Safari/537.36'], [ 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:54.0) Gecko/20100101 Firefox/54.0', 'Mozilla/5.0 (Windows NT 6.1; Win64; x64; rv:56.0) Gecko/20100101 Firefox/56.0', 'Mozilla/5.0 (Windows NT 10.0; WOW64; rv:53.0) Gecko/20100101 Firefox/53.0'], [ 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_4) AppleWebKit/603.1.30 (KHTML, like Gecko) Version/10.1 Safari/603.1.30', 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.36', 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_6) AppleWebKit/604.1.38 (KHTML, like Gecko) Version/11.0 Safari/604.1.38'] ] # 随机选择一个User-Agent列表 user_agent_list = random.choice(user_agents) # 从选定的User-Agent列表中随机选择一个User-Agent字符串 user_agent = random.choice(user_agent_list) return {'User-Agent': user_agent} class Download_Novel: def search_novel(self): hm_url = f'https://user.bqgso.cc/hm.html?&q={self.name}' result = requests.get(hm_url, headers=get_user_agent()).text # print(result) hm = result[2:-2] # print(hm) # 发起请求并获取响应 url = f'https://user.bqgso.cc/search.html?&q={self.name}&hm={hm}' response = json.loads(requests.get(url, headers=get_user_agent()).text[1:-1]) # print(type(response)) for i, book in enumerate(response): # i['url_list'][:9] = 'https://www' trans_url = book['url_list'].replace('https://m', 'https://www') response[i]['url_list'] = trans_url # 返回一个json对象 return response def get_novel_info(self, response): # 定义请求间隔时间(秒) interval = 2 # 设置请求头,模拟浏览器访问 # 要爬取的小说主页链接 url = response['url_list'] # 发起请求并获取响应 url_response = requests.get(url, headers=get_user_agent()) # 将响应转换为BeautifulSoup对象 soup = BeautifulSoup(url_response.text, 'html.parser') # 获取小说名字 # title = soup.select_one('.book h1').get_text(strip=True) self.title = response['articlename'] print(self.title) # 获取小说简介 # print(soup.select('.small')[0]) div_tag = soup.find('div', {'class': 'small'}) # print(div_tag) all_span_tags = div_tag.find_all('span') # print(all_span_tags) # author = all_span_tags[0].text.strip()[3:] self.author = response['author'] self.status = all_span_tags[1].text.strip() self.update_time = all_span_tags[2].text.strip() self.latest_update = all_span_tags[3].text.strip() # for i in all_span_tags: # print(i.text.strip()) self.intro = soup.select_one('.intro').get_text(strip=True)[:-6] print(self.intro) # cover = soup.select_one('.cover img')['src'] self.cover = response['url_img'] # print(cover) # 获取小说所有章节链接 self.chapter_urls = [url + i.get('href').split('/')[-1] for i in soup.select('.listmain a') if i.get('href').split('/')[-1] != 'javascript:dd_show()'] # print(chapter_urls) print('开始下载。。。') # 获取当前文件所在目录路径 dir_path = os.path.dirname(os.path.abspath(__file__)) self.download_path = dir_path + '/downloads/' self.file_path = self.download_path + self.title + '/' if not os.path.exists(self.file_path): os.makedirs(self.file_path) # 停顿两秒 time.sleep(self.interval) def get_multi_txt_file_status(self, file_name): file_name = self.file_path + file_name if os.path.exists(file_name) and os.path.getsize(file_name) > 0: print(file_name + ' 已存在,跳过...\n') return file_name, True else: return file_name, False def download_url(self, chapter_url, file_name): # 限制下载线程数 with self.semaphore: # 获取当前文件所在目录路径 # dir_path = os.path.dirname(os.path.abspath(__file__)) # # file_path = dir_path + '/downloads/' + self.title # file_name = file_path + '/' + file_name # # if not os.path.exists(file_path): # os.makedirs(file_path) # # print('文件夹不存在,创建文件夹') file_name, status = self.get_multi_txt_file_status(file_name=file_name) if status: print(file_name + ' 已存在,跳过...\n') # success_account =+ 1 else: print('开始下载:' + file_name) with open(file_name, 'w', encoding='utf-8') as f: retry = 8 while retry > 0: try: response = requests.get(chapter_url, headers=get_user_agent(), timeout=5) soup = BeautifulSoup(response.text, 'html.parser') chapter_title = soup.select_one('.content h1').get_text() print(chapter_title) chapter_content = soup.select_one('div#chaptercontent').get_text().strip() # print('before: '+chapter_content) # # 将所有的
标签替换成换行符\n chapter_content = chapter_content.replace('  ', '\n ') # chapter_content = chapter_content.replace('
', '\n') content = re.sub(r'(第\d+章|请收藏本站|『点此报错).*$', '', chapter_content, flags=re.MULTILINE) # print(content) # 将处理后的结果写入到test.txt文件中 f.write(chapter_title + '\n' + content + '\n\n') # success_account = success_account + 1 break # return True except Exception as e: print(e, '\n retry...') time.sleep(self.interval) retry -= 1 # return False def multi_thread_download(self): self.threads = [] for file_name, chapter_url in enumerate(self.chapter_urls): args = (chapter_url, str(file_name) + '.txt') thread = threading.Thread(target=self.download_url, args=args) self.threads.append(thread) thread.start() for thread in self.threads: thread.join() def download_process(self): # 限制同时4线程,建议使用4线程,过多线程会导致错误增多 max_concurrent_threads = 4 # 创建Semaphore对象,并将其初始值设置为max_concurrent_threads self.semaphore = threading.Semaphore(max_concurrent_threads) self.multi_thread_download() time.sleep(self.interval) file = 0 # 判断是否全部下载成功 for i in range(0, len(self.chapter_urls)): status = self.get_multi_txt_file_status(str(i) + '.txt')[1] if not status: file += 1 break if not file: convert_type = int(input('下载成功!\n请输入要合并的格式:\n0 TxT文件\n1 Epub文件\n')) convert_status = True if convert_type == 0: print(self.file_path, self.download_path + self.title + '.txt') convert_status = self.merge_txt_file(self.download_path + self.title + '.txt') elif convert_type == 1: txt_files = [] for n in range(0, len(self.chapter_urls)): txt_files.append(self.file_path + str(n) + '.txt') # print('txt_files:',txt_files) convert_status = self.merge_txt_to_epub(txt_files, self.download_path + self.title + '.epub') if convert_status: print('合并成功!') else: print('合并失败!请删除downloads下面目录后重新运行程序!') exit(1) else: print('部分文件下载失败,限制线程数可以提高下载成功率,是否重新下载个别文件?') download = input('0 退出\n1 重试\n') if download == 0: exit(0) else: self.download_process() # 合并为txt文件 def merge_txt_file(self, merged_file_name=''): """ :param merged_file_name: 合并后文件保存位置 :returns bool: 返回合并成功或者失败状态 """ # os.chdir(file_path) if os.path.exists(merged_file_name): os.remove(merged_file_name) print('merge file : ', sorted(os.listdir(self.file_path), key=lambda x: int(x.split('.')[0]))) time.sleep(self.interval) with open(merged_file_name, 'wb') as outfile: try: for filename in sorted(os.listdir(self.file_path), key=lambda x: int(x.split('.')[0])): print(filename) if filename.endswith('.txt'): # 判断文件是否为空 if os.path.exists(self.file_path + '/' + filename) and os.path.getsize( self.file_path + '/' + filename) > 0: # print(filename + ' 已存在,跳过...\n') with open(self.file_path + '/' + filename, 'rb') as infile: shutil.copyfileobj(infile, outfile) else: return False except Exception as e: os.remove(merged_file_name) print(e) return False return True # txt转换为epub def merge_txt_to_epub(self, txt_files, epub_file): """ 将txt转换为epub txt_files (list) : 是一个列表 epub_file (str) :实际为转换成功的epub文件路径及名称 """ # 创建EPUB书籍对象 book = epub.EpubBook() if os.path.exists(epub_file): os.remove(epub_file) # 设置元数据(可根据需要进行调整) book.set_title(self.title) book.set_language('zh') book.add_author(self.author) book.set_direction(self.intro) # 添加封面 # 获取图片并将其转换为字节流 response = requests.get(self.cover) stream = io.BytesIO(response.content) book.set_cover('cover.jpg', stream.getvalue(), 'image/jpeg') print('合并中。。。。。。') # print(txt_files) # 书籍目录 book_spine = [] # 遍历所有txt文件 # os.chdir(title) for i, txt_file in enumerate(txt_files): # 读取txt文件内容 with open(txt_file, 'r', encoding='utf-8') as file: content = file.readlines() try: # 将所有换行符替换为
content = [s.replace('\n', '') for s in content if len(s.strip()) > 0] # 获取章节标题 chapter_title = content[0] # print(chapter_title) # content = ['  ' + line + '
' for line in content] # 处理html文档 content[0] = f"""

{content[0]}

""" for j, line in enumerate(content[1:]): content[j + 1] = '

' + line + '

\n' # content.append('') except IndexError as e: print(e) return False # 创建一个章节对象 chapter = epub.EpubHtml(title=chapter_title, file_name='text/' + str(i) + '.xhtml') chapter.content = ''.join(content) # 将整个文件内容作为章节内容 # 下面的是将css文件引用到单个章节里面 page_style = open('./css/page_styles.css', 'r', encoding='utf-8').read() page_style1 = open('./css/page_styles1.css', 'r', encoding='utf-8').read() style = open('./css/stylesheet.css', 'r', encoding='utf-8').read() chapter.add_item( epub.EpubItem(uid="page_style", file_name="../style/page_styles.css", media_type="text/css", content=page_style)) chapter.add_item( epub.EpubItem(uid="page_style1", file_name="../style/page_styles1.css", media_type="text/css", content=page_style1)) chapter.add_item( epub.EpubItem(uid="style_default", file_name="../style/stylesheet.css", media_type="text/css", content=style)) # 将章节添加到书籍中 book.add_item(chapter) book_spine.append(chapter) # 将目录添加到书籍中 book.add_item(epub.EpubNcx()) book.add_item(epub.EpubNav()) book.spine = book_spine book.toc = book_spine # 设置书籍的样式文件 page_style = open('./css/page_styles.css', 'r', encoding='utf-8').read() page_style1 = open('./css/page_styles1.css', 'r', encoding='utf-8').read() style = open('./css/stylesheet.css', 'r', encoding='utf-8').read() book.add_item( epub.EpubItem(uid="page_style", file_name="style/page_styles.css", media_type="text/css", content=page_style)) book.add_item( epub.EpubItem(uid="page_style1", file_name="style/page_styles1.css", media_type="text/css", content=page_style1)) book.add_item( epub.EpubItem(uid="style_default", file_name="style/stylesheet.css", media_type="text/css", content=style)) # 打包EPUB文件 epub.write_epub(epub_file, book, {}) return True def __init__(self, name): self.file_path = None self.chapter_urls = None self.cover = None self.intro = None self.status = None self.author = None self.title = None self.name = name # 定义请求间隔时间(秒) self.interval = 2 # 要爬取的小说主页链接 # self.novel_url = 'https://www.bqg221.com/xs/' if __name__ == '__main__': search_name = input('请输入要搜索的书籍名称: ') if search_name: download_novel = Download_Novel(search_name) response = download_novel.search_novel() print(response) print('搜索到 ' + str(len(response)) + ' 个结果\n') print('---------------------------------------\n') for i, book in enumerate(response): print(str(i) + ' 书籍名称:' + book['articlename'] + '\n作者:' + book['author'] + '\n简介:' + book[ 'intro'] + '...\n') print('---------------------------------------') print('---------------------------------------\n') select_book = int(input(f'选择要下载的书籍序号(从0-{str(len(response) - 1)}中选择):')) # 判断输入合法 if isinstance(select_book, int) and 0 <= select_book <= len(response): download_novel.get_novel_info(response[select_book]) download_novel.download_process() else: print('输入内容不合法!') else: exit(0)