Commit 9de60412 by hanhusheng

作者和机构部分,外文翻译为中文

parent 6e3eacb1
import json
import os
import threading
import random
import time
from pypinyin import pinyin, Style
from openai import OpenAI
RED = '\033[91m'
GREEN = '\033[92m'
BLUE = '\033[94m'
RESET = '\033[0m'
def api_call_with_retry(func, *args, max_retries=10, initial_delay=1, **kwargs):
"""
执行API调用并在遇到速率限制时进行退避重试
参数:
- func: 要调用的函数
- *args: 传递给函数的位置参数
- max_retries: 最大重试次数
- initial_delay: 初始等待时间(秒)
- **kwargs: 传递给函数的关键字参数
返回:
- 函数的返回值
"""
delay = initial_delay
retries = 0
while True:
try:
return func(*args, **kwargs)
except Exception as e:
# 检查异常是否与速率限制相关
error_msg = str(e).lower()
if "rate limit" in error_msg or "ratelimit" in error_msg or "429" in error_msg or "tpm limit" in error_msg:
retries += 1
if retries > max_retries:
print(f"{RED}达到最大重试次数({max_retries}),放弃请求{RESET}")
raise e
# 指数退避策略,加入一点随机性
jitter = random.uniform(0, 0.1 * delay)
wait_time = delay + jitter
print(f"{BLUE}遇到速率限制,等待{wait_time:.2f}秒后重试 ({retries}/{max_retries}){RESET}")
time.sleep(wait_time)
# 增加下一次的等待时间(指数退避)
delay *= 2
else:
# 如果不是速率限制错误,直接抛出
raise
class ModelPool:
def __init__(self):
self.clients = []
self.current_index = 0
self.lock = threading.Lock()
# 硅基流动 API 配置
base_url = "https://api.siliconflow.cn/v1"
# 定义API密钥和对应模型
api_configs = [
{"name": "郑新翰", "model": "deepseek-ai/DeepSeek-V3", "key": "sk-qhyllkxnvsynlygdzitqhdlmvsyurystkapfimjvljmelgap"},
]
# 初始化客户端
for api_cfg in api_configs:
client = OpenAI(api_key=api_cfg["key"], base_url=base_url)
self.clients.append({
"client": client,
"model": api_cfg["model"],
"name": api_cfg["name"],
"key": api_cfg["key"]
})
def get_next_client(self):
"""轮询选择下一个客户端,线程安全"""
with self.lock:
client_info = self.clients[self.current_index]
self.current_index = (self.current_index + 1) % len(self.clients)
return client_info["client"], client_info["model"], client_info["name"], client_info["key"]
def translate_institution(chinese_text, model_pool):
"""使用LLM翻译中文机构名到英文"""
if not chinese_text or not isinstance(chinese_text, str) or all(ord(char) < 128 for char in chinese_text):
return chinese_text
client, model, _, _ = model_pool.get_next_client()
system_prompt = """
You are a professional academic institution name translator.
Translate the given Chinese institution name to its standard English name.
Guidelines:
1. Use official English names for well-known institutions
2. For universities, follow the pattern: "University of [Location]" or "[Location] University"
3. Keep abbreviations if commonly used (e.g. "CAS" for Chinese Academy of Sciences)
4. For research institutes, use "Institute of [Field]" pattern
5. Remove department-level details, only keep university/institute level
6. Return only the English name, no additional text or explanations
Example Input: 清华大学计算机科学与技术系
Example Output: Tsinghua University
Example Input: 中国科学院自动化研究所
Example Output: Institute of Automation, Chinese Academy of Sciences
donot show output: Institute of Applied Physics and Computational Mathematics, Beijing
should show output: Institute of Applied Physics and Computational Mathematics
"""
try:
response = api_call_with_retry(
client.chat.completions.create,
model=model,
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": chinese_text}
],
temperature=0.1,
max_tokens=1000
)
return response.choices[0].message.content.strip()
except Exception as e:
print(f"翻译机构名失败: {str(e)}")
return chinese_text
def convert_to_pinyin(ls):
# 如果是列表,递归处理每个元素
if isinstance(ls, list):
return [convert_to_pinyin(item) for item in ls]
# 如果是纯英文字符串,直接返回
if all(ord(char) < 128 for char in ls):
return ls
# 处理中文字符串 - 将姓和名分开
pinyin_list = pinyin(ls, style=Style.NORMAL)
if len(pinyin_list) >= 2: # 如果有姓和名
# 将姓放在最后
return ''.join([item[0] for item in pinyin_list[1:]]) + ' ' + pinyin_list[0][0]
else:
return ' '.join([item[0] for item in pinyin_list])
def process_json_files():
# 初始化模型池
model_pool = ModelPool()
# 读取config.json获取result_dir路径
with open('config.json', 'r', encoding='utf-8') as f:
config = json.load(f)
result_dir = config['result_dir']
# 使用os.walk递归遍历result_dir及其子目录
for root, dirs, files in os.walk(result_dir):
for filename in files:
if filename.endswith('.json'):
filepath = os.path.join(root, filename)
# 读取json文件
with open(filepath, 'r', encoding='utf-8') as f:
data = json.load(f)
# 转换中文名称和机构为拼音
if 'Authors' in data:
data['Authors'] = convert_to_pinyin(data['Authors'])
if 'Corresponding Authors' in data:
data['Corresponding Authors'] = convert_to_pinyin(data['Corresponding Authors'])
if 'Institutions' in data:
data['Institutions'] = [translate_institution(inst, model_pool) for inst in data['Institutions']]
# 生成新文件名(原文件名加_piny后缀)
base_name = os.path.splitext(filename)[0]
new_filename = f"{base_name}_piny.json"
# 修改这里:直接在原目录同级创建_piny文件夹
piny_dir = os.path.join(os.path.dirname(root), f"{os.path.basename(root)}-piny")
os.makedirs(piny_dir, exist_ok=True)
new_filepath = os.path.join(piny_dir, new_filename)
# 保存修改后的json文件到新路径
with open(new_filepath, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=4)
print(f"已处理并保存文件: {filepath} -> {new_filepath}")
if __name__ == '__main__':
process_json_files()
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment