Skip to content

Commit

Permalink
Merge branch 'master' into issue/rllib-linear-missing-rnn
Browse files Browse the repository at this point in the history
  • Loading branch information
brieyla1 committed Sep 19, 2024
2 parents f0d4918 + 1e48a03 commit 0ad19d0
Show file tree
Hide file tree
Showing 28 changed files with 611 additions and 471 deletions.
3 changes: 3 additions & 0 deletions .buildkite/linux_aarch64.rayci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ steps:
- "3.9"
- "3.10"
- "3.11"
- "3.12"
cuda:
- "11.7.1-cudnn8"
- "11.8.0-cudnn8"
Expand All @@ -47,6 +48,7 @@ steps:
- "3.9"
- "3.10"
- "3.11"
- "3.12"
instance_type: builder-arm64
env:
PYTHON_VERSION: "{{matrix}}"
Expand Down Expand Up @@ -91,6 +93,7 @@ steps:
- "3.9"
- "3.10"
- "3.11"
- "3.12"

- label: ":ray: core: wheel-aarch64 tests"
tags: linux_wheels
Expand Down
14 changes: 12 additions & 2 deletions doc/source/cluster/kubernetes/user-guides/kuberay-gcs-ft.md
Original file line number Diff line number Diff line change
Expand Up @@ -136,12 +136,22 @@ export REDIS_POD=$(kubectl get pods --selector=app=redis -o custom-columns=POD:m
kubectl exec -it $REDIS_POD -- redis-cli -a "5241590000000000"

# Step 6.4: Check the keys in Redis.
# Note: the schema changed in Ray 2.38.0. Previously we use a single HASH table,
# now we use multiple HASH tables with a common prefix.

KEYS *
# [Example output]:
# 1) "864b004c-6305-42e3-ac46-adfa8eb6f752"
# 1) "RAY864b004c-6305-42e3-ac46-adfa8eb6f752@INTERNAL_CONFIG"
# 2) "RAY864b004c-6305-42e3-ac46-adfa8eb6f752@KV"
# 3) "RAY864b004c-6305-42e3-ac46-adfa8eb6f752@NODE"
# [Example output Before Ray 2.38.0]:
# 2) "864b004c-6305-42e3-ac46-adfa8eb6f752"
#

# Step 6.5: Check the value of the key.
HGETALL 864b004c-6305-42e3-ac46-adfa8eb6f752
HGETALL RAY864b004c-6305-42e3-ac46-adfa8eb6f752@NODE
# Before Ray 2.38.0:
# HGETALL 864b004c-6305-42e3-ac46-adfa8eb6f752
```

In [ray-cluster.external-redis.yaml](https://github.com/ray-project/kuberay/blob/v1.0.0/ray-operator/config/samples/ray-cluster.external-redis.yaml), the `ray.io/external-storage-namespace` annotation isn't set for the RayCluster.
Expand Down
10 changes: 6 additions & 4 deletions python/ray/_private/gcs_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ def cleanup_redis_storage(
storage_namespace: The namespace of the storage to be deleted.
"""

from ray._raylet import del_key_from_storage # type: ignore
from ray._raylet import del_key_prefix_from_storage # type: ignore

if not isinstance(host, str):
raise ValueError("Host must be a string")
Expand All @@ -142,6 +142,8 @@ def cleanup_redis_storage(
if not isinstance(storage_namespace, str):
raise ValueError("storage namespace must be a string")

# Right now, GCS store all data into a hash set key by storage_namespace.
# So we only need to delete the specific key to cleanup the cluster.
return del_key_from_storage(host, port, password, use_ssl, storage_namespace)
# Right now, GCS stores all data into multiple hashes with keys prefixed by
# storage_namespace. So we only need to delete the specific key prefix to cleanup
# the cluster.
# Note this deletes all keys with prefix `RAY{key_prefix}@`, not `{key_prefix}`.
return del_key_prefix_from_storage(host, port, password, use_ssl, storage_namespace)
11 changes: 8 additions & 3 deletions python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,10 @@ from ray.includes.libcoreworker cimport (

from ray.includes.ray_config cimport RayConfig
from ray.includes.global_state_accessor cimport CGlobalStateAccessor
from ray.includes.global_state_accessor cimport RedisDelKeySync, RedisGetKeySync
from ray.includes.global_state_accessor cimport (
RedisDelKeyPrefixSync,
RedisGetKeySync
)
from ray.includes.optional cimport (
optional, nullopt
)
Expand Down Expand Up @@ -5176,8 +5179,10 @@ cdef void async_callback(shared_ptr[CRayObject] obj,
cpython.Py_DECREF(user_callback)


def del_key_from_storage(host, port, password, use_ssl, key):
return RedisDelKeySync(host, port, password, use_ssl, key)
# Note this deletes keys with prefix `RAY{key_prefix}@`
# Example: with key_prefix = `default`, we remove all `RAYdefault@...` keys.
def del_key_prefix_from_storage(host, port, password, use_ssl, key_prefix):
return RedisDelKeyPrefixSync(host, port, password, use_ssl, key_prefix)


def get_session_key_from_storage(host, port, password, use_ssl, config, key):
Expand Down
9 changes: 4 additions & 5 deletions python/ray/dashboard/modules/job/job_log_storage_client.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import os
from collections import deque
from typing import Iterator, List, Tuple
from typing import AsyncIterator, List, Tuple

import ray
from ray.dashboard.modules.job.common import JOB_LOGS_PATH_TEMPLATE
Expand All @@ -25,10 +25,10 @@ def get_logs(self, job_id: str) -> str:
except FileNotFoundError:
return ""

def tail_logs(self, job_id: str) -> Iterator[List[str]]:
def tail_logs(self, job_id: str) -> AsyncIterator[List[str]]:
return file_tail_iterator(self.get_log_file_path(job_id))

def get_last_n_log_lines(
async def get_last_n_log_lines(
self, job_id: str, num_log_lines=NUM_LOG_LINES_ON_ERROR
) -> str:
"""
Expand All @@ -39,9 +39,8 @@ def get_last_n_log_lines(
job_id: The id of the job whose logs we want to return
num_log_lines: The number of lines to return.
"""
log_tail_iter = self.tail_logs(job_id)
log_tail_deque = deque(maxlen=num_log_lines)
for lines in log_tail_iter:
async for lines in self.tail_logs(job_id):
if lines is None:
break
else:
Expand Down
6 changes: 3 additions & 3 deletions python/ray/dashboard/modules/job/job_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import string
import time
import traceback
from typing import Any, Dict, Iterator, Optional, Union
from typing import Any, AsyncIterator, Dict, Optional, Union

import ray
import ray._private.ray_constants as ray_constants
Expand Down Expand Up @@ -619,12 +619,12 @@ def get_job_logs(self, job_id: str) -> str:
"""Get all logs produced by a job."""
return self._log_client.get_logs(job_id)

async def tail_job_logs(self, job_id: str) -> Iterator[str]:
async def tail_job_logs(self, job_id: str) -> AsyncIterator[str]:
"""Return an iterator following the logs of a job."""
if await self.get_job_status(job_id) is None:
raise RuntimeError(f"Job '{job_id}' does not exist.")

for lines in self._log_client.tail_logs(job_id):
async for lines in self._log_client.tail_logs(job_id):
if lines is None:
# Return if the job has exited and there are no new log lines.
status = await self.get_job_status(job_id)
Expand Down
2 changes: 1 addition & 1 deletion python/ray/dashboard/modules/job/job_supervisor.py
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ async def run(
driver_exit_code=return_code,
)
else:
log_tail = self._log_client.get_last_n_log_lines(self._job_id)
log_tail = await self._log_client.get_last_n_log_lines(self._job_id)
if log_tail is not None and log_tail != "":
message = (
"Job entrypoint command "
Expand Down
4 changes: 2 additions & 2 deletions python/ray/dashboard/modules/job/sdk.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import dataclasses
import logging
from typing import Any, Dict, Iterator, List, Optional, Union
from typing import Any, AsyncIterator, Dict, List, Optional, Union

import packaging.version

Expand Down Expand Up @@ -449,7 +449,7 @@ def get_job_logs(self, job_id: str) -> str:
self._raise_error(r)

@PublicAPI(stability="stable")
async def tail_job_logs(self, job_id: str) -> Iterator[str]:
async def tail_job_logs(self, job_id: str) -> AsyncIterator[str]:
"""Get an iterator that follows the logs of a job.
Example:
Expand Down
87 changes: 53 additions & 34 deletions python/ray/dashboard/modules/job/tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,17 @@
)


# Polyfill anext() function for Python 3.9 compatibility
# May raise StopAsyncIteration.
async def anext_polyfill(iterator):
return await iterator.__anext__()


# Use the built-in anext() for Python 3.10+, otherwise use our polyfilled function
if sys.version_info < (3, 10):
anext = anext_polyfill


@pytest.fixture
def tmp():
with NamedTemporaryFile() as f:
Expand Down Expand Up @@ -80,32 +91,36 @@ async def test_forward_compatibility(self):


class TestIterLine:
def test_invalid_type(self):
@pytest.mark.asyncio
async def test_invalid_type(self):
with pytest.raises(TypeError, match="path must be a string"):
next(file_tail_iterator(1))
await anext(file_tail_iterator(1))

def test_file_not_created(self, tmp):
@pytest.mark.asyncio
async def test_file_not_created(self, tmp):
it = file_tail_iterator(tmp)
assert next(it) is None
assert await anext(it) is None
f = open(tmp, "w")
f.write("hi\n")
f.flush()
assert next(it) is not None
assert await anext(it) is not None

def test_wait_for_newline(self, tmp):
@pytest.mark.asyncio
async def test_wait_for_newline(self, tmp):
it = file_tail_iterator(tmp)
assert next(it) is None
assert await anext(it) is None

f = open(tmp, "w")
f.write("no_newline_yet")
assert next(it) is None
assert await anext(it) is None
f.write("\n")
f.flush()
assert next(it) == ["no_newline_yet\n"]
assert await anext(it) == ["no_newline_yet\n"]

def test_multiple_lines(self, tmp):
@pytest.mark.asyncio
async def test_multiple_lines(self, tmp):
it = file_tail_iterator(tmp)
assert next(it) is None
assert await anext(it) is None

f = open(tmp, "w")

Expand All @@ -114,13 +129,14 @@ def test_multiple_lines(self, tmp):
s = f"{i}\n"
f.write(s)
f.flush()
assert next(it) == [s]
assert await anext(it) == [s]

assert next(it) is None
assert await anext(it) is None

def test_batching(self, tmp):
@pytest.mark.asyncio
async def test_batching(self, tmp):
it = file_tail_iterator(tmp)
assert next(it) is None
assert await anext(it) is None

f = open(tmp, "w")

Expand All @@ -131,13 +147,14 @@ def test_batching(self, tmp):
f.write(f"{i}\n")
f.flush()

assert next(it) == [f"{i}\n" for i in range(10)]
assert await anext(it) == [f"{i}\n" for i in range(10)]

assert next(it) is None
assert await anext(it) is None

def test_max_line_batching(self, tmp):
@pytest.mark.asyncio
async def test_max_line_batching(self, tmp):
it = file_tail_iterator(tmp)
assert next(it) is None
assert await anext(it) is None

f = open(tmp, "w")

Expand All @@ -148,17 +165,18 @@ def test_max_line_batching(self, tmp):
f.write(f"{i}\n")
f.flush()

assert next(it) == [f"{i}\n" for i in range(10)]
assert next(it) == [f"{i}\n" for i in range(10, 20)]
assert next(it) == [f"{i}\n" for i in range(20, 30)]
assert next(it) == [f"{i}\n" for i in range(30, 40)]
assert next(it) == [f"{i}\n" for i in range(40, 50)]
assert await anext(it) == [f"{i}\n" for i in range(10)]
assert await anext(it) == [f"{i}\n" for i in range(10, 20)]
assert await anext(it) == [f"{i}\n" for i in range(20, 30)]
assert await anext(it) == [f"{i}\n" for i in range(30, 40)]
assert await anext(it) == [f"{i}\n" for i in range(40, 50)]

assert next(it) is None
assert await anext(it) is None

def test_max_char_batching(self, tmp):
@pytest.mark.asyncio
async def test_max_char_batching(self, tmp):
it = file_tail_iterator(tmp)
assert next(it) is None
assert await anext(it) is None

f = open(tmp, "w")

Expand All @@ -170,31 +188,32 @@ def test_max_char_batching(self, tmp):
f.flush()

# First line will come in a batch of its own
assert next(it) == [f"{'1234567890' * 6000}\n"]
assert await anext(it) == [f"{'1234567890' * 6000}\n"]
# Other 4 lines will be batched together
assert (
next(it)
await anext(it)
== [
f"{'1234567890' * 500}\n",
]
* 4
)
assert next(it) is None
assert await anext(it) is None

def test_delete_file(self):
@pytest.mark.asyncio
async def test_delete_file(self):
with NamedTemporaryFile() as tmp:
it = file_tail_iterator(tmp.name)
f = open(tmp.name, "w")

assert next(it) is None
assert await anext(it) is None

f.write("hi\n")
f.flush()

assert next(it) == ["hi\n"]
assert await anext(it) == ["hi\n"]

# Calls should continue returning None after file deleted.
assert next(it) is None
assert await anext(it) is None


if __name__ == "__main__":
Expand Down
7 changes: 3 additions & 4 deletions python/ray/dashboard/modules/job/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,9 @@
import logging
import os
import re
import time
import traceback
from dataclasses import dataclass
from typing import Any, Dict, Iterator, List, Optional, Tuple, Union
from typing import Any, AsyncIterator, Dict, List, Optional, Tuple, Union

from ray._private import ray_constants
from ray._private.gcs_utils import GcsAioClient
Expand Down Expand Up @@ -60,7 +59,7 @@ def redact_url_password(url: str) -> str:
return url


def file_tail_iterator(path: str) -> Iterator[Optional[List[str]]]:
async def file_tail_iterator(path: str) -> AsyncIterator[Optional[List[str]]]:
"""Yield lines from a file as it's written.
Returns lines in batches of up to 10 lines or 20000 characters,
Expand Down Expand Up @@ -114,7 +113,7 @@ def file_tail_iterator(path: str) -> Iterator[Optional[List[str]]]:
chunk_char_count += len(curr_line)
else:
# If EOF is reached sleep for 1s before continuing
time.sleep(1)
await asyncio.sleep(1)


async def parse_and_validate_request(
Expand Down
Loading

0 comments on commit 0ad19d0

Please sign in to comment.