Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 77 additions & 20 deletions src/pipecat/serializers/vonage.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,13 @@

"""Vonage Audio Connector WebSocket serializer for Pipecat."""

import base64
import json
from enum import StrEnum
from typing import Any, Literal

from loguru import logger
from pydantic import Field

from pipecat.audio.dtmf.types import KeypadEntry
from pipecat.audio.utils import create_stream_resampler
Expand All @@ -25,6 +29,13 @@
from pipecat.serializers.base_serializer import FrameSerializer


class VonageAudioTransport(StrEnum):
"""Audio transport formats supported by Vonage Audio Connector."""

BINARY = "binary"
JSON = "json"


class VonageFrameSerializer(FrameSerializer):
"""Serializer for Vonage Video API Audio Connector WebSocket protocol.

Expand All @@ -42,11 +53,26 @@ class InputParams(FrameSerializer.InputParams):
vonage_sample_rate: Sample rate used by Vonage, defaults to 16000 Hz.
Common values: 8000, 16000, 24000 Hz.
sample_rate: Optional override for pipeline input sample rate.
audio_transport: Audio transport selected in the Vonage Audio Connector
configuration. ``binary`` sends raw PCM bytes. ``json`` sends JSON
text messages with base64-encoded PCM in the configured audio field.
encoding: Encoding used by the JSON audio transport. Currently only
``base64`` is supported.
audio_field: Outbound JSON field name for base64-encoded audio.
receive_audio_field: Optional inbound JSON field name for base64-encoded
audio. Defaults to ``audio_field``.
static_fields: Additional fields to include in every outbound JSON audio
message.
ignore_rtvi_messages: Inherited from base FrameSerializer, defaults to True.
"""

vonage_sample_rate: int = 16000
sample_rate: int | None = None
audio_transport: VonageAudioTransport = VonageAudioTransport.BINARY
encoding: Literal["base64"] = "base64"
audio_field: str = "audio"
receive_audio_field: str | None = None
static_fields: dict[str, Any] = Field(default_factory=dict)

def __init__(self, params: InputParams | None = None):
"""Initialize the VonageFrameSerializer.
Expand All @@ -60,6 +86,7 @@ def __init__(self, params: InputParams | None = None):

self._vonage_sample_rate = self._params.vonage_sample_rate
self._sample_rate = 0 # Pipeline input rate
self._receive_audio_field = self._params.receive_audio_field or self._params.audio_field

self._input_resampler = create_stream_resampler()
self._output_resampler = create_stream_resampler()
Expand All @@ -81,7 +108,8 @@ async def serialize(self, frame: Frame) -> str | bytes | None:
frame: The Pipecat frame to serialize.

Returns:
Serialized data as string (JSON commands) or bytes (audio), or None if the frame isn't handled.
Serialized data as string (JSON commands) or bytes (audio), or None if the
frame isn't handled.
"""
if isinstance(frame, InterruptionFrame):
# Clear the audio buffer to stop playback immediately
Expand All @@ -98,7 +126,10 @@ async def serialize(self, frame: Frame) -> str | bytes | None:
# Ignoring in case we don't have audio
return None

# Vonage expects raw binary PCM data (not base64 encoded)
if self._params.audio_transport == VonageAudioTransport.JSON:
return self._serialize_json_audio(serialized_data)

# By default, Vonage expects raw binary PCM data (not base64 encoded).
return serialized_data
elif isinstance(frame, (OutputTransportMessageFrame, OutputTransportMessageUrgentFrame)):
if self.should_ignore_frame(frame):
Expand All @@ -124,28 +155,15 @@ async def deserialize(self, data: str | bytes) -> Frame | None:
# Check if this is binary audio data
if isinstance(data, bytes):
# Binary message = audio data (16-bit linear PCM)
payload = data

# Input: Convert Vonage's PCM audio to pipeline sample rate
deserialized_data = await self._input_resampler.resample(
payload,
self._vonage_sample_rate,
self._sample_rate,
)
if deserialized_data is None or len(deserialized_data) == 0:
# Ignoring in case we don't have audio
return None

audio_frame = InputAudioRawFrame(
audio=deserialized_data,
num_channels=1, # Vonage uses mono audio
sample_rate=self._sample_rate, # Use the configured pipeline input rate
)
return audio_frame
return await self._deserialize_audio(data)
else:
# Text message = JSON event
try:
message = json.loads(data)
audio_payload = message.get(self._receive_audio_field)
if audio_payload is not None:
return await self._deserialize_json_audio(audio_payload)

event = message.get("event")

