thermoelasticsim.md.propagators 源代码

r"""基础算符传播子实现

每个传播子对应一个基本物理过程,可通过对称 Trotter 分解组合为完整积分方案。

已实现的传播子
--------------

PositionPropagator
    位置演化::math:`\mathbf{r} \leftarrow \mathbf{r} + \mathbf{v}\,\Delta t`

VelocityPropagator
    速度演化::math:`\mathbf{v} \leftarrow \mathbf{v} + \mathbf{F}\,\Delta t/m`

ForcePropagator
    力计算::math:`\mathbf{F} = -\nabla U(\mathbf{r})`

设计要点
--------
1. 单一职责:每个传播子仅负责一种过程
2. 与 Velocity-Verlet 分解一致,减少重复力计算
3. 数值稳定、支持时间可逆的对称分解
"""

from typing import Any

import numpy as np

from ..core.structure import Cell
from ..elastic.mechanics import StressCalculator
from ..utils.utils import EV_TO_GPA, KB_IN_EV
from .interfaces import Propagator

# ====================================================================
# 四阶Suzuki-Yoshida分解系数 (High-Precision Integration)
# ====================================================================
# Ref: H. Yoshida, Phys. Lett. A 150, 262-268 (1990)
#      https://doi.org/10.1016/0375-9601(90)90092-3
#
# 这些系数用于高精度的扩展系统积分,特别是Nose-Hoover链恒温器。
# 四阶方案相比二阶具有显著更好的长期稳定性和精度。
FOURTH_ORDER_COEFFS = [
    1.0 / (2.0 - 2.0 ** (1.0 / 3.0)),  # w1 = 1.351207191959657...
    -(2.0 ** (1.0 / 3.0)) / (2.0 - 2.0 ** (1.0 / 3.0)),  # w2 = -1.702414383919315...
    1.0 / (2.0 - 2.0 ** (1.0 / 3.0)),  # w3 = 1.351207191959657...
]

# 验证系数:w1 + w2 + w3 = 1.0 (必须严格满足)
_coeff_sum = sum(FOURTH_ORDER_COEFFS)
if abs(_coeff_sum - 1.0) > 1e-14:
    raise ValueError(f"Suzuki-Yoshida系数和验证失败: {_coeff_sum} != 1.0")

# 精确值记录(用于文档和验证)
_W1 = _W3 = 1.3512071919596578  # 1/(2-2^(1/3))
_W2 = -1.7024143839193153  # -2^(1/3)/(2-2^(1/3))


