Whisperで文字起こしした電話内容を、OpenAI APIの gpt-3.5-turbo
モデルで要約させている。
会議議事録の要約などは、プロンプトの指定次第で高精度で行えるが、電話の場合、文章の体裁を取らず、会話内容もまとまりがなかったりで、精度が低い。
特に、話者がどちらかの情報が、Whisperでの文字起こし結果には含まれないため、双方の話している内容がまとめられたりと、改善要望が多い。
ちょうど gpt-4
がOpenAI APIで一般解禁されたので試してみたところ、入力テキストやプロンプトをいじらなくても精度向上したが、コストが gpt-3.5-turbo
の20倍ということで、稟議が通らなかった。
精度向上のため、Speaker Diarisation(話者ダイアリゼーション、話者識別、話者分離、話者分割)できないか、場合によってはWhisperからの乗り換えも検討しつつ調べたところ、Whisperの解析結果を元に、Google Colab上でそれっぽいことができたのでメモ。
方法の調査
調べると、Pythonを使った例が出てきた。 spkrec-ecapa-voxceleb
、 Pyannote
、 WhisperX
あたりが名前として出てくる。
spkrec-ecapa-voxceleb
spkrec-ecapa-voxceleb
は、SpeechBrainが公開しているツール。トレーニング済みのECAPA-TDNNモデルで話者の検証を行うのに必要なツールが含まれているとのこと。
huggingface.co
VoxCeleb
はYoutubeを元にしたデータセットらしい。ECAPA-TDNNはモデルの模様。
Pyannote
今回のユースケースのように、音声に対して使用するのは pyannote.audio
。他にも動画用の pyannote.video
などあった。
github.com
PyTorchベースの、speaker diarization用ツールとのこと。
READMEに記載があるが、使用時に認証トークンが必要。その発行のためには、HuggingFaceのアカウントが必要となる。
WhisperX
Whisper + Pyannoteによる文字起こし&話者分離が広く使われているようで、それらをあらかじめ合わせてあるのがWhisperX。
github.com
Whisperの高速化モデルである、Faster-Whisperを使用しており、オリジナルのWhisperを使っているのではない模様。
Whisper APIを使用できたらよかったのだが、現時点では未対応の模様。また、Pyannoteの認証トークンはやはり必要となる。
話者ダイアリゼーション可能なサービス
Whisperから、話者ダイアリゼーションをしてくれるサービスへの乗り換えも検討。
GCPのSpeech-to-Textや、LINEのCLOVA Note、Nootaというサービスもあった。
方法の選定
PyannoteのためのHuggingFaceアカウント作成が、「(決済者が)聞いたことのないWebサイトだから」ということで稟議が通らず、外部サービスについてもそれぞれの理由でお見送りとなったため、消去法でspkrec-ecapa-voxcelebに決定。
サービスの中でも、特にNootaは要約やSalesforce連携(要約結果をカスタムオブジェクトに保存している)など、自前で実装したことに対応しており、かつ話者識別もできるということで乗り換えたかったが、解析対象となる電話の総時間がビジネスプランの最大月12,000分でも足りず、月額30,000円以上のエンタープライズプランになるため見送りとなった。
CLOVA Noteはオープンベータらしく、利用料は無料でスマートフォンアプリ経由であれば無制限に使用可能だが、ファイルアップロード形式だと月300分まで、データ利用を許可しても+300分のトータル月600分までなので見送り。
Speech-To-Textは軽く試してみたが、60秒以上の音声ファイルはGoogle Cloud Storageに保存する必要があるのと、単純にエラーが多かった(時折変換失敗するファイルが存在し、リトライしても失敗し続けてしまう)。また、精度もWhisperより低かったように感じる。
spkrec-ecapa-voxcelebによる話者ダイアリゼーションのGoogle Colaboratoryでの実装
以下で、話者のサンプル音声をもとにした話者ダイアリゼーションを行っている。
qiita.com
ざっくりでいいので、サンプル音声なしの例はないかと調べたところ、以下に例があった。
huggingface.co
これらを参考に実装してみる。
音声ファイルとWhisper APIによる解析結果の用意
事前準備として、音声ファイルとそれをWhisper APIのtranscribeにかけた結果のJSONを保存しておく。
注意点として、transcribeする際のオプションとして response_format="verbose_json"
を指定しておく。発話ごとに音声ファイルを分割する必要があるが、 verbose_json
でないと、発話の開始・終了時間が結果に含まれない。
今回は、それぞれ sample.mp3
、 sample.json
という名前でColab ノートブックにアップロード。音声ファイルは約25分、128Kbpsで容量が25MB弱のものを使用。
speechbrain
, pydub
, scikit-learn
をインストールする。
!pip install speechbrain pydub scikit-learn
Embeddingの計算
speechbrainのサンプルコードだと以下のようにしている。
import torchaudio
from speechbrain.pretrained import EncoderClassifier
classifier = EncoderClassifier.from_hparams(
source="speechbrain/spkrec-ecapa-voxceleb",
run_opts={"device": "cuda"}
)
signal, fs = torchaudio.load("sample.mp3")
ただ、今回使用している音声ファイルを読み込ませると、 OutOfMemoryError: CUDA out of memory.
が発生。
以下のように、Whisperの解析結果をもとに、ファイルを分割して読み込ませる必要があった。
!mkdir audio-segments
import json
import torchaudio
from pydub import AudioSegment
from speechbrain.pretrained import EncoderClassifier
def audio_segmentation(audio, segments, format):
"""
音声ファイルをセグメントに分割して保存し、保存先のファイルパスを配列で返します。
"""
segment_file_names = []
for segment in segments:
segment_start = segment["start"] * 1000
segment_end = segment["end"] * 1000
audio_segment = audio[segment_start:segment_end]
segment_file_name = f"audio-segments/sample_{segment['id']:03}.mp3"
audio_segment.export(segment_file_name, format=format)
segment_file_names.append(segment_file_name)
return segment_file_names
def segments_to_embeddings(classifier, segment_file_names, format):
"""
音声ファイルをEmbeddingに変換します。
"""
embeddings = []
for segment_file_name in segment_file_names:
signal, _ = torchaudio.load(segment_file_name, format=format)
embeddings.append(classifier.encode_batch(signal))
return embeddings
def main():
audio = AudioSegment.from_mp3("sample.mp3")
format = "mp3"
classifier = EncoderClassifier.from_hparams(
source="speechbrain/spkrec-ecapa-voxceleb",
run_opts={"device": "cuda"},
)
with open("sample.json") as f:
whisper_result = json.load(f)
segments = whisper_result["segments"]
segment_file_names = audio_segmentation(audio, segments, format)
embeddings = segments_to_embeddings(classifier, segment_file_names, format)
main()
初回実行時は、モデルのダウンロードが実行される。
2回目以降では、25分程度のファイルに対して、実行時間は30秒程度。
ただ、処理時間のほとんどはファイル分割および保存によるもので、Embeddingでは3.5秒程度しかかからなかった。ファイルに保存せず、オンメモリで処理できないか調べたが、 torchaudio.load はファイルパスしか受け付けない模様。
続いて、クラスタリングを行う。
階層的クラスタリングによる教師なし次元削減として、 AgglomerativeClustering や FeatureAgglomeration が使える。今回は AgglomerativeClustering
を用いて実装。
import json
import torchaudio
from sklearn.cluster import AgglomerativeClustering
from speechbrain.pretrained import EncoderClassifier
def get_segment_file_names(segments):
segment_file_names = []
for segment in segments:
segment_file_name = f"audio-segments/sample_{segment['id']:03}.mp3"
segment_file_names.append(segment_file_name)
return segment_file_names
def segments_to_embeddings(classifier, segment_file_names, format):
"""
音声ファイルをEmbeddingに変換します。
"""
embeddings = []
for segment_file_name in segment_file_names:
signal, _ = torchaudio.load(segment_file_name, format=format)
embedding = classifier.encode_batch(signal).detach().cpu().numpy()
embeddings.append(embedding.reshape(192,))
return embeddings
def main():
format = "mp3"
classifier = EncoderClassifier.from_hparams(
source="speechbrain/spkrec-ecapa-voxceleb",
run_opts={"device": "cuda"},
)
with open("sample.json") as f:
whisper_result = json.load(f)
segments = whisper_result["segments"]
segment_file_names = get_segment_file_names(segments)
embeddings = segments_to_embeddings(classifier, segment_file_names, format)
clustering = AgglomerativeClustering().fit(embeddings)
with open("sample-speaker.txt", mode="w", encoding="utf-8") as f:
current_speaker = clustering.labels_[0]
spoken_words = []
for label, segment in zip(clustering.labels_, segments):
if label != current_speaker:
f.write(f"[話者{current_speaker + 1}] {''.join(spoken_words)}\n")
spoken_words = []
current_speaker = label
spoken_words.append(segment["text"])
main()
クラスタリングは一瞬で終わる。
コードをまとめる
後ほど使うことを想定し、作業ディレクトリ名の変数化、作業ディレクトリの作成/削除を追加などを行い、まとめてみる。
import json
import logging
import math
import os
import re
import shutil
import torchaudio
from pydub import AudioSegment
from sklearn.cluster import AgglomerativeClustering
from speechbrain.pretrained import EncoderClassifier
from time import perf_counter
work_dir = "audio-segments"
classifier = EncoderClassifier.from_hparams(
source="speechbrain/spkrec-ecapa-voxceleb",
run_opts={"device": "cuda"},
)
def audio_segmentation(audio, segments, format):
"""
音声ファイルをセグメントに分割して保存し、保存先のファイルパスを配列で返します。
"""
start_time = perf_counter()
duration = audio.duration_seconds * 1000
print(f"duration: {duration}")
segment_file_names = []
for segment in segments:
segment_start = segment["start"] * 1000
segment_end = segment["end"] * 1000
audio_segment = audio[segment_start:segment_end]
segment_file_name = f"{work_dir}/segment-{segment['id']:03}.{format}"
audio_segment.export(segment_file_name, format=format)
segment_file_names.append(segment_file_name)
print(segment_file_names[-1])
return segment_file_names
def segments_to_embeddings(segment_file_names, format):
"""
音声ファイルをEmbeddingに変換します。
"""
start_time = perf_counter()
embeddings = []
for segment_file_name in segment_file_names:
signal, _ = torchaudio.load(segment_file_name, format=format)
embedding = classifier.encode_batch(signal).detach().cpu().numpy()
embeddings.append(embedding.reshape(192,))
return embeddings
def speaker_diarization(embeddings, segments):
start_time = perf_counter()
clustering = AgglomerativeClustering().fit(embeddings)
speakers_file_path = "speakers.txt"
with open("debug.txt", mode="w", encoding="utf-8") as f:
current_speaker = clustering.labels_[0]
statements_by_speaker = []
speaker_dialyzed_conversations = []
def join_statements():
joined_statement = '。'.join(
statements_by_speaker
).replace('。。', '。').replace('、。', '、').replace('?。', '? ')
return f"話者{current_speaker + 1}:{joined_statement}"
for label, segment in zip(clustering.labels_, segments):
f.write(f"{label}: {segment['text']}\n")
if label != current_speaker:
statement = "。".join(statements_by_speaker).replace("。。", "。")
speaker_dialyzed_conversations.append(join_statements())
statements_by_speaker = []
current_speaker = label
statements_by_speaker.append(segment["text"])
if statements_by_speaker:
speaker_dialyzed_conversations.append(join_statements())
with open(speakers_file_path, mode="w", encoding="utf-8") as f:
f.write("\n".join(speaker_dialyzed_conversations) + "\n")
return speakers_file_path
def speaker_diarization_by_whisper_json(whisper_json_file_path, audio_file_path):
with open(whisper_json_file_path) as f:
whisper_json = json.load(f)
segments = whisper_json["segments"]
audio_format = audio_file_path.split(".")[-1]
audio = AudioSegment.from_file(audio_file_path, audio_format)
segment_file_names = audio_segmentation(audio, segments, audio_format)
embeddings = segments_to_embeddings(segment_file_names, audio_format)
speakers_file_path = speaker_diarization(embeddings, segments)
print(f"speakers_file_path: {speakers_file_path}")
def main(whisper_json_file_path, audio_file_path):
speaker_diarization_by_whisper_json(whisper_json_file_path, audio_file_path)
if os.path.isdir(work_dir):
shutil.rmtree(work_dir)
os.mkdir(work_dir)
main("sample.json", "sample.mp3")
振り返り
わからないなりに何とかなったが、さて実際に運用に乗せるとなると、どんな構成がいいか。
AWS上で動かす前提だが、EC2のGPUインスタンスを動かすと結構高いのと、CUDAなどの環境構築をしたことがない。
SageMakerが使えれば安くなりそうだが、あれは自分で学習させたモデルを使うものだしなぁ。