简介
FunASR是一个基础的端到端语音识别工具包,旨在架起语音识别学术研究和工业应用之间的桥梁。它支持工业级语音识别模型的训练和微调,方便研究人员和开发者更便捷地进行语音识别模型的研究和生产,促进语音识别生态的发展。其目标是让语音识别变得更有趣(ASR for Fun)!FunASR 提供了语音识别 (ASR)、语音活动检测 (VAD)、标点恢复、语言模型、说话人识别、说话人分段以及多说话人 ASR 等多种功能。并提供方便的脚本和教程,支持预训练模型的推理和微调。
安装
安装funasr之前,确保已经安装了下面依赖环境:
python>=3.8
torch>=1.13
torchaudio
在树莓派中,可参考以下命令顺序进行安装:
pip install jieba -i https://pypi.tuna.tsinghua.edu.cn/simple --break-system-packages 容易下载失败,最好是手动下载whl文件进行安装
pip install /home/work_lianxi/test_project/jieba-0.42.1-py3-none-any.whl --break-system-packages
pip install torch -i https://pypi.tuna.tsinghua.edu.cn/simple --break-system-packages
pip install torchaudio -i https://pypi.tuna.tsinghua.edu.cn/simple --break-system-packages
pip install --ignore-installed funasr -i https://pypi.tuna.tsinghua.edu.cn/simple --break-system-packages
funasr 开源仓库地址:https://gitcode.com/gh_mirrors/fun/FunASR?utm_source=csdn_github_accelerator&isLogin=1https://gitcode.com/gh_mirrors/fun/FunASR?utm_source=csdn_github_accelerator&isLogin=1
实时语音识别测试
开源仓库里面有个示例,可以直接复杂代码运行测试
from funasr import AutoModel
chunk_size = [0, 10, 5] #[0, 10, 5] 600ms, [0, 8, 4] 480ms
encoder_chunk_look_back = 4 #number of chunks to lookback for encoder self-attention
decoder_chunk_look_back = 1 #number of encoder chunks to lookback for decoder cross-attention
model = AutoModel(model="paraformer-zh-streaming")
import soundfile
import os
wav_file = os.path.join(model.model_path, "example/asr_example.wav")
speech, sample_rate = soundfile.read(wav_file)
chunk_stride = chunk_size[1] * 960 # 600ms
cache = {}
total_chunk_num = int(len((speech)-1)/chunk_stride+1)
for i in range(total_chunk_num):
speech_chunk = speech[i*chunk_stride:(i+1)*chunk_stride]
is_final = i == total_chunk_num - 1
res = model.generate(input=speech_chunk, cache=cache, is_final=is_final, chunk_size=chunk_size, encoder_chunk_look_back=encoder_chunk_look_back, decoder_chunk_look_back=decoder_chunk_look_back)
print(res)
整合 pyaudio+edge_tts 实现语音对话
这里还需要安装:edge-tts 可参考另一篇文章:https://blog.csdn.net/qq_32502511/article/details/147575690?spm=1001.2014.3001.5501https://blog.csdn.net/qq_32502511/article/details/147575690?spm=1001.2014.3001.5501
最后整合代码便可以实现一个语音对话机器人,效果如下图所示:
完整的代码如下:
from funasr import AutoModel
import numpy as np
import pyaudio
import queue
import threading
import urllib.parse
import requests
import asyncio
import pygame
import edge_tts
def play_mp3(file_name):
'''播放音频文件'''
# 初始化pygame的音频系统
pygame.init()
pygame.mixer.init()
# 加载音频文件
sound = pygame.mixer.Sound(file_name) # 替换为你的音频文件路径
# 设置音量
sound.set_volume(1) # 设置为100%的音量
# 播放音频
sound.play()
# 等待音频播放完毕
while pygame.mixer.get_busy():
pygame.time.Clock().tick(10)
def get_robot_message(msg):
'''获取免费的机器人回答消息'''
url = 'http://api.qingyunke.com/api.php?key=free&appid=0&msg={}'.format(urllib.parse.quote(msg))
html = requests.get(url)
return html.json()["content"]
def generate_audio(text: str, voice: str, output_file: str) -> None:
"""
传入文本、语音及输出文件名,生成语音并保存为音频文件
:param text: 需要合成的中文文本
:param voice: 使用的语音类型,如 'zh-CN-XiaoyiNeural'
:param output_file: 输出的音频文件名
"""
async def generate_audio_async() -> None:
"""异步生成语音"""
communicate = edge_tts.Communicate(text, voice)
await communicate.save(output_file)
# 异步执行生成音频
asyncio.run(generate_audio_async())
def speak_text(text):
generate_audio(text, "zh-CN-XiaoyiNeural", "测试语音.mp3")
play_mp3("测试语音.mp3") # 替换为你的音频文件路径
class FunasrManager(object):
def __init__(self):
self.engine = pyaudio.PyAudio()
self.model = AutoModel(model="paraformer-zh-streaming") #指定模型,可替换
def stream_decode(self,data_queue):
'''从队列里面循环获取音频进行识别'''
while True:
audio = data_queue.get()
res = self.model.generate(input=audio, cache={})
if res[0]["text"]:
print('识别结果:')
msg=res[0]["text"]
print("麦克风输入信息:",msg)
rebot_msg=get_robot_message(msg)
print("机器人回答信息:",rebot_msg)
speak_text(rebot_msg)
def start(self, chunk=1024, channels=1, rate=16000, format=pyaudio.paInt16, save_record_seconds=10):
'''
从麦克风录制指定时长的wav文件
:param save_file:
:param record_seconds:
:param chunk:
:param channels:
:param rate:
:param format:
:return:
'''
stream = self.engine.open(format=format, # 音频流wav格式
channels=channels, # 单声道
rate=rate, # 采样率16000
input=True,
frames_per_buffer=chunk)
data_queue = queue.Queue()
t = threading.Thread(target=self.stream_decode, args=(data_queue,))
t.daemon = True
t.start()
# 录制音频数据
while True:
frames = [] # 录制的音频流
for i in range(0, int(rate / chunk * save_record_seconds)):
data = stream.read(chunk, exception_on_overflow=False)
frames.append(data)
print('音频流长度:{}'.format(len(frames)))
audio = np.frombuffer(b''.join(frames), np.int16).flatten().astype(np.float32) / 32768.0
data_queue.put(audio)
def close(self):
self.engine.terminate()
if __name__ == '__main__':
fwm = FunasrManager()
fwm.start()
暂无评论内容