104 lines
3.0 KiB
Python
104 lines
3.0 KiB
Python
# Meissa - A trainable and simple text to speech server
|
|
#
|
|
# Copyright (c) 2023 Sameer Rahmani <lxsameer@gnu.org>
|
|
#
|
|
# This program is free software; you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation, version 2.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
import asyncio
|
|
from pathlib import Path
|
|
|
|
# pylint: disable=redefined-outer-name, unused-argument
|
|
import numpy as np
|
|
import simpleaudio as sa
|
|
|
|
from TTS.utils.manage import ModelManager
|
|
from TTS.utils.synthesizer import Synthesizer
|
|
|
|
from meissa import utils
|
|
|
|
|
|
def to_wav_data(wav):
|
|
wav_norm = np.array(wav) * (32767 / max(0.01, np.max(np.abs(wav))))
|
|
return wav_norm.astype(np.int16)
|
|
|
|
|
|
def play(wav):
|
|
return sa.play_buffer(to_wav_data(wav), 1, 2, 22050)
|
|
|
|
|
|
def create_synth(ctx):
|
|
path = Path(__file__).parent / "speaker.models.json"
|
|
manager = ModelManager(path)
|
|
|
|
language_ids_file_path = None
|
|
vocoder_path = None
|
|
vocoder_config_path = None
|
|
encoder_path = None
|
|
encoder_config_path = None
|
|
|
|
model_path, config_path, model_item = manager.download_model(
|
|
utils.config(ctx).get("model_name", "tts_models/en/ljspeech/tacotron2-DDC")
|
|
)
|
|
vocoder_name = model_item["default_vocoder"]
|
|
vocoder_path, vocoder_config_path, _ = manager.download_model(vocoder_name)
|
|
|
|
# load models
|
|
return Synthesizer(
|
|
model_path,
|
|
config_path,
|
|
None,
|
|
language_ids_file_path,
|
|
vocoder_path,
|
|
vocoder_config_path,
|
|
encoder_path,
|
|
encoder_config_path,
|
|
False,
|
|
)
|
|
|
|
|
|
async def worker(ctx, job_queue, worker_stop, stop_event):
|
|
utils.info("Spawning a worker...")
|
|
synthesizer = create_synth(ctx)
|
|
speaker_idx = utils.config(ctx).get("SPEAKER_IDX")
|
|
|
|
while not worker_stop.is_set():
|
|
try:
|
|
job = await job_queue.get()
|
|
utils.info(f"Running job: {job}")
|
|
|
|
txt = job["text"]
|
|
speaker = job.get("speaker", speaker_idx)
|
|
wav = synthesizer.tts(txt, speaker, "None", None)
|
|
|
|
# uncomment for debugging
|
|
# synthesizer.save_wav(wav, "/tmp/blah.wav")
|
|
|
|
utils.info("Playing...")
|
|
|
|
try:
|
|
# wait until the audio finish playing or the stop
|
|
# event is set by the sever. E.g. via a command
|
|
# from client
|
|
player = play(wav)
|
|
|
|
while not stop_event.is_set() and player.is_playing:
|
|
await asyncio.sleep(0.05)
|
|
finally:
|
|
player.stop()
|
|
stop_event.clear()
|
|
|
|
job_queue.task_done()
|
|
except asyncio.QueueEmpty:
|
|
await asyncio.sleep(0.1)
|
|
|
|
utils.info("Worker stopped")
|