forked from mrlan/EnglishPal
176 lines
6.5 KiB
Python
176 lines
6.5 KiB
Python
|
from WordFreq import WordFreq
|
|||
|
from difficulty import WordDifficultyEvaluator
|
|||
|
from wordfreqCMD import youdao_link, sort_in_descending_order
|
|||
|
import pickle_idea, pickle_idea2
|
|||
|
import os
|
|||
|
import random, glob
|
|||
|
import hashlib
|
|||
|
from datetime import datetime
|
|||
|
from flask import Flask, request, redirect, render_template, url_for, session, abort, flash, get_flashed_messages
|
|||
|
from model.article import get_all_articles, get_article_by_id, get_number_of_articles
|
|||
|
import logging
|
|||
|
import re
|
|||
|
|
|||
|
path_prefix = './'
|
|||
|
db_path_prefix = './db/' # 部署时请注释掉此行
|
|||
|
oxford_words_path = './db/oxford_words.txt'
|
|||
|
|
|||
|
|
|||
|
def count_oxford_words(text, oxford_words):
|
|||
|
words = re.findall(r'\b\w+\b', text.lower())
|
|||
|
total_words = len(words)
|
|||
|
oxford_word_count = sum(1 for word in words if word in oxford_words)
|
|||
|
return oxford_word_count, total_words
|
|||
|
|
|||
|
|
|||
|
def calculate_ratio(oxford_word_count, total_words):
|
|||
|
if total_words == 0:
|
|||
|
return 0
|
|||
|
return oxford_word_count / total_words
|
|||
|
|
|||
|
|
|||
|
def load_oxford_words(file_path):
|
|||
|
oxford_words = {}
|
|||
|
with open(file_path, 'r', encoding='utf-8') as file:
|
|||
|
for line in file:
|
|||
|
parts = line.strip().split()
|
|||
|
word = parts[0]
|
|||
|
pos = parts[1]
|
|||
|
level = parts[2]
|
|||
|
oxford_words[word] = {'pos': pos, 'level': level}
|
|||
|
return oxford_words
|
|||
|
|
|||
|
|
|||
|
def total_number_of_essays():
|
|||
|
return get_number_of_articles()
|
|||
|
|
|||
|
|
|||
|
def get_article_title(s):
|
|||
|
return s.split('\n')[0]
|
|||
|
|
|||
|
|
|||
|
def get_article_body(s):
|
|||
|
lst = s.split('\n')
|
|||
|
lst.pop(0) # 移除第一行标题
|
|||
|
return '\n'.join(lst)
|
|||
|
|
|||
|
|
|||
|
def get_today_article(user_word_list, visited_articles):
|
|||
|
if visited_articles is None:
|
|||
|
visited_articles = {
|
|||
|
"index": 0, # 文章索引
|
|||
|
"article_ids": [] # 已显示文章的id列表,越后越新
|
|||
|
}
|
|||
|
|
|||
|
if visited_articles["index"] > len(visited_articles["article_ids"]) - 1: # 获取新的文章
|
|||
|
result = get_all_articles()
|
|||
|
else: # 获取已阅读过的文章
|
|||
|
if visited_articles["article_ids"][visited_articles["index"]] == 'null': # 如果刷新页面导致查询了'null'
|
|||
|
visited_articles["index"] -= 1
|
|||
|
visited_articles["article_ids"].pop()
|
|||
|
article_id = visited_articles["article_ids"][visited_articles["index"]]
|
|||
|
result = get_article_by_id(article_id)
|
|||
|
|
|||
|
random.shuffle(result)
|
|||
|
|
|||
|
# 根据读者的水平选择文章
|
|||
|
logging.debug('* get_today_article(): 开始加载用户词汇表 d1...')
|
|||
|
d1 = load_freq_history(user_word_list)
|
|||
|
d2 = load_freq_history(path_prefix + 'static/words_and_tests.p')
|
|||
|
logging.debug(' ... get_today_article(): 调用 get_difficulty_level_for_user() 开始')
|
|||
|
wordDifficultyEvaluator = WordDifficultyEvaluator()
|
|||
|
d3 = wordDifficultyEvaluator.get_difficulty_level_for_user(d1, d2)
|
|||
|
logging.debug(' ... get_today_article(): 完成')
|
|||
|
|
|||
|
d = None
|
|||
|
result_of_generate_article = "not found"
|
|||
|
|
|||
|
d_user = load_freq_history(user_word_list)
|
|||
|
logging.debug('* get_today_article(): 调用 user_difficulty_level() 开始')
|
|||
|
|
|||
|
user_level = wordDifficultyEvaluator.user_difficulty_level(d_user, d3) # 用户行为是动态的,考虑时间因素
|
|||
|
logging.debug('* get_today_article(): 完成')
|
|||
|
text_level = 0
|
|||
|
|
|||
|
if visited_articles["index"] > len(visited_articles["article_ids"]) - 1: # 生成新的文章
|
|||
|
amount_of_visited_articles = len(visited_articles["article_ids"])
|
|||
|
amount_of_existing_articles = len(result)
|
|||
|
if amount_of_visited_articles == amount_of_existing_articles: # 所有文章都已读过
|
|||
|
result_of_generate_article = "had read all articles"
|
|||
|
else:
|
|||
|
for _ in range(3): # 尝试最多3次
|
|||
|
for reading in result:
|
|||
|
text_level = wordDifficultyEvaluator.text_difficulty_level(reading['text'], d3)
|
|||
|
factor = random.gauss(0.8, 0.1) # 从均值为0.8,标准差为0.1的高斯分布中抽取一个数
|
|||
|
if reading['article_id'] not in visited_articles["article_ids"] and within_range(text_level,
|
|||
|
user_level, (
|
|||
|
8.0 - user_level) * factor): # 新文章且符合难度范围
|
|||
|
d = reading
|
|||
|
visited_articles["article_ids"].append(d['article_id']) # 添加新文章ID到列表
|
|||
|
result_of_generate_article = "found"
|
|||
|
break
|
|||
|
if result_of_generate_article == "found": # 成功找到文章后退出循环
|
|||
|
break
|
|||
|
if result_of_generate_article != "found": # 没有找到适合的文章,则添加空(“null”)
|
|||
|
visited_articles["article_ids"].append('null')
|
|||
|
else: # 生成已经阅读过的文章
|
|||
|
d = random.choice(result)
|
|||
|
text_level = wordDifficultyEvaluator.text_difficulty_level(d['text'], d3)
|
|||
|
result_of_generate_article = "found"
|
|||
|
|
|||
|
today_article = None
|
|||
|
if d:
|
|||
|
oxford_words = load_oxford_words(oxford_words_path)
|
|||
|
oxford_word_count, total_words = count_oxford_words(d['text'], oxford_words)
|
|||
|
ratio = calculate_ratio(oxford_word_count, total_words)
|
|||
|
today_article = {
|
|||
|
"user_level": '%4.1f' % user_level,
|
|||
|
"text_level": '%4.1f' % text_level,
|
|||
|
"date": d['date'],
|
|||
|
"article_title": get_article_title(d['text']),
|
|||
|
"article_body": get_article_body(d['text']),
|
|||
|
"source": d["source"],
|
|||
|
"question": get_question_part(d['question']),
|
|||
|
"answer": get_answer_part(d['question']),
|
|||
|
"ratio": ratio
|
|||
|
}
|
|||
|
|
|||
|
return visited_articles, today_article, result_of_generate_article
|
|||
|
|
|||
|
|
|||
|
def load_freq_history(path):
|
|||
|
d = {}
|
|||
|
if os.path.exists(path):
|
|||
|
d = pickle_idea.load_record(path)
|
|||
|
return d
|
|||
|
|
|||
|
|
|||
|
def within_range(x, y, r):
|
|||
|
return x > y and abs(x - y) <= r
|
|||
|
|
|||
|
|
|||
|
def get_question_part(s):
|
|||
|
result = []
|
|||
|
flag = False
|
|||
|
for line in s.split('\n'):
|
|||
|
line = line.strip()
|
|||
|
if line == 'QUESTION':
|
|||
|
result.append(line)
|
|||
|
flag = True
|
|||
|
elif line == 'ANSWER':
|
|||
|
flag = False
|
|||
|
elif flag:
|
|||
|
result.append(line)
|
|||
|
return '\n'.join(result)
|
|||
|
|
|||
|
|
|||
|
def get_answer_part(s):
|
|||
|
result = []
|
|||
|
flag = False
|
|||
|
for line in s.split('\n'):
|
|||
|
line = line.strip()
|
|||
|
if line == 'ANSWER':
|
|||
|
flag = True
|
|||
|
elif flag:
|
|||
|
result.append(line)
|
|||
|
return '\n'.join(result)
|