Commit 030769db by nanziyuan

add multi-gpu support

parent 97aca067
from vllm import LLM, SamplingParams
import os
import multiprocessing
from itertools import chain, combinations
from functools import partial
import subprocess
import unicodedata
import numpy as np
from codecritic.data.utils import SPLITTER
def get_distance(connection_type):
if connection_type.startswith("NV"):
return 1
elif connection_type == "X":
return 0
elif connection_type == "PIX":
return 2
elif connection_type == "PBX":
return 3
elif connection_type == "PHB":
return 4
elif connection_type == "NODE":
return 5
elif connection_type == "SYS":
return 6
else:
raise RuntimeError("Unknown connection type")
def get_gpu_topology():
"""
Get the GPU topology using `nvidia-smi topo -m` and return a distance matrix.
"""
try:
result = subprocess.run(['nvidia-smi', 'topo', '-m'], stdout=subprocess.PIPE, text=True)
topo_output = result.stdout
except FileNotFoundError:
raise RuntimeError("nvidia-smi not found. Make sure NVIDIA drivers are installed and nvidia-smi is in PATH.")
# Parse the topology matrix
matrix_str = topo_output.split('\n\n')[0]
lines = matrix_str.splitlines()
header = lines[0].split()
gpu_num = sum([x.startswith("GPU") for x in header])
for idx in range(gpu_num):
assert header[idx].endswith(f"GPU{idx}"), header[idx]
matrix = []
for idx, line in enumerate(lines[1:1 + gpu_num]):
assert line.startswith(f"GPU{idx}")
matrix.append(line.split()[1:1 + gpu_num])
# Convert to a numeric distance matrix (lower is better)
distance_matrix = [[get_distance(e) for e in r] for r in matrix]
return np.array(distance_matrix)
def comb_group(n, k):
comb_stack, lst_stack = [], []
def comb_pivot(lst):
"""
lst should be sorted
"""
pivot = lst[0]
for other in combinations(lst[1:], k-1):
yield (pivot,) + other
def fill_stack():
lst = [x for x in range(n) if x not in list(chain(*lst_stack))]
stack_len = len(comb_stack)
for _ in range((n // k) - stack_len):
new_comb = comb_pivot(lst)
new_group = next(new_comb)
comb_stack.append(new_comb)
lst_stack.append(new_group)
lst = [x for x in lst if x not in new_group]
fill_stack()
yield lst_stack.copy()
while len(comb_stack) > 0:
new_group = next(comb_stack[-1], None)
if new_group:
lst_stack[-1] = new_group
fill_stack()
yield lst_stack.copy()
else:
comb_stack.pop()
lst_stack.pop()
def get_optimal_groups(matrix, index, k):
m = matrix[index][:, index]
cost_memory = dict()
for group in combinations(range(len(m)), k):
indices = list(group)
cost_memory[group] = np.sum(m[indices][:, indices])
min_cost = float('inf')
min_groups = []
for groups in comb_group(len(m), k):
cost = sum(cost_memory[group] for group in groups)
if cost < min_cost:
min_cost = cost
min_groups = groups
return [[str(index[x]) for x in group] for group in min_groups]
def allocate_gpu(model_required_gpus):
cuda_devices = os.environ["CUDA_VISIBLE_DEVICES"].split(',')
print(cuda_devices)
assert len(cuda_devices) % model_required_gpus == 0, "gpus must be n * tensor_parallel"
if model_required_gpus > 1:
matrix = get_gpu_topology()
index = [int(x) for x in cuda_devices]
cuda_devices = get_optimal_groups(matrix, index, model_required_gpus)
else:
cuda_devices = [[x] for x in cuda_devices]
return cuda_devices
def data_split(data, num):
groups = [[] for _ in range(num)]
for i, item in enumerate(data):
groups[i % num].append(item)
return groups
def generate_worker(cuda_device, prompts, model_path, sampling_params):
os.environ["CUDA_VISIBLE_DEVICES"] = ','.join(cuda_device)
llm = LLM(model=model_path,
seed=42,
max_model_len=8 * 1024,
swap_space=16,
tensor_parallel_size=len(cuda_device))
tokenizer = llm.get_tokenizer()
stop_tokens = [tokenizer.eos_token_id, tokenizer.convert_tokens_to_ids("<|eot_id|>")]
print(f"SUCCESS: load llm {model_path} on cuda {cuda_device}")
vllm_sampling_params = SamplingParams(
n=sampling_params['n'],
temperature=sampling_params['temperature'],
top_p=0.95,
max_tokens=sampling_params['max_tokens'],
stop_token_ids=stop_tokens
)
def messages_to_text(messages):
text = tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
if SPLITTER in text:
text = text.split(SPLITTER)[0]
return text
text_prompts = [messages_to_text(item["messages"]) for item in prompts]
outputs = llm.generate(text_prompts, sampling_params=vllm_sampling_params, use_tqdm=True)
results = []
for item, output in zip(prompts, outputs):
for response in output.outputs:
generated_text = response.text
messages = item["messages"].copy()
if SPLITTER in messages[-1]["content"]:
message = messages.pop()
raw_content = message["content"].split(SPLITTER)[0]
message["content"] = raw_content + generated_text
else:
message = {"role": "assistant", "content": generated_text}
messages.append(message)
item["messages"].append(message)
results.append({**item, "messages": messages})
return results
def score_worker(cuda_device, prompts, model_path, score_token):
os.environ["CUDA_VISIBLE_DEVICES"] = ','.join(cuda_device)
llm = LLM(model=model_path,
seed=42,
max_model_len=8 * 1024,
swap_space=16,
tensor_parallel_size=len(cuda_device))
tokenizer = llm.get_tokenizer()
print(f"SUCCESS: load llm {model_path} on cuda {cuda_device}")
vllm_sampling_params = SamplingParams(
n=1,
temperature=0,
max_tokens=5,
logprobs=20
)
text_prompts = [tokenizer.apply_chat_template(item["messages"], tokenize=False, add_generation_prompt=True) for item in prompts]
outputs = llm.generate(text_prompts, sampling_params=vllm_sampling_params, use_tqdm=False)
results = []
for item, output in zip(prompts, outputs):
for response in output.outputs:
# response.logprobs: list[dict[int, Logprob]] https://github.com/vllm-project/vllm/blob/main/vllm/sequence.py
sample_logprobs = response.logprobs
logprob = sample_logprobs[0].get(score_token)
score = np.exp(logprob.logprob) if logprob else 0
text = response.text
results.append({**item, "score": score, "critic_text": text})
return results
def vllm_chatcomplete(model_path, prompts, sampling_params, model_required_gpus=1):
cuda_devices = allocate_gpu(model_required_gpus)
group_num = len(cuda_devices)
data_groups = data_split(prompts, group_num)
args = list(zip(cuda_devices, data_groups))
worker_llm = partial(generate_worker, model_path=model_path, sampling_params=sampling_params)
with multiprocessing.Pool(group_num) as pool:
nested_results = pool.starmap(worker_llm, args)
return list(chain(*nested_results))
def vllm_score(model_path, prompts, score_token, model_required_gpus=1):
cuda_devices = allocate_gpu(model_required_gpus)
group_num = len(cuda_devices)
data_groups = data_split(prompts, group_num)
args = list(zip(cuda_devices, data_groups))
worker_llm = partial(score_worker, model_path=model_path, score_token=score_token)
with multiprocessing.Pool(group_num) as pool:
nested_results = pool.starmap(worker_llm, args)
return list(chain(*nested_results))
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment