Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions vsb/cmdline_args.py
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,12 @@ def add_vsb_cmdline_args(
default=1,
help="Number of replicas for dedicated read nodes. Default is %(default)s.",
)
pinecone_group.add_argument(
"--pinecone_multi_namespace",
action=argparse.BooleanOptionalAction,
default=False,
help="Enable benchmarking across all namespaces in an existing index. Auto-discovers all populated namespaces and distributes requests evenly. Requires --skip_populate and an existing index. Default is %(default)s.",
)

opensearch_group = parser.add_argument_group(
"Options specific to OpenSearch database"
Expand Down Expand Up @@ -513,6 +519,27 @@ def validate_parsed_args(
"The following arguments must be specified when --database is "
"'pinecone'" + formatter.format_help(),
)
# Validate multi-namespace mode requirements
if getattr(args, "pinecone_multi_namespace", False):
if not getattr(args, "skip_populate", False):
parser.error(
"multi_namespace mode requires --skip_populate. Multi-namespace benchmarking only works with existing populated indexes."
)
if (
getattr(args, "pinecone_namespace_name", "__default__")
!= "__default__"
):
parser.error(
"multi_namespace mode does not support custom namespace names. Use --pinecone_multi_namespace to auto-discover all namespaces."
)
if getattr(args, "overwrite", False):
parser.error(
"--overwrite is not applicable with --pinecone_multi_namespace. Multi-namespace mode only benchmarks existing indexes."
)
if not getattr(args, "pinecone_index_name", None):
parser.error(
"multi_namespace mode requires --pinecone_index_name to be specified. Cannot auto-generate index name in multi-namespace mode."
)
case "opensearch":
pass
case "pgvector":
Expand Down
31 changes: 31 additions & 0 deletions vsb/databases/pinecone/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,34 @@ Available dedicated read node options:
> [!TIP]
> The API key and/or index name can also be passed via environment variables
> (`VSB__PINECONE_API_KEY` and `VSB__PINECONE_INDEX_NAME` respectively).

## Multi-Namespace Benchmarking

VSB supports benchmarking across multiple namespaces in an existing Pinecone index. When `--pinecone_multi_namespace=True`, VSB automatically discovers all populated namespaces in the index and distributes requests evenly across them.

**Requirements:**
- The index must already exist and be populated
- Must use `--skip_populate` (multi-namespace mode only benchmarks existing indexes)
- Must specify `--pinecone_index_name` (cannot auto-generate index name)
- Cannot use `--overwrite` or custom `--pinecone_namespace_name` with multi-namespace mode

**Usage:**

```shell
vsb --database=pinecone --workload=mnist-test \
--pinecone_api_key=<YOUR_API_KEY> \
--pinecone_index_name=<EXISTING_INDEX> \
--pinecone_multi_namespace=True \
--skip_populate
```

**How it works:**
- VSB automatically discovers all namespaces in the index that have records (`record_count > 0`)
- Namespaces are distributed across users/workers using a round-robin algorithm
- Requests from each user are distributed evenly across their assigned namespaces
- Metrics are aggregated across all namespaces (same display format as single-namespace mode)

**Example:** If an index has 3 namespaces (ns1, ns2, ns3) and you run with 2 users:
- User 0 will handle requests to ns1 and ns3
- User 1 will handle requests to ns2
- Requests are distributed evenly across the assigned namespaces for each user
93 changes: 84 additions & 9 deletions vsb/databases/pinecone/pinecone.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ def __init__(
self.overwrite = config["overwrite"]
self.index_name = config["pinecone_index_name"]
self.namespace = config["pinecone_namespace_name"]
self.multi_namespace = config.get("pinecone_multi_namespace", False)
self.use_dedicated_read_nodes = config.get(
"pinecone_dedicated_read_nodes", False
)
Expand All @@ -192,6 +193,13 @@ def __init__(
)
raise StopUser()
except NotFoundException:
# Check if multi-namespace mode is enabled - if so, don't create index
if self.multi_namespace:
logger.critical(
f"PineconeDB: Index '{self.index_name}' does not exist. Multi-namespace mode requires an existing populated index. Please create the index first or use single-namespace mode."
)
raise StopUser()

logger.info(
f"PineconeDB: Specified index '{self.index_name}' was not found, or the "
f"specified API key cannot access it. Creating new index '{self.index_name}'."
Expand Down Expand Up @@ -232,6 +240,28 @@ def __init__(
f"PineconeDB index '{self.index_name}' has incorrect metric - expected:{metric.value}, found:{index_metric}"
)

# Initialize namespaces list (empty for both modes to avoid AttributeError)
self.namespaces = []

# Multi-namespace mode: discover and validate namespaces
if self.multi_namespace:
if self.created_index:
logger.critical(
f"PineconeDB: Cannot use multi_namespace mode with a newly created index. Index '{self.index_name}' must already exist and be populated."
)
raise StopUser()

self.namespaces = self._discover_all_namespaces()
if not self.namespaces:
logger.critical(
f"PineconeDB: No populated namespaces found in index '{self.index_name}'. Multi-namespace mode requires at least one namespace with records."
)
raise StopUser()

logger.info(
f"PineconeDB: Discovered {len(self.namespaces)} namespaces: {', '.join(self.namespaces)}"
)

def close(self):
self.index.close()

Expand Down Expand Up @@ -260,7 +290,16 @@ def get_batch_size(self, sample_record: Record) -> int:
return batch_size

def get_namespace(self, namespace: str) -> Namespace:
return PineconeNamespace(self.index, self.namespace)
if self.multi_namespace:
# Validate namespace exists in discovered namespaces
if namespace not in self.namespaces:
raise ValueError(
f"PineconeDB: Namespace '{namespace}' not found in discovered namespaces. Available namespaces: {', '.join(self.namespaces)}"
)
return PineconeNamespace(self.index, namespace)
else:
# Single namespace mode: ignore parameter, use configured namespace
return PineconeNamespace(self.index, self.namespace)

def initialize_population(self):
# If the namespace already existed before VSB (we didn't create it) and
Expand Down Expand Up @@ -322,24 +361,60 @@ def get_record_count(self) -> int:
self.index.describe_namespace(namespace=self.namespace)["record_count"]
)

def check_namespace_exists(self, namespace: str) -> bool:
"""Check if a namespace exists inside the current index using list_namespaces generator."""
def _discover_all_namespaces(self) -> list[str]:
"""Discover all populated namespaces in the index."""
populated_namespaces = []
try:
# list_namespaces returns a generator of dicts with 'name' and 'record_count'
for ns in self.index.list_namespaces():
if ns["name"] == namespace:
logger.info(
f"PineconeDB: Namespace '{namespace}' exists in index '{self.index_name}'."
# Convert record_count to int (API may return string)
record_count = int(ns["record_count"])
if record_count > 0:
populated_namespaces.append(ns["name"])
logger.debug(
f"PineconeDB: Namespace '{ns['name']}' has {record_count} records"
)
return True
except PineconeException as e:
logger.error(
f"PineconeDB: Error listing namespaces in index '{self.index_name}': {e}"
)
raise ValueError(
f"Failed to list namespaces in index '{self.index_name}': {e}"
) from e

if not populated_namespaces:
raise ValueError(
f"No populated namespaces found in index '{self.index_name}'. Multi-namespace mode requires at least one namespace with records."
)

return sorted(populated_namespaces)

def _get_namespaces_for_user(self, user_id: int, num_users: int) -> list[str]:
"""Distribute namespaces across users using round-robin algorithm."""
if num_users > len(self.namespaces):
raise ValueError(
f"Cannot distribute {num_users} users across {len(self.namespaces)} namespaces. Number of users must be <= number of namespaces."
)
return [
self.namespaces[i] for i in range(user_id, len(self.namespaces), num_users)
]

def check_namespace_exists(self, namespace: str) -> bool:
"""Check if a namespace exists inside the current index."""
try:
# Use describe_namespace which is a direct API call - much faster than listing all namespaces
self.index.describe_namespace(namespace=namespace)
logger.info(
f"PineconeDB: Namespace '{namespace}' exists in index '{self.index_name}'."
)
return True
except NotFoundException:
logger.info(
f"PineconeDB: Namespace '{namespace}' does not exist in index '{self.index_name}'."
)
return False

except PineconeException as e:
logger.error(
f"PineconeDB: Error while listing namespaces in index '{self.index_name}' - {e}"
f"PineconeDB: Error while checking namespace '{namespace}' in index '{self.index_name}' - {e}"
)
return False
40 changes: 39 additions & 1 deletion vsb/users.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,24 @@ def __init__(self, environment):
)
self.query_iter = None

# Multi-namespace mode: get namespaces assigned to this user
if hasattr(self.database, "multi_namespace") and self.database.multi_namespace:
self.user_namespaces = self.database._get_namespaces_for_user(
self.user_id, self.users_total
)
if not self.user_namespaces:
logger.critical(
f"User {self.user_id} has no namespaces assigned. This should not happen - check namespace distribution logic."
)
raise StopUser()
self.namespace_index = 0
logger.debug(
f"RunUser id:{self.user_id} assigned namespaces: {', '.join(self.user_namespaces)}"
)
else:
self.user_namespaces = None
self.namespace_index = None

@task
def request(self):
match self.state:
Expand All @@ -270,10 +288,30 @@ def wait_time(self):
def do_run(self):
if not self.query_iter:
batch_size = self.database.get_batch_size(self.workload.get_sample_record())
self.query_iter = self.workload.get_query_iter(
base_iter = self.workload.get_query_iter(
self.users_total, self.user_id, batch_size
)

# Multi-namespace mode: wrap iterator to inject namespace names
if (
hasattr(self.database, "multi_namespace")
and self.database.multi_namespace
):

def namespace_wrapper():
for tenant, request in base_iter:
# Select next namespace using round-robin
namespace = self.user_namespaces[
self.namespace_index % len(self.user_namespaces)
]
self.namespace_index += 1
yield (namespace, request)

self.query_iter = namespace_wrapper()
else:
# Single namespace mode: use iterator as-is
self.query_iter = base_iter

tenant: str = None
request: QueryRequest = None
try:
Expand Down
Loading