thermoelasticsim.md.thermostats 源代码

# 文件名: thermostats.py
# 作者: Gilbert Young
# 修改日期: 2025-08-12
# 文件描述: 实现分子动力学模拟中的各种恒温器。

import logging

import numpy as np

from thermoelasticsim.interfaces.cpp_interface import CppInterface
from thermoelasticsim.utils.utils import KB_IN_EV

# 配置日志记录
logger = logging.getLogger(__name__)


[文档] class Thermostat: """ 恒温器基类,定义恒温器的接口和通用属性 Parameters ---------- target_temperature : float 目标温度,单位K """
[文档] def __init__(self, target_temperature: float): self.target_temperature = target_temperature self.kb = KB_IN_EV # Boltzmann常数,单位eV/K # 用于记录历史数据 self.temperature_history = [] self.time_history = [] self.kinetic_energy_history = [] self.xi_history = [] # 初始化xi_history
[文档] def apply(self, atoms: list, dt: float) -> None: """ 应用恒温器,更新原子速度 Parameters ---------- atoms : list 原子列表 dt : float 时间步长 """ raise NotImplementedError
[文档] def get_kinetic_energy(self, atoms: list) -> float: """ 计算系统动能 Parameters ---------- atoms : list 原子列表 Returns ------- float 系统总动能 """ masses = np.array([atom.mass for atom in atoms], dtype=np.float64) velocities = np.array([atom.velocity for atom in atoms], dtype=np.float64) kinetic_energy = 0.5 * masses * np.sum(velocities**2, axis=1) return np.sum(kinetic_energy)
[文档] def get_temperature(self, atoms: list) -> float: """ 计算当前系统温度,扣除质心运动 Parameters ---------- atoms : list 原子列表 Returns ------- float 当前温度,单位K """ masses = np.array([atom.mass for atom in atoms], dtype=np.float64) velocities = np.array([atom.velocity for atom in atoms], dtype=np.float64) total_mass = np.sum(masses) com_velocity = np.sum(masses[:, np.newaxis] * velocities, axis=0) / total_mass relative_velocities = velocities - com_velocity kinetic_energy = 0.5 * masses * np.sum(relative_velocities**2, axis=1) dof = 3 * len(atoms) - 3 # 考虑质心运动约束 temperature = 2.0 * np.sum(kinetic_energy) / (dof * self.kb) return temperature
[文档] def remove_com_motion(self, atoms: list) -> None: """ 移除系统质心运动 Parameters ---------- atoms : list 原子列表 """ masses = np.array([atom.mass for atom in atoms]) velocities = np.array([atom.velocity for atom in atoms]) total_mass = np.sum(masses) com_velocity = np.sum(masses[:, np.newaxis] * velocities, axis=0) / total_mass for atom in atoms: atom.velocity -= com_velocity
[文档] def record_state(self, atoms: list, time: float) -> None: """记录系统状态用于后续分析""" temp = self.get_temperature(atoms) kinetic = self.get_kinetic_energy(atoms) self.temperature_history.append(temp) self.time_history.append(time) self.kinetic_energy_history.append(kinetic)
# 由子类决定是否记录 xi_chain
[文档] class BerendsenThermostat(Thermostat): r""" Berendsen 恒温器 通过速度缩放实现温度控制,使系统快速达到目标温度。 该方法不产生严格的正则系综,但具有良好的数值稳定性。 Parameters ---------- target_temperature : float 目标温度,单位为开尔文 (K)。 tau : float 耦合时间常数,控制温度达到目标值的速率。 较小的 `\\tau` 值表示更快的温度响应,但可能导致数值不稳定; 较大的 `\\tau` 值则响应较慢,温度调整更加平缓。 """
[文档] def __init__(self, target_temperature: float, tau: float): super().__init__(target_temperature) self.tau = tau
[文档] def apply(self, cell, dt: float, potential) -> None: r""" 应用 Berendsen 恒温器以控制系统温度。 该恒温器通过以下缩放因子 `\\lambda` 来调整原子速度: .. math:: \\lambda = \\sqrt{1 + \\frac{\\Delta t}{\\tau} \\left(\\frac{T_{\\text{target}}}{T_{\\text{current}}} - 1\\right)} 其中: - `\\Delta t` 为时间步长 `dt`, - `T_{\\text{target}}` 为目标温度 `target_temperature`, - `T_{\\text{current}}` 为当前系统温度。 然后通过 `\\lambda` 缩放原子速度,以达到近似的温度控制。 Parameters ---------- atoms : list 原子对象的列表。 dt : float 时间步长。 """ atoms = cell.atoms current_temp = self.get_temperature(atoms) if current_temp > 0: # 计算缩放因子 lambda lambda_scale = np.sqrt( 1.0 + (dt / self.tau) * (self.target_temperature / current_temp - 1.0) ) # 缩放速度 velocities = np.array([atom.velocity for atom in atoms], dtype=np.float64) velocities *= lambda_scale for i, atom in enumerate(atoms): atom.velocity = velocities[i] # 记录状态 current_time = len(self.time_history) * dt if self.time_history else 0.0 self.record_state(atoms, current_time)
[文档] class AndersenThermostat(Thermostat): r""" Andersen 恒温器 通过随机碰撞来控制温度,实现符合正则系综的采样。 该方法适合平衡态采样,但由于速度随机化,动力学轨迹会被打断。 Parameters ---------- target_temperature : float 目标温度,单位为开尔文 (K)。 collision_frequency : float 碰撞频率,单位为 `\\text{fs}^{-1}`。 较高的碰撞频率会更频繁地随机分配速度,导致温度波动较大; 较低的碰撞频率则速度随机化较少,温度控制更稳定。 """
[文档] def __init__(self, target_temperature: float, collision_frequency: float): super().__init__(target_temperature) self.collision_frequency = collision_frequency
[文档] def apply(self, cell, dt: float, potential) -> None: r""" 应用 Andersen 恒温器以控制系统温度。 在每个时间步,针对每个原子,发生碰撞的概率为 `p`: .. math:: p = \\text{collision\\_frequency} \\times \\Delta t 若发生碰撞,则根据目标温度 `T_{\\text{target}}` 的 Maxwell-Boltzmann 分布重新分配原子的速度: .. math:: v_i = \\mathcal{N}(0, \\sigma) 其中 `\\sigma` 为速度分布的标准差,定义为: .. math:: \\sigma = \\sqrt{\\frac{k_B \\times T_{\\text{target}}}{m}} 其中 `k_B` 为玻尔兹曼常数,`m` 为原子质量。 Parameters ---------- atoms : list 原子对象的列表。 dt : float 时间步长。 """ atoms = cell.atoms velocities = np.array([atom.velocity for atom in atoms], dtype=np.float64) masses = np.array([atom.mass for atom in atoms], dtype=np.float64) collision_probs = self.collision_frequency * dt random_values = np.random.random(len(atoms)) collision_indices = np.where(random_values < collision_probs)[0] if collision_indices.size > 0: # 计算 sigma sigma = np.sqrt( self.kb * self.target_temperature / masses[collision_indices] ) # 重新分配速度 velocities[collision_indices] = np.random.normal( 0, sigma[:, np.newaxis], (collision_indices.size, 3) ) # 更新原子速度 for i in collision_indices: atoms[i].velocity = velocities[i] # # 移除整体平动 # self.remove_com_motion(atoms) # 记录状态 current_time = len(self.time_history) * dt if self.time_history else 0.0 self.record_state(atoms, current_time)
[文档] class NoseHooverThermostat(Thermostat): """ Nose-Hoover 恒温器 (Python直接实现版本) 使用Velocity-Verlet + Nose-Hoover对称分解的方式: 1. 半步更新xi 2. 半步速度缩放 3. 半步力更新速度 4. 更新位置 5. 调用potential.calculate_forces(cell)以更新力 6. 半步力更新速度 7. 半步速度缩放 8. 半步更新xi """
[文档] def __init__( self, target_temperature: float, time_constant: float, Q: float | None = None ): super().__init__(target_temperature) self.time_constant = time_constant if Q is None: self.Q = self._calculate_Q() else: self.Q = Q # Nose-Hoover热浴变量xi初始化 self.xi = np.array([0.0], dtype=np.float64)
def _calculate_Q(self) -> float: """计算合适的热浴质量""" return 3.0 * self.kb * self.target_temperature * self.time_constant**2
[文档] def apply(self, cell, dt: float, potential) -> None: """ 应用 Nose-Hoover 恒温器 (Velocity-Verlet 对称分解实现) Parameters ---------- cell : Cell 包含原子信息的Cell对象,可以通过cell.atoms访问原子列表 dt : float 时间步长 potential : Potential 用于计算力场的对象,必须实现 potential.calculate_forces(cell) """ atoms = cell.atoms num_atoms = len(atoms) dof = 3 * num_atoms - 3 # 去除刚体平移自由度(如果需要,不需要则可用3*num_atoms) masses = np.array([atom.mass for atom in atoms], dtype=np.float64) velocities = np.array( [atom.velocity for atom in atoms], dtype=np.float64 ).flatten() forces = np.array([atom.force for atom in atoms], dtype=np.float64).flatten() total_mass = np.sum(masses) # 可选:移除质心运动,如果不需要可省略 com_velocity = np.zeros(3) for i, atom in enumerate(atoms): com_velocity += masses[i] * atom.velocity com_velocity /= total_mass for i in range(num_atoms): velocities[3 * i : 3 * i + 3] -= com_velocity def kinetic_energy_func(masses, velocities): v2 = velocities.reshape(num_atoms, 3) ** 2 ke = 0.5 * np.sum(masses[:, np.newaxis] * v2) return ke # 当前动能 kinetic_energy = kinetic_energy_func(masses, velocities) dt_half = 0.5 * dt # 计算G_xi G_xi = (2.0 * kinetic_energy - dof * self.kb * self.target_temperature) / self.Q # 半步更新xi self.xi[0] += dt_half * G_xi # 半步缩放速度 scale = np.exp(-self.xi[0] * dt_half) velocities *= scale # 半步力更新速度 for i in range(num_atoms): inv_mass = 1.0 / masses[i] velocities[3 * i] += dt_half * forces[3 * i] * inv_mass velocities[3 * i + 1] += dt_half * forces[3 * i + 1] * inv_mass velocities[3 * i + 2] += dt_half * forces[3 * i + 2] * inv_mass # 更新位置 velocities_reshaped = velocities.reshape(num_atoms, 3) for i, atom in enumerate(atoms): atom.velocity = velocities_reshaped[i] + com_velocity atom.position += atom.velocity * dt # 重新计算力 potential.calculate_forces(cell) # 获取更新后的力并再次移除质心运动(可选) forces = np.array([atom.force for atom in atoms], dtype=np.float64) velocities = np.array( [atom.velocity for atom in atoms], dtype=np.float64 ).flatten() com_velocity = np.zeros(3) for i, atom in enumerate(atoms): com_velocity += masses[i] * atom.velocity com_velocity /= total_mass for i in range(num_atoms): velocities[3 * i : 3 * i + 3] -= com_velocity # 第二次半步力更新速度 for i in range(num_atoms): inv_mass = 1.0 / masses[i] velocities[3 * i] += dt_half * forces[3 * i] * inv_mass velocities[3 * i + 1] += dt_half * forces[3 * i + 1] * inv_mass velocities[3 * i + 2] += dt_half * forces[3 * i + 2] * inv_mass # 第二次半步缩放速度 velocities *= scale # 再次计算动能和G_xi,更新xi kinetic_energy = kinetic_energy_func(masses, velocities) G_xi = (2.0 * kinetic_energy - dof * self.kb * self.target_temperature) / self.Q self.xi[0] += dt_half * G_xi # 更新原子速度(加回COM) velocities_reshaped = velocities.reshape(num_atoms, 3) for i, atom in enumerate(atoms): atom.velocity = velocities_reshaped[i] + com_velocity # 记录状态 current_time = len(self.time_history) * dt if self.time_history else 0.0 self.record_state(atoms, current_time) # 调试日志 logger.debug(f"Nose-Hoover xi: {self.xi[0]}") current_temp = self.get_temperature(atoms) logger.debug(f"Current Temperature: {current_temp}K")
[文档] class NoseHooverChainThermostat(Thermostat): """ Nose-Hoover 链恒温器 通过多个热浴形成链来改善遍历性,适合复杂系统。 采用C++后端实现核心积分步骤。 Parameters ---------- target_temperature : float 目标温度,单位K time_constant : float 时间常数,控制热浴耦合强度。 较大的time_constant意味着热浴链的响应较慢,温度调整更加平缓。 较小的time_constant则响应较快,但可能导致温度波动。 chain_length : int, optional 链的长度,默认为3。 链长度增加可以改善相空间的遍历性,但会增加计算复杂度。 Q : numpy.ndarray, optional 热浴质量参数数组,长度应等于 `chain_length`。 如果未提供,将根据默认方式初始化。 """
[文档] def __init__( self, target_temperature: float, time_constant: float, chain_length: int = 3, Q: np.ndarray | None = None, ): super().__init__(target_temperature) self.time_constant = time_constant self.chain_length = chain_length self.cpp_interface = CppInterface("nose_hoover_chain") if Q is not None: Q = np.asarray(Q, dtype=np.float64) if Q.size != self.chain_length: raise ValueError( f"Q array must have length equal to chain_length ({self.chain_length})." ) self.Q = Q else: self.Q = self._initialize_chain_masses() # 初始化链变量 self.xi_chain = np.zeros(self.chain_length, dtype=np.float64) # 记录链变量历史 self.xi_history = [] # 添加这个列表用于存储链变量历史
def _initialize_chain_masses(self) -> np.ndarray: """初始化热浴链的质量""" Q = np.empty(self.chain_length, dtype=np.float64) Q[0] = 3.0 * self.kb * self.target_temperature * self.time_constant**2 for i in range(1, self.chain_length): Q[i] = self.kb * self.target_temperature * self.time_constant**2 return Q
[文档] def apply(self, atoms: list, dt: float) -> None: """ 应用 Nose-Hoover 链恒温器 Parameters ---------- atoms : list 原子列表 dt : float 时间步长 """ num_atoms = len(atoms) # 提前分配并填充 numpy 数组以避免重复分配 masses = np.empty(num_atoms, dtype=np.float64) velocities = np.empty(3 * num_atoms, dtype=np.float64) forces = np.empty(3 * num_atoms, dtype=np.float64) for i, atom in enumerate(atoms): masses[i] = atom.mass velocities[3 * i : 3 * i + 3] = atom.velocity forces[3 * i : 3 * i + 3] = atom.force try: # 调用 C++ 实现 self.cpp_interface.nose_hoover_chain( dt, num_atoms, masses, velocities, forces, self.xi_chain, self.Q, self.chain_length, self.target_temperature, ) # 更新原子速度 velocities = velocities.reshape((num_atoms, 3)) for i, atom in enumerate(atoms): atom.velocity = velocities[i] # # 移除质心运动 # self.remove_com_motion(atoms) except Exception as e: logger.error(f"Nose-Hoover chain thermostat error: {e}") raise # 记录状态 current_time = len(self.time_history) * dt if self.time_history else 0.0 self.record_state(atoms, current_time) # 添加调试日志 logger.debug(f"Nose-Hoover Chain xi_chain: {self.xi_chain}") current_temp = self.get_temperature(atoms) logger.debug(f"Current Temperature: {current_temp}K") # 记录当前 xi_chain 状态到历史中 self.xi_history.append(self.xi_chain.copy())
[文档] def get_chain_state(self) -> dict: """获取热浴链的当前状态""" return { "xi_chain": self.xi_chain.copy(), "Q": self.Q.copy(), "chain_length": self.chain_length, }