Commit 4e3e7502 by nzy

split vllm

parent eb972b4c
from functools import partial
import os
from vllm import LLM, SamplingParams
from codecritic.data.utils import SPLITTER
def generate_worker(cuda_device, prompts, model_path, sampling_params):
os.environ["CUDA_VISIBLE_DEVICES"] = ','.join(cuda_device)
llm = LLM(model=model_path,
seed=42,
max_model_len=8 * 1024,
swap_space=16,
tensor_parallel_size=len(cuda_device))
tokenizer = llm.get_tokenizer()
stop_tokens = [tokenizer.eos_token_id]
print(f"SUCCESS: load llm {model_path} on cuda {cuda_device}")
vllm_sampling_params = SamplingParams(
n=sampling_params['n'],
temperature=sampling_params['temperature'],
top_p=0.95,
max_tokens=sampling_params['max_tokens'],
stop_token_ids=stop_tokens
)
print("Sampling params:", vllm_sampling_params)
def messages_to_text(messages):
text = tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
if SPLITTER in text:
text = text.split(SPLITTER)[0]
return text
text_prompts = [messages_to_text(item["messages"]) for item in prompts]
outputs = llm.generate(text_prompts, sampling_params=vllm_sampling_params, use_tqdm=True)
results = []
for item, output in zip(prompts, outputs):
for response in output.outputs:
generated_text = response.text
messages = item["messages"].copy()
if SPLITTER in messages[-1]["content"]:
message = messages.pop()
raw_content = message["content"].split(SPLITTER)[0]
message["content"] = raw_content + generated_text
else:
message = {"role": "assistant", "content": generated_text}
messages.append(message)
results.append({**item, "messages": messages})
return results
def score_worker(cuda_device, prompts, model_path, score_token):
def compute_score_onetoken(logprob):
positive_token = score_token[0]
positive_logprob = logprob.get(positive_token)
positive_prob = np.exp(positive_logprob.logprob) if positive_logprob else 0
return {"score": positive_prob}
def compute_score_twotoken(logprob):
positive_token, negative_token = score_token[0], score_token[1]
positive_logprob = logprob.get(positive_token)
positive_prob = np.exp(positive_logprob.logprob) if positive_logprob else 0
negative_logprob = logprob.get(negative_token)
negative_prob = np.exp(negative_logprob.logprob) if negative_logprob else 0
return {
"score": positive_prob / (positive_prob + negative_prob),
"uncertainty": 1 - (positive_prob + negative_prob)
}
if len(score_token) == 1:
compute_score = compute_score_onetoken
elif len(score_token) == 2:
compute_score = compute_score_twotoken
else:
raise NotImplementedError("param: score_token length shoud be 1 or 2")
os.environ["CUDA_VISIBLE_DEVICES"] = ','.join(cuda_device)
llm = LLM(model=model_path,
seed=42,
max_model_len=8 * 1024,
swap_space=16,
tensor_parallel_size=len(cuda_device))
tokenizer = llm.get_tokenizer()
print(f"SUCCESS: load llm {model_path} on cuda {cuda_device}")
vllm_sampling_params = SamplingParams(
n=1,
temperature=0,
max_tokens=5,
logprobs=20
)
text_prompts = [tokenizer.apply_chat_template(item["messages"], tokenize=False, add_generation_prompt=True) for item in prompts]
outputs = llm.generate(text_prompts, sampling_params=vllm_sampling_params, use_tqdm=False)
results = []
for item, output in zip(prompts, outputs):
for response in output.outputs:
# response.logprobs: list[dict[int, Logprob]] https://github.com/vllm-project/vllm/blob/main/vllm/sequence.py
scores = compute_score(response.logprobs[0])
text = response.text
results.append({**item, **scores, "critic_text": text})
return results
from vllm import LLM, SamplingParams
import os
from concurrent.futures import ProcessPoolExecutor
from itertools import chain, combinations
from functools import partial
import subprocess
import numpy as np
from codecritic.data.utils import SPLITTER
def get_distance(connection_type):
if connection_type.startswith("NV"):
......@@ -37,13 +33,17 @@ def get_gpu_topology():
Get the GPU topology using `nvidia-smi topo -m` and return a distance matrix.
"""
try:
result = subprocess.run(['nvidia-smi', 'topo', '-m'], stdout=subprocess.PIPE, text=True)
result = subprocess.run(
["nvidia-smi", "topo", "-m"], stdout=subprocess.PIPE, text=True
)
topo_output = result.stdout
except FileNotFoundError:
raise RuntimeError("nvidia-smi not found. Make sure NVIDIA drivers are installed and nvidia-smi is in PATH.")
raise RuntimeError(
"nvidia-smi not found. Make sure NVIDIA drivers are installed and nvidia-smi is in PATH."
)
# Parse the topology matrix
matrix_str = topo_output.split('\n\n')[0]
matrix_str = topo_output.split("\n\n")[0]
lines = matrix_str.splitlines()
header = lines[0].split()
......@@ -53,9 +53,9 @@ def get_gpu_topology():
assert header[idx].endswith(f"GPU{idx}"), header[idx]
matrix = []
for idx, line in enumerate(lines[1:1 + gpu_num]):
for idx, line in enumerate(lines[1 : 1 + gpu_num]):
assert line.startswith(f"GPU{idx}")
matrix.append(line.split()[1:1 + gpu_num])
matrix.append(line.split()[1 : 1 + gpu_num])
# Convert to a numeric distance matrix (lower is better)
distance_matrix = [[get_distance(e) for e in r] for r in matrix]
......@@ -70,7 +70,7 @@ def comb_group(n, k):
yield groups.copy()
else:
head, *rest = lst
for group in combinations(rest, k-1):
for group in combinations(rest, k - 1):
groups.append((head,) + group)
yield from helper([x for x in rest if x not in group])
groups.pop()
......@@ -79,7 +79,7 @@ def comb_group(n, k):
def allocate_gpu(model_required_gpus):
cuda_devices = os.environ["CUDA_VISIBLE_DEVICES"].split(',')
cuda_devices = os.environ["CUDA_VISIBLE_DEVICES"].split(",")
gpu_num = len(cuda_devices)
assert gpu_num % model_required_gpus == 0, "gpus must be n * tensor_parallel"
......@@ -92,7 +92,7 @@ def allocate_gpu(model_required_gpus):
indices = list(group)
cost_memory[group] = np.sum(m[indices][:, indices])
min_cost, min_groups = float('inf'), []
min_cost, min_groups = float("inf"), []
for groups in comb_group(gpu_num, model_required_gpus):
cost = sum(cost_memory[group] for group in groups)
if cost < min_cost:
......@@ -110,141 +110,25 @@ def split_data(data, num):
"""
groups = [[] for _ in range(num)]
for i, item in enumerate(data):
item["__index__"] = i
groups[i % num].append(item)
return groups
def vmap(worker, data, model_required_gpus):
def sort_data(lst):
lst.sort(key=lambda x: x["__index__"])
for item in lst:
item.pop("__index__")
return lst
def model_map(worker, data, model_required_gpus):
cuda_devices = allocate_gpu(model_required_gpus)
group_num = len(cuda_devices)
data_groups = split_data(data, group_num)
args = list(zip(cuda_devices, data_groups))
with ProcessPoolExecutor() as executor:
nested_results = list(executor.map(worker, *zip(*args))) # It's a magic
return list(chain(*nested_results))
def generate_worker(cuda_device, prompts, model_path, sampling_params):
os.environ["CUDA_VISIBLE_DEVICES"] = ','.join(cuda_device)
llm = LLM(model=model_path,
seed=42,
max_model_len=8 * 1024,
swap_space=16,
tensor_parallel_size=len(cuda_device))
tokenizer = llm.get_tokenizer()
stop_tokens = [tokenizer.eos_token_id]
print(f"SUCCESS: load llm {model_path} on cuda {cuda_device}")
vllm_sampling_params = SamplingParams(
n=sampling_params['n'],
temperature=sampling_params['temperature'],
top_p=0.95,
max_tokens=sampling_params['max_tokens'],
stop_token_ids=stop_tokens
)
print("Sampling params:", vllm_sampling_params)
def messages_to_text(messages):
text = tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
if SPLITTER in text:
text = text.split(SPLITTER)[0]
return text
text_prompts = [messages_to_text(item["messages"]) for item in prompts]
outputs = llm.generate(text_prompts, sampling_params=vllm_sampling_params, use_tqdm=True)
results = []
for item, output in zip(prompts, outputs):
for response in output.outputs:
generated_text = response.text
messages = item["messages"].copy()
if SPLITTER in messages[-1]["content"]:
message = messages.pop()
raw_content = message["content"].split(SPLITTER)[0]
message["content"] = raw_content + generated_text
else:
message = {"role": "assistant", "content": generated_text}
messages.append(message)
results.append({**item, "messages": messages})
return results
def score_worker(cuda_device, prompts, model_path, score_token):
def compute_score_onetoken(logprob):
positive_token = score_token[0]
positive_logprob = logprob.get(positive_token)
positive_prob = np.exp(positive_logprob.logprob) if positive_logprob else 0
return {"score": positive_prob}
def compute_score_twotoken(logprob):
positive_token, negative_token = score_token[0], score_token[1]
positive_logprob = logprob.get(positive_token)
positive_prob = np.exp(positive_logprob.logprob) if positive_logprob else 0
negative_logprob = logprob.get(negative_token)
negative_prob = np.exp(negative_logprob.logprob) if negative_logprob else 0
return {
"score": positive_prob / (positive_prob + negative_prob),
"uncertainty": 1 - (positive_prob + negative_prob)
}
if len(score_token) == 1:
compute_score = compute_score_onetoken
elif len(score_token) == 2:
compute_score = compute_score_twotoken
else:
raise NotImplementedError("param: score_token length shoud be 1 or 2")
os.environ["CUDA_VISIBLE_DEVICES"] = ','.join(cuda_device)
llm = LLM(model=model_path,
seed=42,
max_model_len=8 * 1024,
swap_space=16,
tensor_parallel_size=len(cuda_device))
tokenizer = llm.get_tokenizer()
print(f"SUCCESS: load llm {model_path} on cuda {cuda_device}")
vllm_sampling_params = SamplingParams(
n=1,
temperature=0,
max_tokens=5,
logprobs=20
)
text_prompts = [tokenizer.apply_chat_template(item["messages"], tokenize=False, add_generation_prompt=True) for item in prompts]
outputs = llm.generate(text_prompts, sampling_params=vllm_sampling_params, use_tqdm=False)
results = []
for item, output in zip(prompts, outputs):
for response in output.outputs:
# response.logprobs: list[dict[int, Logprob]] https://github.com/vllm-project/vllm/blob/main/vllm/sequence.py
scores = compute_score(response.logprobs[0])
text = response.text
results.append({**item, **scores, "critic_text": text})
return results
def vllm_chatcomplete(model_path, prompts, sampling_params, model_required_gpus=1):
worker = partial(generate_worker, model_path=model_path, sampling_params=sampling_params)
return vmap(worker, prompts, model_required_gpus)
nested_results = list(executor.map(worker, *zip(*args))) # It's a magic
def vllm_score(model_path, prompts, score_token, model_required_gpus=1):
worker = partial(score_worker, model_path=model_path, score_token=score_token)
return vmap(worker, prompts, model_required_gpus)
return sort_data(list(chain(*nested_results)))
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment