百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术文章 > 正文

comfyui-关于自定义插件-音频转文本

itomcoil 2024-12-23 11:08 30 浏览

效果图


实现方式:

工作流说明:

视频下载:通过上传url从网上下载视频

路径分支:下载后的视频分两个路径完成流转。

一个是视频转图片

一个是音频转文本

主要的节点类,如下图目录结构:


源码分析

音频转文本.py

import os
import torch
import whisper
import subprocess
import json
from datetime import timedelta
import time
import sys

class AudioTranscriber:
    def __init__(self):
        print("\n[初始化] 音频转文本节点被创建")
        self.execution_count = 0
        
        # 创建输出目录
        self.output_dir = "transcribed_text"
        os.makedirs(self.output_dir, exist_ok=True)
        
        # 检查CUDA可用性
        self.device = "cuda" if torch.cuda.is_available() else "cpu"
        print(f"[初始化] 使用设备: {self.device}")
        
        # 加载whisper模型
        self.model = None
        self.current_model_size = None

    def save_text_file(self, text, video_name):
        """保存转录文本到文件"""
        try:
            # 生成文件名
            timestamp = time.strftime("%Y%m%d_%H%M%S")
            filename = os.path.join(self.output_dir, f"{video_name}_{timestamp}.txt")
            
            # 使用UTF-8编码保存文件
            with open(filename, 'w', encoding='utf-8') as f:
                f.write(text)
            
            print(f"[保存] 文本已保存至: {filename}")
            return filename
        except Exception as e:
            print(f"[错误] 保存文件失败: {str(e)}")
            raise ValueError(f"保存文件失败: {str(e)}")

    @classmethod
    def INPUT_TYPES(s):
        return {
            "required": {
                "视频路径_音频": ("STRING", {"forceInput": True, "multiline": False}),
                "模型大小": (["tiny", "base", "small", "medium", "large"], {
                    "default": "base"
                }),
            },
            "optional": {
                "语言": (["auto", "Chinese", "English", "Japanese", "Korean"], {
                    "default": "Chinese"
                }),
                "时间戳": ("BOOLEAN", {"default": True}),
            }
        }

    RETURN_TYPES = ("STRING",)
    RETURN_NAMES = ("文本文件路径",)
    FUNCTION = "transcribe_audio"
    CATEGORY = "视频转文章"
    OUTPUT_NODE = True

    def verify_video_file(self, video_path):
        """验证视频文件"""
        if not video_path:
            raise ValueError("视频路径为空")
        if not os.path.exists(video_path):
            raise ValueError(f"视频文件不存在: {video_path}")
        if not os.path.isfile(video_path):
            raise ValueError(f"不是文件: {video_path}")
        try:
            size = os.path.getsize(video_path)
            print(f"[验证] 视频文件大小: {size} 字节")
            with open(video_path, 'rb') as f:
                print("[验证] 视频文件可访问")
        except Exception as e:
            raise ValueError(f"视频文件访问失败: {str(e)}")

    def extract_audio(self, video_path):
        """从视频中提取音频"""
        try:
            # 生成音频文件名
            video_name = os.path.splitext(os.path.basename(video_path))[0]
            # timestamp = time.strftime("%Y%m%d_%H%M%S")
            audio_path = os.path.join(self.output_dir, f"{video_name}.wav")
            
            print(f"[音频] 输出路径: {audio_path}")
            
            # 构建FFmpeg命令
            command = [
                'ffmpeg', '-i', video_path,
                '-vn',  # 不处理视频
                '-acodec', 'pcm_s16le',  # 设置音频编码
                '-ar', '16000',  # 设置采样率
                '-ac', '1',  # 设置为单声道
                '-y',  # 覆盖已存在的文件
                audio_path
            ]
            
            print("[音频] 执行FFmpeg命令...")
            print(f"[音频] 命令: {' '.join(command)}")
            
            # 修改subprocess调用,指定编码
            startupinfo = None
            if os.name == 'nt':  # Windows系统
                startupinfo = subprocess.STARTUPINFO()
                startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
            
            # 执行命令
            process = subprocess.Popen(
                command,
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE,
                startupinfo=startupinfo,
                encoding='utf-8',
                errors='replace'  # 处理无法解码的字符
            )
            
            # 获取输出
            stdout, stderr = process.communicate()
            
            # 检查结果
            if process.returncode == 0 and os.path.exists(audio_path):
                size = os.path.getsize(audio_path)
                print(f"[音频] 提取成功,大小: {size} 字节")
                if stderr:
                    print(f"[警告] FFmpeg输出: {stderr}")
                return audio_path
            else:
                print(f"[错误] FFmpeg返回码: {process.returncode}")
                print(f"[错误] FFmpeg输出: {stderr}")
                raise ValueError("音频提取失败")
                
        except Exception as e:
            print(f"[错误] 音频提取失败: {str(e)}")
            print(f"[错误] 异常类型: {type(e)}")
            print(f"[错误] 堆栈信息: ", sys.exc_info())
            raise ValueError(f"音频提取失败: {str(e)}")

    def load_model(self, model_size):
        """加载模型并处理设备选择"""
        try:
            print(f"[模型] 开始加载 {model_size} 模型...")
            model = whisper.load_model(model_size)
            
            # 如果有GPU,将模型移到GPU
            if self.device == "cuda":
                print("[模型] 将模型移至GPU")
                model = model.to(self.device)
                # 使用半精度以提高性能
                if torch.cuda.is_available():
                    model = model.half()
            else:
                print("[模型] 在CPU上运行模型")
                model = model.float()  # 确保在CPU上使用FP32
            
            print(f"[模型] 加载完成,使用设备: {self.device}")
            return model
            
        except Exception as e:
            print(f"[错误] 模型加载失败: {str(e)}")
            raise ValueError(f"模型加载失败: {str(e)}")

    def transcribe_audio(self, 视频路径_音频, 模型大小, 语言="Chinese", 时间戳=True):
        self.execution_count += 1
        try:
            print("\n========== 音频转文本开始 ==========")
            print(f"[执行] 第 {self.execution_count} 次执行")
            print(f"[时间] {time.strftime('%Y-%m-%d %H:%M:%S')}")
            print(f"[设备] 使用 {self.device.upper()}")
            
            # 验证输入
            print("\n[输入参数]")
            print(f"视频路径: {视频路径_音频}")
            print(f"模型大小: {模型大小}")
            print(f"语言: {语言}")
            print(f"时间戳: {时间戳}")
            
            if not 视频路径_音频:
                raise ValueError("未接收到视频路径")

            # 标准化路径
            video_path = os.path.abspath(视频路径_音频)
            print(f"[路径] 标准化后: {video_path}")
            
            # 验证视频文件
            print("\n[视频验证]")
            self.verify_video_file(video_path)
            
            # 提取音频
            print("\n[音频提取]")
            audio_path = self.extract_audio(video_path)
            
            # 加载模型
            print("\n[模型加载]")
            if self.model is None or self.current_model_size != 模型大小:
                self.model = self.load_model(模型大小)
                self.current_model_size = 模型大小
            else:
                print("[模型] 使用已加载的模型")
            
            # 转录音频
            print("\n[转录]")
            transcribe_options = {
                "language": 语言 if 语言 != "auto" else None,
                "task": "transcribe",
                "fp16": torch.cuda.is_available() and self.device == "cuda"
            }
            
            # 添加性能监控
            start_time = time.time()
            result = self.model.transcribe(audio_path, **transcribe_options)
            end_time = time.time()
            
            print(f"[性能] 转录耗时: {end_time - start_time:.2f} 秒")
            if self.device == "cuda":
                print(f"[性能] GPU内存使用: {torch.cuda.max_memory_allocated() / 1024**2:.2f} MB")
            
            # 处理结果
            print("\n[处理结果]")
            if 时间戳:
                text_content = []
                for segment in result["segments"]:
                    start = str(timedelta(seconds=round(segment["start"])))
                    end = str(timedelta(seconds=round(segment["end"])))
                    text_content.append(f"[{start} -> {end}] {segment['text']}")
                text_content = "\n".join(text_content)
            else:
                text_content = result["text"]
            
            # 保存文本到文件
            video_name = os.path.splitext(os.path.basename(video_path))[0]
            text_file_path = self.save_text_file(text_content, video_name)
            
            print("\n[完成]")
            print(f"音频文件: {audio_path}")
            print(f"文本文件: {text_file_path}")
            print("========== 音频转文本完成 ==========\n")
            
            return (text_file_path,)  # 返回文本文件路径

        except Exception as e:
            print("\n[异常信息]")
            print(f"类型: {type(e)}")
            print(f"描述: {str(e)}")
            print("========== 音频转文本异常终止 ==========\n")
            raise ValueError(f"音频转录失败: {str(e)}")

    @classmethod
    def IS_CHANGED(cls, 视频路径_音频, *args):
        """确保节点每次都执行"""
        print(f"[IS_CHANGED] 输入: {视频路径_音频}")
        import random
        return random.random()
 

