diff --git a/tests/utils/autotailor_integration_test.sh b/tests/utils/autotailor_integration_test.sh index 5e096f20d4..118efdf1d5 100755 --- a/tests/utils/autotailor_integration_test.sh +++ b/tests/utils/autotailor_integration_test.sh @@ -114,6 +114,17 @@ assert_exists 1 '/Benchmark/TestResult/rule-result[@idref="xccdf_com.example.www assert_exists 1 '/Benchmark/TestResult/rule-result[@idref="xccdf_com.example.www_rule_R3"]/result[text()="notselected"]' assert_exists 1 '/Benchmark/TestResult/rule-result[@idref="xccdf_com.example.www_rule_R4"]/result[text()="notselected"]' +# invalid selector for V1 should fail with a descriptive error +! python3 $autotailor --id-namespace "com.example.www" --var-select V1=invalid $ds $original_profile 2>$stdout +grep "Selector 'invalid' does not exist" $stdout + +# invalid selector for V2 should fail with available selectors listed +! python3 $autotailor --id-namespace "com.example.www" --var-select V2=invalid $ds $original_profile 2>$stdout +grep "Available selectors" $stdout + +# --no-validate bypasses selector validation +python3 $autotailor --id-namespace "com.example.www" --no-validate --var-select V1=invalid $ds $original_profile > $tailoring + # use JSON tailoring python3 $autotailor $ds --id-namespace "com.example.www" --json-tailoring $json_tailoring > $tailoring $OSCAP xccdf eval --profile JSON_P1 --progress --tailoring-file $tailoring --results $result $ds diff --git a/tests/utils/test_autotailor.py b/tests/utils/test_autotailor.py index 2b9a8ce15f..9cb3941eb0 100644 --- a/tests/utils/test_autotailor.py +++ b/tests/utils/test_autotailor.py @@ -1,4 +1,5 @@ import importlib +import pathlib import pytest NS = "http://checklists.nist.gov/xccdf/1.2" @@ -120,3 +121,146 @@ def test_get_datastream_uri(): uri = t._get_datastream_uri() assert uri.startswith("file://") assert "relative/path/to/ds.xml" in uri + + +def test_datastream_validator(): + """Test that DataStreamValidator properly validates IDs.""" + ds_path = pathlib.Path(__file__).parent.joinpath("data_stream.xml") + validator = autotailor.DataStreamValidator(str(ds_path)) + + # Test valid profile validation + validator.validate_profile("xccdf_com.example.www_profile_P1") + + # Test valid value validation + validator.validate_value("xccdf_com.example.www_value_V1") + validator.validate_value("xccdf_com.example.www_value_V2") + + # Test valid rule validation + validator.validate_rule("xccdf_com.example.www_rule_R1") + validator.validate_rule("xccdf_com.example.www_rule_R2") + validator.validate_rule("xccdf_com.example.www_rule_R3") + validator.validate_rule("xccdf_com.example.www_rule_R4") + + # Test valid group validation + validator.validate_group("xccdf_com.example.www_group_G34") + + # Test invalid profile + with pytest.raises(ValueError) as e: + validator.validate_profile("xccdf_com.example.www_profile_INVALID") + assert "Profile ID 'xccdf_com.example.www_profile_INVALID' does not exist" in str(e.value) + + # Test invalid value with suggestion + with pytest.raises(ValueError) as e: + validator.validate_value("xccdf_com.example.www_value_V3") + assert "Value ID 'xccdf_com.example.www_value_V3' does not exist" in str(e.value) + + # Test invalid rule with suggestion + with pytest.raises(ValueError) as e: + validator.validate_rule("xccdf_com.example.www_rule_R5") + assert "Rule ID 'xccdf_com.example.www_rule_R5' does not exist" in str(e.value) + + # Test invalid group + with pytest.raises(ValueError) as e: + validator.validate_group("xccdf_com.example.www_group_INVALID") + assert "Group ID 'xccdf_com.example.www_group_INVALID' does not exist" in str(e.value) + + +def test_profile_with_validator(): + """Test that Tailoring uses validator to check IDs.""" + ds_path = pathlib.Path(__file__).parent.joinpath("data_stream.xml") + validator = autotailor.DataStreamValidator(str(ds_path)) + + p = autotailor.Tailoring(validator=validator) + p.reverse_dns = "com.example.www" + + # Test valid variable change works + p.add_value_change("V1", "30") + + # Test invalid variable name fails + with pytest.raises(ValueError) as e: + p.add_value_change("INVALID_VAR", "test") + assert "Value ID 'xccdf_com.example.www_value_INVALID_VAR' does not exist" in str(e.value) + + # Test valid rule selection works + p.select_rule("R1") + + # Test invalid rule selection fails + with pytest.raises(ValueError) as e: + p.select_rule("INVALID_RULE") + assert "Rule ID 'xccdf_com.example.www_rule_INVALID_RULE' does not exist" in str(e.value) + + # Test valid base profile validation + p.validate_base_profile("P1") + + # Test invalid base profile fails + with pytest.raises(ValueError) as e: + p.validate_base_profile("INVALID_PROFILE") + assert "Profile ID 'xccdf_com.example.www_profile_INVALID_PROFILE' does not exist" in str(e.value) + + +def test_validator_suggestions(): + """Test that validator provides helpful suggestions for typos.""" + ds_path = pathlib.Path(__file__).parent.joinpath("data_stream.xml") + validator = autotailor.DataStreamValidator(str(ds_path)) + + # Test suggestion for value with typo (V11 instead of V1) + with pytest.raises(ValueError) as e: + validator.validate_value("xccdf_com.example.www_value_V11") + error_msg = str(e.value) + assert "Did you mean one of these?" in error_msg + assert "xccdf_com.example.www_value_V1" in error_msg + + # Test suggestion for rule with typo (R11 instead of R1) + with pytest.raises(ValueError) as e: + validator.validate_rule("xccdf_com.example.www_rule_R11") + error_msg = str(e.value) + assert "Did you mean one of these?" in error_msg + assert "xccdf_com.example.www_rule_R1" in error_msg + + +def test_validate_selector(): + """Test that validate_selector rejects selectors not present in the data stream.""" + ds_path = pathlib.Path(__file__).parent.joinpath("data_stream.xml") + validator = autotailor.DataStreamValidator(str(ds_path)) + + # V1 has selector "thirty"; V2 has "some" and "other" + validator.validate_selector("xccdf_com.example.www_value_V1", "thirty") + validator.validate_selector("xccdf_com.example.www_value_V2", "some") + validator.validate_selector("xccdf_com.example.www_value_V2", "other") + + # Invalid selector for V1 + with pytest.raises(ValueError) as e: + validator.validate_selector("xccdf_com.example.www_value_V1", "invalid") + error_msg = str(e.value) + assert "Selector 'invalid' does not exist for Value 'xccdf_com.example.www_value_V1'" in error_msg + assert "thirty" in error_msg + + # Invalid selector for V2 with a close-enough typo triggers a suggestion + with pytest.raises(ValueError) as e: + validator.validate_selector("xccdf_com.example.www_value_V2", "ther") + error_msg = str(e.value) + assert "Selector 'ther' does not exist for Value 'xccdf_com.example.www_value_V2'" in error_msg + assert "other" in error_msg + + +def test_profile_selector_validation(): + """Test that Tailoring validates selectors via -V/--var-select through refine_value.""" + ds_path = pathlib.Path(__file__).parent.joinpath("data_stream.xml") + validator = autotailor.DataStreamValidator(str(ds_path)) + + p = autotailor.Tailoring(validator=validator) + p.reverse_dns = "com.example.www" + + # Valid selector passes + p.change_selectors(["V1=thirty"]) + p.change_selectors(["V2=some"]) + + # Invalid selector raises + with pytest.raises(ValueError) as e: + p.change_selectors(["V1=invalid"]) + assert "Selector 'invalid' does not exist for Value 'xccdf_com.example.www_value_V1'" in str(e.value) + + # Invalid value ID still raises before reaching selector check + with pytest.raises(ValueError) as e: + p.change_selectors(["NONEXISTENT=thirty"]) + assert "Value ID 'xccdf_com.example.www_value_NONEXISTENT' does not exist" in str(e.value) diff --git a/utils/autotailor b/utils/autotailor index 43a2c1b3e9..6ebfacdc0a 100755 --- a/utils/autotailor +++ b/utils/autotailor @@ -26,16 +26,23 @@ import pathlib import xml.etree.ElementTree as ET import xml.dom.minidom import json +import difflib NS = "http://checklists.nist.gov/xccdf/1.2" NS_PREFIX = "xccdf-1.2" +DS_NS = "http://scap.nist.gov/schema/scap/source/1.2" DEFAULT_PROFILE_SUFFIX = "_customized" DEFAULT_REVERSE_DNS = "org.ssgproject.content" ROLES = ["full", "unscored", "unchecked"] SEVERITIES = ["unknown", "info", "low", "medium", "high"] ATTRIBUTES = ["role", "severity"] +DS_NAMESPACES = { + 'ds': DS_NS, + 'xccdf': NS +} + def quote(string): return "\"" + string + "\"" @@ -53,8 +60,121 @@ def is_valid_xccdf_id(string): string) is not None +class DataStreamValidator: + """Validates IDs against the SCAP data stream.""" + + def __init__(self, datastream_path): + self.datastream_path = datastream_path + self.profile_ids = set() + self.value_ids = set() + self.value_selectors = {} + self.rule_ids = set() + self.group_ids = set() + self._parse_datastream() + + def _extract_profiles(self, benchmark): + for profile in benchmark.findall('.//xccdf:Profile', DS_NAMESPACES): + profile_id = profile.get('id') + if profile_id: + self.profile_ids.add(profile_id) + + def _extract_values(self, benchmark): + for value in benchmark.findall('.//xccdf:Value', DS_NAMESPACES): + value_id = value.get('id') + if not value_id: + continue + self.value_ids.add(value_id) + selectors = set() + for val_elem in value.findall('xccdf:value', DS_NAMESPACES): + selector = val_elem.get('selector') + if selector: + selectors.add(selector) + self.value_selectors[value_id] = selectors + + def _extract_rules(self, benchmark): + for rule in benchmark.findall('.//xccdf:Rule', DS_NAMESPACES): + rule_id = rule.get('id') + if rule_id: + self.rule_ids.add(rule_id) + + def _extract_groups(self, benchmark): + for group in benchmark.findall('.//xccdf:Group', DS_NAMESPACES): + group_id = group.get('id') + if group_id: + self.group_ids.add(group_id) + + def _parse_datastream(self): + try: + tree = ET.parse(self.datastream_path) + root = tree.getroot() + benchmarks = root.findall('.//xccdf:Benchmark', DS_NAMESPACES) + for benchmark in benchmarks: + self._extract_profiles(benchmark) + self._extract_values(benchmark) + self._extract_rules(benchmark) + self._extract_groups(benchmark) + except ET.ParseError as e: + raise ValueError(f"Failed to parse data stream '{self.datastream_path}': {e}") + except FileNotFoundError: + raise ValueError(f"Data stream file not found: '{self.datastream_path}'") + + def _suggest_similar(self, invalid_id, valid_ids, n=3): + """Suggest similar valid IDs using fuzzy matching.""" + if not valid_ids: + return [] + matches = difflib.get_close_matches(invalid_id, valid_ids, n=n, cutoff=0.6) + return matches + + def _create_validation_error(self, id_type, invalid_id, valid_ids): + msg = f"{id_type} ID '{invalid_id}' does not exist in the data stream." + + suggestions = self._suggest_similar(invalid_id, valid_ids) + if suggestions: + msg += "\n\nDid you mean one of these?" + for suggestion in suggestions: + msg += f"\n - {suggestion}" + + if not valid_ids: + msg += f"\n\nNo {id_type.lower()}s found in the data stream." + else: + msg += f"\n\nAvailable {id_type.lower()}s can be listed by examining the data stream file." + + return msg + + def validate_profile(self, profile_id): + if profile_id not in self.profile_ids: + raise ValueError(self._create_validation_error("Profile", profile_id, self.profile_ids)) + + def validate_value(self, value_id): + if value_id not in self.value_ids: + raise ValueError(self._create_validation_error("Value", value_id, self.value_ids)) + + def validate_selector(self, value_id, selector): + selectors = self.value_selectors.get(value_id, set()) + if selector not in selectors: + msg = f"Selector '{selector}' does not exist for Value '{value_id}' in the data stream." + if selectors: + suggestions = self._suggest_similar(selector, selectors) + if suggestions: + msg += "\n\nDid you mean one of these?" + for suggestion in suggestions: + msg += f"\n - {suggestion}" + msg += f"\n\nAvailable selectors: {', '.join(sorted(selectors))}" + else: + msg += "\n\nNo selectors found for this value in the data stream." + raise ValueError(msg) + + def validate_rule(self, rule_id): + if rule_id not in self.rule_ids: + raise ValueError(self._create_validation_error("Rule", rule_id, self.rule_ids)) + + def validate_group(self, group_id): + if group_id not in self.group_ids: + raise ValueError(self._create_validation_error("Group", group_id, self.group_ids)) + + class Tailoring: - def __init__(self): + def __init__(self, validator=None): self.id = "xccdf_auto_tailoring_default" self.reverse_dns = DEFAULT_REVERSE_DNS self.version = 1 @@ -62,6 +182,7 @@ class Tailoring: self.extends = "" self.original_ds_filename = "" self.profile_title = "" + self.validator = validator self.value_changes = [] self.rules_to_select = [] @@ -141,11 +262,17 @@ class Tailoring: def refine_rule(self, rule_id, attribute, value): Tailoring._validate_rule_refinement_params(rule_id, attribute, value) + if self.validator: + self.validator.validate_rule(rule_id) self._prevent_duplicate_rule_refinement(attribute, rule_id, value) self._rule_refinements[rule_id][attribute] = value def refine_value(self, value_id, attribute, value): Tailoring._validate_value_refinement_params(value_id, attribute, value) + if self.validator: + self.validator.validate_value(value_id) + if attribute == 'selector': + self.validator.validate_selector(value_id, value) self._prevent_duplicate_value_refinement(attribute, value_id, value) self._value_refinements[value_id][attribute] = value @@ -178,6 +305,36 @@ class Tailoring: varname, selector = assignment_to_tuple(change) self.change_value_attribute(varname, "selector", selector) + def select_rule(self, rule_id): + full_rule_id = self._full_rule_id(rule_id) + if self.validator: + self.validator.validate_rule(full_rule_id) + self.rules_to_select.append(rule_id) + + def unselect_rule(self, rule_id): + full_rule_id = self._full_rule_id(rule_id) + if self.validator: + self.validator.validate_rule(full_rule_id) + self.rules_to_unselect.append(rule_id) + + def select_group(self, group_id): + full_group_id = self._full_group_id(group_id) + if self.validator: + self.validator.validate_group(full_group_id) + self.groups_to_select.append(group_id) + + def unselect_group(self, group_id): + full_group_id = self._full_group_id(group_id) + if self.validator: + self.validator.validate_group(full_group_id) + self.groups_to_unselect.append(group_id) + + def validate_base_profile(self, profile_id): + if not profile_id or not self.validator: + return + full_profile_id = self._full_profile_id(profile_id) + self.validator.validate_profile(full_profile_id) + def _full_id(self, string, el_type): if is_valid_xccdf_id(string): return string @@ -196,6 +353,9 @@ class Tailoring: return self._full_id(string, "group") def add_value_change(self, varname, value): + full_var_id = self._full_var_id(varname) + if self.validator: + self.validator.validate_value(full_var_id) self.value_changes.append((varname, value)) def _add_group_select_operations(self, container_element): @@ -280,9 +440,9 @@ class Tailoring: for group_id, props in tailoring["groups"].items(): if "evaluate" in props: if props["evaluate"]: - self.groups_to_select.append(group_id) + self.select_group(group_id) else: - self.groups_to_unselect.append(group_id) + self.unselect_group(group_id) def _import_variables_from_tailoring(self, tailoring): if "variables" in tailoring: @@ -297,9 +457,9 @@ class Tailoring: for rule_id, props in tailoring["rules"].items(): if "evaluate" in props: if props["evaluate"]: - self.rules_to_select.append(rule_id) + self.select_rule(rule_id) else: - self.rules_to_unselect.append(rule_id) + self.unselect_rule(rule_id) for attr in ATTRIBUTES: if attr in props: self.change_rule_attribute(rule_id, attr, props[attr]) @@ -426,6 +586,11 @@ def get_parser(): "--local-path", action="store_true", help="Use local path for the benchmark href instead of absolute file:// URI. " "Absolute paths are converted to basename, relative paths are preserved.") + parser.add_argument( + "--no-validate", action="store_true", + help="Skip validation of IDs against the data stream. This significantly speeds up " + "execution on large data streams but may produce invalid tailoring files if incorrect " + "IDs are provided. Use with caution.") return parser @@ -439,27 +604,51 @@ if __name__ == "__main__": parser.error("one of the following arguments has to be provided: " "BASE_PROFILE_ID or --json-tailoring JSON_TAILORING_FILENAME") - t = Tailoring() + validator = None + if not args.no_validate: + try: + validator = DataStreamValidator(args.datastream) + except (ValueError, FileNotFoundError) as e: + print(f"Error loading data stream: {e}", file=sys.stderr) + sys.exit(1) + + t = Tailoring(validator=validator) t.original_ds_filename = args.datastream t.reverse_dns = args.id_namespace t.use_local_path = args.local_path if args.json_tailoring: - t.import_json_tailoring(args.json_tailoring) + try: + t.import_json_tailoring(args.json_tailoring) + except ValueError as e: + print(f"Error: {e}", file=sys.stderr) + sys.exit(1) if args.profile: + try: + t.validate_base_profile(args.profile) + except ValueError as e: + print(f"Error: {e}", file=sys.stderr) + sys.exit(1) t.extends = args.profile + if args.new_profile_id: t.profile_id = args.new_profile_id if args.title: t.profile_title = args.title - t.rules_to_select.extend(args.select) - t.rules_to_unselect.extend(args.unselect) - - t.change_values(args.var_value) - t.change_selectors(args.var_select) - t.change_roles(args.rule_role) - t.change_severities(args.rule_severity) + try: + for rule_id in args.select: + t.select_rule(rule_id) + for rule_id in args.unselect: + t.unselect_rule(rule_id) + + t.change_values(args.var_value) + t.change_selectors(args.var_select) + t.change_roles(args.rule_role) + t.change_severities(args.rule_severity) + except ValueError as e: + print(f"Error: {e}", file=sys.stderr) + sys.exit(1) t.to_xml(args.output) diff --git a/utils/autotailor.8 b/utils/autotailor.8 index 0fd01c4db0..44d080f02e 100644 --- a/utils/autotailor.8 +++ b/utils/autotailor.8 @@ -8,8 +8,8 @@ A tailoring file adds a new profile, which is supposed to extend a profile that Tailoring can add, remove or refine rules, and it also can redefine contents of XCCDF variables. The tool requires data stream location and ID of the base profile as inputs. -Note however, that the referenced data stream is not opened, and the validity of tailoring is not checked against it. -The tool doesn't prevent you from extending non-existent profiles, selecting non-existent rules, and so on. +By default, the tool parses the referenced data stream and validates provided IDs (profiles, rules, values, groups, and selectors) against it. +Validation can be skipped using the \fB--no-validate\fR option, which significantly speeds up execution on large data streams at the cost of no longer catching invalid IDs. .SH SYNOPSIS autotailor [OPTION...] DATASTREAM_FILE [BASE_PROFILE_ID] @@ -75,6 +75,11 @@ Use local path for the benchmark href instead of absolute file:// URI. This opti When this option is specified, absolute paths are converted to basename only, while relative paths are preserved as provided. By default, the tool uses absolute file:// URIs for backward compatibility. .RE +.TP +\fB--no-validate\fR +.RS +Skip validation of IDs against the data stream. This significantly speeds up execution on large data streams but may produce invalid tailoring files if incorrect IDs are provided. Use with caution. +.RE .SH USAGE .SS Modify a variable value