test / main.py
varad-simpli's picture
Create main.py
f87de7a verified
raw
history blame
12.9 kB
import numpy as np
from concurrent.futures import ThreadPoolExecutor
import time
import json
import tritonclient.grpc as grpcclient
from tritonclient.utils import *
import queue
from functools import partial
import random
run_multiple_tests = False
resp_list = []
scode_breakup = {}
def np_to_server_dtype(np_dtype):
if np_dtype == bool:
return "BOOL"
elif np_dtype == np.int8:
return "INT8"
elif np_dtype == np.int16:
return "INT16"
elif np_dtype == np.int32:
return "INT32"
elif np_dtype == np.int64:
return "INT64"
elif np_dtype == np.uint8:
return "UINT8"
elif np_dtype == np.uint16:
return "UINT16"
elif np_dtype == np.uint32:
return "UINT32"
elif np_dtype == np.uint64:
return "UINT64"
elif np_dtype == np.float16:
return "FP16"
elif np_dtype == np.float32:
return "FP32"
elif np_dtype == np.float64:
return "FP64"
elif np_dtype == np.object_ or np_dtype.type == np.bytes_:
return "BYTES"
return None
class UserData:
def __init__(self):
self._completed_requests = queue.Queue()
def callback(user_data, result, error):
if error:
user_data._completed_requests.put(error)
else:
user_data._completed_requests.put(result)
def prepare_tensor(name: str, data: np.ndarray):
server_input = grpcclient.InferInput(name=name, shape=data.shape,
datatype=np_to_server_dtype(data.dtype))
server_input.set_data_from_numpy(data)
return server_input
def process_and_send_request(sample_request):
prompt = sample_request['prompt']
negative_prompt = sample_request['negative_prompt'] if 'negative_prompt' in sample_request else None
height = sample_request['height'] if 'height' in sample_request else None
width = sample_request['width'] if 'width' in sample_request else None
num_images_per_prompt = sample_request['num_images_per_prompt'] if 'num_images_per_prompt' in sample_request else 1
num_inference_steps = sample_request['num_inference_steps'] if 'num_inference_steps' in sample_request else 20
image = sample_request['image'] if 'image' in sample_request else None
mask_image = sample_request['mask_image'] if 'mask_image' in sample_request else None
control_images = sample_request['control_images'] if 'control_images' in sample_request else None
control_weightages = sample_request['control_weightages'] if 'control_weightages' in sample_request else None
control_modes = sample_request['control_modes'] if 'control_modes' in sample_request else None
seed = sample_request['seed'] if 'seed' in sample_request else -1
guidance_scale = sample_request['guidance_scale'] if 'guidance_scale' in sample_request else 7.5
strength = sample_request['strength'] if 'strength' in sample_request else 1
scheduler = sample_request['scheduler'] if 'scheduler' in sample_request else "EULER-A"
model_type = sample_request['model_type'] if 'model_type' in sample_request else None
lora_weights = sample_request['lora_weights'] if 'lora_weights' in sample_request else None
control_guidance_start = sample_request['control_guidance_start'] if 'control_guidance_start' in sample_request else None
control_guidance_end = sample_request['control_guidance_end'] if 'control_guidance_end' in sample_request else None
inputs = []
inputs.append(prepare_tensor("prompt", np.array([prompt], dtype=np.object_)))
if negative_prompt is not None:
inputs.append(prepare_tensor("negative_prompt", np.array([negative_prompt], dtype=np.object_)))
if height is not None:
inputs.append(prepare_tensor("height", np.array([height], dtype=np.int32)))
if width is not None:
inputs.append(prepare_tensor("width", np.array([width], dtype=np.int32)))
if num_images_per_prompt is not None:
inputs.append(prepare_tensor("num_images_per_prompt", np.array([num_images_per_prompt], dtype=np.int32)))
if num_inference_steps is not None:
inputs.append(prepare_tensor("num_inference_steps", np.array([num_inference_steps], dtype=np.int32)))
if image is not None:
inputs.append(prepare_tensor("image", np.array([image], dtype=np.object_)))
if mask_image is not None:
inputs.append(prepare_tensor("mask_image", np.array([mask_image], dtype=np.object_)))
if seed is not None:
inputs.append(prepare_tensor("seed", np.array([seed], dtype=np.int64)))
if guidance_scale is not None:
inputs.append(prepare_tensor("guidance_scale", np.array([guidance_scale], dtype=np.float32)))
if model_type is not None:
inputs.append(prepare_tensor("model_type", np.array([model_type], dtype=np.object_)))
if strength is not None:
inputs.append(prepare_tensor("strength", np.array([strength], dtype=np.float32)))
if scheduler is not None:
inputs.append(prepare_tensor("scheduler", np.array([scheduler], dtype=np.object_)))
if control_images is not None:
inputs.append(prepare_tensor("control_images", np.array([control_images], dtype=np.object_)))
if control_weightages is not None:
inputs.append(prepare_tensor("control_weightages", np.array([control_weightages], dtype=np.float32)))
if control_modes is not None:
inputs.append(prepare_tensor("control_modes", np.array([control_modes], dtype=np.int32)))
if lora_weights is not None:
inputs.append(prepare_tensor("lora_weights", np.array([lora_weights], dtype=np.object_)))
if control_guidance_start is not None:
inputs.append(prepare_tensor("control_guidance_start", np.array([control_guidance_start], dtype=np.float32)))
if control_guidance_end is not None:
inputs.append(prepare_tensor("control_guidance_end", np.array([control_guidance_end], dtype=np.float32)))
outputs = [
grpcclient.InferRequestedOutput("response_id"),
grpcclient.InferRequestedOutput("time_taken"),
grpcclient.InferRequestedOutput("load_lora"),
grpcclient.InferRequestedOutput("output_image_urls"),
grpcclient.InferRequestedOutput("error"),
# grpcclient.InferRequestedOutput("mega_pixel")
]
user_data = UserData()
st = time.time()
mega_pixel = 0
url = "localhost:8002"
with grpcclient.InferenceServerClient(url=url, ssl=False) as triton_client:
triton_client.start_stream(callback=partial(callback, user_data))
triton_client.async_stream_infer(
model_name="flux",
inputs=inputs,
outputs=outputs,
)
et = time.time()
response = user_data._completed_requests.get()
print(response)
# Check if response is an error (InferenceServerException)
if hasattr(response, 'message'):
# This is an error response
print(f"Server error: {response}")
output_image_urls = []
inference_time = 0
lora_time = 0
response_id = None
mega_pixel = 0
error = str(response)
sCode = 500
else:
# This is a successful response
try:
inference_time = 0
lora_time = 0
response_id = None
inference_time = response.as_numpy("time_taken").item()
lora_time = response.as_numpy("load_lora").item()
response_id = response.as_numpy("response_id").item().decode() if response.as_numpy("response_id").item() else None
output_image_urls = response.as_numpy("output_image_urls").tolist() if response.as_numpy("output_image_urls") is not None else []
mega_pixel = response.as_numpy("mega_pixel").item().decode() if response.as_numpy("mega_pixel") is not None else "0"
error_tensor = response.as_numpy("error")
error = error_tensor.item().decode() if error_tensor is not None and error_tensor.item() else None
sCode = 200
except Exception as e:
print(f"Error processing response: {e}")
output_image_urls = []
inference_time = 0
lora_time = 0
response_id = None
mega_pixel = 0
error = str(e)
sCode = 500
results = {
"response_id": response_id,
"total_time_taken": et-st,
"inference_time_taken": inference_time,
"loading_lora_time": lora_time,
"output_image_urls": output_image_urls,
"error": error,
"mega_pixel": 0 if mega_pixel is None else mega_pixel
}
print(results)
if output_image_urls == []:
print("No images generated")
results["error"] = "No images generated"
return results
def warmup_and_load_lora(warmup_json_path):
if warmup_json_path is None:
return False
with open(warmup_json_path, 'r') as f:
warmup_data = json.load(f)
st = time.time()
for request in warmup_data:
process_and_send_request(request)
resp_time = time.time()-st
print(f"Warmup and load lora done in {resp_time:.3f} seconds")
return True
def generate_jitter_window():
percent_bifer = random.randint(1,100)
if percent_bifer >= 1 and percent_bifer <= 50:
jitter_window = [1, 5]
elif percent_bifer >= 51 and percent_bifer <= 75:
jitter_window = [10, 15]
else:
jitter_window = [20,30]
time.sleep(random.randint(jitter_window[0],jitter_window[1]))
return True
def predict(requests_data,percent_bifer):
random_request = random.choice(requests_data)
sample_request = random_request['payload']
generate_jitter_window()
return process_and_send_request(sample_request)
def run_single_test(requests_data,id = 0):
return predict(requests_data,id)
number_of_users = 1 #change here for concurrent users
duration_minutes = 2
def run_concurrent_tests_cont(number_of_users, duration_minutes):
start_time = time.time()
end_time = start_time + duration_minutes * 60
results = []
with ThreadPoolExecutor(max_workers=number_of_users) as executor:
future_to_start_time = {}
while time.time() < end_time:
# Submit new tasks continuously
percent_bifer = random.randint(1,10)
if len(future_to_start_time) < number_of_users:
future = executor.submit(run_single_test, requests_data)
future_to_start_time[future] = time.time()
# Process completed tasks and replace them
done_futures = [f for f in future_to_start_time if f.done()]
for future in done_futures:
response_time = future.result()
results.append(response_time)
del future_to_start_time[future]
# Wait for any remaining tasks to finish
for future in future_to_start_time:
results.append(future.result())
p25 = np.percentile(results, 25)
p50 = np.percentile(results, 50)
p90 = np.percentile(results, 90)
p99 = np.percentile(results, 99)
avg = sum(results) / len(results)
with open(f"result_dump_{number_of_users}_{duration_minutes}.json", "w") as f:
f.write(
json.dumps(resp_list, indent=4)
)
return p25, p50, p90, p99, avg
if run_multiple_tests:
p25_result , p50_results, p90_resutls, p99_results, avg = run_concurrent_tests_cont(number_of_users,duration_minutes)
load_lora_time = warmup_and_load_lora(requests_data)
print(f"25th Percentile: {p25_result:.3f} seconds")
print(f"50th Percentile: {p50_results:.3f} seconds")
print(f"90th Percentile: {p90_resutls:.3f} seconds")
print(f"99th Percentile: {p99_results:.3f} seconds")
print(f"Average Response Time: {avg:.3f} seconds")
with open(f"test_results.json", "w") as f:
f.write(
json.dumps({
"p25": p25_result,
"p50": p50_results,
"p90": p90_resutls,
"p99": p99_results,
"avg": avg,
"sCode_breakup": scode_breakup
}, indent=4)
)
else:
payload = {
"prompt": "A girl in city, 25 years old, cool, futuristic <lora:https://huggingface.co/XLabs-AI/flux-lora-collection/resolve/main/art_lora.safetensors:0.5>",
"negative_prompt": "blurry, low quality, distorted",
"height": 1024,
"width": 1024,
"num_images_per_prompt": 1,
"num_inference_steps": 20,
"seed": 42424243,
"guidance_scale": 7.0,
"model_type": "txt2img"
}
result = process_and_send_request(payload)
print(result)