Files
PoemClassify/classify_pois.py
2026-03-23 22:31:48 +08:00

616 lines
24 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Poem Classification System with LLM API Integration
基于大模型的中国古代诗词多维度分类系统
分类维度包括:
- 季节时序(四季、节气、时辰)
- 题材类型(山水、边塞、咏史、咏物等)
- 情感心境(喜怒哀乐、忧思愁绪等)
- 景物意象(自然、植物、动物、建筑等)
- 哲理思想(儒释道、人生感悟等)
- 艺术手法(比兴、用典、对仗等)
- 人物关系(送别、怀人、思乡等)
- 生活场景(宴饮、耕作、读书等)
Configuration via environment variables:
- LLM_BASE_URL: API base URL (e.g., http://localhost:11434/v1)
- LLM_API_KEY: API key (optional for local models)
- LLM_MODEL: Model name (e.g., qwen:7b, gpt-4)
"""
import json
import os
import re
import hashlib
from typing import Dict, List, Set, Optional
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor, as_completed
import urllib.request
import urllib.error
import threading
def generate_poem_signature(poem_data: Dict) -> str:
"""
基于标题、作者、内容生成诗词的唯一签名MD5
用于检测重复诗词
"""
title = poem_data.get('title', '').strip()
author = poem_data.get('author', '').strip()
content_parts = []
if 'paragraphs' in poem_data:
for para in poem_data['paragraphs']:
if isinstance(para, str):
content_parts.append(para.strip())
elif isinstance(para, list):
content_parts.extend([p.strip() for p in para])
if 'content' in poem_data:
if isinstance(poem_data['content'], str):
content_parts.append(poem_data['content'].strip())
elif isinstance(poem_data['content'], list):
content_parts.extend([p.strip() for p in poem_data['content']])
content = '\n'.join(content_parts)
signature_str = f"title:{title}|author:{author}|content:{content}"
return hashlib.md5(signature_str.encode('utf-8')).hexdigest()
def load_existing_poems(output_file: str) -> Set[str]:
"""
从输出文件加载已存在的诗词签名
"""
existing_signatures = set()
if not os.path.exists(output_file):
return existing_signatures
try:
with open(output_file, 'r', encoding='utf-8') as f:
for line_num, line in enumerate(f, 1):
line = line.strip()
if not line:
continue
try:
data = json.loads(line)
if 'title' in data and 'author' in data:
sig = generate_poem_signature({
'title': data.get('title', ''),
'author': data.get('author', ''),
'paragraphs': data.get('llm_classification', {}).get('analysis', '').split('\n')
if data.get('llm_classification') else []
})
existing_signatures.add(sig)
except json.JSONDecodeError:
print(f" Warning: Skipping invalid JSON on line {line_num}")
except IOError as e:
print(f" Warning: Could not read existing output file: {e}")
return existing_signatures
def append_to_output_file(output_file: str, data: Dict, lock: threading.Lock):
"""
线程安全地追加写入输出文件
"""
with lock:
with open(output_file, 'a', encoding='utf-8') as f:
f.write(json.dumps(data, ensure_ascii=False) + '\n')
class LLMClassifier:
"""
基于大模型的诗词分类器
支持通过 OpenAI 兼容 API 调用本地或远程模型
提供 20+ 维度的诗词分类标签
"""
CLASSIFICATION_PROMPT = """你是一位中国古代诗词分类专家。请深入分析以下诗词,并从多个维度进行分类标注。
【诗词信息】
- 标题:{title}
- 作者:{author}
- 内容:{content}
【重要说明】
- 如果诗词内容是繁体中文,请在输出中包含简体中文版本
- 分类标签一律使用简体中文
【分类维度与标签选项】
1. 季节season["", "", "", "", "四季", "无明确季节"]
2. 节气solar_terms24 节气中的具体节气,如 "立春""清明""冬至" 等,无则空数组
3. 时辰time_of_day["清晨", "上午", "正午", "下午", "黄昏", "夜晚", "深夜", "黎明", "不明确"]
4. 题材类型genre["山水田园", "边塞征战", "咏史怀古", "咏物言志", "送别怀人", "思乡怀远",
"爱情闺怨", "友情赠答", "羁旅漂泊", "隐逸闲适", "讽喻时事", "节日习俗",
"宴饮酬唱", "读书治学", "农耕劳作", "宗教禅理", "其他"]
5. 情感基调emotion_tone["喜悦欢快", "悲伤哀愁", "愤怒激愤", "忧郁伤感", "孤独寂寞",
"宁静淡泊", "豪迈激昂", "思念眷恋", "惆怅失落", "平和超脱", "复杂混合"]
6. 具体情感emotions从以下选择 3-5 个最贴切的:
["", "", "", "", "", "", "", "", "", "", "", "",
"", "", "", "", "", "", "", ""]
7. 景物 - 自然nature_scenery["", "", "", "", "", "", "", "", "", "", "", "", "", ""]
8. 景物 - 植物plants["", "", "", "", "", "", "", "", "", "", "", "", "梧桐", "芭蕉", "其他"]
9. 景物 - 动物animals["", "", "", "", "", "", "", "", "", "", "", "", "", "其他"]
10. 景物 - 建筑buildings["", "", "", "", "", "", "", "", "", "", "", "", "殿", "", "其他"]
11. 意象关键词imagery提取 5-10 个诗中最具代表性的意象词汇(简体中文)
12. 哲理思想philosophy["儒家思想", "道家思想", "佛家禅理", "人生感悟", "历史兴叹", "自然之道", "无明显哲理"]
13. 人生阶段life_stage["少年", "青年", "中年", "老年", "不明确"]
14. 社会身份social_role["士人", "官员", "隐士", "游子", "征人", "商贾", "农夫", "僧道", "闺中", "其他"]
15. 写作手法technique["比兴", "", "对仗", "用典", "借景抒情", "托物言志", "虚实结合", "动静结合", "其他"]
16. 修辞手法rhetoric["比喻", "拟人", "夸张", "对偶", "排比", "反复", "设问", "反问", "其他"]
17. 色彩意象colors提取诗中的色彩词如 ["", "绿", "", "", "", "", "", "", "", ""]
18. 声音意象sounds["钟声", "鼓声", "笛声", "琴声", "风声", "雨声", "鸟鸣", "蝉鸣", "其他"]
19. 地理方位location["江南", "塞北", "中原", "巴蜀", "关中", "岭南", "吴越", "荆楚", "其他"]
20. 节日习俗festival["春节", "元宵", "清明", "端午", "七夕", "中秋", "重阳", "除夕", ""]
【返回格式】
请返回严格的 JSON 格式,结构如下:
{{
"original_text": ["空山新雨後,天氣晚來秋。", ...], // 原始文本(如果输入是繁体)
"simplified_text": ["空山新雨后,天气晚来秋。", ...], // 简体中文版本
"season": [""],
"solar_terms": ["白露"],
"time_of_day": "黄昏",
"genre": ["山水田园", "隐逸闲适"],
"emotion_tone": "宁静淡泊",
"emotions": ["", "", ""],
"nature_scenery": ["", "", ""],
"plants": ["", ""],
"animals": [""],
"buildings": [],
"imagery": ["空山", "新雨", "明月", "青松"],
"philosophy": ["道家思想", "自然之道"],
"life_stage": "中年",
"social_role": "隐士",
"technique": ["借景抒情", "动静结合"],
"rhetoric": ["拟人"],
"colors": ["", ""],
"sounds": [],
"location": "终南山",
"festival": "",
"analysis": "简要分析这首诗的主题思想、艺术特色和情感内涵100-200 字,使用简体中文)"
}}
【注意事项】
- 只返回 JSON不要有任何其他文字说明
- 每个维度根据诗意选择最贴切的标签,可以是 1 个或多个
- 如果某个维度没有明确对应,选择"""不明确"或空数组
- 意象关键词应从原诗中提取或合理归纳(使用简体中文)
- 分析要准确、深入、简洁(使用简体中文)
- 如果输入是繁体中文original_text 和 simplified_text 都要填写
- 如果输入已经是简体中文original_text 和 simplified_text 填写相同内容"""
def __init__(self, base_url: Optional[str] = None, api_key: Optional[str] = None,
model: Optional[str] = None, output_file: Optional[str] = None):
"""
初始化 LLM 分类器
Args:
base_url: API 基础 URL
api_key: API 密钥(本地模型可选)
model: 模型名称
output_file: 输出文件路径(用于实时写入)
"""
self.base_url = (base_url or os.getenv('LLM_BASE_URL') or 'https://api.siliconflow.cn/v1').rstrip('/')
self.api_key = api_key or os.getenv('LLM_API_KEY') or ''
self.model = model or os.getenv('LLM_MODEL') or 'qwen:7b'
self.output_file = output_file
self.file_lock = threading.Lock() if output_file else None
def _call_api(self, messages: List[Dict], temperature: float = 0.3, max_retries: int = 3) -> str:
"""
调用 LLM API带重试机制
Args:
messages: 消息列表
temperature: 温度参数
max_retries: 最大重试次数
Returns:
API 返回的文本内容
"""
url = f"{self.base_url}/chat/completions"
headers = {
'Content-Type': 'application/json',
}
if self.api_key:
headers['Authorization'] = f'Bearer {self.api_key}'
payload = {
'model': self.model,
'messages': messages,
'temperature': temperature,
'stream': False
}
for attempt in range(max_retries):
try:
data = json.dumps(payload).encode('utf-8')
req = urllib.request.Request(url, data=data, headers=headers, method='POST')
with urllib.request.urlopen(req, timeout=180) as response:
result = json.loads(response.read().decode('utf-8'))
return result['choices'][0]['message']['content']
except urllib.error.URLError as e:
print(f" API request failed (attempt {attempt + 1}/{max_retries}): {e}")
if attempt == max_retries - 1:
return ""
except json.JSONDecodeError as e:
print(f" Failed to parse API response: {e}")
return ""
except Exception as e:
print(f" Unexpected error (attempt {attempt + 1}/{max_retries}): {e}")
if attempt == max_retries - 1:
return ""
return ""
def classify_poem(self, poem_data: Dict, skip_if_exists: bool = False,
existing_signatures: Optional[Set[str]] = None) -> Optional[Dict]:
"""
使用 LLM 对单首诗词进行分类
Args:
poem_data: 诗词数据字典
skip_if_exists: 是否跳过已存在的诗词
existing_signatures: 已存在的诗词签名集合
Returns:
分类结果字典,失败返回 None
"""
title = poem_data.get('title', '')
author = poem_data.get('author', '')
content_parts = []
if 'paragraphs' in poem_data:
for para in poem_data['paragraphs']:
if isinstance(para, str):
content_parts.append(para)
elif isinstance(para, list):
content_parts.extend(para)
if 'content' in poem_data:
if isinstance(poem_data['content'], str):
content_parts.append(poem_data['content'])
elif isinstance(poem_data['content'], list):
content_parts.extend(poem_data['content'])
content = '\n'.join(content_parts)
if not title and not content:
return None
# 检查是否已存在
if skip_if_exists and existing_signatures is not None:
poem_sig = generate_poem_signature(poem_data)
if poem_sig in existing_signatures:
return None
prompt = self.CLASSIFICATION_PROMPT.format(
title=title or '无题',
author=author or '佚名',
content=content
)
messages = [
{'role': 'system', 'content': '你是一位中国古代诗词分类专家,精通诗词鉴赏和分类,能够准确识别诗词的题材、情感、意象和艺术特色。'},
{'role': 'user', 'content': prompt}
]
response = self._call_api(messages, temperature=0.3)
if not response:
return None
try:
# 提取 JSON 部分
json_match = re.search(r'\{[\s\S]*\}', response)
if json_match:
result = json.loads(json_match.group())
else:
result = json.loads(response)
# 处理简繁转换结果
paragraphs = poem_data.get('paragraphs', [])
simplified_paragraphs = []
# 如果 LLM 返回了简体版本,使用它
if 'simplified_text' in result:
simplified_paragraphs = result['simplified_text']
elif 'original_text' in result:
# 如果只有 original_text说明输入已经是简体
simplified_paragraphs = paragraphs
else:
# 没有文本转换信息,使用原始内容
simplified_paragraphs = paragraphs
classification_result = {
'id': poem_data.get('id', 'unknown'),
'title': title,
'author': author,
'paragraphs': simplified_paragraphs, # 使用简体版本
'original_paragraphs': paragraphs if paragraphs != simplified_paragraphs else None, # 保留原始(如果不同)
'llm_classification': result,
'signature': generate_poem_signature({
'title': title,
'author': author,
'paragraphs': simplified_paragraphs
}), # 基于简体内容生成签名
'timestamp': datetime.now().isoformat(),
'read_mark': False
}
# 实时写入文件
if self.output_file and self.file_lock:
append_to_output_file(self.output_file, classification_result, self.file_lock)
return classification_result
except json.JSONDecodeError as e:
print(f" Failed to parse LLM response as JSON: {e}")
print(f" Raw response: {response[:300]}...")
return None
def is_valid_poem_data(poem_data: Dict) -> bool:
"""
验证诗词数据格式是否有效
"""
required_fields = {'author', 'paragraphs', 'title'}
if not isinstance(poem_data, dict):
return False
if not required_fields.issubset(poem_data.keys()):
return False
paragraphs = poem_data.get('paragraphs')
if paragraphs is None or not isinstance(paragraphs, list):
return False
for para in paragraphs:
if not isinstance(para, str):
return False
if 'id' not in poem_data or not isinstance(poem_data['id'], str):
return False
if not isinstance(poem_data['author'], str) or len(poem_data['author'].strip()) == 0:
return False
if not isinstance(poem_data['title'], str) or len(poem_data['title'].strip()) == 0:
return False
return True
def load_poems_from_file(file_path: str) -> List[Dict]:
"""
从文件加载诗词数据
支持 JSON 数组、单个 JSON 对象、JSONL 三种格式
"""
poems = []
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read().strip()
if not content:
return poems
# 尝试解析为 JSON
try:
data = json.loads(content)
if isinstance(data, list):
poems = data
elif isinstance(data, dict):
poems = [data]
except json.JSONDecodeError:
# 按 JSONL 处理
for line in content.splitlines():
line = line.strip()
if line:
try:
obj = json.loads(line)
if isinstance(obj, dict):
poems.append(obj)
except json.JSONDecodeError:
continue
except IOError as e:
print(f"Error reading file {file_path}: {e}")
return poems
def main():
"""主入口函数"""
import argparse
parser = argparse.ArgumentParser(
description='基于大模型的中国古代诗词多维度分类系统',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
示例:
# 使用本地模型分类(递归搜索子目录)
python classify_pois.py ./poems ./output.jsonl 10 --llm \\
--base-url http://localhost:11434/v1 --model qwen:7b
# 使用远程模型分类
python classify_pois.py ./poems ./output.jsonl 10 --llm \\
--base-url https://api.openai.com/v1 --model gpt-4 --api-key sk-xxx
# 断点续跑(自动跳过已处理的诗词)
python classify_pois.py ./poems ./output.jsonl 10 --llm \\
--base-url http://localhost:11434/v1 --model qwen:7b
# 只扫描当前目录,不递归子目录
python classify_pois.py ./poems ./output.jsonl 10 --llm --no-recursive \\
--base-url http://localhost:11434/v1 --model qwen:7b
环境变量(备选):
LLM_BASE_URL, LLM_API_KEY, LLM_MODEL
"""
)
parser.add_argument('input_folder', help='输入文件夹路径(包含诗词 JSON/JSONL 文件)')
parser.add_argument('output_file', help='输出 JSONL 文件路径')
parser.add_argument('max_workers', nargs='?', type=int, default=10, help='并发数默认10')
parser.add_argument('--llm', action='store_true', help='使用 LLM 分类(默认)')
parser.add_argument('--base-url', type=str, help='LLM API 基础 URL')
parser.add_argument('--api-key', type=str, help='LLM API 密钥')
parser.add_argument('--model', type=str, help='LLM 模型名称')
parser.add_argument('--no-recursive', action='store_true', help='不递归搜索子目录')
args = parser.parse_args()
# 获取配置
base_url = args.base_url or os.getenv('LLM_BASE_URL', 'http://localhost:11434/v1')
api_key = args.api_key or os.getenv('LLM_API_KEY', '')
model = args.model or os.getenv('LLM_MODEL', 'qwen:7b')
print(f"\n{'='*70}")
print(f"基于大模型的诗词多维度分类系统")
print(f"{'='*70}")
print(f"输入目录:{args.input_folder}")
print(f"输出文件:{args.output_file}")
print(f"API 地址:{base_url}")
print(f"模型名称:{model}")
print(f"{'='*70}\n")
# 加载已存在的诗词签名
existing_signatures = load_existing_poems(args.output_file)
if existing_signatures:
print(f"检测到输出文件中已有 {len(existing_signatures)} 首诗词,将自动跳过重复项\n")
# 创建分类器
llm_classifier = LLMClassifier(
base_url=base_url,
api_key=api_key,
model=model,
output_file=args.output_file
)
# 扫描输入文件(支持递归搜索子目录)
valid_extensions = {'.json', '.jsonl'}
json_files = []
if args.no_recursive:
# 不递归,只扫描当前目录
for f in os.listdir(args.input_folder):
if not any(f.lower().endswith(ext) for ext in valid_extensions):
continue
file_path = os.path.join(args.input_folder, f)
if not os.path.isfile(file_path):
continue
json_files.append(file_path)
else:
# 递归扫描所有子目录
for root, dirs, files in os.walk(args.input_folder):
# 跳过隐藏目录
dirs[:] = [d for d in dirs if not d.startswith('.')]
for f in files:
if not any(f.lower().endswith(ext) for ext in valid_extensions):
continue
# 跳过隐藏文件
if f.startswith('.'):
continue
file_path = os.path.join(root, f)
if os.path.isfile(file_path):
json_files.append(file_path)
# 按文件路径排序,保证处理顺序一致
json_files.sort()
print(f"发现 {len(json_files)} 个有效的 JSON/JSONL 文件\n")
# 显示目录结构(如果有子目录)
subdirs = set(os.path.dirname(f).replace(args.input_folder, '').lstrip('\\').lstrip('/') for f in json_files)
if subdirs:
print(f"包含子目录:{', '.join(sorted(subdirs))}")
# 统计信息
stats = {
'processed': 0,
'skipped': 0,
'failed': 0,
'invalid': 0,
'total': 0
}
stats_lock = threading.Lock()
def process_poem_batch(file_path: str) -> dict:
"""处理单个文件的诗词"""
file_stats = {'processed': 0, 'skipped': 0, 'failed': 0, 'invalid': 0}
poems = load_poems_from_file(file_path)
for idx, poem in enumerate(poems):
if not is_valid_poem_data(poem):
file_stats['invalid'] += 1
print(f" [{idx+1}/{len(poems)}] 格式无效:{poem.get('title', 'Unknown')}")
continue
poem_sig = generate_poem_signature(poem)
if poem_sig in existing_signatures:
file_stats['skipped'] += 1
print(f" [{idx+1}/{len(poems)}] 跳过(已存在): {poem.get('title', 'Unknown')}")
continue
print(f" [{idx+1}/{len(poems)}] 分类中:{poem.get('title', 'Unknown')}")
result = llm_classifier.classify_poem(poem, skip_if_exists=True, existing_signatures=existing_signatures)
if result:
file_stats['processed'] += 1
existing_signatures.add(poem_sig)
print(f" ✓ 成功")
else:
file_stats['failed'] += 1
print(f" ✗ 失败")
return file_stats
# 使用线程池并发处理
print(f"使用 {args.max_workers} 个并发线程处理\n")
with ThreadPoolExecutor(max_workers=args.max_workers) as executor:
# 提交所有文件处理任务
future_to_file = {executor.submit(process_poem_batch, fp): fp for fp in json_files}
# 收集结果
for future in as_completed(future_to_file):
file_path = future_to_file[future]
try:
file_stats = future.result()
with stats_lock:
stats['processed'] += file_stats['processed']
stats['skipped'] += file_stats['skipped']
stats['failed'] += file_stats['failed']
stats['invalid'] += file_stats['invalid']
stats['total'] += file_stats['processed'] + file_stats['skipped'] + file_stats['failed'] + file_stats['invalid']
except Exception as e:
print(f"处理文件 {file_path} 时出错:{e}")
print()
# 输出统计
print(f"\n{'='*70}")
print(f"分类完成!")
print(f"{'='*70}")
print(f"诗词总数:{stats['total']}")
print(f"成功分类:{stats['processed']}")
print(f"跳过重复:{stats['skipped']}")
print(f"分类失败:{stats['failed']}")
print(f"格式无效:{stats['invalid']}")
print(f"输出文件:{args.output_file}")
print(f"{'='*70}\n")
if __name__ == "__main__":
main()