Files
PoemClassify/backend/main.py
2026-03-23 22:31:48 +08:00

630 lines
22 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
古诗词阅读网站后端 API
基于 FastAPI 的 RESTful API提供
- 诗词数据管理CRUD
- 多类别组合筛选
- 阅读标记管理
- 统计分析
- 批量导入
"""
import json
import os
import sqlite3
from datetime import datetime
from typing import List, Optional, Dict, Any
from contextlib import contextmanager
from fastapi import FastAPI, HTTPException, UploadFile, File, Query
from fastapi.staticfiles import StaticFiles
from fastapi.responses import FileResponse, JSONResponse
from fastapi.middleware.cors import CORSMiddleware
import uvicorn
app = FastAPI(title="古诗词阅读 API", version="2.0.0")
# 启用 CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# 配置
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
PROJECT_ROOT = os.path.dirname(BASE_DIR)
DB_PATH = os.path.join(BASE_DIR, 'poems.db')
FRONTEND_DIR = os.path.join(PROJECT_ROOT, 'frontend')
# 分类标签体系定义
CATEGORY_SYSTEM = {
"season": {
"name": "季节",
"tags": ["", "", "", "", "四季", "无明确季节"]
},
"solar_terms": {
"name": "节气",
"tags": ["立春", "雨水", "惊蛰", "春分", "清明", "谷雨",
"立夏", "小满", "芒种", "夏至", "小暑", "大暑",
"立秋", "处暑", "白露", "秋分", "寒露", "霜降",
"立冬", "小雪", "大雪", "冬至", "小寒", "大寒"]
},
"time_of_day": {
"name": "时辰",
"tags": ["清晨", "上午", "正午", "下午", "黄昏", "夜晚", "深夜", "黎明", "不明确"]
},
"genre": {
"name": "题材类型",
"tags": ["山水田园", "边塞征战", "咏史怀古", "咏物言志", "送别怀人", "思乡怀远",
"爱情闺怨", "友情赠答", "羁旅漂泊", "隐逸闲适", "讽喻时事", "节日习俗",
"宴饮酬唱", "读书治学", "农耕劳作", "宗教禅理", "其他"]
},
"emotion_tone": {
"name": "情感基调",
"tags": ["喜悦欢快", "悲伤哀愁", "愤怒激愤", "忧郁伤感", "孤独寂寞",
"宁静淡泊", "豪迈激昂", "思念眷恋", "惆怅失落", "平和超脱", "复杂混合"]
},
"emotions": {
"name": "具体情感",
"tags": ["", "", "", "", "", "", "", "", "", "",
"", "", "", "", "", "", "", "", "", ""]
},
"nature_scenery": {
"name": "自然景物",
"tags": ["", "", "", "", "", "", "", "", "", "", "", "", "", ""]
},
"plants": {
"name": "植物",
"tags": ["", "", "", "", "", "", "", "", "", "", "", "", "梧桐", "芭蕉", "其他"]
},
"animals": {
"name": "动物",
"tags": ["", "", "", "", "", "", "", "", "", "", "", "", "", "其他"]
},
"buildings": {
"name": "建筑",
"tags": ["", "", "", "", "", "", "", "", "", "", "", "", "殿", "", "其他"]
},
"philosophy": {
"name": "哲理思想",
"tags": ["儒家思想", "道家思想", "佛家禅理", "人生感悟", "历史兴叹", "自然之道", "无明显哲理"]
},
"life_stage": {
"name": "人生阶段",
"tags": ["少年", "青年", "中年", "老年", "不明确"]
},
"social_role": {
"name": "社会身份",
"tags": ["士人", "官员", "隐士", "游子", "征人", "商贾", "农夫", "僧道", "闺中", "其他"]
},
"technique": {
"name": "写作手法",
"tags": ["比兴", "", "对仗", "用典", "借景抒情", "托物言志", "虚实结合", "动静结合", "其他"]
},
"rhetoric": {
"name": "修辞手法",
"tags": ["比喻", "拟人", "夸张", "对偶", "排比", "反复", "设问", "反问", "其他"]
},
"colors": {
"name": "色彩意象",
"tags": ["", "绿", "", "", "", "", "", "", "", ""]
},
"sounds": {
"name": "声音意象",
"tags": ["钟声", "鼓声", "笛声", "琴声", "风声", "雨声", "鸟鸣", "蝉鸣", "其他"]
},
"location": {
"name": "地理方位",
"tags": ["江南", "塞北", "中原", "巴蜀", "关中", "岭南", "吴越", "荆楚", "其他"]
},
"festival": {
"name": "节日习俗",
"tags": ["春节", "元宵", "清明", "端午", "七夕", "中秋", "重阳", "除夕", ""]
}
}
@contextmanager
def get_db_connection():
"""数据库连接上下文管理器"""
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
try:
yield conn
finally:
conn.close()
def init_database():
"""初始化数据库表结构"""
with get_db_connection() as conn:
cursor = conn.cursor()
# 诗词表
cursor.execute('''
CREATE TABLE IF NOT EXISTS poems (
id TEXT PRIMARY KEY,
title TEXT NOT NULL,
author TEXT NOT NULL,
paragraphs TEXT,
signature TEXT UNIQUE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# 分类标签表
cursor.execute('''
CREATE TABLE IF NOT EXISTS classifications (
id INTEGER PRIMARY KEY AUTOINCREMENT,
poem_id TEXT NOT NULL,
category TEXT NOT NULL,
tags TEXT,
FOREIGN KEY (poem_id) REFERENCES poems(id) ON DELETE CASCADE
)
''')
# 阅读记录表
cursor.execute('''
CREATE TABLE IF NOT EXISTS reading_records (
id INTEGER PRIMARY KEY AUTOINCREMENT,
poem_id TEXT UNIQUE NOT NULL,
is_read BOOLEAN DEFAULT FALSE,
read_at TIMESTAMP,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (poem_id) REFERENCES poems(id) ON DELETE CASCADE
)
''')
# 创建索引
cursor.execute('CREATE INDEX IF NOT EXISTS idx_classifications_poem ON classifications(poem_id)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_classifications_category ON classifications(category)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_reading_records_poem ON reading_records(poem_id)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_poems_signature ON poems(signature)')
conn.commit()
def poem_to_dict(row: sqlite3.Row, classifications: Optional[List[sqlite3.Row]] = None, reading_record: Optional[sqlite3.Row] = None) -> Dict:
"""将数据库行转换为诗词字典"""
result = {
'id': row['id'],
'title': row['title'],
'author': row['author'],
'paragraphs': json.loads(row['paragraphs']) if row['paragraphs'] else [],
'created_at': row['created_at'],
'updated_at': row['updated_at']
}
if classifications:
result['classifications'] = {}
for clf in classifications:
result['classifications'][clf['category']] = json.loads(clf['tags'])
if reading_record:
result['is_read'] = bool(reading_record['is_read'])
result['read_at'] = reading_record['read_at']
else:
result['is_read'] = False
result['read_at'] = None
return result
# ===== API 端点 =====
# 挂载静态文件目录
app.mount("/static", StaticFiles(directory=FRONTEND_DIR), name="static")
@app.get("/")
async def serve_frontend():
"""提供前端页面"""
return FileResponse(os.path.join(FRONTEND_DIR, 'index.html'))
@app.get("/style.css")
async def serve_css():
"""提供 CSS 文件"""
return FileResponse(os.path.join(FRONTEND_DIR, 'style.css'))
@app.get("/script.js")
async def serve_js():
"""提供 JS 文件"""
return FileResponse(os.path.join(FRONTEND_DIR, 'script.js'))
@app.get("/api/categories")
async def get_categories():
"""获取分类标签体系"""
return CATEGORY_SYSTEM
@app.get("/api/stats")
async def get_statistics():
"""获取统计数据"""
with get_db_connection() as conn:
cursor = conn.cursor()
# 总数
cursor.execute('SELECT COUNT(*) FROM poems')
total = cursor.fetchone()[0]
# 已读数量
cursor.execute('SELECT COUNT(*) FROM reading_records WHERE is_read = TRUE')
read_count = cursor.fetchone()[0]
# 各分类统计
cursor.execute('''
SELECT category, COUNT(*) as count
FROM classifications
GROUP BY category
''')
category_stats = {row['category']: row['count'] for row in cursor.fetchall()}
# 热门标签
cursor.execute('''
SELECT category, tags FROM classifications
WHERE category = 'genre' OR category = 'emotion_tone'
''')
popular_tags = {}
for row in cursor.fetchall():
tags = json.loads(row['tags'])
for tag in tags:
if tag not in popular_tags:
popular_tags[tag] = 0
popular_tags[tag] += 1
top_tags = sorted(popular_tags.items(), key=lambda x: x[1], reverse=True)[:20]
return {
'total_poems': total,
'read_count': read_count,
'unread_count': total - read_count,
'category_stats': category_stats,
'top_tags': top_tags,
'reading_progress': round((read_count / total * 100) if total > 0 else 0, 1)
}
@app.post("/api/poems/import")
async def import_poems(file: UploadFile = File(...)):
"""批量导入诗词(支持 JSON/JSONL"""
if not file.filename or not file.filename.endswith(('.json', '.jsonl')):
raise HTTPException(status_code=400, detail="仅支持 JSON 和 JSONL 格式")
content = await file.read()
content_str = content.decode('utf-8').strip()
if not content_str:
raise HTTPException(status_code=400, detail="文件内容为空")
# 解析文件
raw_items = []
try:
data = json.loads(content_str)
if isinstance(data, list):
raw_items = data
elif isinstance(data, dict):
raw_items = [data]
except json.JSONDecodeError:
for line in content_str.splitlines():
line = line.strip()
if line:
try:
obj = json.loads(line)
if isinstance(obj, dict):
raw_items.append(obj)
except json.JSONDecodeError:
continue
# 导入数据库
imported_count = 0
skipped_count = 0
with get_db_connection() as conn:
cursor = conn.cursor()
for item in raw_items:
try:
# 验证基本格式
if not all(k in item for k in ['title', 'author', 'paragraphs']):
skipped_count += 1
continue
poem_id = item.get('id', f"poem_{datetime.now().timestamp()}")
signature = item.get('signature', '')
paragraphs = json.dumps(item['paragraphs'], ensure_ascii=False)
# 检查是否已存在
if signature:
cursor.execute('SELECT id FROM poems WHERE signature = ?', (signature,))
if cursor.fetchone():
skipped_count += 1
continue
# 插入诗词
cursor.execute('''
INSERT OR REPLACE INTO poems (id, title, author, paragraphs, signature)
VALUES (?, ?, ?, ?, ?)
''', (poem_id, item['title'], item['author'], paragraphs, signature or ''))
# 插入分类
if 'llm_classification' in item or 'classifications' in item:
classifications = item.get('llm_classification', item.get('classifications', {}))
# 先删除旧分类
cursor.execute('DELETE FROM classifications WHERE poem_id = ?', (poem_id,))
# 插入新分类
for category, tags in classifications.items():
if isinstance(tags, list):
tags_str = json.dumps(tags, ensure_ascii=False)
else:
tags_str = json.dumps([tags], ensure_ascii=False)
cursor.execute('''
INSERT INTO classifications (poem_id, category, tags)
VALUES (?, ?, ?)
''', (poem_id, category, tags_str))
# 初始化阅读记录
cursor.execute('''
INSERT OR IGNORE INTO reading_records (poem_id, is_read)
VALUES (?, FALSE)
''', (poem_id,))
imported_count += 1
except Exception as e:
print(f"导入失败:{e}")
skipped_count += 1
continue
conn.commit()
return {
'message': f'成功导入 {imported_count} 首诗词',
'imported': imported_count,
'skipped': skipped_count
}
@app.get("/api/poems")
async def get_poems(
page: int = Query(1, ge=1),
page_size: int = Query(20, ge=1, le=100),
categories: Optional[str] = Query(None, description="多类别筛选格式category1:tag1,category2:tag2"),
search: Optional[str] = Query(None, description="搜索关键词"),
is_read: Optional[bool] = Query(None, description="阅读状态筛选")
):
"""获取诗词列表(支持分页、多类别筛选、搜索)"""
with get_db_connection() as conn:
cursor = conn.cursor()
# 构建查询条件
conditions = []
params = []
# 阅读状态筛选
if is_read is not None:
conditions.append('rr.is_read = ?')
params.append(1 if is_read else 0)
# 搜索
if search:
conditions.append('(p.title LIKE ? OR p.author LIKE ?)')
params.extend([f'%{search}%', f'%{search}%'])
# 多类别筛选
if categories:
category_conditions = []
for cat_filter in categories.split(','):
if ':' in cat_filter:
category, tag = cat_filter.split(':', 1)
category_conditions.append('''
EXISTS (
SELECT 1 FROM classifications c
WHERE c.poem_id = p.id
AND c.category = ?
AND c.tags LIKE ?
)
''')
params.extend([category, f'%{tag}%'])
if category_conditions:
conditions.append(' AND '.join(category_conditions))
where_clause = ' AND '.join(conditions) if conditions else '1=1'
# 查询总数
count_sql = f'''
SELECT COUNT(DISTINCT p.id) FROM poems p
LEFT JOIN reading_records rr ON p.id = rr.poem_id
WHERE {where_clause}
'''
cursor.execute(count_sql, params)
total = cursor.fetchone()[0]
# 查询数据
sql = f'''
SELECT p.*, rr.is_read, rr.read_at
FROM poems p
LEFT JOIN reading_records rr ON p.id = rr.poem_id
WHERE {where_clause}
ORDER BY p.created_at DESC
LIMIT ? OFFSET ?
'''
params.extend([page_size, (page - 1) * page_size])
cursor.execute(sql, params)
poems = cursor.fetchall()
# 获取每首诗的分类
poem_ids = [row['id'] for row in poems]
classifications = {}
if poem_ids:
cursor.execute('''
SELECT poem_id, category, tags FROM classifications
WHERE poem_id IN ({})
'''.format(','.join('?' * len(poem_ids))), poem_ids)
for clf in cursor.fetchall():
if clf['poem_id'] not in classifications:
classifications[clf['poem_id']] = []
classifications[clf['poem_id']].append(clf)
# 转换结果
result = []
for row in poems:
poem_dict = {
'id': row['id'],
'title': row['title'],
'author': row['author'],
'paragraphs': json.loads(row['paragraphs']) if row['paragraphs'] else [],
'created_at': row['created_at'],
'updated_at': row['updated_at'],
'is_read': bool(row['is_read']),
'read_at': row['read_at'],
'classifications': {}
}
# 添加分类
if row['id'] in classifications:
for clf in classifications[row['id']]:
poem_dict['classifications'][clf['category']] = json.loads(clf['tags'])
result.append(poem_dict)
return {
'total': total,
'page': page,
'page_size': page_size,
'total_pages': (total + page_size - 1) // page_size,
'poems': result
}
@app.get("/api/poems/random")
async def get_random_poem():
"""获取随机一首诗词"""
with get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute('SELECT * FROM poems ORDER BY RANDOM() LIMIT 1')
poem = cursor.fetchone()
if not poem:
raise HTTPException(status_code=404, detail="没有可用的诗词")
cursor.execute('SELECT * FROM classifications WHERE poem_id = ?', (poem['id'],))
classifications = cursor.fetchall()
cursor.execute('SELECT * FROM reading_records WHERE poem_id = ?', (poem['id'],))
reading_record = cursor.fetchone()
return poem_to_dict(poem, classifications, reading_record)
@app.get("/api/poems/{poem_id}")
async def get_poem(poem_id: str):
"""获取单首诗词详情"""
with get_db_connection() as conn:
cursor = conn.cursor()
# 获取诗词
cursor.execute('SELECT * FROM poems WHERE id = ?', (poem_id,))
poem = cursor.fetchone()
if not poem:
raise HTTPException(status_code=404, detail="诗词不存在")
# 获取分类
cursor.execute('SELECT * FROM classifications WHERE poem_id = ?', (poem_id,))
classifications = cursor.fetchall()
# 获取阅读记录
cursor.execute('SELECT * FROM reading_records WHERE poem_id = ?', (poem_id,))
reading_record = cursor.fetchone()
return poem_to_dict(poem, classifications, reading_record)
@app.put("/api/poems/{poem_id}/read")
async def toggle_read_status(poem_id: str, is_read: bool = Query(...)):
"""切换阅读状态"""
with get_db_connection() as conn:
cursor = conn.cursor()
# 检查诗词是否存在
cursor.execute('SELECT id FROM poems WHERE id = ?', (poem_id,))
if not cursor.fetchone():
raise HTTPException(status_code=404, detail="诗词不存在")
# 更新或插入阅读记录
read_at = datetime.now().isoformat() if is_read else None
cursor.execute('''
INSERT INTO reading_records (poem_id, is_read, read_at)
VALUES (?, ?, ?)
ON CONFLICT(poem_id) DO UPDATE SET
is_read = excluded.is_read,
read_at = excluded.read_at
''', (poem_id, 1 if is_read else 0, read_at))
conn.commit()
return {'success': True, 'is_read': is_read, 'read_at': read_at}
@app.get("/api/tags/cloud")
async def get_tag_cloud():
"""获取标签云数据"""
with get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute('''
SELECT category, tags FROM classifications
''')
tag_counts = {}
for row in cursor.fetchall():
tags = json.loads(row['tags'])
for tag in tags:
key = f"{row['category']}:{tag}"
if key not in tag_counts:
tag_counts[key] = {
'category': row['category'],
'tag': tag,
'count': 0
}
tag_counts[key]['count'] += 1
return {
'tags': list(tag_counts.values()),
'categories': {k: v['name'] for k, v in CATEGORY_SYSTEM.items()}
}
# 启动时初始化数据库
@app.on_event("startup")
async def startup_event():
init_database()
print(f"\n数据库路径:{DB_PATH}")
print(f"前端目录:{FRONTEND_DIR}")
print("\nAPI 端点:")
print(" GET / - 前端页面")
print(" GET /api/categories - 分类体系")
print(" GET /api/stats - 统计数据")
print(" POST /api/poems/import - 批量导入")
print(" GET /api/poems - 诗词列表")
print(" GET /api/poems/{id} - 诗词详情")
print(" PUT /api/poems/{id}/read - 切换阅读状态")
print(" GET /api/poems/random - 随机诗词")
print(" GET /api/tags/cloud - 标签云")
print("\n按 Ctrl+C 停止服务\n")
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)