Whisperで文字起こしした電話内容を、OpenAI APIの gpt-3.5-turbo
モデルで要約させている。
会議議事録の要約などは、プロンプトの指定次第で高精度で行えるが、電話の場合、文章の体裁を取らず、会話内容もまとまりがなかったりで、精度が低い。
特に、話者がどちらかの情報が、Whisperでの文字起こし結果には含まれないため、双方の話している内容がまとめられたりと、改善要望が多い。
ちょうど gpt-4
がOpenAI APIで一般解禁されたので試してみたところ、入力テキストやプロンプトをいじらなくても精度向上したが、コストが gpt-3.5-turbo
の20倍ということで、稟議が通らなかった。
精度向上のため、Speaker Diarisation(話者ダイアリゼーション、話者識別、話者分離、話者分割)できないか、場合によってはWhisperからの乗り換えも検討しつつ調べたところ、Whisperの解析結果を元に、Google Colab上でそれっぽいことができたのでメモ。
方法の調査
調べると、Pythonを使った例が出てきた。 spkrec-ecapa-voxceleb
、 Pyannote
、 WhisperX
あたりが名前として出てくる。
spkrec-ecapa-voxceleb
spkrec-ecapa-voxceleb
は、SpeechBrainが公開しているツール。トレーニング済みのECAPA-TDNNモデルで話者の検証を行うのに必要なツールが含まれているとのこと。
VoxCeleb
はYoutubeを元にしたデータセットらしい。ECAPA-TDNNはモデルの模様。
Pyannote
今回のユースケースのように、音声に対して使用するのは pyannote.audio
。他にも動画用の pyannote.video
などあった。
PyTorchベースの、speaker diarization用ツールとのこと。
READMEに記載があるが、使用時に認証トークンが必要。その発行のためには、HuggingFaceのアカウントが必要となる。
WhisperX
Whisper + Pyannoteによる文字起こし&話者分離が広く使われているようで、それらをあらかじめ合わせてあるのがWhisperX。
Whisperの高速化モデルである、Faster-Whisperを使用しており、オリジナルのWhisperを使っているのではない模様。
Whisper APIを使用できたらよかったのだが、現時点では未対応の模様。また、Pyannoteの認証トークンはやはり必要となる。
話者ダイアリゼーション可能なサービス
Whisperから、話者ダイアリゼーションをしてくれるサービスへの乗り換えも検討。
GCPのSpeech-to-Textや、LINEのCLOVA Note、Nootaというサービスもあった。
方法の選定
PyannoteのためのHuggingFaceアカウント作成が、「(決済者が)聞いたことのないWebサイトだから」ということで稟議が通らず、外部サービスについてもそれぞれの理由でお見送りとなったため、消去法でspkrec-ecapa-voxcelebに決定。
サービスの中でも、特にNootaは要約やSalesforce連携(要約結果をカスタムオブジェクトに保存している)など、自前で実装したことに対応しており、かつ話者識別もできるということで乗り換えたかったが、解析対象となる電話の総時間がビジネスプランの最大月12,000分でも足りず、月額30,000円以上のエンタープライズプランになるため見送りとなった。
CLOVA Noteはオープンベータらしく、利用料は無料でスマートフォンアプリ経由であれば無制限に使用可能だが、ファイルアップロード形式だと月300分まで、データ利用を許可しても+300分のトータル月600分までなので見送り。
Speech-To-Textは軽く試してみたが、60秒以上の音声ファイルはGoogle Cloud Storageに保存する必要があるのと、単純にエラーが多かった(時折変換失敗するファイルが存在し、リトライしても失敗し続けてしまう)。また、精度もWhisperより低かったように感じる。
spkrec-ecapa-voxcelebによる話者ダイアリゼーションのGoogle Colaboratoryでの実装
以下で、話者のサンプル音声をもとにした話者ダイアリゼーションを行っている。
ざっくりでいいので、サンプル音声なしの例はないかと調べたところ、以下に例があった。
これらを参考に実装してみる。
音声ファイルとWhisper APIによる解析結果の用意
事前準備として、音声ファイルとそれをWhisper APIのtranscribeにかけた結果のJSONを保存しておく。
注意点として、transcribeする際のオプションとして response_format="verbose_json"
を指定しておく。発話ごとに音声ファイルを分割する必要があるが、 verbose_json
でないと、発話の開始・終了時間が結果に含まれない。
今回は、それぞれ sample.mp3
、 sample.json
という名前でColab ノートブックにアップロード。音声ファイルは約25分、128Kbpsで容量が25MB弱のものを使用。
Pythonパッケージのインストール
speechbrain
, pydub
, scikit-learn
をインストールする。
!pip install speechbrain pydub scikit-learn
Embeddingの計算
speechbrainのサンプルコードだと以下のようにしている。
import torchaudio from speechbrain.pretrained import EncoderClassifier classifier = EncoderClassifier.from_hparams( source="speechbrain/spkrec-ecapa-voxceleb", # GPUを使用するよう設定、デフォルトではCPUが使われる run_opts={"device": "cuda"} ) signal, fs = torchaudio.load("sample.mp3")
ただ、今回使用している音声ファイルを読み込ませると、 OutOfMemoryError: CUDA out of memory.
が発生。
以下のように、Whisperの解析結果をもとに、ファイルを分割して読み込ませる必要があった。
!mkdir audio-segments import json import torchaudio from pydub import AudioSegment from speechbrain.pretrained import EncoderClassifier def audio_segmentation(audio, segments, format): """ 音声ファイルをセグメントに分割して保存し、保存先のファイルパスを配列で返します。 """ segment_file_names = [] for segment in segments: # ミリ秒で指定する必要があるため1000倍 segment_start = segment["start"] * 1000 segment_end = segment["end"] * 1000 audio_segment = audio[segment_start:segment_end] segment_file_name = f"audio-segments/sample_{segment['id']:03}.mp3" audio_segment.export(segment_file_name, format=format) segment_file_names.append(segment_file_name) return segment_file_names def segments_to_embeddings(classifier, segment_file_names, format): """ 音声ファイルをEmbeddingに変換します。 """ embeddings = [] for segment_file_name in segment_file_names: signal, _ = torchaudio.load(segment_file_name, format=format) embeddings.append(classifier.encode_batch(signal)) return embeddings def main(): audio = AudioSegment.from_mp3("sample.mp3") format = "mp3" classifier = EncoderClassifier.from_hparams( source="speechbrain/spkrec-ecapa-voxceleb", run_opts={"device": "cuda"}, ) # Whisper解析結果の読み込み with open("sample.json") as f: whisper_result = json.load(f) segments = whisper_result["segments"] segment_file_names = audio_segmentation(audio, segments, format) embeddings = segments_to_embeddings(classifier, segment_file_names, format) main()
初回実行時は、モデルのダウンロードが実行される。
2回目以降では、25分程度のファイルに対して、実行時間は30秒程度。
ただ、処理時間のほとんどはファイル分割および保存によるもので、Embeddingでは3.5秒程度しかかからなかった。ファイルに保存せず、オンメモリで処理できないか調べたが、 torchaudio.load はファイルパスしか受け付けない模様。
クラスタリングの実施
続いて、クラスタリングを行う。
階層的クラスタリングによる教師なし次元削減として、 AgglomerativeClustering や FeatureAgglomeration が使える。今回は AgglomerativeClustering
を用いて実装。
import json import torchaudio from sklearn.cluster import AgglomerativeClustering from speechbrain.pretrained import EncoderClassifier def get_segment_file_names(segments): segment_file_names = [] for segment in segments: segment_file_name = f"audio-segments/sample_{segment['id']:03}.mp3" segment_file_names.append(segment_file_name) return segment_file_names def segments_to_embeddings(classifier, segment_file_names, format): """ 音声ファイルをEmbeddingに変換します。 """ embeddings = [] for segment_file_name in segment_file_names: signal, _ = torchaudio.load(segment_file_name, format=format) embedding = classifier.encode_batch(signal).detach().cpu().numpy() # embedding は(1, 1, 192) の3次元配列 embeddings.append(embedding.reshape(192,)) return embeddings def main(): format = "mp3" classifier = EncoderClassifier.from_hparams( source="speechbrain/spkrec-ecapa-voxceleb", run_opts={"device": "cuda"}, ) # Whisper解析結果の読み込み with open("sample.json") as f: whisper_result = json.load(f) segments = whisper_result["segments"] segment_file_names = get_segment_file_names(segments) embeddings = segments_to_embeddings(classifier, segment_file_names, format) # コンストラクタにクラスタ数を渡すこともできる。デフォルトは2 clustering = AgglomerativeClustering().fit(embeddings) with open("sample-speaker.txt", mode="w", encoding="utf-8") as f: current_speaker = clustering.labels_[0] spoken_words = [] for label, segment in zip(clustering.labels_, segments): if label != current_speaker: f.write(f"[話者{current_speaker + 1}] {''.join(spoken_words)}\n") spoken_words = [] current_speaker = label spoken_words.append(segment["text"]) main()
クラスタリングは一瞬で終わる。
コードをまとめる
後ほど使うことを想定し、作業ディレクトリ名の変数化、作業ディレクトリの作成/削除を追加などを行い、まとめてみる。
import json import logging import math import os import re import shutil import torchaudio from pydub import AudioSegment from sklearn.cluster import AgglomerativeClustering from speechbrain.pretrained import EncoderClassifier from time import perf_counter work_dir = "audio-segments" classifier = EncoderClassifier.from_hparams( source="speechbrain/spkrec-ecapa-voxceleb", run_opts={"device": "cuda"}, ) def audio_segmentation(audio, segments, format): """ 音声ファイルをセグメントに分割して保存し、保存先のファイルパスを配列で返します。 """ start_time = perf_counter() duration = audio.duration_seconds * 1000 print(f"duration: {duration}") segment_file_names = [] for segment in segments: segment_start = segment["start"] * 1000 segment_end = segment["end"] * 1000 audio_segment = audio[segment_start:segment_end] segment_file_name = f"{work_dir}/segment-{segment['id']:03}.{format}" audio_segment.export(segment_file_name, format=format) segment_file_names.append(segment_file_name) print(segment_file_names[-1]) return segment_file_names def segments_to_embeddings(segment_file_names, format): """ 音声ファイルをEmbeddingに変換します。 """ start_time = perf_counter() embeddings = [] for segment_file_name in segment_file_names: signal, _ = torchaudio.load(segment_file_name, format=format) embedding = classifier.encode_batch(signal).detach().cpu().numpy() embeddings.append(embedding.reshape(192,)) return embeddings def speaker_diarization(embeddings, segments): start_time = perf_counter() clustering = AgglomerativeClustering().fit(embeddings) speakers_file_path = "speakers.txt" with open("debug.txt", mode="w", encoding="utf-8") as f: current_speaker = clustering.labels_[0] statements_by_speaker = [] # 話者が連続して発言した内容 speaker_dialyzed_conversations = [] # 話者ダイアリゼーションされた会話 def join_statements(): # 「。」で結合。ファイルによっては「。、?」で text が終わるので、それぞれ調整 joined_statement = '。'.join( statements_by_speaker ).replace('。。', '。').replace('、。', '、').replace('?。', '? ') return f"話者{current_speaker + 1}:{joined_statement}" for label, segment in zip(clustering.labels_, segments): f.write(f"{label}: {segment['text']}\n") # 話者が交代した場合 if label != current_speaker: statement = "。".join(statements_by_speaker).replace("。。", "。") speaker_dialyzed_conversations.append(join_statements()) statements_by_speaker = [] current_speaker = label statements_by_speaker.append(segment["text"]) if statements_by_speaker: speaker_dialyzed_conversations.append(join_statements()) with open(speakers_file_path, mode="w", encoding="utf-8") as f: f.write("\n".join(speaker_dialyzed_conversations) + "\n") return speakers_file_path def speaker_diarization_by_whisper_json(whisper_json_file_path, audio_file_path): with open(whisper_json_file_path) as f: whisper_json = json.load(f) segments = whisper_json["segments"] audio_format = audio_file_path.split(".")[-1] audio = AudioSegment.from_file(audio_file_path, audio_format) segment_file_names = audio_segmentation(audio, segments, audio_format) embeddings = segments_to_embeddings(segment_file_names, audio_format) speakers_file_path = speaker_diarization(embeddings, segments) print(f"speakers_file_path: {speakers_file_path}") def main(whisper_json_file_path, audio_file_path): speaker_diarization_by_whisper_json(whisper_json_file_path, audio_file_path) if os.path.isdir(work_dir): shutil.rmtree(work_dir) os.mkdir(work_dir) main("sample.json", "sample.mp3")
振り返り
わからないなりに何とかなったが、さて実際に運用に乗せるとなると、どんな構成がいいか。
AWS上で動かす前提だが、EC2のGPUインスタンスを動かすと結構高いのと、CUDAなどの環境構築をしたことがない。
SageMakerが使えれば安くなりそうだが、あれは自分で学習させたモデルを使うものだしなぁ。