Whisper APIで解析した結果から、Google Colabで話者ダイアリゼーション(話者の識別)を行う

Whisperで文字起こしした電話内容を、OpenAI APIgpt-3.5-turbo モデルで要約させている。

会議議事録の要約などは、プロンプトの指定次第で高精度で行えるが、電話の場合、文章の体裁を取らず、会話内容もまとまりがなかったりで、精度が低い。

特に、話者がどちらかの情報が、Whisperでの文字起こし結果には含まれないため、双方の話している内容がまとめられたりと、改善要望が多い。

ちょうど gpt-4 がOpenAI APIで一般解禁されたので試してみたところ、入力テキストやプロンプトをいじらなくても精度向上したが、コストが gpt-3.5-turbo の20倍ということで、稟議が通らなかった。

精度向上のため、Speaker Diarisation(話者ダイアリゼーション、話者識別、話者分離、話者分割)できないか、場合によってはWhisperからの乗り換えも検討しつつ調べたところ、Whisperの解析結果を元に、Google Colab上でそれっぽいことができたのでメモ。

方法の調査

調べると、Pythonを使った例が出てきた。 spkrec-ecapa-voxcelebPyannoteWhisperX あたりが名前として出てくる。

spkrec-ecapa-voxceleb

spkrec-ecapa-voxceleb は、SpeechBrainが公開しているツール。トレーニング済みのECAPA-TDNNモデルで話者の検証を行うのに必要なツールが含まれているとのこと。

huggingface.co

VoxCelebYoutubeを元にしたデータセットらしい。ECAPA-TDNNはモデルの模様。

Pyannote

今回のユースケースのように、音声に対して使用するのは pyannote.audio 。他にも動画用の pyannote.video などあった。

github.com

PyTorchベースの、speaker diarization用ツールとのこと。

READMEに記載があるが、使用時に認証トークンが必要。その発行のためには、HuggingFaceのアカウントが必要となる。

WhisperX

Whisper + Pyannoteによる文字起こし&話者分離が広く使われているようで、それらをあらかじめ合わせてあるのがWhisperX。

github.com

Whisperの高速化モデルである、Faster-Whisperを使用しており、オリジナルのWhisperを使っているのではない模様。

Whisper APIを使用できたらよかったのだが、現時点では未対応の模様。また、Pyannoteの認証トークンはやはり必要となる。

話者ダイアリゼーション可能なサービス

Whisperから、話者ダイアリゼーションをしてくれるサービスへの乗り換えも検討。

GCPSpeech-to-Textや、LINEのCLOVA NoteNootaというサービスもあった。

方法の選定

PyannoteのためのHuggingFaceアカウント作成が、「(決済者が)聞いたことのないWebサイトだから」ということで稟議が通らず、外部サービスについてもそれぞれの理由でお見送りとなったため、消去法でspkrec-ecapa-voxcelebに決定。

サービスの中でも、特にNootaは要約やSalesforce連携(要約結果をカスタムオブジェクトに保存している)など、自前で実装したことに対応しており、かつ話者識別もできるということで乗り換えたかったが、解析対象となる電話の総時間がビジネスプランの最大月12,000分でも足りず、月額30,000円以上のエンタープライズプランになるため見送りとなった。

CLOVA Noteはオープンベータらしく、利用料は無料でスマートフォンアプリ経由であれば無制限に使用可能だが、ファイルアップロード形式だと月300分まで、データ利用を許可しても+300分のトータル月600分までなので見送り。

Speech-To-Textは軽く試してみたが、60秒以上の音声ファイルはGoogle Cloud Storageに保存する必要があるのと、単純にエラーが多かった(時折変換失敗するファイルが存在し、リトライしても失敗し続けてしまう)。また、精度もWhisperより低かったように感じる。

spkrec-ecapa-voxcelebによる話者ダイアリゼーションのGoogle Colaboratoryでの実装

以下で、話者のサンプル音声をもとにした話者ダイアリゼーションを行っている。

qiita.com

ざっくりでいいので、サンプル音声なしの例はないかと調べたところ、以下に例があった。

huggingface.co

これらを参考に実装してみる。

音声ファイルとWhisper APIによる解析結果の用意

事前準備として、音声ファイルとそれをWhisper APIのtranscribeにかけた結果のJSONを保存しておく。

注意点として、transcribeする際のオプションとして response_format="verbose_json" を指定しておく。発話ごとに音声ファイルを分割する必要があるが、 verbose_json でないと、発話の開始・終了時間が結果に含まれない。

今回は、それぞれ sample.mp3sample.json という名前でColab ノートブックにアップロード。音声ファイルは約25分、128Kbpsで容量が25MB弱のものを使用。

Pythonパッケージのインストール

speechbrain , pydub , scikit-learn をインストールする。

!pip install speechbrain pydub scikit-learn

Embeddingの計算

speechbrainのサンプルコードだと以下のようにしている。

import torchaudio
from speechbrain.pretrained import EncoderClassifier

classifier = EncoderClassifier.from_hparams(
    source="speechbrain/spkrec-ecapa-voxceleb",
    # GPUを使用するよう設定、デフォルトではCPUが使われる
    run_opts={"device": "cuda"}
)

signal, fs = torchaudio.load("sample.mp3")

ただ、今回使用している音声ファイルを読み込ませると、 OutOfMemoryError: CUDA out of memory. が発生。

以下のように、Whisperの解析結果をもとに、ファイルを分割して読み込ませる必要があった。

!mkdir audio-segments

import json
import torchaudio

from pydub import AudioSegment
from speechbrain.pretrained import EncoderClassifier

def audio_segmentation(audio, segments, format):
    """
    音声ファイルをセグメントに分割して保存し、保存先のファイルパスを配列で返します。
    """
    segment_file_names = []

    for segment in segments:
        # ミリ秒で指定する必要があるため1000倍
        segment_start = segment["start"] * 1000
        segment_end = segment["end"] * 1000

        audio_segment = audio[segment_start:segment_end]
        segment_file_name = f"audio-segments/sample_{segment['id']:03}.mp3"
        audio_segment.export(segment_file_name, format=format)
        segment_file_names.append(segment_file_name)

    return segment_file_names


def segments_to_embeddings(classifier, segment_file_names, format):
    """
    音声ファイルをEmbeddingに変換します。
    """
    embeddings = []

    for segment_file_name in segment_file_names:
        signal, _ = torchaudio.load(segment_file_name, format=format)
        embeddings.append(classifier.encode_batch(signal))

    return embeddings


def main():
    audio = AudioSegment.from_mp3("sample.mp3")
    format = "mp3"
    classifier = EncoderClassifier.from_hparams(
        source="speechbrain/spkrec-ecapa-voxceleb",
        run_opts={"device": "cuda"},
    )

    # Whisper解析結果の読み込み
    with open("sample.json") as f:
        whisper_result = json.load(f)
        segments = whisper_result["segments"]

        segment_file_names = audio_segmentation(audio, segments, format)
        embeddings = segments_to_embeddings(classifier, segment_file_names, format)

main()

初回実行時は、モデルのダウンロードが実行される。

2回目以降では、25分程度のファイルに対して、実行時間は30秒程度。

ただ、処理時間のほとんどはファイル分割および保存によるもので、Embeddingでは3.5秒程度しかかからなかった。ファイルに保存せず、オンメモリで処理できないか調べたが、 torchaudio.load はファイルパスしか受け付けない模様。

クラスタリングの実施

続いて、クラスタリングを行う。

階層的クラスタリングによる教師なし次元削減として、 AgglomerativeClusteringFeatureAgglomeration が使える。今回は AgglomerativeClustering を用いて実装。

import json
import torchaudio

from sklearn.cluster import AgglomerativeClustering
from speechbrain.pretrained import EncoderClassifier


def get_segment_file_names(segments):
    segment_file_names = []

    for segment in segments:
        segment_file_name = f"audio-segments/sample_{segment['id']:03}.mp3"
        segment_file_names.append(segment_file_name)

    return segment_file_names


def segments_to_embeddings(classifier, segment_file_names, format):
    """
    音声ファイルをEmbeddingに変換します。
    """
    embeddings = []

    for segment_file_name in segment_file_names:
        signal, _ = torchaudio.load(segment_file_name, format=format)
        embedding = classifier.encode_batch(signal).detach().cpu().numpy()
        # embedding は(1, 1, 192) の3次元配列
        embeddings.append(embedding.reshape(192,))

    return embeddings


def main():
    format = "mp3"
    classifier = EncoderClassifier.from_hparams(
        source="speechbrain/spkrec-ecapa-voxceleb",
        run_opts={"device": "cuda"},
    )

    # Whisper解析結果の読み込み
    with open("sample.json") as f:
        whisper_result = json.load(f)
        segments = whisper_result["segments"]

        segment_file_names = get_segment_file_names(segments)
        embeddings = segments_to_embeddings(classifier, segment_file_names, format)

        # コンストラクタにクラスタ数を渡すこともできる。デフォルトは2
        clustering = AgglomerativeClustering().fit(embeddings)

        with open("sample-speaker.txt", mode="w", encoding="utf-8") as f:
            current_speaker = clustering.labels_[0]
            spoken_words = []

            for label, segment in zip(clustering.labels_, segments):
                if label != current_speaker:
                    f.write(f"[話者{current_speaker + 1}] {''.join(spoken_words)}\n")
                    spoken_words = []
                    current_speaker = label

                spoken_words.append(segment["text"])

main()

クラスタリングは一瞬で終わる。

コードをまとめる

後ほど使うことを想定し、作業ディレクトリ名の変数化、作業ディレクトリの作成/削除を追加などを行い、まとめてみる。

import json
import logging
import math
import os
import re
import shutil
import torchaudio

from pydub import AudioSegment
from sklearn.cluster import AgglomerativeClustering
from speechbrain.pretrained import EncoderClassifier
from time import perf_counter


work_dir = "audio-segments"

classifier = EncoderClassifier.from_hparams(
    source="speechbrain/spkrec-ecapa-voxceleb",
    run_opts={"device": "cuda"},
)


def audio_segmentation(audio, segments, format):
    """
    音声ファイルをセグメントに分割して保存し、保存先のファイルパスを配列で返します。
    """
    start_time = perf_counter()

    duration = audio.duration_seconds * 1000
    print(f"duration: {duration}")
    segment_file_names = []

    for segment in segments:
        segment_start = segment["start"] * 1000
        segment_end = segment["end"] * 1000

        audio_segment = audio[segment_start:segment_end]
        segment_file_name = f"{work_dir}/segment-{segment['id']:03}.{format}"
        audio_segment.export(segment_file_name, format=format)
        segment_file_names.append(segment_file_name)

    print(segment_file_names[-1])
    return segment_file_names


def segments_to_embeddings(segment_file_names, format):
    """
    音声ファイルをEmbeddingに変換します。
    """
    start_time = perf_counter()

    embeddings = []

    for segment_file_name in segment_file_names:
        signal, _ = torchaudio.load(segment_file_name, format=format)
        embedding = classifier.encode_batch(signal).detach().cpu().numpy()
        embeddings.append(embedding.reshape(192,))

    return embeddings


def speaker_diarization(embeddings, segments):
    start_time = perf_counter()

    clustering = AgglomerativeClustering().fit(embeddings)

    speakers_file_path = "speakers.txt"

    with open("debug.txt", mode="w", encoding="utf-8") as f:
        current_speaker = clustering.labels_[0]
        statements_by_speaker = [] # 話者が連続して発言した内容
        speaker_dialyzed_conversations = [] # 話者ダイアリゼーションされた会話

        def join_statements():
            # 「。」で結合。ファイルによっては「。、?」で text が終わるので、それぞれ調整
            joined_statement = '。'.join(
                statements_by_speaker
            ).replace('。。', '。').replace('、。', '、').replace('?。', '? ')
            return f"話者{current_speaker + 1}:{joined_statement}"

        for label, segment in zip(clustering.labels_, segments):
            f.write(f"{label}: {segment['text']}\n")

            # 話者が交代した場合
            if label != current_speaker:
                statement = "。".join(statements_by_speaker).replace("。。", "。")
                speaker_dialyzed_conversations.append(join_statements())
                statements_by_speaker = []
                current_speaker = label

            statements_by_speaker.append(segment["text"])

        if statements_by_speaker:
            speaker_dialyzed_conversations.append(join_statements())

    with open(speakers_file_path, mode="w", encoding="utf-8") as f:
        f.write("\n".join(speaker_dialyzed_conversations) + "\n")

    return speakers_file_path


def speaker_diarization_by_whisper_json(whisper_json_file_path, audio_file_path):
    with open(whisper_json_file_path) as f:
        whisper_json = json.load(f)
        segments = whisper_json["segments"]

        audio_format = audio_file_path.split(".")[-1]
        audio = AudioSegment.from_file(audio_file_path, audio_format)

        segment_file_names = audio_segmentation(audio, segments, audio_format)
        embeddings = segments_to_embeddings(segment_file_names, audio_format)
        speakers_file_path = speaker_diarization(embeddings, segments)
        print(f"speakers_file_path: {speakers_file_path}")


def main(whisper_json_file_path, audio_file_path):
    speaker_diarization_by_whisper_json(whisper_json_file_path, audio_file_path)


if os.path.isdir(work_dir):
    shutil.rmtree(work_dir)

os.mkdir(work_dir)
main("sample.json", "sample.mp3")

振り返り

わからないなりに何とかなったが、さて実際に運用に乗せるとなると、どんな構成がいいか。

AWS上で動かす前提だが、EC2のGPUインスタンスを動かすと結構高いのと、CUDAなどの環境構築をしたことがない。

SageMakerが使えれば安くなりそうだが、あれは自分で学習させたモデルを使うものだしなぁ。