Skip to content

Commit

Permalink
Merge pull request #981 from Yelp/revert-980-u/jfong/revert_to_2.1.1
Browse files Browse the repository at this point in the history
Revert "Reverts all changes back through 2.1.1 and retains urgent unprocessedkeys fix"
  • Loading branch information
EmanElsaban authored Jun 20, 2024
2 parents 7e48dc8 + d94356d commit 2547395
Show file tree
Hide file tree
Showing 13 changed files with 121 additions and 9 deletions.
2 changes: 1 addition & 1 deletion requirements-minimal.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pytimeparse
pytz
PyYAML>=5.1
requests
task_processing[mesos_executor,k8s]
task_processing[mesos_executor,k8s]>=1.1.0
Twisted>=19.7.0
urllib3>=1.24.2
Werkzeug>=0.15.3
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ setuptools==65.5.1
six==1.15.0
sshpubkeys==3.1.0
stack-data==0.6.2
task-processing==0.14.0
task-processing==1.1.0
traitlets==5.0.0
Twisted==22.10.0
typing-extensions==4.5.0
Expand Down
14 changes: 14 additions & 0 deletions tests/config/config_parse_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,13 @@ def make_master_jobs():
items=(schema.ConfigSecretVolumeItem(key="secret1", path="abcd", mode="777"),),
),
),
projected_sa_volumes=(
schema.ConfigProjectedSAVolume(
container_path="/var/secrets/whatever",
audience="foo.bar",
expiration_seconds=1800,
),
),
node_selectors={"yelp.com/pool": "default"},
node_affinities=(
ConfigNodeAffinity(
Expand Down Expand Up @@ -451,6 +458,13 @@ class ConfigTestCase(TestCase):
],
),
],
projected_sa_volumes=[
dict(
container_path="/var/secrets/whatever",
audience="foo.bar",
expiration_seconds=1800,
),
],
cap_add=["KILL"],
cap_drop=["CHOWN", "KILL"],
node_selectors={"yelp.com/pool": "default"},
Expand Down
1 change: 1 addition & 0 deletions tests/core/actionrun_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1894,6 +1894,7 @@ def test_recover(self, mock_cluster_repo, mock_filehandler, mock_k8s_action_run)
serializer=serializer,
volumes=mock_k8s_action_run.command_config.extra_volumes,
secret_volumes=mock_k8s_action_run.command_config.secret_volumes,
projected_sa_volumes=mock_k8s_action_run.command_config.projected_sa_volumes,
cap_add=mock_k8s_action_run.command_config.cap_add,
cap_drop=mock_k8s_action_run.command_config.cap_drop,
task_id=last_attempt.kubernetes_task_id,
Expand Down
8 changes: 8 additions & 0 deletions tests/kubernetes_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from task_processing.plugins.kubernetes.task_config import KubernetesTaskConfig

from tron.config.schema import ConfigFieldSelectorSource
from tron.config.schema import ConfigProjectedSAVolume
from tron.config.schema import ConfigSecretSource
from tron.config.schema import ConfigSecretVolume
from tron.config.schema import ConfigSecretVolumeItem
Expand Down Expand Up @@ -464,6 +465,7 @@ def test_create_task_disabled():
env={},
secret_env={},
secret_volumes=[],
projected_sa_volumes=[],
field_selector_env={},
volumes=[],
cap_add=[],
Expand Down Expand Up @@ -493,6 +495,7 @@ def test_create_task(mock_kubernetes_cluster):
env={},
secret_env={},
secret_volumes=[],
projected_sa_volumes=[],
field_selector_env={},
volumes=[],
cap_add=[],
Expand Down Expand Up @@ -523,6 +526,7 @@ def test_create_task_with_task_id(mock_kubernetes_cluster):
env={},
secret_env={},
secret_volumes=[],
projected_sa_volumes=[],
field_selector_env={},
volumes=[],
cap_add=[],
Expand Down Expand Up @@ -556,6 +560,7 @@ def test_create_task_with_invalid_task_id(mock_kubernetes_cluster):
env={},
secret_env={},
secret_volumes=[],
projected_sa_volumes=[],
field_selector_env={},
volumes=[],
cap_add=[],
Expand Down Expand Up @@ -590,6 +595,7 @@ def test_create_task_with_config(mock_kubernetes_cluster):
]
config_secrets = {"TEST_SECRET": ConfigSecretSource(secret_name="tron-secret-test-secret--A", key="secret_A")}
config_field_selector = {"POD_IP": ConfigFieldSelectorSource(field_path="status.podIP")}
config_sa_volumes = [ConfigProjectedSAVolume(audience="for.bar.com", container_path="/var/run/secrets/whatever")]

expected_args = {
"name": mock.ANY,
Expand All @@ -601,6 +607,7 @@ def test_create_task_with_config(mock_kubernetes_cluster):
"environment": {"TEST_ENV": "foo"},
"secret_environment": {k: v._asdict() for k, v in config_secrets.items()},
"secret_volumes": [v._asdict() for v in config_secret_volumes],
"projected_sa_volumes": [v._asdict() for v in config_sa_volumes],
"field_selector_environment": {k: v._asdict() for k, v in config_field_selector.items()},
"volumes": [v._asdict() for v in default_volumes + config_volumes],
"cap_add": ["KILL"],
Expand All @@ -625,6 +632,7 @@ def test_create_task_with_config(mock_kubernetes_cluster):
env=expected_args["environment"],
secret_env=config_secrets,
secret_volumes=config_secret_volumes,
projected_sa_volumes=config_sa_volumes,
field_selector_env=config_field_selector,
volumes=config_volumes,
cap_add=["KILL"],
Expand Down
45 changes: 45 additions & 0 deletions tests/utils/scribereader_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ def test_read_log_stream_for_action_run_yelp_clog():
), mock.patch("tron.config.static_config.build_configuration_watcher", autospec=True,), mock.patch(
"tron.config.static_config.load_yaml_file",
autospec=True,
), mock.patch(
"tron.utils.scribereader.get_ecosystem", autospec=True, return_value="fake"
), mock.patch(
"tron.utils.scribereader.get_superregion", autospec=True, return_value="fake"
), mock.patch(
"tron.utils.scribereader.S3LogsReader", autospec=True
) as mock_s3_reader:
Expand Down Expand Up @@ -80,6 +84,47 @@ def test_read_log_stream_for_action_run_yelp_clog():
assert output == ["line 1", "line 2"]


@pytest.mark.parametrize(
"local_datetime, expected_date",
[
(
datetime.datetime(2024, 2, 29, 23, 59, 59, tzinfo=datetime.timezone(datetime.timedelta(hours=+3))),
datetime.date(2024, 2, 29),
),
(
datetime.datetime(2024, 2, 29, 23, 59, 59, tzinfo=datetime.timezone(datetime.timedelta(hours=-3))),
datetime.date(2024, 3, 1),
),
],
)
def test_read_log_stream_for_action_run_yelp_clog_tz(local_datetime, expected_date):
with mock.patch(
"staticconf.read",
autospec=True,
side_effect=static_conf_patch({"logging.use_s3_reader": True, "logging.max_lines_to_display": 1000}),
), mock.patch("tron.config.static_config.build_configuration_watcher", autospec=True,), mock.patch(
"tron.config.static_config.load_yaml_file",
autospec=True,
), mock.patch(
"tron.utils.scribereader.get_ecosystem", autospec=True, return_value="fake"
), mock.patch(
"tron.utils.scribereader.get_superregion", autospec=True, return_value="fake"
), mock.patch(
"tron.utils.scribereader.S3LogsReader", autospec=True
) as mock_s3_log_reader:

read_log_stream_for_action_run(
"namespace.job.1234.action",
component="stdout",
min_date=local_datetime,
max_date=local_datetime,
paasta_cluster="fake",
)
mock_s3_log_reader.return_value.get_log_reader.assert_called_once_with(
log_name=mock.ANY, min_date=expected_date, max_date=expected_date
)


def test_read_log_stream_for_action_run_min_date_and_max_date_today():
# NOTE: these tests don't actually depend on the current time apart from
# today vs not-today and the args are forwarded to scribereader anyway
Expand Down
21 changes: 21 additions & 0 deletions tron/config/config_parse.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
from tron.config.schema import ConfigMesos
from tron.config.schema import ConfigNodeAffinity
from tron.config.schema import ConfigParameter
from tron.config.schema import ConfigProjectedSAVolume
from tron.config.schema import ConfigSecretSource
from tron.config.schema import ConfigSecretVolume
from tron.config.schema import ConfigSecretVolumeItem
Expand Down Expand Up @@ -345,6 +346,22 @@ def post_validation(self, valid_input, config_context):
valid_secret_volume = ValidateSecretVolume()


class ValidateProjectedSAVolume(Validator):
config_class = ConfigProjectedSAVolume
optional = True
defaults = {
"expiration_seconds": 1800,
}
validators = {
"container_path": valid_string,
"audience": valid_string,
"expiration_seconds": valid_int,
}


valid_projected_sa_volume = ValidateProjectedSAVolume()


class ValidateFieldSelectorSource(Validator):
config_class = ConfigFieldSelectorSource
validators = {
Expand Down Expand Up @@ -536,6 +553,7 @@ class ValidateAction(Validator):
"env": None,
"secret_env": None,
"secret_volumes": None,
"projected_sa_volumes": None,
"field_selector_env": None,
"extra_volumes": None,
"trigger_downstreams": None,
Expand Down Expand Up @@ -576,6 +594,7 @@ class ValidateAction(Validator):
"env": valid_dict,
"secret_env": build_dict_value_validator(valid_secret_source),
"secret_volumes": build_list_of_type_validator(valid_secret_volume, allow_empty=True),
"projected_sa_volumes": build_list_of_type_validator(valid_projected_sa_volume, allow_empty=True),
"field_selector_env": build_dict_value_validator(valid_field_selector_source),
"extra_volumes": build_list_of_type_validator(valid_volume, allow_empty=True),
"trigger_downstreams": valid_trigger_downstreams,
Expand Down Expand Up @@ -625,6 +644,7 @@ class ValidateCleanupAction(Validator):
"env": None,
"secret_env": None,
"secret_volumes": None,
"projected_sa_volumes": None,
"field_selector_env": None,
"extra_volumes": None,
"trigger_downstreams": None,
Expand Down Expand Up @@ -660,6 +680,7 @@ class ValidateCleanupAction(Validator):
"env": valid_dict,
"secret_env": build_dict_value_validator(valid_secret_source),
"secret_volumes": build_list_of_type_validator(valid_secret_volume, allow_empty=True),
"projected_sa_volumes": build_list_of_type_validator(valid_projected_sa_volume, allow_empty=True),
"field_selector_env": build_dict_value_validator(valid_field_selector_source),
"extra_volumes": build_list_of_type_validator(valid_volume, allow_empty=True),
"trigger_downstreams": valid_trigger_downstreams,
Expand Down
9 changes: 8 additions & 1 deletion tron/config/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ def config_object_factory(name, required=None, optional=None):
"env", # dict
"secret_env", # dict of str, ConfigSecretSource
"secret_volumes", # List of ConfigSecretVolume
"projected_sa_volumes", # List of ConfigProjectedSAVolume
"field_selector_env", # dict of str, ConfigFieldSelectorSource
"extra_volumes", # List of ConfigVolume
"expected_runtime", # datetime.Timedelta
Expand Down Expand Up @@ -209,6 +210,7 @@ def config_object_factory(name, required=None, optional=None):
"env", # dict
"secret_env", # dict of str, ConfigSecretSource
"secret_volumes", # List of ConfigSecretVolume
"projected_sa_volumes", # List of ConfigProjectedSAVolume
"field_selector_env", # dict of str, ConfigFieldSelectorSource
"extra_volumes", # List of ConfigVolume
"trigger_downstreams", # None, bool or dict
Expand Down Expand Up @@ -253,7 +255,6 @@ def config_object_factory(name, required=None, optional=None):
optional=["mode"],
)


_ConfigSecretVolume = config_object_factory(
name="ConfigSecretVolume",
required=["secret_volume_name", "secret_name", "container_path"],
Expand Down Expand Up @@ -284,6 +285,12 @@ def _asdict(self) -> dict:
optional=[],
)

ConfigProjectedSAVolume = config_object_factory(
name="ConfigProjectedSAVolume",
required=["container_path", "audience"],
optional=["expiration_seconds"],
)

ConfigFieldSelectorSource = config_object_factory(
name="ConfigFieldSelectorSource",
required=["field_path"],
Expand Down
3 changes: 3 additions & 0 deletions tron/core/action.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from tron.config.schema import CLEANUP_ACTION_NAME
from tron.config.schema import ConfigAction
from tron.config.schema import ConfigNodeAffinity
from tron.config.schema import ConfigProjectedSAVolume
from tron.config.schema import ConfigSecretVolume

log = logging.getLogger(__name__)
Expand All @@ -33,6 +34,7 @@ class ActionCommandConfig:
env: dict = field(default_factory=dict)
secret_env: dict = field(default_factory=dict)
secret_volumes: List[ConfigSecretVolume] = field(default_factory=list)
projected_sa_volumes: List[ConfigProjectedSAVolume] = field(default_factory=list)
field_selector_env: dict = field(default_factory=dict)
extra_volumes: set = field(default_factory=set)
node_selectors: dict = field(default_factory=dict)
Expand Down Expand Up @@ -90,6 +92,7 @@ def from_config(cls, config: ConfigAction) -> "Action":
env=config.env or {},
secret_env=config.secret_env or {},
secret_volumes=config.secret_volumes or [],
projected_sa_volumes=config.projected_sa_volumes or [],
field_selector_env=config.field_selector_env or {},
cap_add=config.cap_add or [],
cap_drop=config.cap_drop or [],
Expand Down
2 changes: 2 additions & 0 deletions tron/core/actionrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -1164,6 +1164,7 @@ def submit_command(self, attempt: ActionRunAttempt) -> Optional[KubernetesTask]:
env=build_environment(original_env=attempt.command_config.env, run_id=self.id),
secret_env=attempt.command_config.secret_env,
secret_volumes=attempt.command_config.secret_volumes,
projected_sa_volumes=attempt.command_config.projected_sa_volumes,
field_selector_env=attempt.command_config.field_selector_env,
serializer=filehandler.OutputStreamSerializer(self.output_path),
volumes=attempt.command_config.extra_volumes,
Expand Down Expand Up @@ -1239,6 +1240,7 @@ def recover(self) -> Optional[KubernetesTask]:
field_selector_env=last_attempt.command_config.field_selector_env,
serializer=filehandler.OutputStreamSerializer(self.output_path),
secret_volumes=last_attempt.command_config.secret_volumes,
projected_sa_volumes=last_attempt.command_config.projected_sa_volumes,
volumes=last_attempt.command_config.extra_volumes,
cap_add=last_attempt.command_config.cap_add,
cap_drop=last_attempt.command_config.cap_drop,
Expand Down
3 changes: 3 additions & 0 deletions tron/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from tron.config.schema import ConfigFieldSelectorSource
from tron.config.schema import ConfigKubernetes
from tron.config.schema import ConfigNodeAffinity
from tron.config.schema import ConfigProjectedSAVolume
from tron.config.schema import ConfigSecretSource
from tron.config.schema import ConfigSecretVolume
from tron.config.schema import ConfigVolume
Expand Down Expand Up @@ -477,6 +478,7 @@ def create_task(
env: Dict[str, str],
secret_env: Dict[str, ConfigSecretSource],
secret_volumes: Collection[ConfigSecretVolume],
projected_sa_volumes: Collection[ConfigProjectedSAVolume],
field_selector_env: Dict[str, ConfigFieldSelectorSource],
volumes: Collection[ConfigVolume],
cap_add: Collection[str],
Expand Down Expand Up @@ -512,6 +514,7 @@ def create_task(
environment=env,
secret_environment={k: v._asdict() for k, v in secret_env.items()},
secret_volumes=[volume._asdict() for volume in secret_volumes],
projected_sa_volumes=[volume._asdict() for volume in projected_sa_volumes],
field_selector_environment={k: v._asdict() for k, v in field_selector_env.items()},
cap_add=cap_add,
cap_drop=cap_drop,
Expand Down
18 changes: 13 additions & 5 deletions tron/utils/scribereader.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,19 +171,27 @@ def read_log_stream_for_action_run(

paasta_logs = PaaSTALogs(component, paasta_cluster, action_run_id)
stream_name = paasta_logs.stream_name

today = datetime.date.today()
start_date = min_date.date()
end_date: Optional[datetime.date]

# yelp_clog S3LogsReader is a newer reader that is supposed to replace scribe readers eventually.
if use_s3_reader:
end_date = max_date.date() if max_date else today
# S3 reader uses UTC as a standard timezone
# if min_date and max_date timezone is missing, astimezone() will assume local timezone and convert it to UTC
start_date = min_date.astimezone(datetime.timezone.utc).date()
end_date = (
max_date.astimezone(datetime.timezone.utc).date()
if max_date
else datetime.datetime.now().astimezone(datetime.timezone.utc).date()
)

log.debug("Using S3LogsReader to retrieve logs")
s3_reader = S3LogsReader(ecosystem).get_log_reader(log_name=stream_name, min_date=start_date, max_date=end_date)
s3_reader = S3LogsReader(ecosystem, superregion).get_log_reader(
log_name=stream_name, min_date=start_date, max_date=end_date
)
paasta_logs.fetch(s3_reader, max_lines)
else:
today = datetime.date.today()
start_date = min_date.date()
end_date = max_date.date() if max_date else None
use_tailer = today in {start_date, end_date}
use_reader = start_date != today and end_date is not None
Expand Down
2 changes: 1 addition & 1 deletion yelp_package/extra_requirements_yelp.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,6 @@ srv-configs==1.3.4 # required by monk
tenacity==8.2.3 # required by yelp-logging
thriftpy2==0.4.20 # required by monk
yelp-cgeom==1.3.1 # required by geogrid
yelp-clog==7.1.1 # scribereader dependency
yelp-clog==7.1.2 # scribereader dependency
yelp-logging==4.17.0 # scribereader dependency
yelp-meteorite==2.1.1 # used by task-processing to emit metrics, clusterman-metrics dependency

0 comments on commit 2547395

Please sign in to comment.