comfyui-关于自定义插件-音频转文本
itomcoil 2024-12-23 11:08 36 浏览
效果图
实现方式:
工作流说明:
视频下载:通过上传url从网上下载视频
路径分支:下载后的视频分两个路径完成流转。
一个是视频转图片
一个是音频转文本
主要的节点类,如下图目录结构:
源码分析
音频转文本.py
import os
import torch
import whisper
import subprocess
import json
from datetime import timedelta
import time
import sys
class AudioTranscriber:
def __init__(self):
print("\n[初始化] 音频转文本节点被创建")
self.execution_count = 0
# 创建输出目录
self.output_dir = "transcribed_text"
os.makedirs(self.output_dir, exist_ok=True)
# 检查CUDA可用性
self.device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"[初始化] 使用设备: {self.device}")
# 加载whisper模型
self.model = None
self.current_model_size = None
def save_text_file(self, text, video_name):
"""保存转录文本到文件"""
try:
# 生成文件名
timestamp = time.strftime("%Y%m%d_%H%M%S")
filename = os.path.join(self.output_dir, f"{video_name}_{timestamp}.txt")
# 使用UTF-8编码保存文件
with open(filename, 'w', encoding='utf-8') as f:
f.write(text)
print(f"[保存] 文本已保存至: {filename}")
return filename
except Exception as e:
print(f"[错误] 保存文件失败: {str(e)}")
raise ValueError(f"保存文件失败: {str(e)}")
@classmethod
def INPUT_TYPES(s):
return {
"required": {
"视频路径_音频": ("STRING", {"forceInput": True, "multiline": False}),
"模型大小": (["tiny", "base", "small", "medium", "large"], {
"default": "base"
}),
},
"optional": {
"语言": (["auto", "Chinese", "English", "Japanese", "Korean"], {
"default": "Chinese"
}),
"时间戳": ("BOOLEAN", {"default": True}),
}
}
RETURN_TYPES = ("STRING",)
RETURN_NAMES = ("文本文件路径",)
FUNCTION = "transcribe_audio"
CATEGORY = "视频转文章"
OUTPUT_NODE = True
def verify_video_file(self, video_path):
"""验证视频文件"""
if not video_path:
raise ValueError("视频路径为空")
if not os.path.exists(video_path):
raise ValueError(f"视频文件不存在: {video_path}")
if not os.path.isfile(video_path):
raise ValueError(f"不是文件: {video_path}")
try:
size = os.path.getsize(video_path)
print(f"[验证] 视频文件大小: {size} 字节")
with open(video_path, 'rb') as f:
print("[验证] 视频文件可访问")
except Exception as e:
raise ValueError(f"视频文件访问失败: {str(e)}")
def extract_audio(self, video_path):
"""从视频中提取音频"""
try:
# 生成音频文件名
video_name = os.path.splitext(os.path.basename(video_path))[0]
# timestamp = time.strftime("%Y%m%d_%H%M%S")
audio_path = os.path.join(self.output_dir, f"{video_name}.wav")
print(f"[音频] 输出路径: {audio_path}")
# 构建FFmpeg命令
command = [
'ffmpeg', '-i', video_path,
'-vn', # 不处理视频
'-acodec', 'pcm_s16le', # 设置音频编码
'-ar', '16000', # 设置采样率
'-ac', '1', # 设置为单声道
'-y', # 覆盖已存在的文件
audio_path
]
print("[音频] 执行FFmpeg命令...")
print(f"[音频] 命令: {' '.join(command)}")
# 修改subprocess调用,指定编码
startupinfo = None
if os.name == 'nt': # Windows系统
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
# 执行命令
process = subprocess.Popen(
command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
startupinfo=startupinfo,
encoding='utf-8',
errors='replace' # 处理无法解码的字符
)
# 获取输出
stdout, stderr = process.communicate()
# 检查结果
if process.returncode == 0 and os.path.exists(audio_path):
size = os.path.getsize(audio_path)
print(f"[音频] 提取成功,大小: {size} 字节")
if stderr:
print(f"[警告] FFmpeg输出: {stderr}")
return audio_path
else:
print(f"[错误] FFmpeg返回码: {process.returncode}")
print(f"[错误] FFmpeg输出: {stderr}")
raise ValueError("音频提取失败")
except Exception as e:
print(f"[错误] 音频提取失败: {str(e)}")
print(f"[错误] 异常类型: {type(e)}")
print(f"[错误] 堆栈信息: ", sys.exc_info())
raise ValueError(f"音频提取失败: {str(e)}")
def load_model(self, model_size):
"""加载模型并处理设备选择"""
try:
print(f"[模型] 开始加载 {model_size} 模型...")
model = whisper.load_model(model_size)
# 如果有GPU,将模型移到GPU
if self.device == "cuda":
print("[模型] 将模型移至GPU")
model = model.to(self.device)
# 使用半精度以提高性能
if torch.cuda.is_available():
model = model.half()
else:
print("[模型] 在CPU上运行模型")
model = model.float() # 确保在CPU上使用FP32
print(f"[模型] 加载完成,使用设备: {self.device}")
return model
except Exception as e:
print(f"[错误] 模型加载失败: {str(e)}")
raise ValueError(f"模型加载失败: {str(e)}")
def transcribe_audio(self, 视频路径_音频, 模型大小, 语言="Chinese", 时间戳=True):
self.execution_count += 1
try:
print("\n========== 音频转文本开始 ==========")
print(f"[执行] 第 {self.execution_count} 次执行")
print(f"[时间] {time.strftime('%Y-%m-%d %H:%M:%S')}")
print(f"[设备] 使用 {self.device.upper()}")
# 验证输入
print("\n[输入参数]")
print(f"视频路径: {视频路径_音频}")
print(f"模型大小: {模型大小}")
print(f"语言: {语言}")
print(f"时间戳: {时间戳}")
if not 视频路径_音频:
raise ValueError("未接收到视频路径")
# 标准化路径
video_path = os.path.abspath(视频路径_音频)
print(f"[路径] 标准化后: {video_path}")
# 验证视频文件
print("\n[视频验证]")
self.verify_video_file(video_path)
# 提取音频
print("\n[音频提取]")
audio_path = self.extract_audio(video_path)
# 加载模型
print("\n[模型加载]")
if self.model is None or self.current_model_size != 模型大小:
self.model = self.load_model(模型大小)
self.current_model_size = 模型大小
else:
print("[模型] 使用已加载的模型")
# 转录音频
print("\n[转录]")
transcribe_options = {
"language": 语言 if 语言 != "auto" else None,
"task": "transcribe",
"fp16": torch.cuda.is_available() and self.device == "cuda"
}
# 添加性能监控
start_time = time.time()
result = self.model.transcribe(audio_path, **transcribe_options)
end_time = time.time()
print(f"[性能] 转录耗时: {end_time - start_time:.2f} 秒")
if self.device == "cuda":
print(f"[性能] GPU内存使用: {torch.cuda.max_memory_allocated() / 1024**2:.2f} MB")
# 处理结果
print("\n[处理结果]")
if 时间戳:
text_content = []
for segment in result["segments"]:
start = str(timedelta(seconds=round(segment["start"])))
end = str(timedelta(seconds=round(segment["end"])))
text_content.append(f"[{start} -> {end}] {segment['text']}")
text_content = "\n".join(text_content)
else:
text_content = result["text"]
# 保存文本到文件
video_name = os.path.splitext(os.path.basename(video_path))[0]
text_file_path = self.save_text_file(text_content, video_name)
print("\n[完成]")
print(f"音频文件: {audio_path}")
print(f"文本文件: {text_file_path}")
print("========== 音频转文本完成 ==========\n")
return (text_file_path,) # 返回文本文件路径
except Exception as e:
print("\n[异常信息]")
print(f"类型: {type(e)}")
print(f"描述: {str(e)}")
print("========== 音频转文本异常终止 ==========\n")
raise ValueError(f"音频转录失败: {str(e)}")
@classmethod
def IS_CHANGED(cls, 视频路径_音频, *args):
"""确保节点每次都执行"""
print(f"[IS_CHANGED] 输入: {视频路径_音频}")
import random
return random.random()
文本预览.py
# -*- coding: utf-8 -*-
import os
import sys
import time
import json
class TextPreview:
def __init__(self):
print("\n[初始化] 文本预览节点被创建")
print(f"[初始化] 时间: {time.strftime('%Y-%m-%d %H:%M:%S')}")
print(f"[初始化] 工作目录: {os.getcwd()}")
self.execution_count = 0
@classmethod
def INPUT_TYPES(s):
return {
"required": {
"文本文件路径": ("STRING", {"forceInput": True}),
},
"optional": {
"预览长度": ("INT", {
"default": 1000,
"min": 100,
"max": 5000,
"step": 500
}),
}
}
RETURN_TYPES = ()
FUNCTION = "preview_text"
CATEGORY = "视频转文章"
OUTPUT_NODE = True
def format_text(self, text, length):
"""格式化文本内容"""
try:
print(f"\n[格式化] 开始格式化文本")
print(f"[格式化] 原始文本长度: {len(text)}")
print(f"[格式化] 目标预览长度: {length}")
# 截取指定长度
preview = text[:length]
# 添加基本信息
info = [
"=== 文本预览 ===",
f"总字数: {len(text)}",
f"预览字数: {min(length, len(text))}",
f"文件: {os.path.basename(self.current_file)}",
"=" * 50,
preview,
"=" * 50
]
if len(text) > length:
info.append("(更多内容请查看原文件)")
formatted = "\n".join(info)
print(f"[格式化] 完成,格式化后长度: {len(formatted)}")
return formatted
except Exception as e:
print(f"[错误] 格式化失败: {str(e)}")
print(f"[错误] 异常类型: {type(e)}")
print(f"[错误] 堆栈信息: ", sys.exc_info())
return f"格式化失败: {str(e)}"
def preview_text(self, 文本文件路径, 预览长度=1000):
self.execution_count += 1
try:
print("\n========== 文本预览开始 ==========")
print(f"[执行] 第 {self.execution_count} 次执行")
print(f"[时间] {time.strftime('%Y-%m-%d %H:%M:%S')}")
print(f"[输入] 文件路径: {文本文件路径}")
print(f"[输入] 预览长度: {预览长度}")
# 验证文件
if not os.path.exists(文本文件路径):
raise ValueError(f"文件不存在: {文本文件路径}")
# 保存当前文件路径
self.current_file = 文本文件路径
print(f"[状态] 当前处理文件: {os.path.basename(self.current_file)}")
# 读取文件
print("[流程] 开始读取文件...")
with open(文本文件路径, 'r', encoding='utf-8') as f:
text = f.read()
file_size = os.path.getsize(文本文件路径)
print(f"[文件] 大小: {file_size} 字节")
print(f"[文件] 字符数: {len(text)}")
# 格式化文本
print("[流程] 开始格式化文本...")
formatted_text = self.format_text(text, 预览长度)
print("[流程] 格式化完成")
print("\n[预览输出]")
print("-" * 50)
print(formatted_text)
print("-" * 50)
print("========== 文本预览完成 ==========\n")
# 返回预览内容
return {
"ui": {
"value": formatted_text,
"type": "text",
"multiline": True,
"scroll": True,
"height": "500px"
}
}
except Exception as e:
error_msg = f"预览失败: {str(e)}"
print("\n[异常信息]")
print(f"[错误] 类型: {type(e)}")
print(f"[错误] 描述: {str(e)}")
print(f"[错误] 堆栈: ", sys.exc_info())
print("========== 文本预览异常终止 ==========\n")
return {
"ui": {
"value": error_msg,
"type": "text",
"multiline": True,
"color": "red"
}
}
@classmethod
def IS_CHANGED(cls, 文本文件路径, *args):
"""确保节点每次都执行"""
print(f"[IS_CHANGED] 被调用: {文本文件路径}")
import random
return random.random()
有不清楚的,在评论里留言,有时间我就会关注的。
分享我的bilibili视频:
【双色球自动采集发送到微信coze工作流使用教程】 https://www.bilibili.com/video/BV1YVqCYCEjV/?share_source=copy_web&vd_source=4f96fddc82e951a8caff5c703881b8da
相关推荐
- selenium(WEB自动化工具)
-
定义解释Selenium是一个用于Web应用程序测试的工具。Selenium测试直接运行在浏览器中,就像真正的用户在操作一样。支持的浏览器包括IE(7,8,9,10,11),MozillaF...
- 开发利器丨如何使用ELK设计微服务中的日志收集方案?
-
【摘要】微服务各个组件的相关实践会涉及到工具,本文将会介绍微服务日常开发的一些利器,这些工具帮助我们构建更加健壮的微服务系统,并帮助排查解决微服务系统中的问题与性能瓶颈等。我们将重点介绍微服务架构中...
- 高并发系统设计:应对每秒数万QPS的架构策略
-
当面试官问及"如何应对每秒几万QPS(QueriesPerSecond)"时,大概率是想知道你对高并发系统设计的理解有多少。本文将深入探讨从基础设施到应用层面的解决方案。01、理解...
- 2025 年每个 JavaScript 开发者都应该了解的功能
-
大家好,很高兴又见面了,我是"高级前端进阶",由我带着大家一起关注前端前沿、深入前端底层技术,大家一起进步,也欢迎大家关注、点赞、收藏、转发。1.Iteratorhelpers开发者...
- JavaScript Array 对象
-
Array对象Array对象用于在变量中存储多个值:varcars=["Saab","Volvo","BMW"];第一个数组元素的索引值为0,第二个索引值为1,以此类推。更多有...
- Gemini 2.5编程全球霸榜,谷歌重回AI王座,神秘模型曝光,奥特曼迎战
-
刚刚,Gemini2.5Pro编程登顶,6美元性价比碾压Claude3.7Sonnet。不仅如此,谷歌还暗藏着更强的编程模型Dragontail,这次是要彻底翻盘了。谷歌,彻底打了一场漂亮的翻...
- 动力节点最新JavaScript教程(高级篇),深入学习JavaScript
-
JavaScript是一种运行在浏览器中的解释型编程语言,它的解释器被称为JavaScript引擎,是浏览器的一部分,JavaScript广泛用于浏览器客户端编程,通常JavaScript脚本是通过嵌...
- 一文看懂Kiro,其 Spec工作流秒杀Cursor,可移植至Claude Code
-
当Cursor的“即兴编程”开始拖累项目质量,AWS新晋IDEKiro以Spec工作流打出“先规范后编码”的系统工程思维:需求-设计-任务三件套一次生成,文档与代码同步落地,复杂项目不...
- 「晚安·好梦」努力只能及格,拼命才能优秀
-
欢迎光临,浏览之前点击上面的音乐放松一下心情吧!喜欢的话给小编一个关注呀!Effortscanonlypass,anddesperatelycanbeexcellent.努力只能及格...
- JavaScript 中 some 与 every 方法的区别是什么?
-
大家好,很高兴又见面了,我是姜茶的编程笔记,我们一起学习前端相关领域技术,共同进步,也欢迎大家关注、点赞、收藏、转发,您的支持是我不断创作的动力在JavaScript中,Array.protot...
- 10个高效的Python爬虫框架,你用过几个?
-
小型爬虫需求,requests库+bs4库就能解决;大型爬虫数据,尤其涉及异步抓取、内容管理及后续扩展等功能时,就需要用到爬虫框架了。下面介绍了10个爬虫框架,大家可以学习使用!1.Scrapysc...
- 12个高效的Python爬虫框架,你用过几个?
-
实现爬虫技术的编程环境有很多种,Java、Python、C++等都可以用来爬虫。但很多人选择Python来写爬虫,为什么呢?因为Python确实很适合做爬虫,丰富的第三方库十分强大,简单几行代码便可实...
- pip3 install pyspider报错问题解决
-
运行如下命令报错:>>>pip3installpyspider观察上面的报错问题,需要安装pycurl。是到这个网址:http://www.lfd.uci.edu/~gohlke...
- PySpider框架的使用
-
PysiderPysider是一个国人用Python编写的、带有强大的WebUI的网络爬虫系统,它支持多种数据库、任务监控、项目管理、结果查看、URL去重等强大的功能。安装pip3inst...
- 「机器学习」神经网络的激活函数、并通过python实现激活函数
-
神经网络的激活函数、并通过python实现whatis激活函数感知机的网络结构如下:左图中,偏置b没有被画出来,如果要表示出b,可以像右图那样做。用数学式来表示感知机:上面这个数学式子可以被改写:...
- 一周热门
- 最近发表
- 标签列表
-
- ps图案在哪里 (33)
- super().__init__ (33)
- python 获取日期 (34)
- 0xa (36)
- super().__init__()详解 (33)
- python安装包在哪里找 (33)
- linux查看python版本信息 (35)
- python怎么改成中文 (35)
- php文件怎么在浏览器运行 (33)
- eval在python中的意思 (33)
- python安装opencv库 (35)
- python div (34)
- sticky css (33)
- python中random.randint()函数 (34)
- python去掉字符串中的指定字符 (33)
- python入门经典100题 (34)
- anaconda安装路径 (34)
- yield和return的区别 (33)
- 1到10的阶乘之和是多少 (35)
- python安装sklearn库 (33)
- dom和bom区别 (33)
- js 替换指定位置的字符 (33)
- python判断元素是否存在 (33)
- sorted key (33)
- shutil.copy() (33)