#!/usr/bin/env python3
"""
弹性波动力学模拟模块
本模块提供弹性波在晶格中传播的分子动力学模拟功能。
主要功能
--------
- 平面波激发:支持位移型和速度型激发,以及时间域源注入
- NVE传播模拟:零温条件下的短时间线性响应模拟
- 速度测量:多种算法(互相关、到达时间拟合、k-ω谱分析)
- 可视化输出:时空图(x-t图)显示位移场和包络
物理模型
--------
- 传播方向:当前仅支持[100]方向(x轴)
- 边界条件:y/z方向周期性边界,x方向准无限介质
- 激发模式:小振幅线性响应区,避免非线性效应
- 吸收边界:可选的海绵层减少边界反射
单位约定
--------
- 长度:Å(埃)
- 时间:fs(飞秒)
- 速度:Å/fs(内部),km/s(输出,换算关系:1 Å/fs = 100 km/s)
Notes
-----
本模块为教学演示设计,追求物理清晰性和代码可理解性。
后续可扩展支持更多传播方向和材料类型。
"""
from __future__ import annotations
import contextlib
import logging
import time
from dataclasses import dataclass
from typing import Literal
import numpy as np
from ...core import CrystallineStructureBuilder
from ...core.structure import Cell
from ...md.schemes import NVEScheme
from ...potentials.eam import EAMAl1Potential
from ...utils.plot_config import plt
Axis = Literal["x", "y", "z"]
Polarization = Literal["L", "Ty", "Tz"]
logger = logging.getLogger(__name__)
[文档]
@dataclass(slots=True)
class WaveExcitation:
"""平面波激发器(教学版)
Parameters
----------
direction : Axis
传播方向,当前仅支持 "x"(等价于[100])。
polarization : {"L", "Ty", "Tz"}
极化:纵波 L(平行 x),或横波 Ty/Tz(沿 y/z)。
n_waves : int
超胞长度内包含的整波数(满足 PBC 的允许波矢)。
amplitude_velocity : float
初始速度幅值(Å/fs),建议 1e-4–1e-3。
amplitude_displacement : float | None
初始位移幅值(Å);若提供则叠加位移型激发(可为空)。
phase : float
初始相位(弧度)。
Notes
-----
- 平面波形式: s(r) = A cos(k·r + φ)
- k = 2π n / Lx · ex(n 为整数,满足 PBC)
- 纵波:极化 e = ex;横波:e = ey 或 ez。
- 施加后移除质心平动,避免总动量漂移。
"""
direction: Axis = "x"
polarization: Polarization = "L"
n_waves: int = 2
amplitude_velocity: float = 1e-4
amplitude_displacement: float | None = None
phase: float = 0.0
mode: Literal["standing", "traveling"] = "standing"
phase_speed_km_s: float | None = None # 仅当 mode="traveling" 时用于设定 ω = k v
[文档]
def apply(self, cell: Cell) -> None:
"""对晶胞施加平面波激发(原地修改 cell)。"""
if self.direction != "x":
raise NotImplementedError("MVP仅支持[100]方向(x轴)")
# 基本参数
Lx = float(np.linalg.norm(cell.lattice_vectors[0]))
k = 2.0 * np.pi * float(self.n_waves) / Lx # 仅沿 x
# 极化单位向量
if self.polarization == "L":
e = np.array([1.0, 0.0, 0.0])
elif self.polarization == "Ty":
e = np.array([0.0, 1.0, 0.0])
elif self.polarization == "Tz":
e = np.array([0.0, 0.0, 1.0])
else:
raise ValueError(f"未知极化: {self.polarization}")
# 初始位置与速度
R = cell.get_positions() # (N,3)
x = R[:, 0]
phase = k * x + float(self.phase)
c = np.cos(phase)
s = np.sin(phase)
# traveling: 设定 u(x,0) = A cos(kx+φ), v(x,0) = + ω A sin(kx+φ)(沿 +x 行波)
if self.mode == "traveling":
v_phase_km_s = self.phase_speed_km_s
if v_phase_km_s is None or v_phase_km_s <= 0:
raise ValueError("traveling 模式需要有效的 phase_speed_km_s(km/s)")
v_aa_fs = v_phase_km_s / 100.0 # Å/fs
omega = k * v_aa_fs
# 自动匹配位移/速度幅值关系:A_v = ω A_u
A_v = float(self.amplitude_velocity)
A_u = self.amplitude_displacement
if (A_u is None or A_u == 0.0) and (A_v is not None and A_v != 0.0):
A_u = A_v / max(omega, 1e-12)
elif (A_v is None or A_v == 0.0) and (A_u is not None and A_u != 0.0):
A_v = omega * A_u
elif (A_v is None or A_v == 0.0) and (A_u is None or A_u == 0.0):
# 回退到默认速度幅值,推导位移幅值
A_v = 1e-4
A_u = A_v / max(omega, 1e-12)
# 应用位移(cos)
if A_u and A_u != 0.0:
du = (float(A_u) * c)[:, None] * e[None, :]
Rn = R + du
Rn = cell.apply_periodic_boundary(Rn)
cell.set_positions(Rn)
# 应用速度(+ωA sin)
if A_v and A_v != 0.0:
dv = (float(A_v) * s)[:, None] * e[None, :]
V = cell.get_velocities()
V = V + dv
for i, atom in enumerate(cell.atoms):
atom.velocity = V[i]
else:
# standing: 速度/位移均按 cos(kx+φ)
if self.amplitude_velocity and self.amplitude_velocity != 0.0:
dv = (self.amplitude_velocity * c)[:, None] * e[None, :]
V = cell.get_velocities()
V = V + dv
for i, atom in enumerate(cell.atoms):
atom.velocity = V[i]
if (
self.amplitude_displacement is not None
and self.amplitude_displacement != 0.0
):
du = (float(self.amplitude_displacement) * c)[:, None] * e[None, :]
Rn = R + du
Rn = cell.apply_periodic_boundary(Rn)
cell.set_positions(Rn)
# 去除整体平动(数值稳健)
cell.remove_com_motion()
[文档]
@dataclass(slots=True)
class DynamicsConfig:
"""MD 波传播最小配置。"""
supercell: tuple[int, int, int] = (64, 12, 12)
dt_fs: float = 0.5
steps: int = 6000 # 3 ps @ 0.5 fs(默认更轻量)
sample_every: int = 50
direction: Axis = "x"
polarization: Polarization = "L"
n_waves: int = 2
amplitude_velocity: float = 1e-4
amplitude_displacement: float | None = None
# 源注入(无需解析速度):沿x=0薄片区域在时间域施加高斯脉冲速度
use_source: bool = True
source_slab_fraction: float = 0.06 # 源区域厚度占Lx比例
source_amplitude_velocity: float = 5e-4 # Å/fs
source_t0_fs: float = 200.0 # 脉冲中心 (fs) - 仅 Gaussian 使用
source_sigma_fs: float = 80.0 # 脉冲宽度 (fs) - 仅 Gaussian 使用
source_type: Literal["gaussian", "tone_burst"] = "gaussian"
source_cycles: int = 4 # tone burst 周期数
source_freq_thz: float = 1.0 # tone burst 载频(THz)
record_trajectory: bool = False
trajectory_file: str | None = None
detector_frac_a: float = 0.20
detector_frac_b: float = 0.70
measure_method: str = (
"auto" # auto | arrival | arrival_T_guard | xcorr | threshold | komega
)
v_max_km_s: float | None = None
# 吸收边界(海绵层)
absorber_enabled: bool = False
absorber_slab_fraction: float = 0.10 # 左右各占Lx的比例
absorber_tau_fs: float = 250.0 # 衰减时间常数(fs),越小吸收越强
absorber_profile: Literal["cos2", "linear"] = "cos2"
def _bin_average_signal(
cell: Cell,
u: np.ndarray,
n_bins: int,
) -> tuple[np.ndarray, np.ndarray]:
"""对投影信号按 x 方向分箱取 yz 平均。
Parameters
----------
cell : Cell
晶胞
u : ndarray, shape (N,)
原子标量信号(如位移在极化方向的投影)
n_bins : int
x 方向分箱数
Returns
-------
(x_centers, avg)
箱中心坐标(Å)与分箱平均值(shape=(n_bins,))。
"""
R = cell.get_positions()
Lx = float(np.linalg.norm(cell.lattice_vectors[0]))
s = (R[:, 0] / Lx) % 1.0 # 分数坐标(x)
edges = np.linspace(0.0, 1.0, int(n_bins) + 1)
idx = np.clip(np.digitize(s, edges) - 1, 0, int(n_bins) - 1)
avg = np.zeros(int(n_bins), dtype=float)
cnt = np.zeros(int(n_bins), dtype=int)
for i in range(cell.num_atoms):
avg[idx[i]] += float(u[i])
cnt[idx[i]] += 1
cnt = np.maximum(cnt, 1)
avg = avg / cnt
x_centers = (0.5 * (edges[:-1] + edges[1:])) * Lx
return x_centers, avg
def _estimate_velocity_by_xcorr(
t_fs: np.ndarray,
xa: float,
xb: float,
x_centers: np.ndarray,
U_xt: np.ndarray,
v_pred_km_s: float | None = None,
max_speed_km_s: float = 10.0,
t_gate_fs: float | None = None,
use_bandpass: bool = True,
) -> tuple[float | None, dict]:
"""用互相关法估计群速度。
通过计算两个空间位置的信号互相关峰值,确定波传播的时间延迟,
进而计算群速度。
Parameters
----------
t_fs : ndarray, shape (T,)
采样时间序列(fs)
xa : float
上游探针位置(Å)
xb : float
下游探针位置(Å)
x_centers : ndarray, shape (X,)
空间分箱中心坐标(Å)
U_xt : ndarray, shape (T, X)
时空图:位移在极化方向的投影u·e
v_pred_km_s : float | None
预估速度,用于限定搜索窗口(km/s)
max_speed_km_s : float
物理速度上限,用于设置最小滞后约束(km/s)
t_gate_fs : float | None
时间窗口起始点,去除源附近近场效应(fs)
use_bandpass : bool
是否应用带通滤波器
Returns
-------
v_kms : float | None
估计的群速度(km/s),失败时返回None
info : dict
调试信息,包含滞后时间、探针位置等
"""
# 找到最近箱
ia = int(np.argmin(np.abs(x_centers - xa)))
ib = int(np.argmin(np.abs(x_centers - xb)))
if ia == ib:
return None, {"reason": "ia==ib"}
sa = np.asarray(U_xt[:, ia], dtype=float)
sb = np.asarray(U_xt[:, ib], dtype=float)
# 时间窗口:去除源附近的近场/起振阶段
if t_gate_fs is not None:
gate_mask = t_fs >= float(t_gate_fs)
if np.count_nonzero(gate_mask) > 4:
sa = sa.copy()
sb = sb.copy()
sa[~gate_mask] = 0.0
sb[~gate_mask] = 0.0
# 去均值并单位化,提升互相关鲁棒性
sa = sa - np.nanmean(sa)
sb = sb - np.nanmean(sb)
sa_std = float(np.nanstd(sa)) or 1.0
sb_std = float(np.nanstd(sb)) or 1.0
sa = sa / sa_std
sb = sb / sb_std
# 可选频带:围绕上游信号的主频做简单带通,压制低频漂移与高频噪声
if use_bandpass and len(sa) >= 16:
dt_fs = float(t_fs[1] - t_fs[0]) if len(t_fs) >= 2 else 1.0
fs = 1.0 / dt_fs # fs^-1
# FFT频率
S = np.fft.rfft(sa - np.nanmean(sa))
freqs = np.fft.rfftfreq(len(sa), d=dt_fs)
# 排除直流,从第2个bin开始找峰
if len(S) > 3:
k0 = 1
k_peak = int(np.argmax(np.abs(S[k0:])) + k0)
f_peak = max(freqs[k_peak], 0.0)
if f_peak > 0.0:
f_lo = max(0.5 * f_peak, 0.0)
f_hi = min(1.5 * f_peak, fs * 0.45) # 避免靠近Nyquist
def bp(x):
X = np.fft.rfft(x - np.nanmean(x))
H = (freqs >= f_lo) & (freqs <= f_hi)
X_f = X * H
xr = np.fft.irfft(X_f, n=len(x))
return xr
sa = bp(sa)
sb = bp(sb)
# 互相关:sb(t) 与 sa(t-τ),只考虑非负时延
cc = np.correlate(sb, sa, mode="full")
T = len(sa)
mid = len(cc) // 2
cc_pos = cc[mid:]
if len(cc_pos) <= 1:
return None, {"reason": "too_short_time_series"}
dt_fs = float(t_fs[1] - t_fs[0]) if len(t_fs) >= 2 else 1.0
# 引导窗口:若给定预测速度,则在 [0.5τ_pred, 1.5τ_pred] 内寻找最大峰
if v_pred_km_s is not None and v_pred_km_s > 0:
v_pred_aa_fs = v_pred_km_s / 100.0
dx_A = float(abs(xb - xa))
tau_pred_fs = dx_A / max(v_pred_aa_fs, 1e-12)
lo = int(max(1, 0.5 * tau_pred_fs / dt_fs))
hi = int(min(len(cc_pos), 1.5 * tau_pred_fs / dt_fs))
if hi <= lo + 1:
lo = max(1, int(0.03 * T))
hi = len(cc_pos)
search = np.abs(cc_pos[lo:hi])
if len(search) == 0:
return None, {"reason": "empty_window"}
lag_idx = int(lo + np.argmax(search))
else:
cc_use = np.abs(cc_pos.copy())
# 排除前若干个样本,避免拾取近零滞后的伪峰;
# 同时基于最大物理速度设置最小滞后阈值
dx_A = float(abs(xb - xa))
v_max_aa_fs = max_speed_km_s / 100.0
min_lag_fs_phys = dx_A / max(v_max_aa_fs, 1e-12)
# 使用向上取整,确保严格满足物理最小滞后约束
import math as _math
ignore_head = max(int(0.03 * T), int(_math.ceil(min_lag_fs_phys / dt_fs)))
cc_use[:ignore_head] = -np.inf
lag_idx = int(np.argmax(cc_use))
lag_fs = lag_idx * dt_fs
dx_A = float(abs(xb - xa))
if lag_fs <= 0 or dx_A <= 0:
return None, {"reason": "nonpositive lag or dx", "lag_fs": lag_fs, "dx_A": dx_A}
v_kms = (dx_A / lag_fs) * 100.0 # Å/fs → km/s
return v_kms, {
"lag_fs": lag_fs,
"dx_A": dx_A,
"ia": ia,
"ib": ib,
"lag_idx": lag_idx,
}
def _estimate_velocity_by_threshold(
t_fs: np.ndarray,
sa: np.ndarray,
sb: np.ndarray,
dx_A: float,
smooth_window: int = 7,
alpha: float = 0.2,
) -> tuple[float | None, dict]:
"""基于包络阈值的到达时间差估计速度。
Parameters
----------
t_fs : ndarray
时间序列(fs)
sa, sb : ndarray
两个探针的时间信号
dx_A : float
探针间距(Å)
smooth_window : int
平滑窗口(奇数)
alpha : float
阈值比例(相对最大值)
"""
if dx_A <= 0 or len(t_fs) < 5:
return None, {"reason": "invalid_inputs"}
w = max(3, int(smooth_window) | 1) # odd
def smooth_abs(x):
y = np.abs(x)
k = np.ones(w) / w
z = np.convolve(y, k, mode="same")
return z
ea = smooth_abs(sa)
eb = smooth_abs(sb)
ta = np.argmax(ea >= (alpha * np.max(ea)))
tb = np.argmax(eb >= (alpha * np.max(eb)))
if tb <= ta:
return None, {"reason": "non_positive_dt", "ta_idx": int(ta), "tb_idx": int(tb)}
lag_fs = t_fs[min(tb, len(t_fs) - 1)] - t_fs[min(ta, len(t_fs) - 1)]
if lag_fs <= 0:
return None, {"reason": "nonpositive_lag"}
v_kms = (dx_A / lag_fs) * 100.0
return v_kms, {"ta_fs": float(t_fs[ta]), "tb_fs": float(t_fs[tb]), "alpha": alpha}
def _estimate_velocity_by_envelope_peak(
t_fs: np.ndarray,
sa: np.ndarray,
sb: np.ndarray,
dx_A: float,
smooth_window: int = 21,
) -> tuple[float | None, dict]:
"""基于包络峰值时间差的速度估计。
使用移动均方根近似包络:env ≈ sqrt(moving_average(s^2))。
取各自最大值的时间差作为到达时间差。
"""
if dx_A <= 0 or len(t_fs) < 5:
return None, {"reason": "invalid_inputs"}
w = max(5, int(smooth_window) | 1)
def envelope(x):
x2 = np.square(x)
k = np.ones(w) / w
m = np.convolve(x2, k, mode="same")
return np.sqrt(np.maximum(m, 0.0))
ea = envelope(sa)
eb = envelope(sb)
ta_idx = int(np.argmax(ea))
tb_idx = int(np.argmax(eb))
if tb_idx <= ta_idx:
return None, {
"reason": "non_positive_dt_env",
"ta_idx": ta_idx,
"tb_idx": tb_idx,
}
lag_fs = float(t_fs[min(tb_idx, len(t_fs) - 1)] - t_fs[min(ta_idx, len(t_fs) - 1)])
if lag_fs <= 0:
return None, {"reason": "nonpositive_lag_env"}
v_kms = (dx_A / lag_fs) * 100.0
return v_kms, {"ta_fs": float(t_fs[ta_idx]), "tb_fs": float(t_fs[tb_idx])}
def _estimate_velocity_by_arrival_fit(
t_fs: np.ndarray,
x_centers: np.ndarray,
U_xt: np.ndarray,
x_min_A: float,
x_max_A: float,
t_gate_fs: float | None = None,
alpha: float = 0.3,
smooth_window: int = 21,
t_max_fs: float | None = None,
) -> tuple[float | None, dict]:
"""多探针到达时间直线拟合估计速度。
对每个空间位置计算包络达到阈值的最早时间,随后做 t(x) 的线性拟合,
斜率 b=dt/dx,速度 v=1/b(Å/fs → km/s 乘以100)。
"""
X = np.asarray(x_centers, dtype=float)
T = np.asarray(t_fs, dtype=float)
U = np.asarray(U_xt, dtype=float)
if U.shape != (len(T), len(X)):
return None, {"reason": "shape_mismatch"}
# 选择空间范围
mask_x = (x_min_A <= X) & (x_max_A >= X)
if np.count_nonzero(mask_x) < 5:
return None, {"reason": "insufficient_x_bins"}
Xs = X[mask_x]
Us = U[:, mask_x]
# 时间窗口与早期截断裁剪(避免晚期干涉抬高阈值)
mask_t = np.ones(len(T), dtype=bool)
if t_gate_fs is not None:
mask_t &= float(t_gate_fs) <= T
if t_max_fs is not None:
mask_t &= float(t_max_fs) >= T
if np.count_nonzero(mask_t) > 4:
T = T[mask_t]
Us = Us[mask_t, :]
# 构造包络与到达时间
w = max(5, int(smooth_window) | 1)
def envelope(col):
x2 = np.square(col)
k = np.ones(w) / w
m = np.convolve(x2, k, mode="same")
return np.sqrt(np.maximum(m, 0.0))
t_arr = []
x_sel = []
for j in range(Us.shape[1]):
env = envelope(Us[:, j])
# 使用早期窗内的最大值作为阈值参考,避免晚期干涉抬高阈值
thr = alpha * (np.max(env) if len(env) == 0 else float(np.max(env)))
idx = int(np.argmax(env >= thr))
if env[idx] >= thr and 0 < idx < len(T) - 1:
t_arr.append(T[idx])
x_sel.append(Xs[j])
if len(t_arr) < 5:
return None, {"reason": "insufficient_arrivals"}
x_sel = np.asarray(x_sel, dtype=float)
t_arr = np.asarray(t_arr, dtype=float)
# 线性拟合 t = a + b x
A = np.vstack([np.ones_like(x_sel), x_sel]).T
try:
sol, *_ = np.linalg.lstsq(A, t_arr, rcond=None)
a, b = float(sol[0]), float(sol[1])
if b <= 0:
return None, {"reason": "nonpositive_slope"}
v_kms = (1.0 / b) * 100.0
return v_kms, {"a_fs": a, "b_fs_per_A": b, "n_points": int(len(x_sel))}
except Exception as e:
return None, {"reason": f"lstsq_fail: {e}"}
def _estimate_velocity_by_komega(
t_fs: np.ndarray,
x_centers: np.ndarray,
U_xt: np.ndarray,
Lx_A: float,
t_gate_fs: float | None = None,
time_window_fs: float = 300.0,
) -> tuple[float | None, dict]:
"""基于 k–ω 谱峰的相速度估计 v = ω/k(Å/fs→km/s)。
简化实现:
- 时间窗:以 t_gate 为中心取一段窗口(或从开头截取窗口)
- k:对空间方向做FFT,取主峰索引 m→k=2π m / L
- ω:对上游探针做时间FFT,取主峰频率 f→ω=2π f
"""
T = np.asarray(t_fs, dtype=float)
X = np.asarray(x_centers, dtype=float)
U = np.asarray(U_xt, dtype=float)
if U.shape != (len(T), len(X)):
return None, {"reason": "shape_mismatch"}
# 时间窗索引
center = float(t_gate_fs) if t_gate_fs is not None else float(T[len(T) // 2])
half = time_window_fs / 2.0
mask_t = (center - half <= T) & (center + half >= T)
if np.count_nonzero(mask_t) < 8:
mask_t = slice(None)
Uw = U[mask_t, :]
# 空间FFT(对每个时间求谱再平均)
Umean = np.mean(Uw, axis=0)
SX = np.fft.rfft(Umean - np.mean(Umean))
m = np.argmax(np.abs(SX[1:])) + 1 if len(SX) > 2 else 0
if m <= 0:
return None, {"reason": "no_spatial_peak"}
k = 2.0 * np.pi * m / max(Lx_A, 1e-12)
# 时间FFT(上游探针处)
ia = int(np.argmin(np.abs(X - (0.2 * Lx_A))))
sa = U[:, ia]
# 同样时间窗
sa_win = sa[mask_t] if not isinstance(mask_t, slice) else sa
dt_fs = float(T[1] - T[0]) if len(T) >= 2 else 1.0
SF = np.fft.rfft(sa_win - np.mean(sa_win))
freqs = np.fft.rfftfreq(len(sa_win), d=dt_fs)
p = np.argmax(np.abs(SF[1:])) + 1 if len(SF) > 2 else 0
if p <= 0:
return None, {"reason": "no_temporal_peak"}
f = freqs[p]
omega = 2.0 * np.pi * f
if k <= 0 or omega <= 0:
return None, {"reason": "nonpositive_k_or_omega", "k": k, "omega": omega}
v_aa_fs = omega / k
v_kms = v_aa_fs * 100.0
return v_kms, {"k_Ainv": k, "omega_fs_inv": omega}
def _first_arrival_time(
T: np.ndarray, s: np.ndarray, alpha: float = 0.25, smooth_window: int = 21
) -> float | None:
"""返回包络首次超过阈值的时间(fs),失败返回None。"""
if len(T) < 5:
return None
w = max(5, int(smooth_window) | 1)
x2 = np.square(s)
k = np.ones(w) / w
m = np.convolve(x2, k, mode="same")
env = np.sqrt(np.maximum(m, 0.0))
thr = float(alpha) * float(np.max(env) if np.max(env) > 0 else 1.0)
idx = int(np.argmax(env >= thr))
if env[idx] >= thr:
return float(T[min(idx, len(T) - 1)])
return None
def _estimate_t_velocity_with_l_constraint(
t_fs: np.ndarray,
x_centers: np.ndarray,
U_T: np.ndarray,
U_L: np.ndarray,
x_min_A: float,
x_max_A: float,
t_gate_fs: float | None = None,
alpha_T: float = 0.25,
alpha_L: float = 0.25,
margin_fs: float = 100.0,
smooth_window: int = 21,
amplitude_ratio: float = 1.2,
t_max_fs: float | None = None,
) -> tuple[float | None, dict]:
"""横波速度估计(使用纵波到达时间约束)。
通过纵波先到达的物理约束,剔除横波信号中的纵波成分污染,
提高横波速度测量的准确性。
算法流程
---------
1. 空间范围限制:x∈[x_min, x_max]
2. 对每个空间位置x:
- 计算纵波通道的首次到达时间tL(x)
- 通过投影去除纵波成分:U_T_res = U_T - k*U_L
- 在残差信号上计算横波到达时间tT(x)
- 应用物理约束:tT ≥ tL + margin_fs
3. 线性拟合t(x)获取波速:v = 1/b (Å/fs) × 100 → km/s
Parameters
----------
t_fs : ndarray
时间序列(fs)
x_centers : ndarray
空间分箱中心(Å)
U_T : ndarray
横波通道信号(Ty或Tz分量)
U_L : ndarray
纵波通道信号(x分量)
x_min_A : float
空间范围最小值(Å)
x_max_A : float
空间范围最大值(Å)
t_gate_fs : float | None
时间窗口起始点(fs)
alpha_T : float
横波阈值比例
alpha_L : float
纵波阈值比例
margin_fs : float
纵横波时间间隔约束(fs)
smooth_window : int
包络平滑窗口大小
amplitude_ratio : float
幅值比例阈值,用于过滤纵波污染严重的信号
t_max_fs : float | None
时间窗口截止点(fs)
Returns
-------
v_kms : float | None
估计的横波速度(km/s)
info : dict
包含拟合参数和数据点数
"""
X = np.asarray(x_centers, dtype=float)
T = np.asarray(t_fs, dtype=float)
UT = np.asarray(U_T, dtype=float)
UL = np.asarray(U_L, dtype=float)
if UT.shape != (len(T), len(X)) or UL.shape != (len(T), len(X)):
return None, {"reason": "shape_mismatch"}
mask_x = (x_min_A <= X) & (x_max_A >= X)
if np.count_nonzero(mask_x) < 5:
return None, {"reason": "insufficient_x_bins"}
Xs = X[mask_x]
UTs = UT[:, mask_x]
ULs = UL[:, mask_x]
# 时间窗口与早期截断
mask_t = np.ones(len(T), dtype=bool)
if t_gate_fs is not None:
mask_t &= float(t_gate_fs) <= T
if t_max_fs is not None:
mask_t &= float(t_max_fs) >= T
if np.count_nonzero(mask_t) > 4:
T = T[mask_t]
UTs = UTs[mask_t, :]
ULs = ULs[mask_t, :]
t_arr_T = []
x_sel = []
ncols = UTs.shape[1]
for j in range(ncols):
colL = ULs[:, j]
colT = UTs[:, j]
# L 到达
tL = _first_arrival_time(T, colL, alpha=alpha_L, smooth_window=smooth_window)
# 去L分量
denom = float(np.dot(colL, colL)) + 1e-12
k = float(np.dot(colT, colL)) / denom
colRes = colT - k * colL
# 幅值门控:要求T残差包络峰值显著大于L通道
# 以避免L污染导致的过早到达
# 计算包络峰值
def env_max(x):
x2 = np.square(x)
w_local = max(5, int(smooth_window) | 1)
kk = np.ones(w_local) / w_local
mm = np.convolve(x2, kk, mode="same")
return float(np.sqrt(np.max(np.maximum(mm, 0.0))))
if env_max(colRes) < amplitude_ratio * env_max(colL):
continue
# T 到达
tT = _first_arrival_time(T, colRes, alpha=alpha_T, smooth_window=smooth_window)
if tT is None:
continue
if tL is not None and tT < tL + float(margin_fs):
continue
t_arr_T.append(tT)
x_sel.append(Xs[j])
if len(t_arr_T) < 5:
return None, {"reason": "insufficient_T_arrivals"}
x_sel = np.asarray(x_sel, dtype=float)
t_arr_T = np.asarray(t_arr_T, dtype=float)
A = np.vstack([np.ones_like(x_sel), x_sel]).T
try:
sol, *_ = np.linalg.lstsq(A, t_arr_T, rcond=None)
a, b = float(sol[0]), float(sol[1])
if b <= 0:
return None, {"reason": "nonpositive_slope_T"}
v_kms = (1.0 / b) * 100.0
return v_kms, {"a_fs": a, "b_fs_per_A": b, "n_points": int(len(x_sel))}
except Exception as e:
return None, {"reason": f"lstsq_fail_T: {e}"}
def _estimate_velocity_by_multi_xcorr(
t_fs: np.ndarray,
x_centers: np.ndarray,
U_xt: np.ndarray,
xa_frac: float,
xb_fracs: list[float],
Lx_A: float,
v_max_km_s: float,
t_gate_fs: float | None = None,
use_bandpass: bool = True,
) -> tuple[float | None, dict]:
"""多对探针的互相关飞行时间拟合。
- 固定上游 xa=Lx*xa_frac,多个下游 xb=Lx*xb_frac
- 每对计算 τ_i(带时间门控/最小物理滞后/频带过滤)
- 用最小二乘拟合 Δx = v·τ(过原点),求 v
"""
T = np.asarray(t_fs, dtype=float)
X = np.asarray(x_centers, dtype=float)
U = np.asarray(U_xt, dtype=float)
if U.shape != (len(T), len(X)):
return None, {"reason": "shape_mismatch"}
xa = float(xa_frac) * Lx_A
ia = int(np.argmin(np.abs(X - xa)))
sa = U[:, ia]
# 时间窗口过滤
if t_gate_fs is not None:
gate_mask = float(t_gate_fs) <= T
if np.count_nonzero(gate_mask) > 4:
sa = sa.copy()
sa[~gate_mask] = 0.0
dXs = []
Taus = []
for frac in xb_fracs:
xb = float(frac) * Lx_A
if xb <= xa:
continue
ib = int(np.argmin(np.abs(X - xb)))
sb = U[:, ib]
# 时间窗口过滤
sb_use = sb.copy()
if t_gate_fs is not None and np.count_nonzero(float(t_gate_fs) <= T) > 4:
sb_use[float(t_gate_fs) > T] = 0.0
# xcorr 估计 τ
# 计算滞后(fs)
tau_fs = _xcorr_lag_sa_sb(
T,
sa,
sb_use,
dx_A=(xb - xa),
v_max_km_s=v_max_km_s,
t_gate_fs=t_gate_fs,
use_bandpass=use_bandpass,
)
if tau_fs is None or tau_fs <= 0:
continue
dXs.append(xb - xa)
Taus.append(tau_fs)
if len(Taus) < 3:
return None, {"reason": "insufficient_pairs"}
dXs = np.asarray(dXs)
Taus = np.asarray(Taus)
# 线性拟合 Δx = v·τ(过原点)
b = float(np.dot(dXs, Taus) / np.dot(Taus, Taus)) if np.dot(Taus, Taus) > 0 else 0.0
if b <= 0:
return None, {"reason": "nonpositive_slope_dx_tau"}
v_kms = b * 100.0
return v_kms, {"pairs": int(len(Taus))}
def _xcorr_lag_sa_sb(
t_fs: np.ndarray,
sa: np.ndarray,
sb: np.ndarray,
dx_A: float,
v_max_km_s: float,
t_gate_fs: float | None = None,
use_bandpass: bool = True,
) -> float | None:
"""对两条时间序列做互相关,返回正滞后峰的时间(fs)。"""
sa = np.asarray(sa, dtype=float)
sb = np.asarray(sb, dtype=float)
T = np.asarray(t_fs, dtype=float)
if len(sa) != len(sb) or len(sa) != len(T):
return None
# 时间窗口过滤
if t_gate_fs is not None:
gate_mask = float(t_gate_fs) <= T
if np.count_nonzero(gate_mask) > 4:
sa = sa.copy()
sb = sb.copy()
sa[~gate_mask] = 0.0
sb[~gate_mask] = 0.0
# 去均值/标准化
sa = sa - np.nanmean(sa)
sb = sb - np.nanmean(sb)
sstd_a = float(np.nanstd(sa)) or 1.0
sstd_b = float(np.nanstd(sb)) or 1.0
sa /= sstd_a
sb /= sstd_b
# 带通
if use_bandpass and len(sa) >= 16:
dt_fs = float(T[1] - T[0]) if len(T) >= 2 else 1.0
fs = 1.0 / dt_fs
S = np.fft.rfft(sa)
freqs = np.fft.rfftfreq(len(sa), d=dt_fs)
if len(S) > 3:
k0 = 1
k_peak = int(np.argmax(np.abs(S[k0:])) + k0)
f_peak = max(freqs[k_peak], 0.0)
if f_peak > 0.0:
f_lo = max(0.5 * f_peak, 0.0)
f_hi = min(1.5 * f_peak, fs * 0.45)
def bp(x):
X = np.fft.rfft(x)
H = (freqs >= f_lo) & (freqs <= f_hi)
Xf = X * H
return np.fft.irfft(Xf, n=len(x))
sa = bp(sa)
sb = bp(sb)
# 相关
cc = np.correlate(sb, sa, mode="full")
mid = len(cc) // 2
cc_pos = cc[mid:]
if len(cc_pos) <= 1:
return None
dt_fs = float(T[1] - T[0]) if len(T) >= 2 else 1.0
# 最小物理滞后
v_max_aa_fs = v_max_km_s / 100.0
min_lag_fs_phys = dx_A / max(v_max_aa_fs, 1e-12)
import math as _math
ignore_head = max(1, int(_math.ceil(min_lag_fs_phys / dt_fs)))
cc_use = np.abs(cc_pos.copy())
cc_use[:ignore_head] = -np.inf
lag_idx = int(np.argmax(cc_use))
lag_fs = lag_idx * dt_fs
if lag_fs <= 0:
return None
return float(lag_fs)
[文档]
def simulate_plane_wave_mvp(
material_symbol: str = "Al",
dynamics: DynamicsConfig | None = None,
excitation: WaveExcitation | None = None,
out_xt_path: str | None = None,
) -> dict:
"""运行最小版 MD 平面波传播并测速(返回结果字典)。
为教学简化,当前固定使用 EAM Al1 势与 FCC 结构。后续可抽象至 CLI。
"""
mat_symbol = material_symbol
dyn = dynamics or DynamicsConfig()
exc = excitation or WaveExcitation(
direction=dyn.direction,
polarization=dyn.polarization,
n_waves=dyn.n_waves,
amplitude_velocity=dyn.amplitude_velocity,
amplitude_displacement=dyn.amplitude_displacement,
)
# 1) 构建超胞(FCC 对齐 x 方向)
a_ref = 4.045 # Å(EAM Al1 参考晶格常数)
builder = CrystallineStructureBuilder()
cell = builder.create_fcc(mat_symbol, a_ref, dyn.supercell)
# 2) 初始条件(可选)。默认不注入 standing/traveling,依靠源驱动
if not dyn.use_source:
exc.apply(cell)
# 3) NVE 传播并采样位移投影的 x–t 图
pot = EAMAl1Potential()
scheme = NVEScheme()
# 预计算初始力,确保第一步半步更新使用正确 F(t)
with contextlib.suppress(Exception):
pot.calculate_forces(cell)
R0 = cell.get_positions().copy()
pol_vec = (
np.array([1.0, 0.0, 0.0])
if exc.polarization == "L"
else (
np.array([0.0, 1.0, 0.0])
if exc.polarization == "Ty"
else np.array([0.0, 0.0, 1.0])
)
)
n_bins = dyn.supercell[0] * 4 # x 方向分辨率(每胞4个层面)
xs_ref, _ = _bin_average_signal(cell, np.zeros(cell.num_atoms), n_bins)
times = []
U_xt = [] # projection on polarization vector (legacy)
UX_xt = [] # longitudinal (x) component
UY_xt = [] # transverse-y component
UZ_xt = [] # transverse-z component
# 轨迹记录(可选)
writer = None
if dyn.record_trajectory:
from thermoelasticsim.utils.trajectory import TrajectoryWriter
traj_path = dyn.trajectory_file or "wave_trajectory.h5"
writer = TrajectoryWriter(traj_path)
writer.initialize(
cell.num_atoms, n_frames_estimate=max(2, dyn.steps // dyn.sample_every + 2)
)
t0 = time.time()
log_every = max(100, int(dyn.steps // 20))
for s in range(int(dyn.steps)):
# 时间(fs, ps)
t_fs = s * float(dyn.dt_fs)
# 源注入(位于 x ∈ [0, frac*Lx] 的薄片),高斯时间包络
if dyn.use_source and dyn.source_amplitude_velocity != 0.0:
Lx = float(np.linalg.norm(cell.lattice_vectors[0]))
slab_x = dyn.source_slab_fraction * Lx
R = cell.get_positions()
mask = R[:, 0] <= slab_x
# 注入方向按极化
e_inj = (
np.array([1.0, 0.0, 0.0])
if exc.polarization == "L"
else (
np.array([0.0, 1.0, 0.0])
if exc.polarization == "Ty"
else np.array([0.0, 0.0, 1.0])
)
)
amp = dyn.source_amplitude_velocity
dv_vec = None
if dyn.source_type == "gaussian" and dyn.source_sigma_fs > 0.0:
g = np.exp(
-0.5 * ((t_fs - dyn.source_t0_fs) / dyn.source_sigma_fs) ** 2
)
if g > 1e-8:
dv_vec = amp * float(g) * e_inj
elif (
dyn.source_type == "tone_burst"
and dyn.source_freq_thz > 0.0
and dyn.source_cycles > 0
):
# f_fs = THz / 1000, ω = 2π f_fs
f_fs = dyn.source_freq_thz / 1000.0
T_burst_fs = dyn.source_cycles / max(f_fs, 1e-12)
if 0.0 <= t_fs <= T_burst_fs:
# 汉宁窗包络
w = 0.5 * (1.0 - np.cos(2.0 * np.pi * t_fs / T_burst_fs))
dv_vec = amp * w * np.sin(2.0 * np.pi * f_fs * t_fs) * e_inj
if dv_vec is not None:
for i, atom in enumerate(cell.atoms):
if mask[i]:
atom.velocity += dv_vec
# 注入产生净动量,移除体系整体质心平动,避免整体飘移/拼接错觉
cell.remove_com_motion()
# 正常NVE推进
scheme.step(cell, pot, float(dyn.dt_fs))
# 吸收边界(海绵层):在两端对速度做指数衰减,减少绕回/反射
if (
dyn.absorber_enabled
and dyn.absorber_slab_fraction > 0
and dyn.absorber_tau_fs > 0
):
Lx = float(np.linalg.norm(cell.lattice_vectors[0]))
slab = float(dyn.absorber_slab_fraction) * Lx
if slab > 0:
R = cell.get_positions()
X = R[:, 0]
w = np.zeros(cell.num_atoms, dtype=float)
left = slab > X
right = (Lx - slab) < X
if np.any(left):
xi = (slab - X[left]) / slab # 0→1 从内到外
if dyn.absorber_profile == "linear":
w[left] = np.clip(xi, 0.0, 1.0)
else: # cos2 平滑
w[left] = np.square(np.sin(0.5 * np.pi * np.clip(xi, 0.0, 1.0)))
if np.any(right):
xi = (X[right] - (Lx - slab)) / slab
if dyn.absorber_profile == "linear":
w[right] = np.clip(xi, 0.0, 1.0)
else:
w[right] = np.square(
np.sin(0.5 * np.pi * np.clip(xi, 0.0, 1.0))
)
if np.any(w > 0):
V = cell.get_velocities()
# 指数衰减因子:exp(-dt/τ * w)
factors = np.exp(
-(float(dyn.dt_fs) / float(dyn.absorber_tau_fs)) * w
)
V = (V.T * factors).T
for i, atom in enumerate(cell.atoms):
atom.velocity = V[i]
# 防止引入净动量
cell.remove_com_motion()
if s % int(dyn.sample_every) == 0:
times.append(t_fs)
R = cell.get_positions()
disp = R - R0
u_pol = disp @ pol_vec # 位移在极化方向投影
ux = disp[:, 0]
uy = disp[:, 1]
uz = disp[:, 2]
x_centers, avg_pol = _bin_average_signal(cell, u_pol, n_bins)
_, avg_x = _bin_average_signal(cell, ux, n_bins)
_, avg_y = _bin_average_signal(cell, uy, n_bins)
_, avg_z = _bin_average_signal(cell, uz, n_bins)
U_xt.append(avg_pol)
UX_xt.append(avg_x)
UY_xt.append(avg_y)
UZ_xt.append(avg_z)
if writer is not None:
# 以 ps 计时间
writer.write_frame(
positions=R,
box=cell.lattice_vectors,
time=t_fs / 1000.0,
step=s,
)
if s and (s % log_every == 0):
elapsed = time.time() - t0
pct = 100.0 * s / float(dyn.steps)
eta = elapsed * (dyn.steps - s) / s
logger.info(
f"MD传播 {s}/{dyn.steps} ({pct:.1f}%), 采样帧={len(times)}, 用时={elapsed:.1f}s, 预计剩余={eta:.1f}s"
)
t_fs_arr = np.asarray(times, dtype=float)
U_xt_arr = np.asarray(U_xt, dtype=float) # shape (T, X)
UX_arr = np.asarray(UX_xt, dtype=float)
UY_arr = np.asarray(UY_xt, dtype=float)
UZ_arr = np.asarray(UZ_xt, dtype=float)
# 4) 互相关测速:选用 0.25L 与 0.50L;裁剪时间窗口避免绕回与后期干涉
Lx = float(np.linalg.norm(cell.lattice_vectors[0]))
xa = float(dyn.detector_frac_a) * Lx
xb = float(dyn.detector_frac_b) * Lx
# 物理最大速度上限(用于互相关最小滞后约束)
if dyn.v_max_km_s is not None and dyn.v_max_km_s > 0:
v_max_km_s = float(dyn.v_max_km_s)
else:
v_max_km_s = 8.0 if dyn.polarization == "L" else 5.0
# 时间窗口过滤:对高斯取 t0+2σ;对tone burst取 burst持续时间
if dyn.use_source:
if dyn.source_type == "gaussian":
t_gate = dyn.source_t0_fs + 1.2 * dyn.source_sigma_fs
else:
f_fs = dyn.source_freq_thz / 1000.0 if dyn.source_freq_thz > 0 else 0.0
T_burst_fs = (dyn.source_cycles / f_fs) if f_fs > 0 else 0.0
t_gate = T_burst_fs
else:
t_gate = None
t_cut_fs = min(t_fs_arr.max(), 0.8 * Lx / (v_max_km_s / 100.0))
mask_t = t_fs_arr <= t_cut_fs
t_win = t_fs_arr[mask_t]
U_win = U_xt_arr[mask_t, :]
v_kms = None
xinfo = {}
candidates: list[tuple[str, float, dict]] = []
ia = int(np.argmin(np.abs(xs_ref - xa)))
ib = int(np.argmin(np.abs(xs_ref - xb)))
# 优先选择估计方法:Gaussian → 多探针到达拟合(早期窗);ToneBurst → 互相关
if dyn.measure_method in ("arrival", "arrival_T_guard") or (
dyn.use_source and dyn.source_type == "gaussian"
):
# 多点拟合(更稳健)
x_min_A = float(dyn.detector_frac_a) * Lx
x_max_A = float(dyn.detector_frac_b) * Lx
# 早期截止时间:避免晚期绕回/干涉抬高阈值
t_early_end = float(t_gate if t_gate is not None else t_win.min()) + 0.6 * (
Lx / (v_max_km_s / 100.0)
)
t_early_end = min(t_early_end, t_cut_fs)
if dyn.polarization == "L" and dyn.measure_method != "arrival_T_guard":
v0, info0 = _estimate_velocity_by_arrival_fit(
t_win,
xs_ref,
UX_arr[mask_t, :],
x_min_A,
x_max_A,
t_gate_fs=t_gate,
alpha=0.18,
t_max_fs=t_early_end,
)
else:
# 对横波:用L通道做护栏,剔除早于L到达的分量
U_T = UY_arr if dyn.polarization == "Ty" else UZ_arr
v0, info0 = _estimate_t_velocity_with_l_constraint(
t_win,
xs_ref,
U_T[mask_t, :],
UX_arr[mask_t, :],
x_min_A,
x_max_A,
t_gate_fs=t_gate,
alpha_T=0.20,
alpha_L=0.20,
margin_fs=150.0,
t_max_fs=t_early_end,
)
if v0 is not None:
candidates.append(("arrival_fit", v0, info0))
if dyn.measure_method in ("auto", "xcorr"):
v1, info1 = _estimate_velocity_by_xcorr(
t_win,
xa,
xb,
xs_ref,
U_win,
v_pred_km_s=None,
max_speed_km_s=v_max_km_s,
t_gate_fs=t_gate,
use_bandpass=True,
)
if v1 is not None:
candidates.append(("xcorr", v1, info1))
# multi_xcorr(多对探针)
if dyn.measure_method in ("auto", "multi_xcorr", "arrival", "arrival_T_guard"):
xb_fracs = [float(dyn.detector_frac_a) + 0.05 * i for i in range(2, 9)]
xb_fracs = [f for f in xb_fracs if f < float(dyn.detector_frac_b)]
v5, info5 = _estimate_velocity_by_multi_xcorr(
t_win,
xs_ref,
U_win,
xa_frac=float(dyn.detector_frac_a),
xb_fracs=xb_fracs,
Lx_A=Lx,
v_max_km_s=v_max_km_s,
t_gate_fs=t_gate,
use_bandpass=True,
)
if v5 is not None:
candidates.append(("multi_xcorr", v5, info5))
# 如不在物理合理范围,尝试阈值法回退
if dyn.measure_method in ("auto", "threshold"):
v2, info2 = _estimate_velocity_by_threshold(
t_win, U_win[:, ia], U_win[:, ib], abs(xb - xa)
)
if v2 is not None:
candidates.append(("threshold", v2, info2))
# 如仍不合理,使用包络峰值回退
if dyn.measure_method in ("auto", "envelope"):
v3, info3 = _estimate_velocity_by_envelope_peak(
t_win, U_win[:, ia], U_win[:, ib], abs(xb - xa)
)
if v3 is not None:
candidates.append(("envelope_peak", v3, info3))
# k–ω 候选
if dyn.measure_method in ("auto", "komega"):
v4, info4 = _estimate_velocity_by_komega(
t_win, xs_ref, U_win, Lx, t_gate_fs=t_gate
)
if v4 is not None:
candidates.append(("komega", v4, info4))
# 选择候选:
# - 若为 Gaussian 源:优先 arrival_fit(L: 直接;T: 带 L 护栏)
# - 若用户显式 method=arrival/arrival_T_guard:强制使用 arrival_fit
# - 否则:优先 multi_xcorr;再回退到在物理范围内的中位数;再回退到最接近 5 km/s
method_requested = str(dyn.measure_method or "auto")
# 整理候选
cand_map: dict[str, tuple[float, dict]] = {}
for m, v, info in candidates:
# 仅保留每种方法的一个代表值(首个)
if m not in cand_map:
cand_map[m] = (float(v), info)
def _in_range(v: float) -> bool:
return 1.0 <= float(v) <= float(v_max_km_s)
# 1) 用户强制 arrival/arrival_T_guard
if method_requested in ("arrival", "arrival_T_guard"):
if "arrival_fit" in cand_map and _in_range(cand_map["arrival_fit"][0]):
v, info = cand_map["arrival_fit"]
v_kms, xinfo = v, {"method": "arrival_fit", **info}
elif "multi_xcorr" in cand_map and _in_range(cand_map["multi_xcorr"][0]):
v, info = cand_map["multi_xcorr"]
v_kms, xinfo = v, {"method": "multi_xcorr", **info}
# 2) Gaussian 源:尽量使用 arrival_fit
if (
v_kms is None
and dyn.use_source
and dyn.source_type == "gaussian"
and "arrival_fit" in cand_map
):
v, info = cand_map["arrival_fit"]
if _in_range(v):
v_kms, xinfo = v, {"method": "arrival_fit", **info}
# 3) 优先 multi_xcorr
if (
v_kms is None
and "multi_xcorr" in cand_map
and _in_range(cand_map["multi_xcorr"][0])
):
v, info = cand_map["multi_xcorr"]
v_kms, xinfo = v, {"method": "multi_xcorr", **info}
# 4) 在物理范围内的中位数
if v_kms is None:
valid = [(m, v, info) for (m, v, info) in candidates if _in_range(v)]
if valid:
vs = sorted([v for (_, v, __) in valid])
v_pick = vs[len(vs) // 2]
for m, v, info in valid:
if abs(v - v_pick) < 1e-9:
v_kms, xinfo = v, {"method": m, **info}
break
elif candidates:
# 选择最接近 5 km/s 的
m, v, info = min(candidates, key=lambda x: abs(x[1] - 5.0))
v_kms, xinfo = v, {"method": m, **info}
# 5) x-t 双面板图:左侧显示位移场,右侧显示包络(便于识别波前)
if out_xt_path:
# 尝试创建增强版4子图,失败则回退到简单版2子图
enhanced_plot_success = False
try:
# 创建更复杂的布局:2x2子图
fig = plt.figure(figsize=(12, 8))
# 左上:位移场
ax1 = plt.subplot(2, 2, 1)
extent = [
xs_ref.min(),
xs_ref.max(),
t_fs_arr.min() / 1000.0,
t_cut_fs / 1000.0,
]
# 位移场(对称色标)
vmax = float(np.nanmax(np.abs(U_win))) or 1e-12
vlim = 0.85 * vmax
im1 = ax1.imshow(
U_win,
origin="lower",
aspect="auto",
extent=[extent[0], extent[1], extent[2], extent[3]],
cmap="RdBu_r",
vmin=-vlim,
vmax=+vlim,
)
ax1.set_xlabel("位置 x (Å)")
ax1.set_ylabel("时间 t (ps)")
ax1.set_title("位移场 u·e")
cbar1 = fig.colorbar(im1, ax=ax1)
cbar1.set_label("位移 (Å)", rotation=270, labelpad=15)
# 标记探测点位置
ax1.axvline(
x=xa,
color="k",
lw=0.8,
alpha=0.5,
linestyle="--",
label=f"探测点A: {xa:.1f}Å",
)
ax1.axvline(
x=xb,
color="k",
lw=0.8,
alpha=0.5,
linestyle=":",
label=f"探测点B: {xb:.1f}Å",
)
# 右上:包络(RMS)
ax2 = plt.subplot(2, 2, 2)
w_env = max(5, int(len(t_win) // 20) | 1)
K = np.ones(w_env) / w_env
U2 = U_win * U_win
# 沿时间轴卷积
env = np.apply_along_axis(
lambda col: np.convolve(col, K, mode="same"), axis=0, arr=U2
)
env = np.sqrt(np.maximum(env, 0.0))
vmax_env = float(np.nanpercentile(env, 99.0)) or 1e-12
im2 = ax2.imshow(
env,
origin="lower",
aspect="auto",
extent=[extent[0], extent[1], extent[2], extent[3]],
cmap="hot",
vmin=0.0,
vmax=vmax_env,
)
ax2.set_xlabel("位置 x (Å)")
ax2.set_ylabel("时间 t (ps)")
ax2.set_title("包络 |u·e|")
cbar2 = fig.colorbar(im2, ax=ax2)
cbar2.set_label("包络幅度 (Å)", rotation=270, labelpad=15)
# 标记探测点
ax2.axvline(x=xa, color="w", lw=0.8, alpha=0.5, linestyle="--")
ax2.axvline(x=xb, color="w", lw=0.8, alpha=0.5, linestyle=":")
# 如果有到达拟合结果,添加波前线
if isinstance(xinfo, dict) and ("a_fs" in xinfo and "b_fs_per_A" in xinfo):
xs_line = np.linspace(xs_ref.min(), xs_ref.max(), 200)
t_line_ps = (
float(xinfo["a_fs"]) + float(xinfo["b_fs_per_A"]) * xs_line
) / 1000.0
ax1.plot(
xs_line,
t_line_ps,
"g--",
lw=1.5,
alpha=0.7,
label=f"波前 (v={v_kms:.2f} km/s)",
)
ax2.plot(xs_line, t_line_ps, "y--", lw=1.5, alpha=0.7)
ax1.legend(loc="upper right", fontsize=8)
# 左下:探测点A的时域信号
ax3 = plt.subplot(2, 2, 3)
ia_plot = int(np.argmin(np.abs(xs_ref - xa)))
signal_a = U_win[:, ia_plot]
ax3.plot(t_win / 1000.0, signal_a, "b-", lw=1.0, label=f"x={xa:.1f}Å")
ax3.set_xlabel("时间 t (ps)")
ax3.set_ylabel("位移 u·e (Å)")
ax3.set_title("探测点A信号")
ax3.grid(True, alpha=0.3)
ax3.legend()
# 右下:探测点B的时域信号
ax4 = plt.subplot(2, 2, 4)
ib_plot = int(np.argmin(np.abs(xs_ref - xb)))
signal_b = U_win[:, ib_plot]
ax4.plot(t_win / 1000.0, signal_b, "r-", lw=1.0, label=f"x={xb:.1f}Å")
ax4.set_xlabel("时间 t (ps)")
ax4.set_ylabel("位移 u·e (Å)")
ax4.set_title("探测点B信号")
ax4.grid(True, alpha=0.3)
ax4.legend()
# 添加互相关信息(如果有)
if v_kms is not None and "lag_fs" in xinfo:
lag_ps = xinfo.get("lag_fs", 0) / 1000.0
ax4.axvline(
x=lag_ps,
color="g",
linestyle="--",
alpha=0.5,
label=f"延迟: {lag_ps:.2f} ps",
)
ax4.legend()
# 总标题
method_str = (
xinfo.get("method", "unknown") if isinstance(xinfo, dict) else "unknown"
)
title = (
f"弹性波传播分析 - {dyn.polarization}波 | v≈{v_kms:.2f} km/s (方法: {method_str})"
if v_kms is not None
else f"弹性波传播分析 - {dyn.polarization}波 | 速度估计失败"
)
fig.suptitle(title, fontsize=12, y=0.98)
fig.tight_layout()
fig.savefig(out_xt_path, dpi=300, bbox_inches="tight")
plt.close(fig)
enhanced_plot_success = True
except Exception:
enhanced_plot_success = False
# 如果增强版失败,使用简单版
if not enhanced_plot_success:
fig, axes = plt.subplots(1, 2, figsize=(10.5, 4), sharey=True)
extent = [
xs_ref.min(),
xs_ref.max(),
t_fs_arr.min() / 1000.0,
t_cut_fs / 1000.0,
]
# 左:位移(对称色标)
vmax = float(np.nanmax(np.abs(U_win))) or 1e-12
vlim = 0.85 * vmax
im0 = axes[0].imshow(
U_win,
origin="lower",
aspect="auto",
extent=[extent[0], extent[1], extent[2], extent[3]],
cmap="RdBu_r",
vmin=-vlim,
vmax=+vlim,
)
axes[0].set_xlabel("x (Å)")
axes[0].set_ylabel("time (ps)")
cbar0 = fig.colorbar(im0, ax=axes[0])
cbar0.set_label("u·e (Å)")
axes[0].axvline(x=xa, color="k", lw=0.8, alpha=0.35)
axes[0].axvline(x=xb, color="k", lw=0.8, alpha=0.35)
# 右:包络(RMS)
w_env = max(5, int(len(t_win) // 20) | 1)
K = np.ones(w_env) / w_env
U2 = U_win * U_win
# 沿时间轴卷积
env = np.apply_along_axis(
lambda col: np.convolve(col, K, mode="same"), axis=0, arr=U2
)
env = np.sqrt(np.maximum(env, 0.0))
vmax_env = float(np.nanpercentile(env, 99.0)) or 1e-12
im1 = axes[1].imshow(
env,
origin="lower",
aspect="auto",
extent=[extent[0], extent[1], extent[2], extent[3]],
cmap="magma",
vmin=0.0,
vmax=vmax_env,
)
axes[1].set_xlabel("x (Å)")
cbar1 = fig.colorbar(im1, ax=axes[1])
cbar1.set_label("|u·e| (RMS Å)")
axes[1].axvline(x=xa, color="w", lw=0.8, alpha=0.35)
axes[1].axvline(x=xb, color="w", lw=0.8, alpha=0.35)
# 若有到达拟合直线,同步叠加在两幅图上
if isinstance(xinfo, dict) and ("a_fs" in xinfo and "b_fs_per_A" in xinfo):
xs_line = np.linspace(xs_ref.min(), xs_ref.max(), 200)
t_line_ps = (
float(xinfo["a_fs"]) + float(xinfo["b_fs_per_A"]) * xs_line
) / 1000.0
for ax in axes:
ax.plot(
xs_line,
t_line_ps,
"w--" if ax is axes[1] else "k--",
lw=1.0,
alpha=0.8,
)
title = (
f"弹性波传播时空图(v≈{v_kms:.2f} km/s)"
if v_kms is not None
else "弹性波传播时空图(速度估计失败)"
)
fig.suptitle(title, y=1.02)
fig.tight_layout()
try:
fig.savefig(out_xt_path, dpi=300, bbox_inches="tight")
finally:
plt.close(fig)
result = {
"material": mat_symbol,
"supercell": dyn.supercell,
"dt_fs": dyn.dt_fs,
"steps": dyn.steps,
"sample_every": dyn.sample_every,
"direction": dyn.direction,
"polarization": dyn.polarization,
"n_waves": dyn.n_waves,
"amplitude_velocity": dyn.amplitude_velocity,
"amplitude_displacement": dyn.amplitude_displacement,
"use_source": dyn.use_source,
"record_trajectory_requested": bool(dyn.record_trajectory),
"source": {
"slab_fraction": dyn.source_slab_fraction,
"amplitude_velocity": dyn.source_amplitude_velocity,
"t0_fs": dyn.source_t0_fs,
"sigma_fs": dyn.source_sigma_fs,
},
"x_centers_A": xs_ref.tolist(),
"t_fs": t_fs_arr.tolist(),
"velocity_estimate_km_s": v_kms,
"xcorr_info": xinfo,
}
if writer is not None:
# 写入元数据,便于GIF叠加信息
with contextlib.suppress(Exception):
writer.write_metadata(
{
"polarization": str(dyn.polarization),
"source_type": str(dyn.source_type) if dyn.use_source else "none",
"detector_frac_a": float(dyn.detector_frac_a),
"detector_frac_b": float(dyn.detector_frac_b),
"dt_fs": float(dyn.dt_fs),
"steps": int(dyn.steps),
"supercell_x": int(dyn.supercell[0]),
"supercell_y": int(dyn.supercell[1]),
"supercell_z": int(dyn.supercell[2]),
"velocity_estimate_km_s": float(v_kms)
if v_kms is not None
else -1.0,
"velocity_method": str(xinfo.get("method", "unknown"))
if isinstance(xinfo, dict)
else "unknown",
}
)
writer.close()
result["trajectory_file"] = writer.filename.as_posix()
return result
__all__ = [
"WaveExcitation",
"DynamicsConfig",
"simulate_plane_wave_mvp",
]