#!/usr/bin/env python # -*- coding: utf-8 -*- """ Poem Classification System with LLM API Integration 基于大模型的中国古代诗词多维度分类系统 分类维度包括: - 季节时序(四季、节气、时辰) - 题材类型(山水、边塞、咏史、咏物等) - 情感心境(喜怒哀乐、忧思愁绪等) - 景物意象(自然、植物、动物、建筑等) - 哲理思想(儒释道、人生感悟等) - 艺术手法(比兴、用典、对仗等) - 人物关系(送别、怀人、思乡等) - 生活场景(宴饮、耕作、读书等) Configuration via environment variables: - LLM_BASE_URL: API base URL (e.g., http://localhost:11434/v1) - LLM_API_KEY: API key (optional for local models) - LLM_MODEL: Model name (e.g., qwen:7b, gpt-4) """ import json import os import re import hashlib from typing import Dict, List, Set, Optional from datetime import datetime from concurrent.futures import ThreadPoolExecutor, as_completed import urllib.request import urllib.error import threading def generate_poem_signature(poem_data: Dict) -> str: """ 基于标题、作者、内容生成诗词的唯一签名(MD5) 用于检测重复诗词 """ title = poem_data.get('title', '').strip() author = poem_data.get('author', '').strip() content_parts = [] if 'paragraphs' in poem_data: for para in poem_data['paragraphs']: if isinstance(para, str): content_parts.append(para.strip()) elif isinstance(para, list): content_parts.extend([p.strip() for p in para]) if 'content' in poem_data: if isinstance(poem_data['content'], str): content_parts.append(poem_data['content'].strip()) elif isinstance(poem_data['content'], list): content_parts.extend([p.strip() for p in poem_data['content']]) content = '\n'.join(content_parts) signature_str = f"title:{title}|author:{author}|content:{content}" return hashlib.md5(signature_str.encode('utf-8')).hexdigest() def load_existing_poems(output_file: str) -> Set[str]: """ 从输出文件加载已存在的诗词签名 """ existing_signatures = set() if not os.path.exists(output_file): return existing_signatures try: with open(output_file, 'r', encoding='utf-8') as f: for line_num, line in enumerate(f, 1): line = line.strip() if not line: continue try: data = json.loads(line) if 'title' in data and 'author' in data: sig = generate_poem_signature({ 'title': data.get('title', ''), 'author': data.get('author', ''), 'paragraphs': data.get('llm_classification', {}).get('analysis', '').split('\n') if data.get('llm_classification') else [] }) existing_signatures.add(sig) except json.JSONDecodeError: print(f" Warning: Skipping invalid JSON on line {line_num}") except IOError as e: print(f" Warning: Could not read existing output file: {e}") return existing_signatures def append_to_output_file(output_file: str, data: Dict, lock: threading.Lock): """ 线程安全地追加写入输出文件 """ with lock: with open(output_file, 'a', encoding='utf-8') as f: f.write(json.dumps(data, ensure_ascii=False) + '\n') class LLMClassifier: """ 基于大模型的诗词分类器 支持通过 OpenAI 兼容 API 调用本地或远程模型 提供 20+ 维度的诗词分类标签 """ CLASSIFICATION_PROMPT = """你是一位中国古代诗词分类专家。请深入分析以下诗词,并从多个维度进行分类标注。 【诗词信息】 - 标题:{title} - 作者:{author} - 内容:{content} 【重要说明】 - 如果诗词内容是繁体中文,请在输出中包含简体中文版本 - 分类标签一律使用简体中文 【分类维度与标签选项】 1. 季节(season):["春", "夏", "秋", "冬", "四季", "无明确季节"] 2. 节气(solar_terms):24 节气中的具体节气,如 "立春"、"清明"、"冬至" 等,无则空数组 3. 时辰(time_of_day):["清晨", "上午", "正午", "下午", "黄昏", "夜晚", "深夜", "黎明", "不明确"] 4. 题材类型(genre):["山水田园", "边塞征战", "咏史怀古", "咏物言志", "送别怀人", "思乡怀远", "爱情闺怨", "友情赠答", "羁旅漂泊", "隐逸闲适", "讽喻时事", "节日习俗", "宴饮酬唱", "读书治学", "农耕劳作", "宗教禅理", "其他"] 5. 情感基调(emotion_tone):["喜悦欢快", "悲伤哀愁", "愤怒激愤", "忧郁伤感", "孤独寂寞", "宁静淡泊", "豪迈激昂", "思念眷恋", "惆怅失落", "平和超脱", "复杂混合"] 6. 具体情感(emotions):从以下选择 3-5 个最贴切的: ["喜", "怒", "哀", "乐", "忧", "思", "悲", "恐", "惊", "愁", "恨", "爱", "恋", "盼", "悔", "愧", "傲", "谦", "静", "躁"] 7. 景物 - 自然(nature_scenery):["山", "水", "云", "雨", "雪", "风", "雷", "电", "日", "月", "星", "霜", "露", "霞"] 8. 景物 - 植物(plants):["松", "竹", "梅", "兰", "菊", "荷", "柳", "桃", "李", "杏", "梨", "枫", "梧桐", "芭蕉", "其他"] 9. 景物 - 动物(animals):["鸟", "雁", "燕", "鹊", "蝉", "蛙", "鱼", "龙", "凤", "马", "牛", "羊", "犬", "其他"] 10. 景物 - 建筑(buildings):["楼", "阁", "亭", "台", "轩", "榭", "桥", "寺", "塔", "城", "关", "宫", "殿", "院", "其他"] 11. 意象关键词(imagery):提取 5-10 个诗中最具代表性的意象词汇(简体中文) 12. 哲理思想(philosophy):["儒家思想", "道家思想", "佛家禅理", "人生感悟", "历史兴叹", "自然之道", "无明显哲理"] 13. 人生阶段(life_stage):["少年", "青年", "中年", "老年", "不明确"] 14. 社会身份(social_role):["士人", "官员", "隐士", "游子", "征人", "商贾", "农夫", "僧道", "闺中", "其他"] 15. 写作手法(technique):["比兴", "赋", "对仗", "用典", "借景抒情", "托物言志", "虚实结合", "动静结合", "其他"] 16. 修辞手法(rhetoric):["比喻", "拟人", "夸张", "对偶", "排比", "反复", "设问", "反问", "其他"] 17. 色彩意象(colors):提取诗中的色彩词,如 ["青", "绿", "红", "白", "黄", "紫", "碧", "翠", "苍", "金"] 18. 声音意象(sounds):["钟声", "鼓声", "笛声", "琴声", "风声", "雨声", "鸟鸣", "蝉鸣", "其他"] 19. 地理方位(location):["江南", "塞北", "中原", "巴蜀", "关中", "岭南", "吴越", "荆楚", "其他"] 20. 节日习俗(festival):["春节", "元宵", "清明", "端午", "七夕", "中秋", "重阳", "除夕", "无"] 【返回格式】 请返回严格的 JSON 格式,结构如下: {{ "original_text": ["空山新雨後,天氣晚來秋。", ...], // 原始文本(如果输入是繁体) "simplified_text": ["空山新雨后,天气晚来秋。", ...], // 简体中文版本 "season": ["秋"], "solar_terms": ["白露"], "time_of_day": "黄昏", "genre": ["山水田园", "隐逸闲适"], "emotion_tone": "宁静淡泊", "emotions": ["静", "喜", "乐"], "nature_scenery": ["山", "水", "月"], "plants": ["松", "竹"], "animals": ["鸟"], "buildings": [], "imagery": ["空山", "新雨", "明月", "青松"], "philosophy": ["道家思想", "自然之道"], "life_stage": "中年", "social_role": "隐士", "technique": ["借景抒情", "动静结合"], "rhetoric": ["拟人"], "colors": ["青", "白"], "sounds": [], "location": "终南山", "festival": "无", "analysis": "简要分析这首诗的主题思想、艺术特色和情感内涵(100-200 字,使用简体中文)" }} 【注意事项】 - 只返回 JSON,不要有任何其他文字说明 - 每个维度根据诗意选择最贴切的标签,可以是 1 个或多个 - 如果某个维度没有明确对应,选择"无"、"不明确"或空数组 - 意象关键词应从原诗中提取或合理归纳(使用简体中文) - 分析要准确、深入、简洁(使用简体中文) - 如果输入是繁体中文,original_text 和 simplified_text 都要填写 - 如果输入已经是简体中文,original_text 和 simplified_text 填写相同内容""" def __init__(self, base_url: Optional[str] = None, api_key: Optional[str] = None, model: Optional[str] = None, output_file: Optional[str] = None): """ 初始化 LLM 分类器 Args: base_url: API 基础 URL api_key: API 密钥(本地模型可选) model: 模型名称 output_file: 输出文件路径(用于实时写入) """ self.base_url = (base_url or os.getenv('LLM_BASE_URL') or 'https://api.siliconflow.cn/v1').rstrip('/') self.api_key = api_key or os.getenv('LLM_API_KEY') or '' self.model = model or os.getenv('LLM_MODEL') or 'qwen:7b' self.output_file = output_file self.file_lock = threading.Lock() if output_file else None def _call_api(self, messages: List[Dict], temperature: float = 0.3, max_retries: int = 3) -> str: """ 调用 LLM API,带重试机制 Args: messages: 消息列表 temperature: 温度参数 max_retries: 最大重试次数 Returns: API 返回的文本内容 """ url = f"{self.base_url}/chat/completions" headers = { 'Content-Type': 'application/json', } if self.api_key: headers['Authorization'] = f'Bearer {self.api_key}' payload = { 'model': self.model, 'messages': messages, 'temperature': temperature, 'stream': False } for attempt in range(max_retries): try: data = json.dumps(payload).encode('utf-8') req = urllib.request.Request(url, data=data, headers=headers, method='POST') with urllib.request.urlopen(req, timeout=180) as response: result = json.loads(response.read().decode('utf-8')) return result['choices'][0]['message']['content'] except urllib.error.URLError as e: print(f" API request failed (attempt {attempt + 1}/{max_retries}): {e}") if attempt == max_retries - 1: return "" except json.JSONDecodeError as e: print(f" Failed to parse API response: {e}") return "" except Exception as e: print(f" Unexpected error (attempt {attempt + 1}/{max_retries}): {e}") if attempt == max_retries - 1: return "" return "" def classify_poem(self, poem_data: Dict, skip_if_exists: bool = False, existing_signatures: Optional[Set[str]] = None) -> Optional[Dict]: """ 使用 LLM 对单首诗词进行分类 Args: poem_data: 诗词数据字典 skip_if_exists: 是否跳过已存在的诗词 existing_signatures: 已存在的诗词签名集合 Returns: 分类结果字典,失败返回 None """ title = poem_data.get('title', '') author = poem_data.get('author', '') content_parts = [] if 'paragraphs' in poem_data: for para in poem_data['paragraphs']: if isinstance(para, str): content_parts.append(para) elif isinstance(para, list): content_parts.extend(para) if 'content' in poem_data: if isinstance(poem_data['content'], str): content_parts.append(poem_data['content']) elif isinstance(poem_data['content'], list): content_parts.extend(poem_data['content']) content = '\n'.join(content_parts) if not title and not content: return None # 检查是否已存在 if skip_if_exists and existing_signatures is not None: poem_sig = generate_poem_signature(poem_data) if poem_sig in existing_signatures: return None prompt = self.CLASSIFICATION_PROMPT.format( title=title or '无题', author=author or '佚名', content=content ) messages = [ {'role': 'system', 'content': '你是一位中国古代诗词分类专家,精通诗词鉴赏和分类,能够准确识别诗词的题材、情感、意象和艺术特色。'}, {'role': 'user', 'content': prompt} ] response = self._call_api(messages, temperature=0.3) if not response: return None try: # 提取 JSON 部分 json_match = re.search(r'\{[\s\S]*\}', response) if json_match: result = json.loads(json_match.group()) else: result = json.loads(response) # 处理简繁转换结果 paragraphs = poem_data.get('paragraphs', []) simplified_paragraphs = [] # 如果 LLM 返回了简体版本,使用它 if 'simplified_text' in result: simplified_paragraphs = result['simplified_text'] elif 'original_text' in result: # 如果只有 original_text,说明输入已经是简体 simplified_paragraphs = paragraphs else: # 没有文本转换信息,使用原始内容 simplified_paragraphs = paragraphs classification_result = { 'id': poem_data.get('id', 'unknown'), 'title': title, 'author': author, 'paragraphs': simplified_paragraphs, # 使用简体版本 'original_paragraphs': paragraphs if paragraphs != simplified_paragraphs else None, # 保留原始(如果不同) 'llm_classification': result, 'signature': generate_poem_signature({ 'title': title, 'author': author, 'paragraphs': simplified_paragraphs }), # 基于简体内容生成签名 'timestamp': datetime.now().isoformat(), 'read_mark': False } # 实时写入文件 if self.output_file and self.file_lock: append_to_output_file(self.output_file, classification_result, self.file_lock) return classification_result except json.JSONDecodeError as e: print(f" Failed to parse LLM response as JSON: {e}") print(f" Raw response: {response[:300]}...") return None def is_valid_poem_data(poem_data: Dict) -> bool: """ 验证诗词数据格式是否有效 """ required_fields = {'author', 'paragraphs', 'title'} if not isinstance(poem_data, dict): return False if not required_fields.issubset(poem_data.keys()): return False paragraphs = poem_data.get('paragraphs') if paragraphs is None or not isinstance(paragraphs, list): return False for para in paragraphs: if not isinstance(para, str): return False if 'id' not in poem_data or not isinstance(poem_data['id'], str): return False if not isinstance(poem_data['author'], str) or len(poem_data['author'].strip()) == 0: return False if not isinstance(poem_data['title'], str) or len(poem_data['title'].strip()) == 0: return False return True def load_poems_from_file(file_path: str) -> List[Dict]: """ 从文件加载诗词数据 支持 JSON 数组、单个 JSON 对象、JSONL 三种格式 """ poems = [] try: with open(file_path, 'r', encoding='utf-8') as f: content = f.read().strip() if not content: return poems # 尝试解析为 JSON try: data = json.loads(content) if isinstance(data, list): poems = data elif isinstance(data, dict): poems = [data] except json.JSONDecodeError: # 按 JSONL 处理 for line in content.splitlines(): line = line.strip() if line: try: obj = json.loads(line) if isinstance(obj, dict): poems.append(obj) except json.JSONDecodeError: continue except IOError as e: print(f"Error reading file {file_path}: {e}") return poems def main(): """主入口函数""" import argparse parser = argparse.ArgumentParser( description='基于大模型的中国古代诗词多维度分类系统', formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" 示例: # 使用本地模型分类(递归搜索子目录) python classify_pois.py ./poems ./output.jsonl 10 --llm \\ --base-url http://localhost:11434/v1 --model qwen:7b # 使用远程模型分类 python classify_pois.py ./poems ./output.jsonl 10 --llm \\ --base-url https://api.openai.com/v1 --model gpt-4 --api-key sk-xxx # 断点续跑(自动跳过已处理的诗词) python classify_pois.py ./poems ./output.jsonl 10 --llm \\ --base-url http://localhost:11434/v1 --model qwen:7b # 只扫描当前目录,不递归子目录 python classify_pois.py ./poems ./output.jsonl 10 --llm --no-recursive \\ --base-url http://localhost:11434/v1 --model qwen:7b 环境变量(备选): LLM_BASE_URL, LLM_API_KEY, LLM_MODEL """ ) parser.add_argument('input_folder', help='输入文件夹路径(包含诗词 JSON/JSONL 文件)') parser.add_argument('output_file', help='输出 JSONL 文件路径') parser.add_argument('max_workers', nargs='?', type=int, default=10, help='并发数(默认:10)') parser.add_argument('--llm', action='store_true', help='使用 LLM 分类(默认)') parser.add_argument('--base-url', type=str, help='LLM API 基础 URL') parser.add_argument('--api-key', type=str, help='LLM API 密钥') parser.add_argument('--model', type=str, help='LLM 模型名称') parser.add_argument('--no-recursive', action='store_true', help='不递归搜索子目录') args = parser.parse_args() # 获取配置 base_url = args.base_url or os.getenv('LLM_BASE_URL', 'http://localhost:11434/v1') api_key = args.api_key or os.getenv('LLM_API_KEY', '') model = args.model or os.getenv('LLM_MODEL', 'qwen:7b') print(f"\n{'='*70}") print(f"基于大模型的诗词多维度分类系统") print(f"{'='*70}") print(f"输入目录:{args.input_folder}") print(f"输出文件:{args.output_file}") print(f"API 地址:{base_url}") print(f"模型名称:{model}") print(f"{'='*70}\n") # 加载已存在的诗词签名 existing_signatures = load_existing_poems(args.output_file) if existing_signatures: print(f"检测到输出文件中已有 {len(existing_signatures)} 首诗词,将自动跳过重复项\n") # 创建分类器 llm_classifier = LLMClassifier( base_url=base_url, api_key=api_key, model=model, output_file=args.output_file ) # 扫描输入文件(支持递归搜索子目录) valid_extensions = {'.json', '.jsonl'} json_files = [] if args.no_recursive: # 不递归,只扫描当前目录 for f in os.listdir(args.input_folder): if not any(f.lower().endswith(ext) for ext in valid_extensions): continue file_path = os.path.join(args.input_folder, f) if not os.path.isfile(file_path): continue json_files.append(file_path) else: # 递归扫描所有子目录 for root, dirs, files in os.walk(args.input_folder): # 跳过隐藏目录 dirs[:] = [d for d in dirs if not d.startswith('.')] for f in files: if not any(f.lower().endswith(ext) for ext in valid_extensions): continue # 跳过隐藏文件 if f.startswith('.'): continue file_path = os.path.join(root, f) if os.path.isfile(file_path): json_files.append(file_path) # 按文件路径排序,保证处理顺序一致 json_files.sort() print(f"发现 {len(json_files)} 个有效的 JSON/JSONL 文件\n") # 显示目录结构(如果有子目录) subdirs = set(os.path.dirname(f).replace(args.input_folder, '').lstrip('\\').lstrip('/') for f in json_files) if subdirs: print(f"包含子目录:{', '.join(sorted(subdirs))}") # 统计信息 stats = { 'processed': 0, 'skipped': 0, 'failed': 0, 'invalid': 0, 'total': 0 } stats_lock = threading.Lock() def process_poem_batch(file_path: str) -> dict: """处理单个文件的诗词""" file_stats = {'processed': 0, 'skipped': 0, 'failed': 0, 'invalid': 0} poems = load_poems_from_file(file_path) for idx, poem in enumerate(poems): if not is_valid_poem_data(poem): file_stats['invalid'] += 1 print(f" [{idx+1}/{len(poems)}] 格式无效:{poem.get('title', 'Unknown')}") continue poem_sig = generate_poem_signature(poem) if poem_sig in existing_signatures: file_stats['skipped'] += 1 print(f" [{idx+1}/{len(poems)}] 跳过(已存在): {poem.get('title', 'Unknown')}") continue print(f" [{idx+1}/{len(poems)}] 分类中:{poem.get('title', 'Unknown')}") result = llm_classifier.classify_poem(poem, skip_if_exists=True, existing_signatures=existing_signatures) if result: file_stats['processed'] += 1 existing_signatures.add(poem_sig) print(f" ✓ 成功") else: file_stats['failed'] += 1 print(f" ✗ 失败") return file_stats # 使用线程池并发处理 print(f"使用 {args.max_workers} 个并发线程处理\n") with ThreadPoolExecutor(max_workers=args.max_workers) as executor: # 提交所有文件处理任务 future_to_file = {executor.submit(process_poem_batch, fp): fp for fp in json_files} # 收集结果 for future in as_completed(future_to_file): file_path = future_to_file[future] try: file_stats = future.result() with stats_lock: stats['processed'] += file_stats['processed'] stats['skipped'] += file_stats['skipped'] stats['failed'] += file_stats['failed'] stats['invalid'] += file_stats['invalid'] stats['total'] += file_stats['processed'] + file_stats['skipped'] + file_stats['failed'] + file_stats['invalid'] except Exception as e: print(f"处理文件 {file_path} 时出错:{e}") print() # 输出统计 print(f"\n{'='*70}") print(f"分类完成!") print(f"{'='*70}") print(f"诗词总数:{stats['total']}") print(f"成功分类:{stats['processed']}") print(f"跳过重复:{stats['skipped']}") print(f"分类失败:{stats['failed']}") print(f"格式无效:{stats['invalid']}") print(f"输出文件:{args.output_file}") print(f"{'='*70}\n") if __name__ == "__main__": main()