diff --git a/vsb/cmdline_args.py b/vsb/cmdline_args.py index 5428d5e..d19c74b 100644 --- a/vsb/cmdline_args.py +++ b/vsb/cmdline_args.py @@ -316,6 +316,12 @@ def add_vsb_cmdline_args( default=1, help="Number of replicas for dedicated read nodes. Default is %(default)s.", ) + pinecone_group.add_argument( + "--pinecone_multi_namespace", + action=argparse.BooleanOptionalAction, + default=False, + help="Enable benchmarking across all namespaces in an existing index. Auto-discovers all populated namespaces and distributes requests evenly. Requires --skip_populate and an existing index. Default is %(default)s.", + ) opensearch_group = parser.add_argument_group( "Options specific to OpenSearch database" @@ -513,6 +519,27 @@ def validate_parsed_args( "The following arguments must be specified when --database is " "'pinecone'" + formatter.format_help(), ) + # Validate multi-namespace mode requirements + if getattr(args, "pinecone_multi_namespace", False): + if not getattr(args, "skip_populate", False): + parser.error( + "multi_namespace mode requires --skip_populate. Multi-namespace benchmarking only works with existing populated indexes." + ) + if ( + getattr(args, "pinecone_namespace_name", "__default__") + != "__default__" + ): + parser.error( + "multi_namespace mode does not support custom namespace names. Use --pinecone_multi_namespace to auto-discover all namespaces." + ) + if getattr(args, "overwrite", False): + parser.error( + "--overwrite is not applicable with --pinecone_multi_namespace. Multi-namespace mode only benchmarks existing indexes." + ) + if not getattr(args, "pinecone_index_name", None): + parser.error( + "multi_namespace mode requires --pinecone_index_name to be specified. Cannot auto-generate index name in multi-namespace mode." + ) case "opensearch": pass case "pgvector": diff --git a/vsb/databases/pinecone/README.md b/vsb/databases/pinecone/README.md index d5eda87..74f4265 100644 --- a/vsb/databases/pinecone/README.md +++ b/vsb/databases/pinecone/README.md @@ -57,3 +57,34 @@ Available dedicated read node options: > [!TIP] > The API key and/or index name can also be passed via environment variables > (`VSB__PINECONE_API_KEY` and `VSB__PINECONE_INDEX_NAME` respectively). + +## Multi-Namespace Benchmarking + +VSB supports benchmarking across multiple namespaces in an existing Pinecone index. When `--pinecone_multi_namespace=True`, VSB automatically discovers all populated namespaces in the index and distributes requests evenly across them. + +**Requirements:** +- The index must already exist and be populated +- Must use `--skip_populate` (multi-namespace mode only benchmarks existing indexes) +- Must specify `--pinecone_index_name` (cannot auto-generate index name) +- Cannot use `--overwrite` or custom `--pinecone_namespace_name` with multi-namespace mode + +**Usage:** + +```shell +vsb --database=pinecone --workload=mnist-test \ + --pinecone_api_key= \ + --pinecone_index_name= \ + --pinecone_multi_namespace=True \ + --skip_populate +``` + +**How it works:** +- VSB automatically discovers all namespaces in the index that have records (`record_count > 0`) +- Namespaces are distributed across users/workers using a round-robin algorithm +- Requests from each user are distributed evenly across their assigned namespaces +- Metrics are aggregated across all namespaces (same display format as single-namespace mode) + +**Example:** If an index has 3 namespaces (ns1, ns2, ns3) and you run with 2 users: +- User 0 will handle requests to ns1 and ns3 +- User 1 will handle requests to ns2 +- Requests are distributed evenly across the assigned namespaces for each user diff --git a/vsb/databases/pinecone/pinecone.py b/vsb/databases/pinecone/pinecone.py index bb5fc3b..98d4855 100644 --- a/vsb/databases/pinecone/pinecone.py +++ b/vsb/databases/pinecone/pinecone.py @@ -168,6 +168,7 @@ def __init__( self.overwrite = config["overwrite"] self.index_name = config["pinecone_index_name"] self.namespace = config["pinecone_namespace_name"] + self.multi_namespace = config.get("pinecone_multi_namespace", False) self.use_dedicated_read_nodes = config.get( "pinecone_dedicated_read_nodes", False ) @@ -192,6 +193,13 @@ def __init__( ) raise StopUser() except NotFoundException: + # Check if multi-namespace mode is enabled - if so, don't create index + if self.multi_namespace: + logger.critical( + f"PineconeDB: Index '{self.index_name}' does not exist. Multi-namespace mode requires an existing populated index. Please create the index first or use single-namespace mode." + ) + raise StopUser() + logger.info( f"PineconeDB: Specified index '{self.index_name}' was not found, or the " f"specified API key cannot access it. Creating new index '{self.index_name}'." @@ -232,6 +240,28 @@ def __init__( f"PineconeDB index '{self.index_name}' has incorrect metric - expected:{metric.value}, found:{index_metric}" ) + # Initialize namespaces list (empty for both modes to avoid AttributeError) + self.namespaces = [] + + # Multi-namespace mode: discover and validate namespaces + if self.multi_namespace: + if self.created_index: + logger.critical( + f"PineconeDB: Cannot use multi_namespace mode with a newly created index. Index '{self.index_name}' must already exist and be populated." + ) + raise StopUser() + + self.namespaces = self._discover_all_namespaces() + if not self.namespaces: + logger.critical( + f"PineconeDB: No populated namespaces found in index '{self.index_name}'. Multi-namespace mode requires at least one namespace with records." + ) + raise StopUser() + + logger.info( + f"PineconeDB: Discovered {len(self.namespaces)} namespaces: {', '.join(self.namespaces)}" + ) + def close(self): self.index.close() @@ -260,7 +290,16 @@ def get_batch_size(self, sample_record: Record) -> int: return batch_size def get_namespace(self, namespace: str) -> Namespace: - return PineconeNamespace(self.index, self.namespace) + if self.multi_namespace: + # Validate namespace exists in discovered namespaces + if namespace not in self.namespaces: + raise ValueError( + f"PineconeDB: Namespace '{namespace}' not found in discovered namespaces. Available namespaces: {', '.join(self.namespaces)}" + ) + return PineconeNamespace(self.index, namespace) + else: + # Single namespace mode: ignore parameter, use configured namespace + return PineconeNamespace(self.index, self.namespace) def initialize_population(self): # If the namespace already existed before VSB (we didn't create it) and @@ -322,24 +361,60 @@ def get_record_count(self) -> int: self.index.describe_namespace(namespace=self.namespace)["record_count"] ) - def check_namespace_exists(self, namespace: str) -> bool: - """Check if a namespace exists inside the current index using list_namespaces generator.""" + def _discover_all_namespaces(self) -> list[str]: + """Discover all populated namespaces in the index.""" + populated_namespaces = [] try: # list_namespaces returns a generator of dicts with 'name' and 'record_count' for ns in self.index.list_namespaces(): - if ns["name"] == namespace: - logger.info( - f"PineconeDB: Namespace '{namespace}' exists in index '{self.index_name}'." + # Convert record_count to int (API may return string) + record_count = int(ns["record_count"]) + if record_count > 0: + populated_namespaces.append(ns["name"]) + logger.debug( + f"PineconeDB: Namespace '{ns['name']}' has {record_count} records" ) - return True + except PineconeException as e: + logger.error( + f"PineconeDB: Error listing namespaces in index '{self.index_name}': {e}" + ) + raise ValueError( + f"Failed to list namespaces in index '{self.index_name}': {e}" + ) from e + + if not populated_namespaces: + raise ValueError( + f"No populated namespaces found in index '{self.index_name}'. Multi-namespace mode requires at least one namespace with records." + ) + + return sorted(populated_namespaces) + def _get_namespaces_for_user(self, user_id: int, num_users: int) -> list[str]: + """Distribute namespaces across users using round-robin algorithm.""" + if num_users > len(self.namespaces): + raise ValueError( + f"Cannot distribute {num_users} users across {len(self.namespaces)} namespaces. Number of users must be <= number of namespaces." + ) + return [ + self.namespaces[i] for i in range(user_id, len(self.namespaces), num_users) + ] + + def check_namespace_exists(self, namespace: str) -> bool: + """Check if a namespace exists inside the current index.""" + try: + # Use describe_namespace which is a direct API call - much faster than listing all namespaces + self.index.describe_namespace(namespace=namespace) + logger.info( + f"PineconeDB: Namespace '{namespace}' exists in index '{self.index_name}'." + ) + return True + except NotFoundException: logger.info( f"PineconeDB: Namespace '{namespace}' does not exist in index '{self.index_name}'." ) return False - except PineconeException as e: logger.error( - f"PineconeDB: Error while listing namespaces in index '{self.index_name}' - {e}" + f"PineconeDB: Error while checking namespace '{namespace}' in index '{self.index_name}' - {e}" ) return False diff --git a/vsb/users.py b/vsb/users.py index 24ee12b..8fca396 100644 --- a/vsb/users.py +++ b/vsb/users.py @@ -251,6 +251,24 @@ def __init__(self, environment): ) self.query_iter = None + # Multi-namespace mode: get namespaces assigned to this user + if hasattr(self.database, "multi_namespace") and self.database.multi_namespace: + self.user_namespaces = self.database._get_namespaces_for_user( + self.user_id, self.users_total + ) + if not self.user_namespaces: + logger.critical( + f"User {self.user_id} has no namespaces assigned. This should not happen - check namespace distribution logic." + ) + raise StopUser() + self.namespace_index = 0 + logger.debug( + f"RunUser id:{self.user_id} assigned namespaces: {', '.join(self.user_namespaces)}" + ) + else: + self.user_namespaces = None + self.namespace_index = None + @task def request(self): match self.state: @@ -270,10 +288,30 @@ def wait_time(self): def do_run(self): if not self.query_iter: batch_size = self.database.get_batch_size(self.workload.get_sample_record()) - self.query_iter = self.workload.get_query_iter( + base_iter = self.workload.get_query_iter( self.users_total, self.user_id, batch_size ) + # Multi-namespace mode: wrap iterator to inject namespace names + if ( + hasattr(self.database, "multi_namespace") + and self.database.multi_namespace + ): + + def namespace_wrapper(): + for tenant, request in base_iter: + # Select next namespace using round-robin + namespace = self.user_namespaces[ + self.namespace_index % len(self.user_namespaces) + ] + self.namespace_index += 1 + yield (namespace, request) + + self.query_iter = namespace_wrapper() + else: + # Single namespace mode: use iterator as-is + self.query_iter = base_iter + tenant: str = None request: QueryRequest = None try: