diff --git a/src/pipecat/serializers/vonage.py b/src/pipecat/serializers/vonage.py index 7d82a717ca..908a069c64 100644 --- a/src/pipecat/serializers/vonage.py +++ b/src/pipecat/serializers/vonage.py @@ -6,9 +6,13 @@ """Vonage Audio Connector WebSocket serializer for Pipecat.""" +import base64 import json +from enum import StrEnum +from typing import Any, Literal from loguru import logger +from pydantic import Field from pipecat.audio.dtmf.types import KeypadEntry from pipecat.audio.utils import create_stream_resampler @@ -25,6 +29,13 @@ from pipecat.serializers.base_serializer import FrameSerializer +class VonageAudioTransport(StrEnum): + """Audio transport formats supported by Vonage Audio Connector.""" + + BINARY = "binary" + JSON = "json" + + class VonageFrameSerializer(FrameSerializer): """Serializer for Vonage Video API Audio Connector WebSocket protocol. @@ -42,11 +53,26 @@ class InputParams(FrameSerializer.InputParams): vonage_sample_rate: Sample rate used by Vonage, defaults to 16000 Hz. Common values: 8000, 16000, 24000 Hz. sample_rate: Optional override for pipeline input sample rate. + audio_transport: Audio transport selected in the Vonage Audio Connector + configuration. ``binary`` sends raw PCM bytes. ``json`` sends JSON + text messages with base64-encoded PCM in the configured audio field. + encoding: Encoding used by the JSON audio transport. Currently only + ``base64`` is supported. + audio_field: Outbound JSON field name for base64-encoded audio. + receive_audio_field: Optional inbound JSON field name for base64-encoded + audio. Defaults to ``audio_field``. + static_fields: Additional fields to include in every outbound JSON audio + message. ignore_rtvi_messages: Inherited from base FrameSerializer, defaults to True. """ vonage_sample_rate: int = 16000 sample_rate: int | None = None + audio_transport: VonageAudioTransport = VonageAudioTransport.BINARY + encoding: Literal["base64"] = "base64" + audio_field: str = "audio" + receive_audio_field: str | None = None + static_fields: dict[str, Any] = Field(default_factory=dict) def __init__(self, params: InputParams | None = None): """Initialize the VonageFrameSerializer. @@ -60,6 +86,7 @@ def __init__(self, params: InputParams | None = None): self._vonage_sample_rate = self._params.vonage_sample_rate self._sample_rate = 0 # Pipeline input rate + self._receive_audio_field = self._params.receive_audio_field or self._params.audio_field self._input_resampler = create_stream_resampler() self._output_resampler = create_stream_resampler() @@ -81,7 +108,8 @@ async def serialize(self, frame: Frame) -> str | bytes | None: frame: The Pipecat frame to serialize. Returns: - Serialized data as string (JSON commands) or bytes (audio), or None if the frame isn't handled. + Serialized data as string (JSON commands) or bytes (audio), or None if the + frame isn't handled. """ if isinstance(frame, InterruptionFrame): # Clear the audio buffer to stop playback immediately @@ -98,7 +126,10 @@ async def serialize(self, frame: Frame) -> str | bytes | None: # Ignoring in case we don't have audio return None - # Vonage expects raw binary PCM data (not base64 encoded) + if self._params.audio_transport == VonageAudioTransport.JSON: + return self._serialize_json_audio(serialized_data) + + # By default, Vonage expects raw binary PCM data (not base64 encoded). return serialized_data elif isinstance(frame, (OutputTransportMessageFrame, OutputTransportMessageUrgentFrame)): if self.should_ignore_frame(frame): @@ -124,28 +155,15 @@ async def deserialize(self, data: str | bytes) -> Frame | None: # Check if this is binary audio data if isinstance(data, bytes): # Binary message = audio data (16-bit linear PCM) - payload = data - - # Input: Convert Vonage's PCM audio to pipeline sample rate - deserialized_data = await self._input_resampler.resample( - payload, - self._vonage_sample_rate, - self._sample_rate, - ) - if deserialized_data is None or len(deserialized_data) == 0: - # Ignoring in case we don't have audio - return None - - audio_frame = InputAudioRawFrame( - audio=deserialized_data, - num_channels=1, # Vonage uses mono audio - sample_rate=self._sample_rate, # Use the configured pipeline input rate - ) - return audio_frame + return await self._deserialize_audio(data) else: # Text message = JSON event try: message = json.loads(data) + audio_payload = message.get(self._receive_audio_field) + if audio_payload is not None: + return await self._deserialize_json_audio(audio_payload) + event = message.get("event") # Handle different event types @@ -182,3 +200,42 @@ async def deserialize(self, data: str | bytes) -> Frame | None: except json.JSONDecodeError: logger.warning(f"Failed to parse JSON message from Vonage: {data}") return None + + def _serialize_json_audio(self, data: bytes) -> str: + payload = base64.b64encode(data).decode("ascii") + message = { + **self._params.static_fields, + self._params.audio_field: payload, + } + return json.dumps(message, separators=(",", ":")) + + async def _deserialize_json_audio(self, audio_payload: Any) -> Frame | None: + if not isinstance(audio_payload, str): + logger.warning(f"Vonage JSON audio field '{self._receive_audio_field}' is not a string") + return None + + try: + payload = base64.b64decode(audio_payload) + except Exception as e: + logger.warning(f"Failed to decode Vonage base64 audio payload: {e}") + return None + + return await self._deserialize_audio(payload) + + async def _deserialize_audio(self, payload: bytes) -> Frame | None: + # Input: Convert Vonage's PCM audio to pipeline sample rate + deserialized_data = await self._input_resampler.resample( + payload, + self._vonage_sample_rate, + self._sample_rate, + ) + if deserialized_data is None or len(deserialized_data) == 0: + # Ignoring in case we don't have audio + return None + + audio_frame = InputAudioRawFrame( + audio=deserialized_data, + num_channels=1, # Vonage uses mono audio + sample_rate=self._sample_rate, # Use the configured pipeline input rate + ) + return audio_frame diff --git a/tests/test_vonage_serializer.py b/tests/test_vonage_serializer.py new file mode 100644 index 0000000000..e533f9b475 --- /dev/null +++ b/tests/test_vonage_serializer.py @@ -0,0 +1,88 @@ +# +# Copyright (c) 2024-2026, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +import base64 +import json +import unittest + +from pipecat.frames.frames import InputAudioRawFrame, OutputAudioRawFrame, StartFrame +from pipecat.serializers.vonage import VonageAudioTransport, VonageFrameSerializer + + +class TestVonageFrameSerializer(unittest.IsolatedAsyncioTestCase): + async def test_default_binary_audio_transport(self): + serializer = VonageFrameSerializer() + await serializer.setup(StartFrame(audio_in_sample_rate=16000)) + + audio = b"\x01\x00\x02\x00" + frame = OutputAudioRawFrame(audio=audio, sample_rate=16000, num_channels=1) + + payload = await serializer.serialize(frame) + + self.assertEqual(payload, audio) + + async def test_json_audio_transport_serializes_base64_audio(self): + serializer = VonageFrameSerializer( + VonageFrameSerializer.InputParams( + audio_transport=VonageAudioTransport.JSON, + static_fields={"kind": "media"}, + ) + ) + await serializer.setup(StartFrame(audio_in_sample_rate=16000)) + + audio = b"\x01\x00\x02\x00" + frame = OutputAudioRawFrame(audio=audio, sample_rate=16000, num_channels=1) + + payload = await serializer.serialize(frame) + + self.assertIsInstance(payload, str) + message = json.loads(payload) + self.assertEqual(message["kind"], "media") + self.assertEqual(base64.b64decode(message["audio"]), audio) + + async def test_json_audio_transport_deserializes_base64_audio(self): + serializer = VonageFrameSerializer( + VonageFrameSerializer.InputParams(audio_transport=VonageAudioTransport.JSON) + ) + await serializer.setup(StartFrame(audio_in_sample_rate=16000)) + + audio = b"\x01\x00\x02\x00" + payload = json.dumps({"audio": base64.b64encode(audio).decode("ascii")}) + + frame = await serializer.deserialize(payload) + + self.assertIsInstance(frame, InputAudioRawFrame) + self.assertEqual(frame.audio, audio) + self.assertEqual(frame.sample_rate, 16000) + self.assertEqual(frame.num_channels, 1) + + async def test_json_audio_transport_uses_custom_audio_fields(self): + serializer = VonageFrameSerializer( + VonageFrameSerializer.InputParams( + audio_transport="json", + audio_field="outbound_audio", + receive_audio_field="inbound_audio", + ) + ) + await serializer.setup(StartFrame(audio_in_sample_rate=16000)) + + audio = b"\x01\x00\x02\x00" + output_frame = OutputAudioRawFrame(audio=audio, sample_rate=16000, num_channels=1) + + payload = await serializer.serialize(output_frame) + message = json.loads(payload) + self.assertIn("outbound_audio", message) + self.assertNotIn("audio", message) + + input_payload = json.dumps({"inbound_audio": base64.b64encode(audio).decode("ascii")}) + input_frame = await serializer.deserialize(input_payload) + + self.assertIsInstance(input_frame, InputAudioRawFrame) + self.assertEqual(input_frame.audio, audio) + + +if __name__ == "__main__": + unittest.main()