文本预览.py

# -*- coding: utf-8 -*-
import os
import sys
import time
import json

class TextPreview:
    def __init__(self):
        print("\n[初始化] 文本预览节点被创建")
        print(f"[初始化] 时间: {time.strftime('%Y-%m-%d %H:%M:%S')}")
        print(f"[初始化] 工作目录: {os.getcwd()}")
        self.execution_count = 0

    @classmethod
    def INPUT_TYPES(s):
        return {
            "required": {
                "文本文件路径": ("STRING", {"forceInput": True}),
            },
            "optional": {
                "预览长度": ("INT", {
                    "default": 1000,
                    "min": 100,
                    "max": 5000,
                    "step": 500
                }),
            }
        }

    RETURN_TYPES = ()
    FUNCTION = "preview_text"
    CATEGORY = "视频转文章"
    OUTPUT_NODE = True

    def format_text(self, text, length):
        """格式化文本内容"""
        try:
            print(f"\n[格式化] 开始格式化文本")
            print(f"[格式化] 原始文本长度: {len(text)}")
            print(f"[格式化] 目标预览长度: {length}")
            
            # 截取指定长度
            preview = text[:length]
            
            # 添加基本信息
            info = [
                "=== 文本预览 ===",
                f"总字数: {len(text)}",
                f"预览字数: {min(length, len(text))}",
                f"文件: {os.path.basename(self.current_file)}",
                "=" * 50,
                preview,
                "=" * 50
            ]
            
            if len(text) > length:
                info.append("(更多内容请查看原文件)")
            
            formatted = "\n".join(info)
            print(f"[格式化] 完成,格式化后长度: {len(formatted)}")
            return formatted
            
        except Exception as e:
            print(f"[错误] 格式化失败: {str(e)}")
            print(f"[错误] 异常类型: {type(e)}")
            print(f"[错误] 堆栈信息: ", sys.exc_info())
            return f"格式化失败: {str(e)}"

    def preview_text(self, 文本文件路径, 预览长度=1000):
        self.execution_count += 1
        try:
            print("\n========== 文本预览开始 ==========")
            print(f"[执行] 第 {self.execution_count} 次执行")
            print(f"[时间] {time.strftime('%Y-%m-%d %H:%M:%S')}")
            print(f"[输入] 文件路径: {文本文件路径}")
            print(f"[输入] 预览长度: {预览长度}")
            
            # 验证文件
            if not os.path.exists(文本文件路径):
                raise ValueError(f"文件不存在: {文本文件路径}")
            
            # 保存当前文件路径
            self.current_file = 文本文件路径
            print(f"[状态] 当前处理文件: {os.path.basename(self.current_file)}")
            
            # 读取文件
            print("[流程] 开始读取文件...")
            with open(文本文件路径, 'r', encoding='utf-8') as f:
                text = f.read()
            file_size = os.path.getsize(文本文件路径)
            print(f"[文件] 大小: {file_size} 字节")
            print(f"[文件] 字符数: {len(text)}")
            
            # 格式化文本
            print("[流程] 开始格式化文本...")
            formatted_text = self.format_text(text, 预览长度)
            print("[流程] 格式化完成")
            
            print("\n[预览输出]")
            print("-" * 50)
            print(formatted_text)
            print("-" * 50)
            
            print("========== 文本预览完成 ==========\n")
            
            # 返回预览内容
            return {
                "ui": {
                    "value": formatted_text,
                    "type": "text",
                    "multiline": True,
                    "scroll": True,
                    "height": "500px"
                }
            }

        except Exception as e:
            error_msg = f"预览失败: {str(e)}"
            print("\n[异常信息]")
            print(f"[错误] 类型: {type(e)}")
            print(f"[错误] 描述: {str(e)}")
            print(f"[错误] 堆栈: ", sys.exc_info())
            print("========== 文本预览异常终止 ==========\n")
            return {
                "ui": {
                    "value": error_msg,
                    "type": "text",
                    "multiline": True,
                    "color": "red"
                }
            }

    @classmethod
    def IS_CHANGED(cls, 文本文件路径, *args):
        """确保节点每次都执行"""
        print(f"[IS_CHANGED] 被调用: {文本文件路径}")
        import random
        return random.random()

有不清楚的,在评论里留言,有时间我就会关注的。

分享我的bilibili视频:

【双色球自动采集发送到微信coze工作流使用教程】 https://www.bilibili.com/video/BV1YVqCYCEjV/?share_source=copy_web&vd_source=4f96fddc82e951a8caff5c703881b8da

相关推荐

Python Qt GUI设计:将UI文件转换Python文件三种妙招(基础篇—2)

在开始本文之前提醒各位朋友,Python记得安装PyQt5库文件,Python语言功能很强,但是Python自带的GUI开发库Tkinter功能很弱,难以开发出专业的GUI。好在Python语言的开放...

Connect 2.0来了,还有Nuke和Maya新集成

ftrackConnect2.0现在可以下载了--重新设计的桌面应用程序,使用户能够将ftrackStudio与创意应用程序集成,发布资产等。这个新版本的发布中还有两个Nuke和Maya新集成,...

Magicgui:不会GUI编程也能轻松构建Python GUI应用

什么是MagicguiMagicgui是一个Python库,它允许开发者仅凭简单的类型注解就能快速构建图形用户界面(GUI)应用程序。这个库基于Napari项目,利用了Python的强大类型系统,使得...

Python入坑系列:桌面GUI开发之Pyside6

阅读本章之后,你可以掌握这些内容:Pyside6的SignalsandSlots、Envents的作用,如何使用?PySide6的Window、DialogsandAlerts、Widgets...

