Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[serve] Multi GPU Inferencing with vLLM issue - max_concurrent_workers is not supported yet #47746

Open
sureshmoligi opened this issue Sep 19, 2024 · 0 comments
Labels
bug Something that is supposed to be working; but isn't triage Needs triage (eg: priority, bug/not-bug, and owning component)

Comments

@sureshmoligi
Copy link

sureshmoligi commented Sep 19, 2024

What happened + What you expected to happen

I am trying to create vLLM inferencing using Ray Serve on GCP GKE 4*L4 GPU Instance. Here is my ray config file:

apiVersion: ray.io/v1
kind: RayService
metadata:
  name: text2sql
spec:
  serveConfigV2: |
    applications:
    - name: text2sql
      route_prefix: /
      import_path:  serve:model
      deployments:
      - name: VLLMDeployment
        max_ongoing_requests: 100
        autoscaling_config:
            target_ongoing_requests: 1
            min_replicas: 1
            max_replicas: 4
        ray_actor_options:
          num_cpus: 1.0
          num_gpus: 2.0
      runtime_env:
        setup_timeout_seconds: 1800
        pip: ["vllm==0.5.5"]
        env_vars:
          MODEL_LOCAL_PATH: "Final Model"
          TENSOR_PARALLELISM: "2"
  rayClusterConfig:
    rayVersion: '2.24.0'
    enableInTreeAutoscaling: true
    autoscalerOptions:
      upscalingMode: Default
      imagePullPolicy: IfNotPresent
      idleTimeoutSeconds: 300
    headGroupSpec:
      rayStartParams:
        dashboard-host: '0.0.0.0'
      template:
        metadata:
          labels:
            ai.gke.io: rayserve
        spec:
          containers:
          - name: ray-head
            image: public.ecr.aws/XXXXXXXX:latest
            resources:
              limits:
                cpu: "2"
                memory: "8Gi"
              requests:
                cpu: "2"
                memory: "8Gi"
            ports:
            - containerPort: 6379
              name: gcs-server
            - containerPort: 8265
              name: dashboard
            - containerPort: 10001
              name: client
            - containerPort: 8000
              name: serve          
    workerGroupSpecs:
    - replicas: 1
      groupName: gpu-group
      rayStartParams: {}
      template:
        metadata:
          labels:
            ai.gke.io: rayserve
        spec:
          containers:
          - name: text2opsql
            image: public.ecr.aws/XXXXXXXX:latest
            resources:
              limits:
                cpu: "8"
                memory: "16Gi"
                nvidia.com/gpu: "2"
              requests:
                cpu: "8"
                memory: "16Gi"
                nvidia.com/gpu: "2"
          nodeSelector:
            cloud.google.com/gke-accelerator: nvidia-l4

When i try to apply this yaml file, I am getting below error.

INFO 2024-09-19 08:23:22,415 controller 528 proxy_state.py:758 - Removing drained proxy on node 'd877a59f7f2701f2eb42c80be8074cd75ff9bc6c8a575ca0974daffe'.
INFO 2024-09-19 08:23:42,507 controller 528 deployment_state.py:1844 - Adding 1 replica to Deployment(name='VLLMDeployment', app='text2opsql').
INFO 2024-09-19 08:23:42,508 controller 528 deployment_state.py:401 - Starting Replica(id='352yge88', deployment='VLLMDeployment', app='text2opsql').
INFO 2024-09-19 08:23:53,748 controller 528 proxy_state.py:151 - Starting proxy on node 'd877a59f7f2701f2eb42c80be8074cd75ff9bc6c8a575ca0974daffe' listening on '0.0.0.0:8000'.
ERROR 2024-09-19 08:23:55,732 controller 528 deployment_state.py:681 - Exception in Replica(id='352yge88', deployment='VLLMDeployment', app='text2opsql'), the replica will be stopped.
Traceback (most recent call last):
  File "/home/ray/anaconda3/lib/python3.9/site-packages/ray/serve/_private/deployment_state.py", line 677, in check_ready
    _, self._version, self._initialization_latency_s = ray.get(
  File "/home/ray/anaconda3/lib/python3.9/site-packages/ray/_private/auto_init_hook.py", line 21, in auto_init_wrapper
    return fn(*args, **kwargs)
  File "/home/ray/anaconda3/lib/python3.9/site-packages/ray/_private/client_mode_hook.py", line 103, in wrapper
    return func(*args, **kwargs)
  File "/home/ray/anaconda3/lib/python3.9/site-packages/ray/_private/worker.py", line 2613, in get
    values, debugger_breakpoint = worker.get_objects(object_refs, timeout=timeout)
  File "/home/ray/anaconda3/lib/python3.9/site-packages/ray/_private/worker.py", line 861, in get_objects
    raise value.as_instanceof_cause()
ray.exceptions.RayTaskError(RuntimeError): ray::ServeReplica:text2opsql:VLLMDeployment.initialize_and_get_metadata() (pid=12819, ip=10.20.3.31, actor_id=b1d2e8c1218640a96c989fca01000000, repr=<ray.serve._private.replica.ServeReplica:text2opsql:VLLMDeployment object at 0x7c67cebcf760>)
  File "/home/ray/anaconda3/lib/python3.9/concurrent/futures/_base.py", line 439, in result
    return self.__get_result()
  File "/home/ray/anaconda3/lib/python3.9/concurrent/futures/_base.py", line 391, in __get_result
    raise self._exception
  File "/home/ray/anaconda3/lib/python3.9/site-packages/ray/serve/_private/replica.py", line 630, in initialize_and_get_metadata
    raise RuntimeError(traceback.format_exc()) from None
RuntimeError: Traceback (most recent call last):
  File "/home/ray/anaconda3/lib/python3.9/site-packages/ray/serve/_private/replica.py", line 608, in initialize_and_get_metadata
    await self._user_callable_wrapper.initialize_callable()
  File "/home/ray/anaconda3/lib/python3.9/site-packages/ray/serve/_private/replica.py", line 888, in initialize_callable
    await self._call_func_or_gen(
  File "/home/ray/anaconda3/lib/python3.9/site-packages/ray/serve/_private/replica.py", line 854, in _call_func_or_gen
    result = callable(*args, **kwargs)
  File "/src/serve.py", line 68, in __init__
    self.engine = AsyncLLMEngine.from_engine_args(args)
  File "/home/ray/anaconda3/lib/python3.9/site-packages/vllm/engine/async_llm_engine.py", line 740, in from_engine_args
    engine = cls(
  File "/home/ray/anaconda3/lib/python3.9/site-packages/vllm/engine/async_llm_engine.py", line 636, in __init__
    self.engine = self._init_engine(*args, **kwargs)
  File "/home/ray/anaconda3/lib/python3.9/site-packages/vllm/engine/async_llm_engine.py", line 840, in _init_engine
    return engine_class(*args, **kwargs)
  File "/home/ray/anaconda3/lib/python3.9/site-packages/vllm/engine/async_llm_engine.py", line 272, in __init__
    super().__init__(*args, **kwargs)
  File "/home/ray/anaconda3/lib/python3.9/site-packages/vllm/engine/llm_engine.py", line 270, in __init__
    self.model_executor = executor_class(
  File "/home/ray/anaconda3/lib/python3.9/site-packages/vllm/executor/multiproc_gpu_executor.py", line 215, in __init__
    super().__init__(*args, **kwargs)
  File "/home/ray/anaconda3/lib/python3.9/site-packages/vllm/executor/distributed_gpu_executor.py", line 25, in __init__
    super().__init__(*args, **kwargs)
  File "/home/ray/anaconda3/lib/python3.9/site-packages/vllm/executor/executor_base.py", line 46, in __init__
    self._init_executor()
  File "/home/ray/anaconda3/lib/python3.9/site-packages/vllm/executor/multiproc_gpu_executor.py", line 138, in _init_executor
    self._run_workers("load_model",
  File "/home/ray/anaconda3/lib/python3.9/site-packages/vllm/executor/multiproc_gpu_executor.py", line 175, in _run_workers
    raise NotImplementedError(
NotImplementedError: max_concurrent_workers is not supported yet.

Versions / Dependencies

vLLM version = 0.5.5
rayVersion = 2.24.0

Reproduction script

import json
import logging
import os
import time
from typing import AsyncGenerator

from fastapi import BackgroundTasks
from ray import serve
from starlette.requests import Request
from starlette.responses import StreamingResponse, Response, JSONResponse
from vllm.engine.arg_utils import AsyncEngineArgs
from vllm.engine.async_llm_engine import AsyncLLMEngine
from vllm.sampling_params import SamplingParams
from vllm.utils import random_uuid
# from vllm.utils import iterate_with_cancellation

# Environment and configuration setup
logger = logging.getLogger("ray.serve")




@serve.deployment(name="VLLMDeployment")
class VLLMDeployment:
    def __init__(self, **kwargs):

        logger.info("Started Loading the model")
        args = AsyncEngineArgs(
            model=str(os.getenv("MODEL_LOCAL_PATH", "Final Model")),
            # Model identifier from Hugging Face Hub or local path.
            dtype=str(os.getenv("MODEL_DTYPE", "auto")),
            # Automatically determine the data type (e.g., float16 or float32) for model weights and computations.
            gpu_memory_utilization=float(os.getenv("GPU_MEMORY_UTILIZATION", "0.9")),
            # Percentage of GPU memory to utilize, reserving some for overhead.
            max_model_len=int(os.getenv("MAX_MODEL_LEN", "4096")),
            # Maximum sequence length (in tokens) the model can handle, including both input and output tokens.
            max_num_seqs=int(os.getenv("MAX_NUM_SEQ", "512")),
            # Maximum number of sequences (requests) to process in parallel.
            max_num_batched_tokens=int(os.getenv("MAX_NUM_BATCHED_TOKENS", "32768")),
            # Maximum number of tokens processed in a single batch across all sequences (max_model_len * max_num_seqs).
            trust_remote_code=True,  # Allow execution of untrusted code from the model repository (use with caution).
            enable_chunked_prefill=False,  # Disable chunked prefill to avoid compatibility issues with prefix caching.
            max_parallel_loading_workers=int(os.getenv("PARALLEL_LOADING_WORKERS", 2)),
            # Number of parallel workers to load the model concurrently.
            pipeline_parallel_size=int(os.getenv("PIPELINE_PARALLELISM", 1)),
            # Number of pipeline parallelism stages; typically set to 1 unless using model parallelism.
            tensor_parallel_size=int(os.getenv("TENSOR_PARALLELISM", 1)),
            # Number of tensor parallelism stages; typically set to 1 unless using model parallelism.
            enable_prefix_caching=True,  # Enable prefix caching to improve performance for similar prompt prefixes.
            quantization=os.getenv("QUANTIZATION", None),  # Model Quantization
            enforce_eager=True,
            disable_log_requests=True,
        )

        self.engine = AsyncLLMEngine.from_engine_args(args)
        self.max_model_len = args.max_model_len
        logger.info("Loaded the VLLM Model")
        logger.info(f"VLLM Engine initialized with max_model_len: {self.max_model_len}")

    async def stream_results(self, results_generator) -> AsyncGenerator[bytes, None]:
        num_returned = 0
        async for request_output in results_generator:
            text_outputs = [output.text for output in request_output.outputs]
            assert len(text_outputs) == 1
            text_output = text_outputs[0][num_returned:]
            ret = {"text": text_output}
            yield (json.dumps(ret) + "\n").encode("utf-8")
            num_returned += len(text_output)

    async def may_abort_request(self, request_id) -> None:
        await self.engine.abort(request_id)

    async def __call__(self, request: Request) -> Response:
        try:
            request_dict = await request.json()
        except json.JSONDecodeError:
            return JSONResponse(status_code=400, content={"error": "Invalid JSON in request body"})

        context_length = request_dict.pop("context_length", 8192)  # Default to 8k

        # Ensure context length is either 8k or 32k
        if context_length not in [8192, 32768]:
            context_length = 8192  # Default to 8k if invalid
        text = request_dict.pop("text")
        stream = request_dict.pop("stream", False)

        default_sampling_params = {
            "temperature": 0,
            "max_tokens": 256,
            "stop": ['}'],
            'include_stop_str_in_output': True
        }

        # Get model config and tokenizer
        model_config = await self.engine.get_model_config()
        tokenizer = await self.engine.get_tokenizer()

        # input_token_ids = tokenizer.encode(prompt)
        # input_tokens = len(input_token_ids)
        # max_possible_new_tokens = min(context_length, model_config.max_model_len) - input_tokens
        # max_new_tokens = min(request_dict.get("max_tokens", 8192), max_possible_new_tokens)

        sampling_params = {**default_sampling_params, **request_dict}
        sampling_params = SamplingParams(**sampling_params)

        request_id = random_uuid()
        start_time = time.time()
        logger.info('Started processing request with id: {} and text: {}'.format(request_id, text))


        results_generator = self.engine.generate(prompt, sampling_params, request_id)
        # results_generator = iterate_with_cancellation(
        #     results_generator, is_cancelled=request.is_disconnected)

        if stream:
            background_tasks = BackgroundTasks()
            # Using background_tasks to abort the request
            # if the client disconnects.
            background_tasks.add_task(self.may_abort_request, request_id)
            return StreamingResponse(
                self.stream_results(results_generator), background=background_tasks
            )

        # Non-streaming case
        final_output = None
        async for request_output in results_generator:
            if await request.is_disconnected():
                # Abort the request if the client disconnects.
                await self.engine.abort(request_id)
                logger.warning(f"Client disconnected for request {request_id}")
                return Response(status_code=499)
            final_output = request_output

        assert final_output is not None
        ret = {"results": json.loads(final_output)}
        ret = {"results": json.loads(final_output)}
        logger.info('Completed processing request with id: {} in {} secs'.format(
            request_id,
            time.time() - start_time
        ))

        return Response(content=json.dumps(ret))


model = VLLMDeployment.bind()

Same setup works if Tensor_parallelism = 1 and num_gpus: 1.0. But Issue starts which i increase tp > 1 and num_gpus > 1.0.

Issue Severity

High: It blocks me from completing my task.

@sureshmoligi sureshmoligi added bug Something that is supposed to be working; but isn't triage Needs triage (eg: priority, bug/not-bug, and owning component) labels Sep 19, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something that is supposed to be working; but isn't triage Needs triage (eg: priority, bug/not-bug, and owning component)
Projects
None yet
Development

No branches or pull requests

1 participant