Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/Yelp/paasta into MLCOMPUT…
Browse files Browse the repository at this point in the history
…E-1203_fix_spark_driver_resources
  • Loading branch information
Sameer Sharma committed Jul 2, 2024
2 parents 7de6ee6 + a974b7f commit 8163fda
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 5 deletions.
53 changes: 52 additions & 1 deletion paasta_tools/cli/cmds/logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@
from typing import Type
from typing import Union

import a_sync
import isodate
import nats
import pytz
from dateutil import tz

Expand Down Expand Up @@ -1166,19 +1168,26 @@ def cluster_to_scribe_env(self, cluster: str) -> str:

@register_log_reader("vector-logs")
class VectorLogsReader(LogReader):
SUPPORTS_TAILING = True
SUPPORTS_TIME = True

def __init__(self, cluster_map: Mapping[str, Any]) -> None:
def __init__(
self, cluster_map: Mapping[str, Any], nats_endpoint_map: Mapping[str, Any]
) -> None:
super().__init__()

if S3LogsReader is None:
raise Exception("yelp_clog package must be available to use S3LogsReader")

self.cluster_map = cluster_map
self.nats_endpoint_map = nats_endpoint_map

def get_superregion_for_cluster(self, cluster: str) -> Optional[str]:
return self.cluster_map.get(cluster, None)

def get_nats_endpoint_for_cluster(self, cluster: str) -> Optional[str]:
return self.nats_endpoint_map.get(cluster, None)

def print_logs_by_time(
self,
service,
Expand Down Expand Up @@ -1230,6 +1239,48 @@ def print_logs_by_time(
for line in aggregated_logs:
print_log(line["raw_line"], levels, raw_mode, strip_headers)

def tail_logs(
self,
service: str,
levels: Sequence[str],
components: Iterable[str],
clusters: Sequence[str],
instances: List[str],
pods: Iterable[str] = None,
raw_mode: bool = False,
strip_headers: bool = False,
) -> None:
stream_name = get_log_name_for_service(service, prefix="app_output")
endpoint = self.get_nats_endpoint_for_cluster(clusters[0])
if not endpoint:
raise NotImplementedError(
"Tailing logs is not supported in this cluster yet, sorry"
)

async def tail_logs_from_nats() -> None:
nc = await nats.connect(f"nats://{endpoint}")
sub = await nc.subscribe(stream_name)

while True:
# Wait indefinitely for a new message (no timeout)
msg = await sub.next_msg(timeout=None)
decoded_data = msg.data.decode("utf-8")

if paasta_log_line_passes_filter(
decoded_data,
levels,
service,
components,
clusters,
instances,
pods,
):
await a_sync.run(
print_log, decoded_data, levels, raw_mode, strip_headers
)

a_sync.block(tail_logs_from_nats)


def scribe_env_to_locations(scribe_env) -> Mapping[str, Any]:
"""Converts a scribe environment to a dictionary of locations. The
Expand Down
6 changes: 5 additions & 1 deletion paasta_tools/cli/cmds/status.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,11 @@
int,
]

EKS_DEPLOYMENT_CONFIGS = [EksDeploymentConfig, FlinkEksDeploymentConfig]
EKS_DEPLOYMENT_CONFIGS = [
EksDeploymentConfig,
FlinkEksDeploymentConfig,
VitessDeploymentConfig,
]
FLINK_DEPLOYMENT_CONFIGS = [FlinkDeploymentConfig, FlinkEksDeploymentConfig]


Expand Down
1 change: 1 addition & 0 deletions requirements-minimal.txt
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ kubernetes >= 18.20.0, < 22.0.0
ldap3
manhole
mypy-extensions >= 0.3.0
nats-py
nulltype
objgraph
ply
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ monotonic==1.4
msgpack-python==0.5.6
multidict==4.7.6
mypy-extensions==0.4.1
nats-py==2.8.0
nulltype==2.3.1
oauthlib==3.1.0
objgraph==3.4.0
Expand Down
6 changes: 3 additions & 3 deletions tests/cli/test_cmds_logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -897,7 +897,7 @@ def test_vector_logs_read_logs_empty_clusters():
with mock.patch("paasta_tools.cli.cmds.logs.log", autospec=True), mock.patch(
"paasta_tools.cli.cmds.logs.S3LogsReader", autospec=None
), pytest.raises(IndexError) as e:
logs.VectorLogsReader(cluster_map={}).print_logs_by_time(
logs.VectorLogsReader(cluster_map={}, nats_endpoint_map={}).print_logs_by_time(
service,
start_time,
end_time,
Expand Down Expand Up @@ -942,7 +942,7 @@ def test_vector_logs_print_logs_by_time():
start_time = pytz.utc.localize(isodate.parse_datetime("2016-06-08T06:00"))
end_time = pytz.utc.localize(isodate.parse_datetime("2016-06-08T07:00"))

logs.VectorLogsReader(cluster_map={}).print_logs_by_time(
logs.VectorLogsReader(cluster_map={}, nats_endpoint_map={}).print_logs_by_time(
service,
start_time,
end_time,
Expand Down Expand Up @@ -1045,7 +1045,7 @@ def test_get_log_reader():
},
{
"driver": "vector-logs",
"options": {"cluster_map": {}},
"options": {"cluster_map": {}, "nats_endpoint_map": {}},
"components": ["stdout", "stderr"],
},
]
Expand Down

0 comments on commit 8163fda

Please sign in to comment.