chroma / chromadb /ingest /impl /simple_policy.py
badalsahani's picture
feat: chroma initial deploy
287a0bc
from typing import Sequence
from uuid import UUID
from overrides import overrides
from chromadb.config import System
from chromadb.ingest import CollectionAssignmentPolicy
from chromadb.ingest.impl.utils import create_topic_name
class SimpleAssignmentPolicy(CollectionAssignmentPolicy):
"""Simple assignment policy that assigns a 1 collection to 1 topic based on the
id of the collection."""
_tenant_id: str
_topic_ns: str
def __init__(self, system: System):
self._tenant_id = system.settings.tenant_id
self._topic_ns = system.settings.topic_namespace
super().__init__(system)
def _topic(self, collection_id: UUID) -> str:
return create_topic_name(self._tenant_id, self._topic_ns, str(collection_id))
@overrides
def assign_collection(self, collection_id: UUID) -> str:
return self._topic(collection_id)
@overrides
def get_topics(self) -> Sequence[str]:
raise NotImplementedError(
"SimpleAssignmentPolicy does not support get_topics, each collection has its own topic"
)
class RendezvousHashingAssignmentPolicy(CollectionAssignmentPolicy):
"""The rendezvous hashing assignment policy assigns a collection to a topic based on the
rendezvous hashing algorithm. This is not actually used in the python sysdb. It is only used in the
go sysdb. However, it is useful here in order to provide a way to get the topic list used for the whole system.
"""
_tenant_id: str
_topic_ns: str
def __init__(self, system: System):
self._tenant_id = system.settings.tenant_id
self._topic_ns = system.settings.topic_namespace
super().__init__(system)
@overrides
def assign_collection(self, collection_id: UUID) -> str:
raise NotImplementedError(
"RendezvousHashingAssignmentPolicy is not implemented"
)
@overrides
def get_topics(self) -> Sequence[str]:
# Mirrors go/coordinator/internal/coordinator/assignment_policy.go
return [
f"persistent://{self._tenant_id}/{self._topic_ns}/chroma_log_{i}"
for i in range(16)
]