Python入坑系列-一起认识Pyside6 designer可拖拽桌面GUI

通过本文章,你可以了解一下内容:如何安装和使用Pyside6designerdesigner有哪些的特性通过designer如何转成python代码以前以为Pyside6designer需要在下载...

pyside2的基础界面(pyside2显示图片)

今天我们来学习pyside2的基础界面没有安装过pyside2的小伙伴可以看主页代码效果...

Python GUI开发:打包PySide2应用(python 打包pyc)

之前的文章我们介绍了怎么使用PySide2来开发一个简单PythonGUI应用。这次我们来将上次完成的代码打包。我们使用pyinstaller。注意,pyinstaller默认会将所有安装的pack...

使用PySide2做窗体,到底是怎么个事?看这个能不能搞懂

PySide2是Qt框架的Python绑定,允许你使用Python创建功能强大的跨平台GUI应用程序。PySide2的基本使用方法:安装PySide2pipinstallPy...

pycharm中conda解释器无法配置(pycharm安装的解释器不能用)

之前用的好好的pycharm正常配置解释器突然不能用了?可以显示有这个环境然后确认后可以conda正在配置解释器,但是进度条结束后还是不成功!!试过了pycharm重启,pycharm重装,anaco...

Conda使用指南:从基础操作到Llama-Factory大模型微调环境搭建

Conda虚拟环境在Linux下的全面使用指南:从基础操作到Llama-Factory大模型微调环境搭建在当今的AI开发与数据分析领域,conda虚拟环境已成为Linux系统下管理项目依赖的标配工具。...

Python操作系统资源管理与监控(python调用资源管理器)

在现代计算环境中,对操作系统资源的有效管理和监控是确保应用程序性能和系统稳定性的关键。Python凭借其丰富的标准库和第三方扩展,提供了强大的工具来实现这一目标。本文将探讨Python在操作系统资源管...

本地部署开源版Manus+DeepSeek创建自己的AI智能体

1、下载安装Anaconda,设置conda环境变量,并使用conda创建python3.12虚拟环境。2、从OpenManus仓库下载代码,并安装需要的依赖。3、使用Ollama加载本地DeepSe...

一文教会你,搭建AI模型训练与微调环境,包学会的!

一、硬件要求显卡配置:需要Nvidia显卡,至少配备8G显存,且专用显存与共享显存之和需大于20G。二、环境搭建步骤1.设置文件存储路径非系统盘存储:建议将非安装版的环境文件均存放在非系统盘(如E盘...

使用scikit-learn为PyTorch 模型进行超参数网格搜索

scikit-learn是Python中最好的机器学习库,而PyTorch又为我们构建模型提供了方便的操作,能否将它们的优点整合起来呢?在本文中,我们将介绍如何使用scikit-learn中的网格搜...

如何Keras自动编码器给极端罕见事件分类

全文共7940字,预计学习时长30分钟或更长本文将以一家造纸厂的生产为例,介绍如何使用自动编码器构建罕见事件分类器。现实生活中罕见事件的数据集:背景1.什么是极端罕见事件?在罕见事件问题中,数据集是...