Morgan Funtowicz
feat(embeddings): do not tokenize twice
5e1abf0
raw
history blame
5.77 kB
import platform
from functools import reduce
from operator import itemgetter
from typing import Generator, Tuple
import torch
from hfendpoints.openai import Context, run
from hfendpoints.openai.embeddings import Embedding, EmbeddingEndpoint, EmbeddingRequest, EmbeddingResponse, Usage
from intel_extension_for_pytorch.cpu.runtime import pin
from loguru import logger
from hfendpoints import EndpointConfig, Handler, __version__
from sentence_transformers import SentenceTransformer
from torch.nn import Module
from torch.backends.mkldnn import VERBOSE_ON_CREATION, VERBOSE_OFF
# Not used for now
SUPPORTED_AMP_DTYPES = {torch.float32, torch.bfloat16}
def has_bf16_support() -> bool:
"""
Helper to detect if the hardware supports bfloat16
Note:
Intel libraries, such as oneDNN, provide emulation for bfloat16 even if the underlying hardware does not support it.
This means CPU ISA with AVX512 will work, even if not with the same performances as one could expect from CPU ISA with AVX512_BF16.
Also, AMX_BF16 is implicitly assumed true when AVX512_BF16 is true (that's the case on Intel Sapphire Rapids).
:return: True if the hardware supports (or can emulate) bfloat16, False otherwise
"""
return torch.cpu._is_avx512_bf16_supported() or torch.cpu._is_avx512_supported()
def get_cores_pinning_strategy() -> "CPUPool":
import intel_extension_for_pytorch as ipex
# Retrieve the number of nodes
num_nodes = ipex.cpu.runtime.runtime_utils.get_num_nodes()
cpu_cores_id = [ipex.cpu.runtime.runtime_utils.get_core_list_of_node_id(node_id) for node_id in range(num_nodes)]
if num_nodes == 1:
pinned_cpu_cores_id = cpu_cores_id[0]
else:
pinned_cpu_cores_id = [core_id for node in cpu_cores_id for core_id in node]
logger.info(f"Pinning CPU cores to {pinned_cpu_cores_id}")
return ipex.cpu.runtime.CPUPool(pinned_cpu_cores_id)
# return ipex.cpu.runtime.CPUPool(node_id=0)
def get_usage(mask: torch.IntTensor) -> Usage:
"""
Compute the number of processed tokens and return as Usage object matching OpenAI
:param mask: Attention mask tensor, as returned by the model
:return: Usage object matching OpenAI specifications
"""
num_tokens = sum(m.sum().item() for m in mask)
return Usage(prompt_tokens=num_tokens, total_tokens=num_tokens)
class SentenceTransformerWithUsage(Module):
__slots__ = ("_model", )
def __init__(self, model: SentenceTransformer):
super().__init__()
self._model = model
def forward(self, sentences: list[str]) -> Tuple[Generator[torch.Tensor], Generator[torch.Tensor]]:
vectors = self._model.encode(sentences, output_value=None)
return map(itemgetter('attention_mask'), vectors), map(itemgetter('sentence_embedding'), vectors)
class SentenceTransformerHandler(Handler):
__slots__ = ("_config", "_dtype", "_model", "_model_name", "_pinned_cores", "_use_amp")
def __init__(self, config: EndpointConfig):
self._config = config
self._dtype = torch.float32
self._model_name = config.model_id
self._allocate_model()
def _allocate_model(self):
# Denormal number is used to store tiny numbers that are close to 0.
# Computations with denormal numbers are remarkably slower than normalized number.
torch.set_flush_denormal(True)
dtype = torch.bfloat16 if has_bf16_support() else torch.float32
model = SentenceTransformer(self._config.model_id, device="cpu", model_kwargs={"torch_dtype": dtype})
if platform.machine() == "x86_64":
import intel_extension_for_pytorch as ipex
logger.info(f"x64 platform detected: {platform.processor()}")
# Retrieve all the physical cores ID for all the CPU nodes
self._pinned_cores = get_cores_pinning_strategy()
# Optimize the model for inference
with torch.inference_mode():
model = model.eval()
model = model.to(memory_format=torch.channels_last)
# Apply IPEx optimizations
model = ipex.optimize(model, dtype=dtype, weights_prepack=True, graph_mode=True, concat_linear=True)
model = torch.compile(model, dynamic=True, backend="ipex")
# model = ipex.cpu.runtime.MultiStreamModule(SentenceTransformerWithUsage(model), num_streams=1)
else:
model = torch.compile(model)
self._dtype = dtype
self._use_amp = dtype in SUPPORTED_AMP_DTYPES
self._model = SentenceTransformerWithUsage(model)
async def __call__(self, request: EmbeddingRequest, ctx: Context) -> EmbeddingResponse:
with torch.backends.mkldnn.verbose(VERBOSE_ON_CREATION if self._config.is_debug else VERBOSE_OFF):
with torch.inference_mode(), torch.amp.autocast("cpu", dtype=self._dtype, enabled=self._use_amp):
with pin(self._pinned_cores):
mask, vectors = self._model(request.input if request.is_batched else [request.input])
embeddings = [None] * len(request)
for (index, embedding) in enumerate(vectors):
embedding = Embedding(index=index, embedding=embedding.tolist())
embeddings[index] = embedding
usage = get_usage(mask)
return EmbeddingResponse(model=self._model_name, embeddings=embeddings, usage=usage)
def entrypoint():
config = EndpointConfig.from_env()
logger.info(f"[Hugging Face Endpoint v{__version__}] Serving: {config.model_id}")
endpoint = EmbeddingEndpoint(SentenceTransformerHandler(config))
run(endpoint, config.interface, config.port)
if __name__ == "__main__":
entrypoint()