-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsegment_editor.py
More file actions
407 lines (337 loc) · 14.2 KB
/
segment_editor.py
File metadata and controls
407 lines (337 loc) · 14.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
#!/usr/bin/env python3
"""
听写视频分段变速剪辑工具 v2
- 韩语发音段:保持原速(1x)+ 扩音器
- 写字过程:加速1.2-1.4x + 扩音器(笔尖摩擦声保留)
- 自动按目标时长(如12-15秒)分组生成短视频
"""
import argparse
import json
import os
import re
import shutil
import subprocess
import sys
import tempfile
from pathlib import Path
def log(msg, level="info"):
prefix = {"info": "[INFO]", "warn": "[WARN]", "error": "[ERR ]", "ok": "[ OK ]", "step": "[STEP]"}
print(f"{prefix.get(level, '[INFO]')} {msg}")
def run(cmd, desc=""):
if desc:
log(desc)
result = subprocess.run(cmd, capture_output=True, text=True, encoding="utf-8", errors="replace")
if result.returncode != 0:
for line in (result.stderr or "").strip().split("\n")[-3:]:
if line.strip():
log(f" {line.strip()}", "error")
return result
def get_duration(path):
r = run(["ffprobe", "-v", "quiet", "-print_format", "json", "-show_format", str(path)])
if r.returncode != 0:
return 0.0
try:
return float(json.loads(r.stdout)["format"]["duration"])
except (KeyError, ValueError, json.JSONDecodeError):
return 0.0
def format_time(seconds):
m, s = divmod(int(seconds), 60)
return f"{m}分{s}秒" if m else f"{s}秒"
def detect_raw_sound_segments(input_path, threshold_db=-30, min_dur=0.15):
"""使用 FFmpeg silencedetect 检测原始有声段"""
r = run([
"ffmpeg", "-i", str(input_path),
"-af", f"silencedetect=noise={threshold_db}dB:d={min_dur}",
"-f", "null", "NUL",
])
stderr = r.stderr or ""
silence_starts = [float(x) for x in re.findall(r"silence_start:\s*([\d.]+)", stderr)]
silence_ends = [float(x) for x in re.findall(r"silence_end:\s*([\d.]+)", stderr)]
total_dur = get_duration(input_path)
events = [(s, "silence_begin") for s in silence_starts] + [(e, "silence_end") for e in silence_ends]
events.sort(key=lambda x: x[0])
segments = []
cur_start = 0.0
in_sound = True
for t, evt in events:
if evt == "silence_begin" and in_sound:
if t > cur_start + 0.03:
segments.append((cur_start, t))
in_sound = False
elif evt == "silence_end" and not in_sound:
cur_start = t
in_sound = True
if in_sound and total_dur > cur_start + 0.03:
segments.append((cur_start, total_dur))
return segments, total_dur
def merge_nearby_segments(segments, merge_gap=0.5):
"""合并间隔小于 merge_gap 的有声段"""
if not segments:
return []
merged = [list(segments[0])]
for s, e in segments[1:]:
if s - merged[-1][1] <= merge_gap:
merged[-1][1] = max(merged[-1][1], e)
else:
merged.append([s, e])
return [(s, e) for s, e in merged]
def classify_and_build_timeline(merged_segments, total_dur, min_pron=0.3):
"""
分类有声段为「发音」和「笔触噪音」,构建交替的时间线。
时间线格式: [{"type": "pronounce"/"write", "start": t, "end": t}, ...]
- pronounce: 有声段 > min_pron 秒
- write: 所有无声/笔触噪音的时间
"""
timeline = []
pos = 0.0
for seg_start, seg_end in merged_segments:
seg_dur = seg_end - seg_start
# seg_start 之前是写字段
if seg_start > pos + 0.05:
timeline.append({"type": "write", "start": pos, "end": seg_start})
if seg_dur >= min_pron:
timeline.append({"type": "pronounce", "start": seg_start, "end": seg_end})
else:
# 短促声(笔触噪音)归入写字段
timeline.append({"type": "write", "start": seg_start, "end": seg_end})
pos = seg_end
if pos < total_dur - 0.05:
timeline.append({"type": "write", "start": pos, "end": total_dur})
return timeline
def build_word_units(timeline):
"""
将时间线分组为单词单元。
每个单元 = {pronounce_start, pronounce_end, write_start, write_end}
write = 从发音结束到下一个发音开始的全部时间。
"""
# 先合并连续的相同类型段
clean = []
for item in timeline:
if clean and clean[-1]["type"] == item["type"]:
clean[-1]["end"] = item["end"]
else:
clean.append(dict(item))
# 提取发音段
pronounce_items = [item for item in clean if item["type"] == "pronounce"]
units = []
for i, p in enumerate(pronounce_items):
# 写字段: 从当前发音结束到下一个发音开始
write_start = p["end"]
if i + 1 < len(pronounce_items):
write_end = pronounce_items[i + 1]["start"]
else:
write_end = p["end"] # 最后一个词没有写字段
units.append({
"pronounce_start": p["start"],
"pronounce_end": p["end"],
"write_start": write_start,
"write_end": write_end,
})
return units
def extract_segment(input_path, output_path, start, end, speed=1.0,
megaphone=True, loudness=-8):
"""提取视频片段,应用变速+音效"""
duration = end - start
if duration < 0.03:
return False
vf = f"setpts=PTS/{speed}"
af_parts = []
if speed != 1.0:
af_parts.append(f"atempo={speed}")
if megaphone:
af_parts.extend([
"highpass=f=300",
"lowpass=f=4000",
"acompressor=threshold=-15dB:ratio=10:attack=1:release=50",
"equalizer=f=800:t=q:w=0.5:g=8",
"equalizer=f=2000:t=q:w=0.5:g=3",
])
af_parts.append(f"loudnorm=I={loudness}:TP=-1:LRA=13")
af = ",".join(af_parts)
r = run([
"ffmpeg",
"-ss", f"{start:.4f}",
"-t", f"{duration:.4f}",
"-i", str(input_path),
"-vf", vf,
"-af", af,
"-c:v", "libx264", "-preset", "ultrafast", "-crf", "18",
"-c:a", "aac", "-b:a", "64k",
"-ar", "44100",
"-avoid_negative_ts", "make_zero",
"-y", str(output_path),
])
return r.returncode == 0 and Path(output_path).exists()
def concat_segments(segment_paths, output_path):
"""拼接多个 TS 片段"""
if not segment_paths:
return False
if len(segment_paths) == 1:
shutil.copy2(str(segment_paths[0]), str(output_path))
return True
concat_list = output_path.parent / "concat.txt"
with open(concat_list, "w", encoding="utf-8") as f:
for seg in segment_paths:
f.write(f"file '{str(seg).replace(chr(92), '/')}'\n")
r = run([
"ffmpeg", "-f", "concat", "-safe", "0",
"-i", str(concat_list),
"-c", "copy",
"-y", str(output_path),
])
return r.returncode == 0 and Path(output_path).exists()
def estimate_duration(unit, write_speed):
"""估算一个单词单元处理后的时长"""
p_dur = unit["pronounce_end"] - unit["pronounce_start"]
w_dur = unit["write_end"] - unit["write_start"]
return p_dur + w_dur / write_speed
def group_into_clips(units, write_speed, target_min=12, target_max=15):
"""
将单词单元分组为片段,使每个片段时长在目标范围内。
使用贪心算法:逐个添加单元,直到累计估算时长 >= target_min。
"""
clips = []
i = 0
while i < len(units):
clip_units = [units[i]]
acc_dur = estimate_duration(units[i], write_speed)
i += 1
while i < len(units) and acc_dur < target_min:
next_dur = estimate_duration(units[i], write_speed)
if acc_dur + next_dur > target_max * 1.1 and len(clip_units) >= 1:
break
clip_units.append(units[i])
acc_dur += next_dur
i += 1
clips.append((clip_units, acc_dur))
return clips
def process_clip(input_path, output_path, clip_units, write_speed,
megaphone=True, loudness=-8):
"""处理一个片段:提取各子段 → 变速 → 拼接"""
with tempfile.TemporaryDirectory(prefix="veditor_seg_") as tmpdir:
tmpdir = Path(tmpdir)
seg_files = []
for idx, unit in enumerate(clip_units):
# 发音段(1x)
p_start, p_end = unit["pronounce_start"], unit["pronounce_end"]
p_dur = p_end - p_start
if p_dur > 0.03:
p_file = tmpdir / f"p_{idx:04d}.ts"
ok = extract_segment(input_path, p_file, p_start, p_end,
speed=1.0, megaphone=megaphone, loudness=loudness)
if ok:
seg_files.append(p_file)
log(f" 发音 {p_start:.2f}-{p_end:.2f}s ({p_dur:.2f}s, 1.0x)")
# 写字段(变速)
w_start, w_end = unit["write_start"], unit["write_end"]
w_dur = w_end - w_start
if w_dur > 0.03:
w_file = tmpdir / f"w_{idx:04d}.ts"
ok = extract_segment(input_path, w_file, w_start, w_end,
speed=write_speed, megaphone=megaphone, loudness=loudness)
if ok:
seg_files.append(w_file)
log(f" 写字 {w_start:.2f}-{w_end:.2f}s ({w_dur:.2f}s, {write_speed}x)")
if not seg_files:
return False, 0
ok = concat_segments(seg_files, output_path)
if ok:
dur = get_duration(output_path)
return True, dur
return False, 0
def main():
parser = argparse.ArgumentParser(description="听写视频分段变速剪辑工具 v2")
parser.add_argument("input", help="输入视频文件")
parser.add_argument("-o", "--output-dir", default="./clips", help="输出目录 (默认: ./clips)")
parser.add_argument("--target-duration", default="12-15",
help="目标每段时长范围,如 '12-15' (默认: 12-15)")
parser.add_argument("--write-speed", type=float, default=1.3,
help="写字过程倍速 (默认: 1.3)")
parser.add_argument("--silence-db", type=int, default=-30,
help="静音阈值 dB, 越小越敏感 (默认: -30)")
parser.add_argument("--merge-gap", type=float, default=0.5,
help="发音段合并间隔/秒 (默认: 0.5)")
parser.add_argument("--min-pron", type=float, default=0.3,
help="最小发音段时长/秒 (默认: 0.3)")
parser.add_argument("--loudness", type=int, default=-8,
help="目标响度 LUFS (默认: -8)")
parser.add_argument("--no-megaphone", action="store_true",
help="禁用扩音器音效")
args = parser.parse_args()
input_path = Path(args.input).resolve()
if not input_path.exists():
log(f"文件不存在: {input_path}", "error")
sys.exit(1)
# 解析 target-duration
parts = args.target_duration.split("-")
target_min = float(parts[0])
target_max = float(parts[1]) if len(parts) > 1 else target_min
megaphone = not args.no_megaphone
print("=" * 55)
print(" 听写视频分段变速剪辑工具 v2")
print("=" * 55)
print()
log(f"输入: {input_path.name}")
log(f"参数: 目标{target_min:.0f}-{target_max:.0f}s/段, 写字{args.write_speed}x, "
f"静音阈值{args.silence_db}dB, 合并间隔{args.merge_gap}s, 最小发音{args.min_pron}s")
log(f"扩音器: {'关' if not megaphone else '开'}")
print()
# 步骤 1: 检测原始有声段
log("步骤 1/4: 检测音频段", "step")
raw_segments, total_dur = detect_raw_sound_segments(
input_path, threshold_db=args.silence_db, min_dur=0.15
)
log(f" 原始有声段: {len(raw_segments)} 个, 总时长 {format_time(total_dur)}")
# 步骤 2: 合并相近段 + 分类
log("步骤 2/4: 识别发音组", "step")
merged = merge_nearby_segments(raw_segments, merge_gap=args.merge_gap)
timeline = classify_and_build_timeline(merged, total_dur, min_pron=args.min_pron)
pron_count = sum(1 for t in timeline if t["type"] == "pronounce")
write_count = sum(1 for t in timeline if t["type"] == "write")
log(f" 合并后: {len(merged)} 段 → 发音 {pron_count} 个, 写字段 {write_count} 个")
# 步骤 3: 构建单词单元
log("步骤 3/4: 构建单词单元", "step")
units = build_word_units(timeline)
log(f" 识别出 {len(units)} 个单词单元:")
for idx, u in enumerate(units):
p_dur = u["pronounce_end"] - u["pronounce_start"]
w_dur = u["write_end"] - u["write_start"]
est = estimate_duration(u, args.write_speed)
log(f" 词{idx+1:2d}: 发音 {u['pronounce_start']:6.1f}-{u['pronounce_end']:6.1f}s "
f"({p_dur:.1f}s), 写字 {u['write_start']:6.1f}-{u['write_end']:6.1f}s "
f"({w_dur:.1f}s) → 预计 {est:.1f}s")
# 步骤 4: 分组 + 处理
log("步骤 4/4: 生成分段视频", "step")
clips = group_into_clips(units, args.write_speed, target_min, target_max)
log(f" 分组结果: {len(clips)} 个片段")
results = []
for clip_idx, (clip_units, est_dur) in enumerate(clips):
clip_name = f"clip_{clip_idx + 1:03d}"
log(f"\n 片段 {clip_idx + 1}/{len(clips)}: {clip_name} "
f"({len(clip_units)}词, 预计{est_dur:.1f}s)")
output_path = Path(args.output_dir).resolve() / f"{clip_name}.mp4"
output_path.parent.mkdir(parents=True, exist_ok=True)
ok, dur = process_clip(
input_path, output_path, clip_units,
write_speed=args.write_speed,
megaphone=megaphone,
loudness=args.loudness,
)
if ok:
log(f" -> {clip_name}.mp4 ({dur:.1f}s)", "ok")
results.append({"name": clip_name, "path": str(output_path), "duration": dur})
else:
log(f" -> 处理失败", "error")
# 汇总
print()
print("=" * 55)
if results:
log(f"处理完成! 共生成 {len(results)} 个片段:", "ok")
for r in results:
log(f" {r['name']}: {r['duration']:.1f}s → {r['path']}")
log(f"\n所有片段已保存至: {Path(args.output_dir).resolve()}")
else:
log("未能生成任何片段", "error")
sys.exit(1)
if __name__ == "__main__":
main()