-
Notifications
You must be signed in to change notification settings - Fork 518
/
Copy pathtest_packages.py
117 lines (93 loc) · 4.59 KB
/
test_packages.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License
# 2.0; you may not use this file except in compliance with the Elastic License
# 2.0.
"""Test that the packages are built correctly."""
import unittest
import uuid
from semver import Version
from marshmallow import ValidationError
from detection_rules import rule_loader
from detection_rules.schemas.registry_package import (RegistryPackageManifestV1,
RegistryPackageManifestV3)
from detection_rules.packaging import PACKAGE_FILE, Package
from tests.base import BaseRuleTest
package_configs = Package.load_configs()
class TestPackages(BaseRuleTest):
"""Test package building and saving."""
@staticmethod
def get_test_rule(version=1, count=1):
def get_rule_contents():
contents = {
"author": ["Elastic"],
"description": "test description",
"language": "kuery",
"license": "Elastic License v2",
"name": "test rule",
"query": "process.name:test.query",
"risk_score": 21,
"rule_id": str(uuid.uuid4()),
"severity": "low",
"type": "query"
}
return contents
rules = [rule_loader.TOMLRule('test.toml', get_rule_contents()) for i in range(count)]
version_info = {
rule.id: {
'rule_name': rule.name,
'sha256': rule.contents.sha256(),
'version': version
} for rule in rules
}
return rules, version_info
def test_package_loader_production_config(self):
"""Test that packages are loading correctly."""
@unittest.skipIf(rule_loader.RULES_CONFIG.bypass_version_lock, 'Version lock bypassed')
def test_package_loader_default_configs(self):
"""Test configs in detection_rules/etc/packages.yaml."""
Package.from_config(rule_collection=self.rc, config=package_configs)
@unittest.skipIf(rule_loader.RULES_CONFIG.bypass_version_lock, 'Version lock bypassed')
def test_package_summary(self):
"""Test the generation of the package summary."""
rules = self.rc
package = Package(rules, 'test-package')
package.generate_summary_and_changelog(package.changed_ids, package.new_ids, package.removed_ids)
@unittest.skipIf(rule_loader.RULES_CONFIG.bypass_version_lock, 'Version lock bypassed')
def test_rule_versioning(self):
"""Test that all rules are properly versioned and tracked"""
self.maxDiff = None
rules = self.rc
original_hashes = []
post_bump_hashes = []
# test that no rules have versions defined
for rule in rules:
self.assertGreaterEqual(rule.contents.autobumped_version, 1, '{} - {}: version is not being set in package')
original_hashes.append(rule.contents.sha256())
package = Package(rules, 'test-package')
# test that all rules have versions defined
# package.bump_versions(save_changes=False)
for rule in package.rules:
self.assertGreaterEqual(rule.contents.autobumped_version, 1, '{} - {}: version is not being set in package')
# test that rules validate with version
for rule in package.rules:
post_bump_hashes.append(rule.contents.sha256())
# test that no hashes changed as a result of the version bumps
self.assertListEqual(original_hashes, post_bump_hashes, 'Version bumping modified the hash of a rule')
class TestRegistryPackage(unittest.TestCase):
"""Test the OOB registry package."""
@classmethod
def setUpClass(cls) -> None:
assert 'registry_data' in package_configs, f'Missing registry_data in {PACKAGE_FILE}'
cls.registry_config = package_configs['registry_data']
stack_version = Version.parse(cls.registry_config['conditions']['kibana.version'].strip("^"),
optional_minor_and_patch=True)
if stack_version >= Version.parse("8.12.0"):
RegistryPackageManifestV3.from_dict(cls.registry_config)
else:
RegistryPackageManifestV1.from_dict(cls.registry_config)
def test_registry_package_config(self):
"""Test that the registry package is validating properly."""
registry_config = self.registry_config.copy()
registry_config['version'] += '7.1.1.'
with self.assertRaises(ValidationError):
RegistryPackageManifestV1.from_dict(registry_config)