from unittest.mock import patch
from lmdeploy import pipeline, GenerationConfig, TurbomindEngineConfig
import torch.distributed as dist
from filelock import FileLock
def lock_print(*args, **kwargs):
with FileLock('print.lock'):
print(*args, **kwargs, flush=True)
rank = int(os.environ['LOCAL_RANK'])
world_size = int(os.environ['LOCAL_WORLD_SIZE'])
# set CUDA_VISIBLE_DEVICES
# CUDA_VISIBLE_DEVICES shoud be set before torch.cuda.init() and dist.init_process_group
assert(torch.cuda.is_initialized() == False)
assert(dist.is_initialized() == False)
if 'CUDA_VISIBLE_DEVICES' in os.environ:
ids = list(map(int, os.environ.get("CUDA_VISIBLE_DEVICES", "").split(",")))
ids = list(range(world_size))
os.environ['CUDA_VISIBLE_DEVICES'] = f'{ids[rank]},{ids[rank+1]}'
dist.init_process_group(backend="nccl")
lock_print(f'rank={dist.get_rank()}, world_size={dist.get_world_size()}')
# patch mmengine.logger which uses LOCAL_RANK and CUDA_VISIBLE_DEVICES envs
lmdeploy_logger_patch = patch.dict(os.environ, {'LOCAL_RANK': '0'})
with lmdeploy_logger_patch:
lock_print('CUDA_VISIBLE_DEVICES', os.environ['CUDA_VISIBLE_DEVICES'], torch.cuda.device_count())
pipe = pipeline('Qwen/Qwen2.5-7B-Instruct', backend_config=TurbomindEngineConfig(tp=2))
out = pipe('tell me a joke', gen_config=GenerationConfig(do_sample=True))
lock_print(f'rank={rank}, i={i}, res={out.text}')
# release pipeline memory
# check if dist works well
# on some ranks, we set CUDA_VISIBLE_DEVICES env variable, therefore we shoud use the 'local device id'
for idx in range(world_size):
local_device_id = rank if rank % 2 != 0 else rank % 2
tensor = torch.randint(0, 100, (5,), device=local_device_id)
dist.broadcast(tensor, src=idx)
lock_print(f'src={idx}')
lock_print(f'rank={rank}, before={before}, after={tensor}')
dist.destroy_process_group()
if __name__ == '__main__':
# CUDA_VISIBLE_DEVICES=0,1,4,5,6,7 torchrun --nproc_per_node 6 /home/chenxin/ws3/topk/offline.py