# Handle different event types
Expand Down Expand Up @@ -182,3 +200,42 @@ async def deserialize(self, data: str | bytes) -> Frame | None:
except json.JSONDecodeError:
logger.warning(f"Failed to parse JSON message from Vonage: {data}")
return None

def _serialize_json_audio(self, data: bytes) -> str:
payload = base64.b64encode(data).decode("ascii")
message = {
**self._params.static_fields,
self._params.audio_field: payload,
}
return json.dumps(message, separators=(",", ":"))

async def _deserialize_json_audio(self, audio_payload: Any) -> Frame | None:
if not isinstance(audio_payload, str):
logger.warning(f"Vonage JSON audio field '{self._receive_audio_field}' is not a string")
return None

try:
payload = base64.b64decode(audio_payload)
except Exception as e:
logger.warning(f"Failed to decode Vonage base64 audio payload: {e}")
return None

return await self._deserialize_audio(payload)

async def _deserialize_audio(self, payload: bytes) -> Frame | None:
# Input: Convert Vonage's PCM audio to pipeline sample rate
deserialized_data = await self._input_resampler.resample(
payload,
self._vonage_sample_rate,
self._sample_rate,
)
if deserialized_data is None or len(deserialized_data) == 0:
# Ignoring in case we don't have audio
return None

audio_frame = InputAudioRawFrame(
audio=deserialized_data,
num_channels=1, # Vonage uses mono audio
sample_rate=self._sample_rate, # Use the configured pipeline input rate
)
return audio_frame
88 changes: 88 additions & 0 deletions tests/test_vonage_serializer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#

import base64
import json
import unittest

from pipecat.frames.frames import InputAudioRawFrame, OutputAudioRawFrame, StartFrame
from pipecat.serializers.vonage import VonageAudioTransport, VonageFrameSerializer


class TestVonageFrameSerializer(unittest.IsolatedAsyncioTestCase):
async def test_default_binary_audio_transport(self):
serializer = VonageFrameSerializer()
await serializer.setup(StartFrame(audio_in_sample_rate=16000))

audio = b"\x01\x00\x02\x00"
frame = OutputAudioRawFrame(audio=audio, sample_rate=16000, num_channels=1)

payload = await serializer.serialize(frame)

self.assertEqual(payload, audio)

async def test_json_audio_transport_serializes_base64_audio(self):
serializer = VonageFrameSerializer(
VonageFrameSerializer.InputParams(
audio_transport=VonageAudioTransport.JSON,
static_fields={"kind": "media"},
)
)
await serializer.setup(StartFrame(audio_in_sample_rate=16000))

audio = b"\x01\x00\x02\x00"
frame = OutputAudioRawFrame(audio=audio, sample_rate=16000, num_channels=1)

payload = await serializer.serialize(frame)

self.assertIsInstance(payload, str)
message = json.loads(payload)
self.assertEqual(message["kind"], "media")
self.assertEqual(base64.b64decode(message["audio"]), audio)

async def test_json_audio_transport_deserializes_base64_audio(self):
serializer = VonageFrameSerializer(
VonageFrameSerializer.InputParams(audio_transport=VonageAudioTransport.JSON)
)
await serializer.setup(StartFrame(audio_in_sample_rate=16000))

audio = b"\x01\x00\x02\x00"
payload = json.dumps({"audio": base64.b64encode(audio).decode("ascii")})

frame = await serializer.deserialize(payload)

self.assertIsInstance(frame, InputAudioRawFrame)
self.assertEqual(frame.audio, audio)
self.assertEqual(frame.sample_rate, 16000)
self.assertEqual(frame.num_channels, 1)

async def test_json_audio_transport_uses_custom_audio_fields(self):
serializer = VonageFrameSerializer(
VonageFrameSerializer.InputParams(
audio_transport="json",
audio_field="outbound_audio",
receive_audio_field="inbound_audio",
)
)
await serializer.setup(StartFrame(audio_in_sample_rate=16000))

audio = b"\x01\x00\x02\x00"
output_frame = OutputAudioRawFrame(audio=audio, sample_rate=16000, num_channels=1)

payload = await serializer.serialize(output_frame)
message = json.loads(payload)
self.assertIn("outbound_audio", message)
self.assertNotIn("audio", message)

input_payload = json.dumps({"inbound_audio": base64.b64encode(audio).decode("ascii")})
input_frame = await serializer.deserialize(input_payload)

self.assertIsInstance(input_frame, InputAudioRawFrame)
self.assertEqual(input_frame.audio, audio)


if __name__ == "__main__":
unittest.main()
Loading