[文档] class PositionPropagator(Propagator): r"""位置传播子::math:`\exp(iL_r\,\Delta t)` 实现位置的时间演化::math:`\mathbf{r}(t+\Delta t)=\mathbf{r}(t)+\mathbf{v}(t)\,\Delta t`。 Notes ----- 处理周期性边界;不改变速度与力;与势能无关。 """
[文档] def propagate(self, cell: Cell, dt: float, **kwargs: Any) -> None: """执行位置更新 Parameters ---------- cell : Cell 晶胞对象,将直接修改其中原子的位置 dt : float 时间步长 (fs) **kwargs : Any 未使用,保持接口一致性 """ # 向量化更新位置并一次性应用PBC,减少Python循环开销 positions = cell.get_positions() velocities = cell.get_velocities() new_positions = positions + velocities * dt new_positions = cell.apply_periodic_boundary(new_positions) cell.set_positions(new_positions)
[文档] class VelocityPropagator(Propagator): r"""速度传播子::math:`\exp(iL_v\,\Delta t)` 实现速度的时间演化::math:`\mathbf{v}(t+\Delta t)=\mathbf{v}(t)+\mathbf{F}(t)\,\Delta t/m`。 Notes ----- 假设 :code:`atom.force` 已计算;不在此处重新计算力。 """
[文档] def propagate(self, cell: Cell, dt: float, **kwargs: Any) -> None: """执行速度更新 Parameters ---------- cell : Cell 晶胞对象,将直接修改其中原子的速度 dt : float 时间步长 (fs) **kwargs : Any 未使用,保持接口一致性 Notes ----- 该方法假设所有原子的力(atom.force)已经正确计算。 如果力未计算或已过期,结果将不正确。 """ # 向量化速度更新,减少Python循环开销 velocities = cell.get_velocities() forces = cell.get_forces() masses = np.array([a.mass for a in cell.atoms], dtype=np.float64) new_velocities = velocities + (forces / masses[:, None]) * dt cell.set_velocities(new_velocities)
[文档] class ForcePropagator(Propagator): r"""力计算传播子::math:`\mathbf{F} = -\nabla U(\mathbf{r})` 封装势函数的力计算调用,控制力计算时机,避免重复计算。 """
[文档] def __init__(self, potential): """初始化力传播子 Parameters ---------- potential : Potential 势函数对象,必须实现calculate_forces方法 """ self.potential = potential
[文档] def propagate(self, cell: Cell, dt: float, **kwargs: Any) -> None: """计算并更新所有原子的力 Parameters ---------- cell : Cell 晶胞对象,将更新其中所有原子的force属性 dt : float 时间步长 (fs),在此方法中未使用,仅为接口一致性 **kwargs : Any 额外参数,可能包含势函数参数等 Notes ----- 该方法将: 1. 调用势函数计算所有原子间相互作用 2. 更新每个原子的force属性 3. 确保力的单位和符号正确 如果势函数计算失败,将抛出相应异常。 """ # dt参数在此处未使用,仅为保持接口一致性 # 调用势函数计算力 self.potential.calculate_forces(cell)
[文档] class BerendsenThermostatPropagator(Propagator): r"""Berendsen 恒温器传播子(速度缩放) 缩放因子 :math:`\lambda`: .. math:: \lambda = \sqrt{1 + \frac{\Delta t}{\tau}\left(\frac{T_0}{T}-1\right)} Notes ----- 后处理式恒温;不严格产生正则分布,常用于快速平衡。 References ---------- Berendsen, H. J. C.; Postma, J. P. M.; van Gunsteren, W. F.; DiNola, A.; Haak, J. R. (1984). Molecular dynamics with coupling to an external bath. The Journal of Chemical Physics, 81(8), 3684–3690. https://doi.org/10.1063/1.448118 """
[文档] def __init__( self, target_temperature: float, tau: float = None, mode: str = "equilibration" ): r"""初始化 Berendsen 恒温器 Parameters ---------- target_temperature : float 目标温度 (K) tau : float, optional 时间常数 :math:`\tau_T` (fs)。未给定时按 `mode` 选择: 500.0 平衡模式(中等耦合,收敛较快) 2000.0 生产模式(弱耦合,扰动更小) mode : str, optional 运行模式,'equilibration'(平衡) 或 'production'(生产) 影响默认τ_T值的选择 """ if target_temperature <= 0: raise ValueError(f"目标温度必须为正数,得到 {target_temperature} K") if mode not in ["equilibration", "production"]: raise ValueError(f"mode必须是'equilibration'或'production',得到 {mode}") self.target_temperature = target_temperature self.mode = mode # 默认 τ_T 参数选择 if tau is None: if mode == "equilibration": self.tau = 500.0 # fs,快速平衡 else: # production self.tau = 2000.0 # fs,弱耦合生产 else: if tau <= 0: raise ValueError(f"时间常数必须为正数,得到 {tau} fs") self.tau = tau print( f"Berendsen恒温器初始化: T={target_temperature}K, τ_T={self.tau}fs, mode={mode}" ) # 统计信息 self._total_scaling_steps = 0 self._cumulative_scaling = 0.0 self._max_scaling_factor = 1.0 self._min_scaling_factor = 1.0 self._limited_steps = 0 # 被限制的步数 self._upper_limited = 0 # 上限限制次数 self._lower_limited = 0 # 下限限制次数
[文档] def propagate(self, cell: Cell, dt: float, **kwargs: Any) -> None: r"""执行 Berendsen 温度调节 Parameters ---------- cell : Cell 晶胞对象,将修改其中所有原子的速度 dt : float 时间步长 (fs) **kwargs : Any 额外参数,未使用 Notes ----- 标准流程: 1. 计算瞬时温度 :math:`T` 2. 计算缩放因子 :math:`\lambda = \sqrt{1 + \tfrac{\Delta t}{\tau_T}(\tfrac{T_0}{T}-1)}` 3. 对所有原子速度缩放 :math:`\mathbf{v} \leftarrow \lambda\,\mathbf{v}` 4. 可选:数值稳定性限制与统计监控 """ # 根据研究报告算法步骤:计算当前系统瞬时温度 current_temp = cell.calculate_temperature() # 特殊情况处理:如果温度接近0(无运动),初始化Maxwell分布 if current_temp < 1e-6: # 接近0K print("⚠️ 检测到零温度,初始化Maxwell分布速度") self._initialize_maxwell_velocities(cell) current_temp = cell.calculate_temperature() # 数值稳定性保护:避免除零错误,设置最小温度阈值 # 根据研究报告,这是必要的数值保护措施 if current_temp < 0.1: # 最小0.1K current_temp = 0.1 # 根据研究报告的Berendsen温度弛豫方程: # dT/dt = (T_0 - T)/τ_T # 速度缩放因子: λ = sqrt(1 + dt/τ_T * (T_0/T_current - 1)) raw_scaling_factor = np.sqrt( 1.0 + (dt / self.tau) * (self.target_temperature / current_temp - 1.0) ) # 数值稳定性保护:限制缩放因子在较窄范围,避免长时间积累导致漂移/爆炸 scaling_factor = raw_scaling_factor limited = False upper, lower = 1.05, 0.95 if raw_scaling_factor > upper: scaling_factor = upper limited = True self._upper_limited += 1 elif raw_scaling_factor < lower: scaling_factor = lower limited = True self._lower_limited += 1 # 更新限制统计 if limited: self._limited_steps += 1 # 增强的监控和警告机制 if self._total_scaling_steps % 500 == 0: # 每500步输出统计 avg_scaling = ( self._cumulative_scaling / self._total_scaling_steps if self._total_scaling_steps > 0 else 1.0 ) limitation_rate = ( (self._limited_steps / self._total_scaling_steps * 100) if self._total_scaling_steps > 0 else 0.0 ) temp_error = ( abs(current_temp - self.target_temperature) / self.target_temperature * 100 ) print( f"Berendsen统计 (#{self._total_scaling_steps}步): T={current_temp:.1f}K, 误差={temp_error:.2f}%, 平均缩放={avg_scaling:.3f}" ) print( f" 限制率: {limitation_rate:.2f}% (上限:{self._upper_limited}, 下限:{self._lower_limited})" ) if limited and self._total_scaling_steps % 100 == 0: print( f"⚠️ Berendsen缩放因子被限制: 原始={raw_scaling_factor:.3f}, 限制后={scaling_factor:.3f}" ) print( f" 当前温度={current_temp:.1f}K, 目标温度={self.target_temperature}K, τ_T={self.tau}fs" ) # 应用速度缩放: v_new = λ * v_old(向量化) v = cell.get_velocities() v *= scaling_factor cell.set_velocities(v) # 更新统计信息 self._total_scaling_steps += 1 self._cumulative_scaling += scaling_factor self._max_scaling_factor = max(self._max_scaling_factor, scaling_factor) self._min_scaling_factor = min(self._min_scaling_factor, scaling_factor)
def _initialize_maxwell_velocities(self, cell: Cell) -> None: """初始化Maxwell分布速度""" for atom in cell.atoms: # Maxwell分布标准差: σ = sqrt(kB*T/m) sigma = np.sqrt(KB_IN_EV * self.target_temperature / atom.mass) atom.velocity = np.random.normal(0, sigma, 3) # 移除质心运动 cell.remove_com_motion()
[文档] def get_statistics(self) -> dict: r"""获取 Berendsen 恒温器统计信息 Returns ------- dict 统计信息字典: total_steps 总调节步数 average_scaling 平均缩放因子 max_scaling 最大缩放因子 min_scaling 最小缩放因子 target_temperature 目标温度 (K) tau 时间常数 :math:`\tau_T` (fs) mode 运行模式(equilibration/production) limited_steps 被限制的步数 upper_limited / lower_limited 上/下限限制次数 limitation_rate 限制比例 (%) effective_coupling 有效耦合强度评估 """ if self._total_scaling_steps == 0: return { "total_steps": 0, "average_scaling": 1.0, "max_scaling": 1.0, "min_scaling": 1.0, "target_temperature": self.target_temperature, "tau": self.tau, "mode": self.mode, "limited_steps": 0, "upper_limited": 0, "lower_limited": 0, "limitation_rate": 0.0, "effective_coupling": "unknown", } avg_scaling = self._cumulative_scaling / self._total_scaling_steps limitation_rate = self._limited_steps / self._total_scaling_steps * 100 # 评估有效耦合强度 scaling_deviation = abs(avg_scaling - 1.0) if scaling_deviation < 0.01: effective_coupling = "weak" # 接近NVE elif scaling_deviation < 0.05: effective_coupling = "moderate" # 中等耦合 else: effective_coupling = "strong" # 强耦合 return { "total_steps": self._total_scaling_steps, "average_scaling": avg_scaling, "max_scaling": self._max_scaling_factor, "min_scaling": self._min_scaling_factor, "target_temperature": self.target_temperature, "tau": self.tau, "mode": self.mode, "limited_steps": self._limited_steps, "upper_limited": self._upper_limited, "lower_limited": self._lower_limited, "limitation_rate": limitation_rate, "effective_coupling": effective_coupling, }
[文档] def reset_statistics(self) -> None: """重置统计信息""" self._total_scaling_steps = 0 self._cumulative_scaling = 0.0 self._max_scaling_factor = 1.0 self._min_scaling_factor = 1.0 self._limited_steps = 0 self._upper_limited = 0 self._lower_limited = 0
[文档] class AndersenThermostatPropagator(Propagator): r"""Andersen 随机碰撞恒温器传播子 每个原子在每个时间步以概率 :math:`p=\nu\,\Delta t` 与热浴碰撞, 并从 Maxwell–Boltzmann 分布重新采样速度。 Notes ----- 产生正确的正则 (NVT) 分布,但会打断动力学连续性。 References ---------- Andersen, H. C. (1980). Molecular dynamics simulations at constant pressure and/or temperature. The Journal of Chemical Physics, 72(4), 2384–2393. https://doi.org/10.1063/1.439486 """
[文档] def __init__(self, target_temperature: float, collision_frequency: float = None): r"""初始化 Andersen 恒温器 Parameters ---------- target_temperature : float 目标温度 (K) collision_frequency : float, optional 碰撞频率 :math:`\nu` (fs⁻¹)。未给定时按系统尺寸选取: 小系统 (≤ 50 原子) 0.005 fs⁻¹ 中系统 (50–150 原子) 0.03 fs⁻¹ 大系统 (> 150 原子) 0.08 fs⁻¹ Raises ------ ValueError 如果target_temperature或collision_frequency为非正数 """ if target_temperature <= 0: raise ValueError(f"目标温度必须为正数,得到 {target_temperature} K") if collision_frequency is not None and collision_frequency <= 0: raise ValueError(f"碰撞频率必须为正数,得到 {collision_frequency} fs⁻¹") self.target_temperature = target_temperature self.base_collision_frequency = collision_frequency # 保存用户设置的基础频率 # 延迟初始化实际使用的频率(需要知道系统尺寸) self._effective_collision_frequency = collision_frequency # 统计信息 self._total_steps = 0 self._total_collisions = 0 self._collision_history = [] self._system_size_initialized = False
def _initialize_system_size_adaptive_frequency(self, num_atoms: int) -> float: """根据系统尺寸初始化适应性频率 根据研究报告的经验公式和实际测试结果, 不同尺寸系统需要不同的碰撞频率以保持有效控温。 Parameters ---------- num_atoms : int 系统中的原子数 Returns ------- float 适合该系统尺寸的碰撞频率 (fs⁻¹) """ if self.base_collision_frequency is not None: # 用户指定了频率,但仍需要根据系统尺寸调整 base_freq = self.base_collision_frequency else: # 根据系统尺寸选择基础频率 if num_atoms <= 50: base_freq = 0.005 # 小系统 elif num_atoms <= 150: base_freq = 0.03 # 中系统 else: base_freq = 0.08 # 大系统 print( f"Andersen恒温器初始化: {num_atoms}原子系统, 选择频率={base_freq:.5f} fs⁻¹" ) self._system_size_initialized = True return base_freq
[文档] def propagate(self, cell: Cell, dt: float, **kwargs: Any) -> None: r"""执行 Andersen 随机碰撞 Parameters ---------- cell : Cell 晶胞对象,将修改其中部分原子的速度 dt : float 时间步长 (fs) **kwargs : Any 额外参数,未使用 Notes ----- 1. 计算系统级碰撞概率 :math:`P=\nu\,\Delta t` 2. 逐原子以概率 :math:`P/N` 判定是否碰撞 3. 若碰撞,从 Maxwell–Boltzmann 分布采样新速度 """ num_atoms = len(cell.atoms) # 延迟初始化:根据系统尺寸确定碰撞频率 if not self._system_size_initialized: self._effective_collision_frequency = ( self._initialize_system_size_adaptive_frequency(num_atoms) ) # 计算当前系统温度 current_temp = cell.calculate_temperature() # 每个原子的碰撞概率(Andersen 原始定义):p_atom = ν × Δt # ν 单位采用 fs⁻¹,Δt 为 fs;期望每步碰撞数为 N × p_atom。 per_atom_collision_probability = self._effective_collision_frequency * dt # 约束到 [0, 1] if per_atom_collision_probability < 0.0: per_atom_collision_probability = 0.0 elif per_atom_collision_probability > 1.0: per_atom_collision_probability = 1.0 # 生成碰撞掩码 rand = np.random.random(num_atoms) mask = rand < per_atom_collision_probability step_collisions = int(np.count_nonzero(mask)) if step_collisions > 0: # 为被选中的原子采样Maxwell-Boltzmann速度 v = cell.get_velocities() masses = np.array([a.mass for a in cell.atoms], dtype=np.float64) sigmas = np.sqrt(KB_IN_EV * self.target_temperature / masses) # 只更新 mask 对应的行 k = step_collisions new_v = np.random.normal(0.0, 1.0, (k, 3)) * sigmas[mask][:, None] v[mask, :] = new_v # 回写到原子对象 for i, m in enumerate(mask): if m: cell.atoms[i].velocity = v[i] # 定期移除质心运动(维持系统稳定性) if step_collisions > 0 or self._total_steps % 20 == 0: # 有碰撞时或每20步 cell.remove_com_motion() # 统计信息监控 self._total_steps += 1 self._total_collisions += step_collisions self._collision_history.append(step_collisions) # 保持历史记录在合理长度 if len(self._collision_history) > 1000: self._collision_history.pop(0) # 定期输出统计信息 if self._total_steps % 1000 == 0: # 实际/期望的均是“每步碰撞数”口径 actual_collisions_per_step = self._total_collisions / self._total_steps expected_collisions_per_step = ( num_atoms * self._effective_collision_frequency * dt ) rate_ratio = ( (actual_collisions_per_step / expected_collisions_per_step) if expected_collisions_per_step > 0 else 0.0 ) temp_error = ( abs(current_temp - self.target_temperature) / self.target_temperature * 100 ) print( f"Andersen统计 (#{self._total_steps}步): T={current_temp:.1f}K, 误差={temp_error:.2f}%" ) print( f" 每步碰撞数: 实际={actual_collisions_per_step:.4f}, 期望={expected_collisions_per_step:.4f}, 比值={rate_ratio:.3f}" )
def _sample_maxwell_boltzmann_velocity(self, atom) -> None: """为单个原子采样Maxwell-Boltzmann分布速度(标准算法) 根据研究报告(Lines 198-202)的标准Maxwell-Boltzmann分布: p(v_α) = sqrt(m_i/(2π*k_B*T_0)) * exp(-m_i*v_α²/(2*k_B*T_0)) 每个速度分量是独立的高斯分布,标准差: σ = sqrt(k_B*T/m) Parameters ---------- atom : Atom 要重新采样速度的原子 """ # Maxwell分布的标准差: σ = sqrt(k_B*T_0/m) # 使用目标温度,不做缓变处理(符合研究报告的标准算法) sigma = np.sqrt(KB_IN_EV * self.target_temperature / atom.mass) # 从高斯分布采样三维速度 # 每个分量独立采样,符合Maxwell-Boltzmann理论 atom.velocity = np.random.normal(0, sigma, 3)
[文档] def get_statistics(self) -> dict: """获取 Andersen 恒温器统计信息 Returns ------- dict 统计信息字典: total_steps 总步数 total_collisions 总碰撞次数 collision_rate 平均碰撞率 (次/步) expected_rate 理论碰撞率 (fs⁻¹) target_temperature 目标温度 (K) effective_collision_frequency 有效碰撞频率 (fs⁻¹) base_collision_frequency 基础频率 (fs⁻¹) recent_collisions 最近 100 步的碰撞数 system_size_initialized 是否已根据系统尺寸初始化 """ if self._total_steps == 0: return { "total_steps": 0, "total_collisions": 0, "collision_rate": 0.0, "expected_rate": 0.0, "target_temperature": self.target_temperature, "effective_collision_frequency": self._effective_collision_frequency or 0.0, "base_collision_frequency": self.base_collision_frequency or 0.0, "recent_collisions": 0, "system_size_initialized": self._system_size_initialized, } collision_rate = self._total_collisions / self._total_steps recent_collisions = ( sum(self._collision_history[-100:]) if self._collision_history else 0 ) return { "total_steps": self._total_steps, "total_collisions": self._total_collisions, "collision_rate": collision_rate, "expected_rate": self._effective_collision_frequency or 0.0, "target_temperature": self.target_temperature, "effective_collision_frequency": self._effective_collision_frequency or 0.0, "base_collision_frequency": self.base_collision_frequency or 0.0, "recent_collisions": recent_collisions, "system_size_initialized": self._system_size_initialized, }
[文档] def reset_statistics(self) -> None: """重置统计信息""" self._total_steps = 0 self._total_collisions = 0 self._collision_history = []
[文档] class NoseHooverChainPropagator(Propagator): r"""Nose–Hoover 链恒温器传播子(四阶 Suzuki–Yoshida 分解) Notes ----- 链式热浴变量自洽演化,时间可逆对称分解,保证正则系综采样质量。 参考文献: Martyna et al., J. Chem. Phys. 97, 2635 (1992); Yoshida, Phys. Lett. A 150, 262 (1990). """
[文档] def __init__( self, target_temperature: float, tdamp: float = 100.0, tchain: int = 3, tloop: int = 1, ): """初始化Nose-Hoover链恒温器 Parameters ---------- target_temperature : float 目标温度 (K),必须为正数 tdamp : float, optional 特征时间常数 (fs),默认100fs 推荐值:50-100*dt,控制耦合强度 过小值(如10*dt)会导致数值不稳定 tchain : int, optional 热浴链长度,默认3 M=3通常足够保证遍历性和稳定性 tloop : int, optional Suzuki-Yoshida循环次数,默认1 通常1次循环已足够,增加会提升精度但增大计算量 Raises ------ ValueError 如果参数设置不合理 """ if target_temperature <= 0: raise ValueError(f"目标温度必须为正数,得到 {target_temperature} K") if tdamp <= 0: raise ValueError(f"时间常数必须为正数,得到 {tdamp} fs") if tchain < 1: raise ValueError(f"链长度必须≥1,得到 {tchain}") if tloop < 1: raise ValueError(f"循环次数必须≥1,得到 {tloop}") self.target_temperature = target_temperature self.tdamp = tdamp self.tchain = tchain self.tloop = tloop # 质量参数Q矩阵 - 延迟初始化(需要系统信息) self.Q = None self._num_atoms_global = None self._dof = None # 体系用于恒温器的自由度(与温度统计口径一致) self._masses = None # 原子质量数组(向量化计算用) self._initialized = False # 热浴变量状态 self.p_zeta = np.zeros(self.tchain) # 热浴动量 self.zeta = np.zeros(self.tchain) # 热浴位置(用于能量计算) # 统计信息 self._total_steps = 0 self._temp_history = [] self._conserved_energy_history = [] # 可选:上下文标签,用于日志打印(例如 "NVT预平衡 C11 +0.30%") self.context_label: str | None = None
def _initialize_Q_parameters(self, cell: Cell) -> None: """初始化质量参数Q矩阵 标准实现: Q[0] = N_f * k_B * T₀ * τ² (第一个热浴) Q[j] = k_B * T₀ * τ² (后续热浴) 其中N_f是系统自由度,τ是特征时间常数。 Parameters ---------- cell : Cell 晶胞对象,用于计算系统自由度 """ self._num_atoms_global = len(cell.atoms) # 计算用于恒温器的一致自由度口径: # - 单原子: 3 # - 多原子: 3N-3(扣除质心平动),与 Cell.calculate_temperature 一致 N_f = 3 if self._num_atoms_global <= 1 else 3 * self._num_atoms_global - 3 self._dof = N_f # 缓存质量数组以便后续向量化计算 try: self._masses = np.array([a.mass for a in cell.atoms], dtype=np.float64) except Exception: self._masses = None # 温度单位转换 kB_T = KB_IN_EV * self.target_temperature # Q参数计算 (ASE公式) self.Q = np.zeros(self.tchain) self.Q[0] = N_f * kB_T * self.tdamp**2 # 第一个热浴(按一致自由度) self.Q[1:] = kB_T * self.tdamp**2 # 后续热浴 self._initialized = True ctx = f" [{self.context_label}]" if getattr(self, "context_label", None) else "" print( f"NHC初始化{ctx}: N_atoms={self._num_atoms_global}, N_f={N_f}, " f"T={self.target_temperature}K, τ={self.tdamp}fs" ) if self.tchain > 1: print(f"Q参数: Q[0]={self.Q[0]:.3e}, Q[1]={self.Q[1]:.3e}") else: print(f"Q参数: Q[0]={self.Q[0]:.3e}") def _calculate_instantaneous_kinetic_energy(self, cell: Cell) -> float: """计算瞬时动能 计算Σ(p_i²/2m_i),不移除质心运动。 在算符分离中间步骤,保持所有动量信息的完整性。 Parameters ---------- cell : Cell 晶胞对象 Returns ------- float 总动能 (eV) """ v = cell.get_velocities() # (N,3) m = ( self._masses if self._masses is not None else np.array([a.mass for a in cell.atoms], dtype=np.float64) ) vsq = np.einsum("ij,ij->i", v, v) return float(0.5 * np.dot(m, vsq))
[文档] def propagate(self, cell: Cell, dt: float, **kwargs: Any) -> None: r"""执行 Nose–Hoover 链传播(四阶 Suzuki–Yoshida) 实现完整的 NHC 算符: .. math:: \exp\big(iL_{\mathrm{NHC}}\,\Delta t\big) 采用三步四阶 Suzuki–Yoshida 分解保证高精度和长期稳定性。 算法流程: 1. 遍历每个 SY 系数 :math:`w_k` 2. 对每个系数执行 :math:`t_{loop}` 次内循环 3. 调用 :code:`_nhc_integration_loop(w_k \cdot \Delta t / t_{loop})` Parameters ---------- cell : Cell 晶胞对象,将修改原子速度 dt : float 传播时间步长 (fs) **kwargs : Any 额外参数(当前未使用,保留接口一致性) Notes ----- 与 NVE 的对称包裹关系: .. math:: \exp\big(iL_{\mathrm{NHC}}\tfrac{\Delta t}{2}\big)\,\exp\big(iL_{\mathrm{NVE}}\,\Delta t\big)\,\exp\big(iL_{\mathrm{NHC}}\tfrac{\Delta t}{2}\big) """ # 延迟初始化Q参数 if not self._initialized: self._initialize_Q_parameters(cell) # 四阶Suzuki-Yoshida主循环 for _ in range(self.tloop): for coeff in FOURTH_ORDER_COEFFS: # 每个系数对应的子步长 sub_delta = coeff * dt / self.tloop # 执行单次NHC积分循环 self._nhc_integration_loop(cell, sub_delta) # 更新统计信息(传递 potential 以评估扩展哈密顿量) self._update_statistics(cell, potential=kwargs.get("potential"))
def _nhc_integration_loop(self, cell: Cell, delta: float) -> None: r"""单次 NHC 积分循环(对称回文序列) 实现 MTK 方法的核心:对称回文更新保证时间可逆性。 步骤: 1. 从后向前传播热浴链(:math:`M-1\to 0`) 2. 更新热浴位置并缩放粒子速度(热浴耦合) 3. 从前向后传播热浴链(:math:`0\to M-1`) Parameters ---------- cell : Cell 晶胞对象 delta : float 积分子步长 """ delta2 = delta / 2.0 # 半步长 delta4 = delta / 4.0 # 四分之一步长 # 步骤1: "向前"传播热浴链(M-1, M-2, ..., 0) for j in range(self.tchain): idx = self.tchain - 1 - j # 从链末端开始 self._update_single_thermostat(cell, idx, delta2, delta4) # 步骤2: 更新热浴位置(积分p_ζ/Q) self.zeta += delta * self.p_zeta / self.Q # 步骤3: 缩放粒子速度(关键的热浴耦合) # v *= exp(-delta * p_ζ[0] / Q[0]) # 数值保护:限制指数参数,避免溢出导致NaN _arg = -delta * self.p_zeta[0] / self.Q[0] scaling_factor = np.exp(np.clip(_arg, -50.0, 50.0)) # 统一缩放速度(向量化) v = cell.get_velocities() v *= scaling_factor cell.set_velocities(v) # 步骤4: "向后"传播热浴链(0, 1, ..., M-1) for j in range(self.tchain): self._update_single_thermostat(cell, j, delta2, delta4) def _update_single_thermostat( self, cell: Cell, j: int, delta2: float, delta4: float ) -> None: """更新单个热浴变量 实现单个热浴p_ζ[j]的Trotter分解更新,包含: 1. 邻居热浴的耦合作用(指数缩放) 2. "力"G_j的作用(线性更新) 3. 再次邻居热浴的耦合作用 这个三步对称分解保证算法的时间可逆性。 Parameters ---------- cell : Cell 晶胞对象 j : int 热浴索引 (0 ≤ j < tchain) delta2 : float 半步长 delta4 : float 四分之一步长 """ # 第一部分:来自后续热浴的耦合(如果存在) if j < self.tchain - 1: _arg = -delta4 * self.p_zeta[j + 1] / self.Q[j + 1] coupling_factor = np.exp(np.clip(_arg, -50.0, 50.0)) self.p_zeta[j] *= coupling_factor # 计算热浴"力" G_j if j == 0: # 第一个热浴:与物理系统耦合(向量化实现,口径一致) v = cell.get_velocities() # (N,3) m = ( self._masses if self._masses is not None else np.array([a.mass for a in cell.atoms], dtype=np.float64) ) if self._num_atoms_global > 1: total_mass = float(np.sum(m)) com_velocity = (m[:, None] * v).sum(axis=0) / total_mass dv = v - com_velocity[None, :] momentum_squared_over_mass = float( np.sum(m * np.einsum("ij,ij->i", dv, dv)) ) else: momentum_squared_over_mass = float( np.sum(m * np.einsum("ij,ij->i", v, v)) ) G_j = ( momentum_squared_over_mass - self._dof * KB_IN_EV * self.target_temperature ) else: # 后续热浴:与前一个热浴耦合 G_j = ( self.p_zeta[j - 1] ** 2 / self.Q[j - 1] ) - KB_IN_EV * self.target_temperature # 应用"力"(中心更新) self.p_zeta[j] += delta2 * G_j # 第二部分:再次来自后续热浴的耦合 if j < self.tchain - 1: _arg2 = -delta4 * self.p_zeta[j + 1] / self.Q[j + 1] coupling_factor = np.exp(np.clip(_arg2, -50.0, 50.0)) self.p_zeta[j] *= coupling_factor def _update_statistics(self, cell: Cell, potential=None) -> None: """更新统计信息""" self._total_steps += 1 # 记录温度 current_temp = cell.calculate_temperature() self._temp_history.append(current_temp) # 记录守恒量(扩展哈密顿量)——为降低开销,仅低频评估 try: if (self._total_steps % 100) == 0 and potential is not None: conserved_energy = self.get_conserved_energy(cell, potential=potential) else: conserved_energy = float("nan") self._conserved_energy_history.append(conserved_energy) except Exception: # 如果计算失败,记录NaN self._conserved_energy_history.append(float("nan")) # 限制历史长度 max_history = 1000 if len(self._temp_history) > max_history: self._temp_history.pop(0) self._conserved_energy_history.pop(0)
[文档] def get_conserved_energy(self, cell: Cell, potential=None) -> float: r"""计算 NHC 扩展哈密顿量(守恒量) .. math:: H' = E_{\mathrm{kin}} + E_{\mathrm{pot}} + E_{\mathrm{thermo}} 其中热浴能量 :math:`E_{\mathrm{thermo}}` 为: .. math:: E_{\mathrm{thermo}} = \sum_{j=0}^{M-1} \frac{p_{\zeta_j}^2}{2 Q_j} + N_f k_B T_0\, \zeta_0 + k_B T_0 \sum_{j=1}^{M-1} \zeta_j Returns ------- float 扩展哈密顿量 (eV) """ if not self._initialized: return float("nan") # 动能和势能 kinetic = self._calculate_instantaneous_kinetic_energy(cell) # 势能计算的向后兼容处理: # 1) 若显式传入 potential,对象应实现 calculate_energy(cell) # 2) 否则优先使用 cell._last_potential_object(若存在) # 3) 再否则,若 Cell 提供 calculate_potential_energy()(无参),则直接调用以获取标量势能 # 4) 若以上均不可用,则报错并提示调用方式 if potential is not None: potential_energy = potential.calculate_energy(cell) else: if ( hasattr(cell, "_last_potential_object") and cell._last_potential_object is not None ): potential_energy = cell._last_potential_object.calculate_energy(cell) elif hasattr(cell, "calculate_potential_energy") and callable( cell.calculate_potential_energy ): # 兼容旧测试/用法:mock 或简化函数直接返回势能标量 potential_energy = cell.calculate_potential_energy() else: raise RuntimeError( "NHC需要计算势能,请传入 potential 或在 Cell 上提供 _last_potential_object。" ) # 热浴动能 thermostat_kinetic = np.sum(0.5 * self.p_zeta**2 / self.Q) # 热浴势能 kB_T = KB_IN_EV * self.target_temperature thermostat_potential = self._dof * kB_T * self.zeta[0] + kB_T * np.sum( self.zeta[1:] ) return kinetic + potential_energy + thermostat_kinetic + thermostat_potential
[文档] def get_statistics(self) -> dict: """获取详细统计信息""" if self._total_steps == 0: return { "total_steps": 0, "target_temperature": self.target_temperature, "tdamp": self.tdamp, "tchain": self.tchain, "tloop": self.tloop, "average_temperature": 0.0, "temperature_std": 0.0, "current_p_zeta": self.p_zeta.tolist(), "current_zeta": self.zeta.tolist(), "Q_parameters": self.Q.tolist() if self.Q is not None else None, "conserved_energy_drift": 0.0, } # 温度统计 temps = np.array(self._temp_history) avg_temp = np.mean(temps) temp_std = np.std(temps) # 守恒量漂移(最近500步的线性拟合斜率) conserved_drift = 0.0 if len(self._conserved_energy_history) > 100: recent_energies = self._conserved_energy_history[-500:] if not any(np.isnan(recent_energies)): x = np.arange(len(recent_energies)) slope, _ = np.polyfit(x, recent_energies, 1) conserved_drift = slope return { "total_steps": self._total_steps, "target_temperature": self.target_temperature, "tdamp": self.tdamp, "tchain": self.tchain, "tloop": self.tloop, "average_temperature": float(avg_temp), "temperature_std": float(temp_std), "current_p_zeta": self.p_zeta.tolist(), "current_zeta": self.zeta.tolist(), "Q_parameters": self.Q.tolist() if self.Q is not None else None, "conserved_energy_drift": float(conserved_drift), }
[文档] def reset_statistics(self) -> None: """重置统计信息""" self._total_steps = 0 self._temp_history = [] self._conserved_energy_history = []
[文档] def reset_thermostat_state(self) -> None: """完全重置热浴状态""" self.p_zeta.fill(0.0) self.zeta.fill(0.0) self.reset_statistics()
# 为了方便导入,提供一个创建基础NVE传播子的工厂函数
[文档] def create_nve_propagators(potential): r"""创建 NVE 积分所需的基础传播子 Parameters ---------- potential : Potential 势函数对象 Returns ------- dict 传播子字典: ``'position'`` :class:`~thermoelasticsim.md.propagators.PositionPropagator` 实例 ``'velocity'`` :class:`~thermoelasticsim.md.propagators.VelocityPropagator` 实例 ``'force'`` :class:`~thermoelasticsim.md.propagators.ForcePropagator` 实例 Examples -------- >>> propagators = create_nve_propagators(eam_potential) >>> pos_prop = propagators['position'] >>> vel_prop = propagators['velocity'] >>> force_prop = propagators['force'] """ return { "position": PositionPropagator(), "velocity": VelocityPropagator(), "force": ForcePropagator(potential), }
[文档] class LangevinThermostatPropagator(Propagator): r"""Langevin 动力学恒温器传播子(BBK 积分) 通过阻尼力和随机力模拟热浴作用,实现温度控制。 Notes ----- Langevin 运动方程: .. math:: m_i \frac{d^2\mathbf{r}_i}{dt^2} = \mathbf{F}_i(\mathbf{r}) - \gamma m_i \frac{d\mathbf{r}_i}{dt} + \mathbf{R}_i(t) 其中: - :math:`\mathbf{F}_i(\mathbf{r})` - 保守力 - :math:`\gamma` - 摩擦系数(friction coefficient) - :math:`\mathbf{R}_i(t)` - 随机力,满足涨落–耗散定理 BBK 积分常数: - :math:`c_1 = \exp(-\gamma\,\Delta t)` - 速度阻尼因子 - :math:`\sigma = \sqrt{k_B T\,(1-c_1^2)/m}` - 随机力标准差 参考文献: Brünger, Brooks & Karplus, Chem. Phys. Lett. 105, 495 (1984). """
[文档] def __init__(self, target_temperature: float, friction: float = 1.0): r"""初始化 Langevin 恒温器。 Parameters ---------- target_temperature : float 目标温度 (K),必须为正数。 friction : float, optional 摩擦系数 gamma (ps⁻¹),默认值 1.0 ps⁻¹。 Raises ------ ValueError 如果 target_temperature 或 friction 为非正数。 """ if target_temperature <= 0: raise ValueError(f"目标温度必须为正数,得到 {target_temperature} K") if friction <= 0: raise ValueError(f"摩擦系数必须为正数,得到 {friction} ps⁻¹") self.target_temperature = target_temperature self.friction = friction # γ in ps⁻¹ # 统计信息 self._total_steps = 0 self._temperature_history = [] self._friction_work_history = [] # 摩擦做功历史 self._random_work_history = [] # 随机力做功历史 print(f"Langevin恒温器初始化: T={target_temperature}K, γ={friction:.3f} ps⁻¹") print(f" 阻尼时间: τ_damp = {1 / friction:.3f} ps") print( f" 预期特性: {'强耦合' if friction >= 5.0 else '弱耦合' if friction <= 0.5 else '中等耦合'}" )
[文档] def propagate(self, cell: Cell, dt: float, **kwargs: Any) -> None: r"""执行 Langevin 恒温器传播(BBK 速度更新)。 Parameters ---------- cell : Cell 晶胞对象,将修改其中所有原子的速度。 dt : float 时间步长 (fs)。 **kwargs : Any 额外参数,当前版本未使用。 Raises ------ ValueError 如果 dt <= 0。 RuntimeError 如果温度计算或随机数生成失败。 """ if dt <= 0: raise ValueError(f"时间步长必须为正数,得到 dt={dt} fs") try: # 时间单位转换:dt从fs转换为ps(摩擦系数单位为ps⁻¹) dt_ps = dt / 1000.0 # fs -> ps # 计算当前温度用于监控 current_temp = self._calculate_temperature(cell) # BBK (Brünger-Brooks-Karplus) 算法常数 gamma_dt = self.friction * dt_ps damping_factor = np.exp(-gamma_dt) # 速度阻尼因子 c1 = exp(-γΔt) # ===== 核心修复:在零质心子空间中执行Ornstein-Uhlenbeck过程 ===== # 获取系统状态 velocities = cell.get_velocities() # 所有原子速度 (N,3) masses = np.array([a.mass for a in cell.atoms], dtype=np.float64) total_mass = float(np.sum(masses)) # 步骤1:投影到零质心子空间 # 计算并移除质心速度,确保在约束子空间内演化 com_velocity = (masses[:, None] * velocities).sum(axis=0) / total_mass velocities_in_com_frame = velocities - com_velocity # 零质心坐标系下的速度 # 保存用于做功计算 velocities_old = velocities_in_com_frame.copy() # 步骤2:计算满足涨落-耗散定理的随机力标准差 # σ_i = sqrt(kB*T*(1-exp(-2γΔt))/m_i) # 注意:(1-c1²) = (1-exp(-2γΔt)) 保证正确的温度涨落 variance_per_atom = ( KB_IN_EV * self.target_temperature * (1.0 - damping_factor**2) / masses ) variance_per_atom = np.maximum(variance_per_atom, 0.0) # 数值稳定性 random_force_sigma = np.sqrt(variance_per_atom) # 步骤3:生成满足质心动量守恒的随机增量 # 关键:随机力必须在零质心子空间内,即 Σ(m_i * Δv_i^random) = 0 gaussian_noise = np.random.normal(0.0, 1.0, velocities.shape) random_velocity_increment = ( random_force_sigma[:, None] * gaussian_noise ) # 未约束的随机增量 # 投影到零质心子空间(移除质心分量) random_com = (masses[:, None] * random_velocity_increment).sum( axis=0 ) / total_mass random_velocity_increment -= random_com # 满足质心动量守恒约束 # 步骤4:Ornstein-Uhlenbeck更新(BBK积分) # v(t+Δt) = exp(-γΔt) * v(t) + σ * R(t) # 这里v和R都在零质心子空间内 velocities_new = ( damping_factor * velocities_in_com_frame + random_velocity_increment ) # 步骤5:计算做功(用于诊断涨落-耗散平衡) friction_velocity_change = ( damping_factor - 1.0 ) * velocities_old # 摩擦力导致的速度变化 friction_work = float( np.sum( masses * np.einsum("ij,ij->i", velocities_old, friction_velocity_change) ) ) random_work = float( np.sum( masses * np.einsum("ij,ij->i", velocities_old, random_velocity_increment) ) ) # 步骤6:更新原子速度 # 重要:不再调用remove_com_motion(),因为已经在零质心子空间内 # 批量更新速度(向量化赋值) cell.set_velocities(velocities_new) # 更新统计信息 self._total_steps += 1 self._temperature_history.append(current_temp) self._friction_work_history.append(friction_work) self._random_work_history.append(random_work) # 定期输出诊断信息(每1000步) if self._total_steps % 1000 == 0: final_temp = self._calculate_temperature(cell) temp_deviation = abs(final_temp - self.target_temperature) temp_relative_error = temp_deviation / self.target_temperature * 100 # 计算最近1000步的平均做功 recent_friction_work = np.mean(self._friction_work_history[-1000:]) recent_random_work = np.mean(self._random_work_history[-1000:]) # 涨落-耗散平衡检查:理想情况下摩擦做功与随机做功应相互抵消 energy_balance_ratio = abs( (recent_friction_work + recent_random_work) / max(abs(recent_friction_work), 1e-10) ) print( f"Langevin恒温器 [步数={self._total_steps:6d}]: " f"T={final_temp:6.1f}K (目标={self.target_temperature:.0f}K, 误差={temp_relative_error:4.1f}%)" ) if self._total_steps <= 5000: # 初期显示更多诊断信息 print( f" 涨落-耗散平衡: 摩擦做功={recent_friction_work:+.3e} eV, " f"随机做功={recent_random_work:+.3e} eV, 平衡度={energy_balance_ratio:.3f}" ) except Exception as e: raise RuntimeError(f"Langevin恒温器传播失败: {str(e)}") from e
def _calculate_temperature(self, cell: Cell) -> float: """计算当前系统温度(与structure.py保持一致) Parameters ---------- cell : Cell 晶胞对象 Returns ------- float 当前温度 (K) """ num_atoms = len(cell.atoms) if num_atoms == 0: return 0.0 if num_atoms == 1: # 单原子系统 atom = cell.atoms[0] kinetic = 0.5 * atom.mass * np.dot(atom.velocity, atom.velocity) dof = 3 else: # 多原子系统:扣除质心运动 total_mass = sum(atom.mass for atom in cell.atoms) total_momentum = sum(atom.mass * atom.velocity for atom in cell.atoms) com_velocity = total_momentum / total_mass kinetic = sum( 0.5 * atom.mass * np.dot(atom.velocity - com_velocity, atom.velocity - com_velocity) for atom in cell.atoms ) dof = 3 * num_atoms - 3 if dof <= 0: return 0.0 temperature = 2.0 * kinetic / (dof * KB_IN_EV) return temperature
[文档] def get_statistics(self) -> dict: """获取 Langevin 恒温器统计信息 Returns ------- dict 统计信息字典: total_steps 总传播步数 target_temperature 目标温度 (K) friction 摩擦系数 (ps⁻¹) damping_time 阻尼时间 (ps) coupling_strength 耦合强度描述 average_temperature 平均温度 (K) temperature_std 温度标准差 (K) temperature_error 平均温度误差 (%) average_friction_work 平均摩擦做功 (eV) average_random_work 平均随机做功 (eV) energy_balance 摩擦做功与随机做功比值 recent_temperatures 最近 100 步的温度历史 """ if self._total_steps == 0: return { "total_steps": 0, "target_temperature": self.target_temperature, "friction": self.friction, "damping_time": 1.0 / self.friction, "coupling_strength": self._get_coupling_strength_description(), "average_temperature": 0.0, "temperature_std": 0.0, "temperature_error": 0.0, "average_friction_work": 0.0, "average_random_work": 0.0, "energy_balance": 0.0, "recent_temperatures": [], } temps = np.array(self._temperature_history) friction_works = np.array(self._friction_work_history) random_works = np.array(self._random_work_history) avg_temp = np.mean(temps) temp_std = np.std(temps) temp_error = ( abs(avg_temp - self.target_temperature) / self.target_temperature * 100 ) avg_friction_work = np.mean(friction_works) if len(friction_works) > 0 else 0.0 avg_random_work = np.mean(random_works) if len(random_works) > 0 else 0.0 # 能量平衡:理想情况下摩擦做功与随机做功应该基本抵消 energy_balance = (avg_friction_work + avg_random_work) / max( abs(avg_friction_work), 1e-10 ) return { "total_steps": self._total_steps, "target_temperature": self.target_temperature, "friction": self.friction, "damping_time": 1.0 / self.friction, "coupling_strength": self._get_coupling_strength_description(), "average_temperature": avg_temp, "temperature_std": temp_std, "temperature_error": temp_error, "average_friction_work": avg_friction_work, "average_random_work": avg_random_work, "energy_balance": energy_balance, "recent_temperatures": ( temps[-100:].tolist() if len(temps) >= 100 else temps.tolist() ), }
def _get_coupling_strength_description(self) -> str: """获取耦合强度的描述性字符串 Returns ------- str 耦合强度描述 """ if self.friction >= 5.0: return "strong" # 强耦合:快速温度控制,动力学扰动大 elif self.friction <= 0.5: return "weak" # 弱耦合:温度控制慢,动力学保持好 else: return "moderate" # 中等耦合:平衡控温速度和动力学保持
[文档] def reset_statistics(self) -> None: """重置所有统计信息""" self._total_steps = 0 self._temperature_history.clear() self._friction_work_history.clear() self._random_work_history.clear()
[文档] def set_friction(self, new_friction: float) -> None: """动态调整摩擦系数 Parameters ---------- new_friction : float 新的摩擦系数 (ps⁻¹) Raises ------ ValueError 如果new_friction为非正数 Notes ----- 这允许在模拟过程中调整恒温器的耦合强度, 例如在平衡阶段使用大摩擦系数,在生产阶段使用小摩擦系数。 """ if new_friction <= 0: raise ValueError(f"摩擦系数必须为正数,得到 {new_friction} ps⁻¹") old_friction = self.friction self.friction = new_friction print(f"摩擦系数已更新: {old_friction:.3f} -> {new_friction:.3f} ps⁻¹") print(f"阻尼时间已更新: {1 / old_friction:.3f} -> {1 / new_friction:.3f} ps")
[文档] def get_effective_parameters(self) -> dict: r"""获取当前有效参数 Returns ------- dict 参数字典: friction 摩擦系数 (ps⁻¹) damping_time 阻尼时间 (ps),:math:`\tau_{\mathrm{damp}} = 1/\gamma` target_temperature 目标温度 (K) coupling_description 耦合强度描述(weak/moderate/strong) """ return { "friction": self.friction, "damping_time": 1.0 / self.friction, "target_temperature": self.target_temperature, "coupling_description": self._get_coupling_strength_description(), }
# ==================================================================== # MTK-NPT 恒压器算符 (Martyna-Tobias-Klein Barostat) # ====================================================================
[文档] def exprel(x: np.ndarray) -> np.ndarray: """ 计算 (exp(x) - 1) / x,避免x接近0时的数值误差 Parameters ---------- x : np.ndarray 输入数组 Returns ------- np.ndarray 计算结果 """ # 对于小值使用Taylor展开:exprel(x) = 1 + x/2 + x²/6 + x³/24 + ... mask_small = np.abs(x) < 1e-6 result = np.zeros_like(x) # 大值情况:直接计算 mask_large = ~mask_small result[mask_large] = (np.exp(x[mask_large]) - 1) / x[mask_large] # 小值情况:Taylor展开到4阶 x_small = x[mask_small] result[mask_small] = 1 + x_small / 2 + x_small**2 / 6 + x_small**3 / 24 return result
[文档] class MTKBarostatPropagator(Propagator): r"""MTK(Martyna–Tobias–Klein)恒压器传播子 Notes ----- 对称分解 + 矩阵指数实现: - 晶格共轭动量 :math:`\mathbf{P}_g` 的演化由应力与外压驱动 - 在 :math:`\mathbf{P}_g` 的特征基中进行精确积分(矩阵指数) 核心关系: .. math:: d\mathbf{P}_g/dt = V\,(\boldsymbol{\sigma}_{\mathrm{int}} - P_{\mathrm{ext}}\,\mathbf{I}) + \text{kinetic correction} References ---------- Martyna, G. J.; Tobias, D. J.; Klein, M. L. (1994). Constant pressure molecular dynamics algorithms. The Journal of Chemical Physics, 101(5), 4177–4189. https://doi.org/10.1063/1.467468 Parameters ---------- target_temperature : float 目标温度 (K) target_pressure : float 目标压力 (eV/ų) pdamp : float 压力阻尼时间 (fs),典型值约 :math:`100\,\times\,dt` pchain : int, optional 恒压器链长度,默认 3 ploop : int, optional 恒压器链积分循环次数,默认 1 """
[文档] def __init__( self, target_temperature: float, target_pressure: float, pdamp: float, pchain: int = 3, ploop: int = 1, ): super().__init__() # 基本参数 self.target_temperature = target_temperature self.target_pressure = target_pressure # eV/ų self.pdamp = pdamp # fs self.pchain = pchain self.ploop = ploop # 热力学常数 self.kT = KB_IN_EV * target_temperature # eV # 应力计算器 self.stress_calculator = StressCalculator() # 初始化链变量 self._xi = np.zeros(self.pchain) # 恒压器坐标 self._p_xi = np.zeros(self.pchain) # 恒压器动量 # 晶格共轭动量 (3x3矩阵) self._P_g = np.zeros((3, 3)) # 初始化标记 self._initialized = False # 统计信息 self._total_steps = 0 self._pressure_history = [] self._volume_history = [] self._barostat_energy_history = [] print("MTK恒压器初始化:") print(f" 目标温度: {target_temperature:.1f} K") print(f" 目标压力: {target_pressure:.6f} eV/ų") print(f" 压力阻尼: {pdamp:.1f} fs") print(f" 恒压器链: {pchain} 变量")
def _initialize_parameters(self, cell: Cell) -> None: """ 初始化质量参数和内部变量 Parameters ---------- cell : Cell 晶胞对象 """ if self._initialized: return N_atoms = len(cell.atoms) # 晶格质量参数 W (关键参数) # 经典选择:W = (N + 1) * kB * T * τ_p² self.W = (N_atoms + 1) * self.kT * (self.pdamp**2) # 恒压器链质量参数 R_j self._R = np.zeros(self.pchain) cell_dof = 9 # 3x3晶格矩阵的自由度 # R[0]对应晶格自由度,R[j>0]对应链变量 self._R[0] = cell_dof * self.kT * (self.pdamp**2) for j in range(1, self.pchain): self._R[j] = self.kT * (self.pdamp**2) self._initialized = True print("MTK参数初始化完成:") print(f" 晶格质量 W: {self.W:.2e}") print(f" 链质量 R[0]: {self._R[0]:.2e}") print(f" 链质量 R[1:]: {self._R[1]:.2e}")
[文档] def propagate(self, cell: Cell, dt: float, potential=None) -> None: """ 执行恒压器传播一个时间步dt 基于对称Trotter分解的恒压器链积分,包含: 1. 恒压器链传播 (dt/2) 2. 晶格动量更新 3. 恒压器链传播 (dt/2) Parameters ---------- cell : Cell 晶胞对象 dt : float 时间步长 (fs) potential : Potential, optional 势能对象,用于应力计算 """ if not self._initialized: self._initialize_parameters(cell) if potential is None: print("警告: MTK恒压器需要势能对象计算应力") return # 对称分解:前半步恒压器链传播 dt_half = dt / 2.0 self._integrate_barostat_chain(cell, dt_half) # 更新晶格动量(受应力驱动) self._update_cell_momenta(cell, potential, dt_half) # 对称分解:后半步恒压器链传播 self._integrate_barostat_chain(cell, dt_half) # 更新统计 self._update_statistics(cell, potential) self._total_steps += 1
def _integrate_barostat_chain(self, cell: Cell, dt: float) -> None: """ 积分恒压器Nose-Hoover链 实现与NHC恒温器类似的链式积分,但"力"来源于晶格动能。 使用四阶Suzuki-Yoshida分解确保高精度。 Parameters ---------- cell : Cell 晶胞对象 dt : float 时间步长 """ # 确保参数已初始化 if not self._initialized: self._initialize_parameters(cell) # 四阶Suzuki-Yoshida系数 dt_suzuki = [w * dt for w in FOURTH_ORDER_COEFFS] * self.ploop for dt_sub in dt_suzuki: self._single_barostat_chain_step(cell, dt_sub) def _single_barostat_chain_step(self, cell: Cell, dt: float) -> None: """ 执行单步恒压器链积分 算法流程(对称回文): 1. 更新链尾变量p_xi[M-1] (dt/2) 2. 依次更新p_xi[j], xi[j] (j从M-1到0) 3. 缩放晶格动量P_g 4. 反向更新xi[j], p_xi[j] (j从0到M-1) 5. 更新链尾变量p_xi[M-1] (dt/2) Parameters ---------- cell : Cell 晶胞对象 dt : float 时间步长 """ dt_half = dt / 2.0 dt_quarter = dt / 4.0 # 1. 更新链尾 (M-1) 动量 if self.pchain > 1: G_M_minus_1 = (self._p_xi[-2] ** 2 / self._R[-2] - self.kT) / self._R[-1] self._p_xi[-1] += G_M_minus_1 * dt_quarter # 2. 前向更新链 (从M-1到1) for j in range(self.pchain - 1, 0, -1): # 计算"力" G_j if j == self.pchain - 1: if j == 1: # 第一个链变量的力来自晶格动能 trace_P2 = np.trace(self._P_g.T @ self._P_g) G_j = (trace_P2 / self.W - 9 * self.kT) / self._R[j] else: G_j = (self._p_xi[j - 1] ** 2 / self._R[j - 1] - self.kT) / self._R[ j ] else: if j == 1: trace_P2 = np.trace(self._P_g.T @ self._P_g) G_j = (trace_P2 / self.W - 9 * self.kT) / self._R[j] else: G_j = (self._p_xi[j - 1] ** 2 / self._R[j - 1] - self.kT) / self._R[ j ] # 更新动量和坐标 self._p_xi[j] += G_j * dt_half self._xi[j] += (self._p_xi[j] / self._R[j]) * dt_half # 3. 更新第一个链变量 (j=0) if self.pchain > 0: # 第0个变量的"力"来自晶格动能 trace_P2 = np.trace(self._P_g.T @ self._P_g) G_0 = (trace_P2 / self.W - 9 * self.kT) / self._R[0] self._p_xi[0] += G_0 * dt_half self._xi[0] += (self._p_xi[0] / self._R[0]) * dt_half # 缩放晶格动量 scale_factor = np.exp(-self._p_xi[0] / self._R[0] * dt) self._P_g *= scale_factor # 4. 反向更新链 (从0到M-1) - 时间可逆性 for j in range(self.pchain): if j == 0: trace_P2 = np.trace(self._P_g.T @ self._P_g) G_0 = (trace_P2 / self.W - 9 * self.kT) / self._R[0] self._p_xi[0] += G_0 * dt_half else: if j == 1: trace_P2 = np.trace(self._P_g.T @ self._P_g) G_j = (trace_P2 / self.W - 9 * self.kT) / self._R[j] else: G_j = (self._p_xi[j - 1] ** 2 / self._R[j - 1] - self.kT) / self._R[ j ] self._p_xi[j] += G_j * dt_half self._xi[j] += (self._p_xi[j] / self._R[j]) * dt_half # 5. 最终更新链尾动量 if self.pchain > 1: G_M_minus_1 = (self._p_xi[-2] ** 2 / self._R[-2] - self.kT) / self._R[-1] self._p_xi[-1] += G_M_minus_1 * dt_quarter def _update_cell_momenta(self, cell: Cell, potential, dt: float) -> None: """ 更新晶格共轭动量P_g 基于内外应力差驱动晶格动量变化: dP_g/dt = V * (σ_internal - P_external * I) + kinetic_correction Parameters ---------- cell : Cell 晶胞对象 potential : Potential 势能对象 dt : float 时间步长 """ # 确保参数已初始化 if not self._initialized: self._initialize_parameters(cell) # 计算应力张量 (eV/ų) stress_tensor = self.stress_calculator.calculate_total_stress(cell, potential) # 计算体积和动能修正项 volume = cell.volume N_atoms = len(cell.atoms) # 动能修正项:Tr(p²/m) / (3*N) * I total_kinetic = 0.0 for atom in cell.atoms: velocity = atom.velocity # Å/fs mass = atom.mass # 原子质量单位 total_kinetic += 0.5 * mass * np.dot(velocity, velocity) # 转换动能到正确单位并计算修正项 kinetic_correction = (2.0 * total_kinetic) / (3.0 * N_atoms) * np.eye(3) # 应力驱动项: 使用 "pressure-like" 张量 -σ # G = V * ( -σ - P_ext * I ) + kinetic_correction pressure_tensor_ext = self.target_pressure * np.eye(3) internal_pressure_tensor = -stress_tensor stress_drive = volume * (internal_pressure_tensor - pressure_tensor_ext) # 总驱动力 G = stress_drive + kinetic_correction # 更新晶格动量 self._P_g += G * dt
[文档] def integrate_momenta(self, cell: Cell, potential, dt: float) -> None: r""" 更新粒子动量(与恒压器耦合的半步,MTK 公式) 在 :math:`\mathbf{P}_g` 的特征基下: .. math:: \mathbf{p}' = \mathbf{p}\,\exp\!\Big(-\frac{\boldsymbol{\kappa}\,\Delta t}{W}\Big) + \Delta t\,\mathbf{F}\,\operatorname{exprel}\!\Big(-\frac{\boldsymbol{\kappa}\,\Delta t}{W}\Big) 其中 :math:`\boldsymbol{\kappa} = \boldsymbol{\lambda} + \operatorname{Tr}(\mathbf{P}_g)/(3N)`。 Notes ----- - 此方法已包含力贡献,无需额外的 NVE 速度半步 - 调用前后需配合 ``_update_cell_momenta(dt/2)`` 与位置/晶格传播 """ if not self._initialized: self._initialize_parameters(cell) # 确保力为最新 potential.calculate_forces(cell) # 特征分解 P_g = U diag(lambda) U^T try: eigvals, U = np.linalg.eigh(self._P_g) except np.linalg.LinAlgError: eigvals = np.zeros(3) U = np.eye(3) N = len(cell.atoms) if N == 0: return masses = np.array([atom.mass for atom in cell.atoms], dtype=np.float64) # (N,) velocities = cell.get_velocities() # (N,3) momenta = velocities * masses[:, None] # p = m v forces = cell.get_forces() # (N,3) # 变换到特征坐标 y = momenta @ U # (N,3) fU = forces @ U # (N,3) # 计算kappa和缩放系数 kappa = eigvals + np.trace(self._P_g) / (3.0 * N) # (3,) x = -kappa * (dt / self.W) # (3,) scale = np.exp(x) # (3,) exr = exprel(x) # (3,) y_new = y * scale[None, :] + dt * fU * exr[None, :] p_new = y_new @ U.T # 回写速度 v_new = p_new / masses[:, None] for i, atom in enumerate(cell.atoms): atom.velocity = v_new[i]
[文档] def propagate_positions_and_cell(self, cell: Cell, dt: float) -> None: r""" 使用矩阵指数精确传播粒子位置与晶格(MTK 核心步骤) 流程: 1. 特征分解 :math:`\mathbf{P}_g = \mathbf{U}\,\operatorname{diag}(\boldsymbol{\lambda})\,\mathbf{U}^\top` 2. 在特征坐标系中用 :math:`\exp(\boldsymbol{\lambda}\,\Delta t/W)` 精确积分 3. 变换回原坐标系 Parameters ---------- cell : Cell 晶胞对象 dt : float 时间步长 (fs) """ if not self._initialized: self._initialize_parameters(cell) # 对P_g进行特征值分解 try: eigvals, U = np.linalg.eigh(self._P_g) except np.linalg.LinAlgError: print("警告: P_g矩阵特征值分解失败,使用单位矩阵") eigvals = np.zeros(3) U = np.eye(3) # 1. 更新晶格矩阵 h h_matrix = cell.lattice_vectors.T # 转换为列向量形式 h_transformed = h_matrix @ U # 变换到特征坐标系 # 在特征坐标系中精确积分 exp_factors = np.exp(eigvals * dt / self.W) h_evolved = h_transformed * exp_factors[None, :] # 广播 # 变换回原坐标系 h_new = h_evolved @ U.T # 使用接口以确保体积和逆矩阵更新 cell.set_lattice_vectors(h_new.T) # 转换回行向量形式 # 2. 更新原子位置与速度(仅恒压器耦合的部分) positions = cell.get_positions() # (N, 3) velocities = cell.get_velocities() # (N, 3) # 在特征坐标系中变换 pos_transformed = positions @ U # (N, 3) @ (3, 3) = (N, 3) vel_transformed = velocities @ U # 精确积分位置 - 使用exprel避免数值误差 dt_over_W = dt / self.W lambda_dt = eigvals * dt_over_W # (3,) # ✅ 修复:正确的MTK位置更新公式 # ASE公式: r_new = r * exp(λt) + v * dt * exprel(λt) # 其中exprel项已经包含dt因子,不需要额外乘dt exp_factors = np.exp(lambda_dt) exprel_factors = exprel(lambda_dt) * dt # dt因子在这里 pos_evolved = ( pos_transformed * exp_factors[None, :] + vel_transformed * exprel_factors[None, :] ) # 移除多余的dt # 变换回原坐标系 positions_new = pos_evolved @ U.T cell.set_positions(positions_new)
# 注意:此处不更新原子速度。动量的指数缩放与力贡献由 # integrate_momenta() 两个半步负责,避免重复缩放。 def _update_statistics(self, cell: Cell, potential) -> None: """更新统计信息""" # 计算当前压力(取迹的1/3) stress_tensor = self.stress_calculator.calculate_total_stress(cell, potential) current_pressure = -np.trace(stress_tensor) / 3.0 # 负号:拉伸为正压 # 记录统计量 self._pressure_history.append(current_pressure) self._volume_history.append(cell.volume) # 计算恒压器能量 barostat_energy = self.get_barostat_energy() self._barostat_energy_history.append(barostat_energy) # 保持历史长度 max_history = 10000 if len(self._pressure_history) > max_history: self._pressure_history = self._pressure_history[-max_history:] self._volume_history = self._volume_history[-max_history:] self._barostat_energy_history = self._barostat_energy_history[-max_history:]
[文档] def get_barostat_energy(self) -> float: r""" 计算恒压器贡献的能量 .. math:: E_{\mathrm{baro}} = \sum_j \frac{p_{\xi_j}^2}{2 R_j} + \frac{\operatorname{Tr}(\mathbf{P}_g^{\top}\mathbf{P}_g)}{2W} + \text{external work} Returns ------- float 恒压器能量 (eV) """ if not self._initialized: return 0.0 # 恒压器链动能 chain_kinetic = 0.0 for j in range(self.pchain): chain_kinetic += 0.5 * self._p_xi[j] ** 2 / self._R[j] # 晶格动能 lattice_kinetic = 0.5 * np.trace(self._P_g.T @ self._P_g) / self.W # 外压做功项 (暂时忽略,因为需要积分P*dV) external_work = 0.0 return chain_kinetic + lattice_kinetic + external_work
[文档] def get_current_pressure(self, cell: Cell, potential) -> float: """ 获取当前系统压力 Returns ------- float 当前压力 (eV/ų) """ if not self._initialized: return 0.0 stress_tensor = self.stress_calculator.calculate_total_stress(cell, potential) return -np.trace(stress_tensor) / 3.0
[文档] def get_statistics(self) -> dict: """获取恒压器统计信息""" if self._total_steps == 0 or len(self._pressure_history) == 0: return { "total_steps": self._total_steps, "target_pressure": self.target_pressure, "target_pressure_GPa": self.target_pressure * EV_TO_GPA, "average_pressure": 0.0, "pressure_error": 0.0, "average_volume": 0.0, "volume_std": 0.0, "average_barostat_energy": 0.0, } pressures = np.array(self._pressure_history) volumes = np.array(self._volume_history) barostat_energies = np.array(self._barostat_energy_history) avg_pressure = np.mean(pressures) pressure_error = abs(avg_pressure - self.target_pressure) avg_volume = np.mean(volumes) volume_std = np.std(volumes) avg_barostat_energy = np.mean(barostat_energies) return { "total_steps": self._total_steps, "target_pressure": self.target_pressure, "target_pressure_GPa": self.target_pressure * EV_TO_GPA, "average_pressure": avg_pressure, "average_pressure_GPa": avg_pressure * EV_TO_GPA, "pressure_error": pressure_error, "pressure_error_GPa": pressure_error * EV_TO_GPA, "average_volume": avg_volume, "volume_std": volume_std, "average_barostat_energy": avg_barostat_energy, "recent_pressures": ( pressures[-100:].tolist() if len(pressures) >= 100 else pressures.tolist() ), "recent_volumes": ( volumes[-100:].tolist() if len(volumes) >= 100 else volumes.tolist() ), }