Skip to content

Commit be55b87

Browse files
authored
Merge pull request #2551 from jku/improve-verification-result
Improve verification results
2 parents 3ab89c5 + 14edf3d commit be55b87

3 files changed

Lines changed: 304 additions & 87 deletions

File tree

examples/repository/_simplerepo.py

Lines changed: 41 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
import logging
99
from collections import defaultdict
1010
from datetime import datetime, timedelta
11-
from typing import Dict, List
11+
from typing import Dict, List, Union
1212

1313
from securesystemslib import keys
1414
from securesystemslib.signer import Key, Signer, SSlibKey, SSlibSigner
@@ -20,10 +20,13 @@
2020
Metadata,
2121
MetaFile,
2222
Root,
23+
RootVerificationResult,
24+
Signed,
2325
Snapshot,
2426
TargetFile,
2527
Targets,
2628
Timestamp,
29+
VerificationResult,
2730
)
2831
from tuf.repository import Repository
2932

@@ -89,6 +92,25 @@ def targets_infos(self) -> Dict[str, MetaFile]:
8992
def snapshot_info(self) -> MetaFile:
9093
return self._snapshot_info
9194

95+
def _get_verification_result(
96+
self, role: str, md: Metadata
97+
) -> Union[VerificationResult, RootVerificationResult]:
98+
"""Verify roles metadata using the existing repository metadata"""
99+
if role == Root.type:
100+
assert isinstance(md.signed, Root)
101+
root = self.root()
102+
previous = root if root.version > 0 else None
103+
return md.signed.get_root_verification_result(
104+
previous, md.signed_bytes, md.signatures
105+
)
106+
if role in [Timestamp.type, Snapshot.type, Targets.type]:
107+
delegator: Signed = self.root()
108+
else:
109+
delegator = self.targets()
110+
return delegator.get_verification_result(
111+
role, md.signed_bytes, md.signatures
112+
)
113+
92114
def open(self, role: str) -> Metadata:
93115
"""Return current Metadata for role from 'storage' (or create a new one)"""
94116

@@ -112,6 +134,14 @@ def close(self, role: str, md: Metadata) -> None:
112134
for signer in self.signer_cache[role]:
113135
md.sign(signer, append=True)
114136

137+
# Double check that we only write verified metadata
138+
vr = self._get_verification_result(role, md)
139+
if not vr:
140+
raise ValueError(f"Role {role} failed to verify")
141+
keyids = [keyid[:7] for keyid in vr.signed]
142+
verify_str = f"verified with keys [{', '.join(keyids)}]"
143+
logger.debug("Role %s v%d: %s", role, md.signed.version, verify_str)
144+
115145
# store new metadata version, update version caches
116146
self.role_cache[role].append(md)
117147
if role == "snapshot":
@@ -130,8 +160,6 @@ def add_target(self, path: str, content: str) -> None:
130160
with self.edit_targets() as targets:
131161
targets.targets[path] = TargetFile.from_data(path, data)
132162

133-
logger.debug("Targets v%d", targets.version)
134-
135163
# update snapshot, timestamp
136164
self.do_snapshot()
137165
self.do_timestamp()
@@ -157,8 +185,6 @@ def submit_delegation(self, rolename: str, data: bytes) -> bool:
157185
logger.info("Failed to add delegation for %s: %s", rolename, e)
158186
return False
159187

160-
logger.debug("Targets v%d", targets.version)
161-
162188
# update snapshot, timestamp
163189
self.do_snapshot()
164190
self.do_timestamp()
@@ -177,19 +203,26 @@ def submit_role(self, role: str, data: bytes) -> bool:
177203
if not targetpath.startswith(f"{role}/"):
178204
raise ValueError(f"targets allowed under {role}/ only")
179205

180-
self.targets().verify_delegate(role, md.signed_bytes, md.signatures)
181-
182206
if md.signed.version != self.targets(role).version + 1:
183207
raise ValueError("Invalid version {md.signed.version}")
184208

185209
except (RepositoryError, ValueError) as e:
186210
logger.info("Failed to add new version for %s: %s", role, e)
187211
return False
188212

213+
# Check that we only write verified metadata
214+
vr = self._get_verification_result(role, md)
215+
if not vr:
216+
logger.info("Role %s failed to verify", role)
217+
return False
218+
219+
keyids = [keyid[:7] for keyid in vr.signed]
220+
verify_str = f"verified with keys [{', '.join(keyids)}]"
221+
logger.debug("Role %s v%d: %s", role, md.signed.version, verify_str)
222+
189223
# Checks passed: Add new delegated role version
190224
self.role_cache[role].append(md)
191225
self._targets_infos[f"{role}.json"].version = md.signed.version
192-
logger.debug("%s v%d", role, md.signed.version)
193226

194227
# To keep it simple, target content is generated from targetpath
195228
for targetpath in md.signed.targets:

tests/test_api.py

Lines changed: 162 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
import sys
1414
import tempfile
1515
import unittest
16-
from copy import copy
16+
from copy import copy, deepcopy
1717
from datetime import datetime, timedelta
1818
from typing import Any, ClassVar, Dict, Optional
1919

@@ -41,6 +41,7 @@
4141
Metadata,
4242
MetaFile,
4343
Root,
44+
RootVerificationResult,
4445
Signature,
4546
Snapshot,
4647
SuccinctRoles,
@@ -55,7 +56,7 @@
5556
logger = logging.getLogger(__name__)
5657

5758

58-
# pylint: disable=too-many-public-methods
59+
# pylint: disable=too-many-public-methods,too-many-statements
5960
class TestMetadata(unittest.TestCase):
6061
"""Tests for public API of all classes in 'tuf/api/metadata.py'."""
6162

@@ -471,95 +472,205 @@ def test_signed_verify_delegate(self) -> None:
471472
Snapshot.type, snapshot_md.signed_bytes, snapshot_md.signatures
472473
)
473474

475+
def test_verification_result(self) -> None:
476+
vr = VerificationResult(3, {"a": None}, {"b": None})
477+
self.assertEqual(vr.missing, 2)
478+
self.assertFalse(vr.verified)
479+
self.assertFalse(vr)
480+
481+
# Add a signature
482+
vr.signed["c"] = None
483+
self.assertEqual(vr.missing, 1)
484+
self.assertFalse(vr.verified)
485+
self.assertFalse(vr)
486+
487+
# Add last missing signature
488+
vr.signed["d"] = None
489+
self.assertEqual(vr.missing, 0)
490+
self.assertTrue(vr.verified)
491+
self.assertTrue(vr)
492+
493+
# Add one more signature
494+
vr.signed["e"] = None
495+
self.assertEqual(vr.missing, 0)
496+
self.assertTrue(vr.verified)
497+
self.assertTrue(vr)
498+
499+
def test_root_verification_result(self) -> None:
500+
vr1 = VerificationResult(3, {"a": None}, {"b": None})
501+
vr2 = VerificationResult(1, {"c": None}, {"b": None})
502+
503+
vr = RootVerificationResult(vr1, vr2)
504+
self.assertEqual(vr.signed, {"a": None, "c": None})
505+
self.assertEqual(vr.unsigned, {"b": None})
506+
self.assertFalse(vr.verified)
507+
self.assertFalse(vr)
508+
509+
vr1.signed["c"] = None
510+
vr1.signed["f"] = None
511+
self.assertEqual(vr.signed, {"a": None, "c": None, "f": None})
512+
self.assertEqual(vr.unsigned, {"b": None})
513+
self.assertTrue(vr.verified)
514+
self.assertTrue(vr)
515+
474516
def test_signed_get_verification_result(self) -> None:
475517
# Setup: Load test metadata and keys
476518
root_path = os.path.join(self.repo_dir, "metadata", "root.json")
477519
root = Metadata[Root].from_file(root_path)
478-
initial_root_keyids = root.signed.roles[Root.type].keyids
479-
self.assertEqual(len(initial_root_keyids), 1)
480-
key1_id = initial_root_keyids[0]
481-
key2 = self.keystore[Timestamp.type]
482-
key2_id = key2["keyid"]
520+
521+
key1_id = root.signed.roles[Root.type].keyids[0]
522+
key1 = root.signed.get_key(key1_id)
523+
524+
key2_id = root.signed.roles[Timestamp.type].keyids[0]
525+
key2 = root.signed.get_key(key2_id)
526+
priv_key2 = self.keystore[Timestamp.type]
527+
483528
key3_id = "123456789abcdefg"
484-
key4 = self.keystore[Snapshot.type]
485-
key4_id = key4["keyid"]
529+
priv_key4 = self.keystore[Snapshot.type]
530+
key4_id = priv_key4["keyid"]
486531

487532
# Test: 1 authorized key, 1 valid signature
488533
result = root.signed.get_verification_result(
489534
Root.type, root.signed_bytes, root.signatures
490535
)
491-
self.assertTrue(result.verified)
492-
self.assertEqual(result.signed, {key1_id})
493-
self.assertEqual(result.unsigned, set())
536+
self.assertTrue(result)
537+
self.assertEqual(result.signed, {key1_id: key1})
538+
self.assertEqual(result.unsigned, {})
494539

495540
# Test: 2 authorized keys, 1 invalid signature
496541
# Adding a key, i.e. metadata change, invalidates existing signature
497-
root.signed.add_key(
498-
SSlibKey.from_securesystemslib_key(key2),
499-
Root.type,
500-
)
542+
root.signed.add_key(key2, Root.type)
501543
result = root.signed.get_verification_result(
502544
Root.type, root.signed_bytes, root.signatures
503545
)
504-
self.assertFalse(result.verified)
505-
self.assertEqual(result.signed, set())
506-
self.assertEqual(result.unsigned, {key1_id, key2_id})
546+
self.assertFalse(result)
547+
self.assertEqual(result.signed, {})
548+
self.assertEqual(result.unsigned, {key1_id: key1, key2_id: key2})
507549

508550
# Test: 3 authorized keys, 1 invalid signature, 1 key missing key data
509-
# Adding a keyid w/o key, fails verification the same as no signature
510-
# or an invalid signature for that key
551+
# Adding a keyid w/o key, fails verification but this key is not listed
552+
# in unsigned
511553
root.signed.roles[Root.type].keyids.append(key3_id)
512554
result = root.signed.get_verification_result(
513555
Root.type, root.signed_bytes, root.signatures
514556
)
515-
self.assertFalse(result.verified)
516-
self.assertEqual(result.signed, set())
517-
self.assertEqual(result.unsigned, {key1_id, key2_id, key3_id})
557+
self.assertFalse(result)
558+
self.assertEqual(result.signed, {})
559+
self.assertEqual(result.unsigned, {key1_id: key1, key2_id: key2})
518560

519561
# Test: 3 authorized keys, 1 valid signature, 1 invalid signature, 1
520562
# key missing key data
521-
root.sign(SSlibSigner(key2), append=True)
563+
root.sign(SSlibSigner(priv_key2), append=True)
522564
result = root.signed.get_verification_result(
523565
Root.type, root.signed_bytes, root.signatures
524566
)
525-
self.assertTrue(result.verified)
526-
self.assertEqual(result.signed, {key2_id})
527-
self.assertEqual(result.unsigned, {key1_id, key3_id})
567+
self.assertTrue(result)
568+
self.assertEqual(result.signed, {key2_id: key2})
569+
self.assertEqual(result.unsigned, {key1_id: key1})
528570

529571
# Test: 3 authorized keys, 1 valid signature, 1 invalid signature, 1
530572
# key missing key data, 1 ignored unrelated signature
531-
root.sign(SSlibSigner(key4), append=True)
573+
root.sign(SSlibSigner(priv_key4), append=True)
532574
self.assertEqual(
533575
set(root.signatures.keys()), {key1_id, key2_id, key4_id}
534576
)
535-
self.assertTrue(result.verified)
536-
self.assertEqual(result.signed, {key2_id})
537-
self.assertEqual(result.unsigned, {key1_id, key3_id})
577+
self.assertTrue(result)
578+
self.assertEqual(result.signed, {key2_id: key2})
579+
self.assertEqual(result.unsigned, {key1_id: key1})
538580

539581
# See test_signed_verify_delegate for more related tests ...
540582

541-
def test_signed_verification_result_union(self) -> None:
542-
# Test all possible "unions" (AND) of "verified" field
543-
data = [
544-
(True, True, True),
545-
(True, False, False),
546-
(False, True, False),
547-
(False, False, False),
548-
]
583+
def test_root_get_root_verification_result(self) -> None:
584+
# Setup: Load test metadata and keys
585+
root_path = os.path.join(self.repo_dir, "metadata", "root.json")
586+
root = Metadata[Root].from_file(root_path)
587+
588+
key1_id = root.signed.roles[Root.type].keyids[0]
589+
key1 = root.signed.get_key(key1_id)
590+
591+
key2_id = root.signed.roles[Timestamp.type].keyids[0]
592+
key2 = root.signed.get_key(key2_id)
593+
priv_key2 = self.keystore[Timestamp.type]
549594

550-
for a_part, b_part, ab_part in data:
551-
self.assertEqual(
552-
VerificationResult(a_part, set(), set()).union(
553-
VerificationResult(b_part, set(), set())
554-
),
555-
VerificationResult(ab_part, set(), set()),
595+
priv_key4 = self.keystore[Snapshot.type]
596+
597+
# Test: Verify with no previous root version
598+
result = root.signed.get_root_verification_result(
599+
None, root.signed_bytes, root.signatures
600+
)
601+
self.assertTrue(result)
602+
self.assertEqual(result.signed, {key1_id: key1})
603+
self.assertEqual(result.unsigned, {})
604+
605+
# Test: Verify with other root that is not version N-1
606+
prev_root: Metadata[Root] = deepcopy(root)
607+
with self.assertRaises(ValueError):
608+
result = root.signed.get_root_verification_result(
609+
prev_root.signed, root.signed_bytes, root.signatures
556610
)
557611

558-
# Test exemplary union (|) of "signed" and "unsigned" fields
559-
a = VerificationResult(True, {"1"}, {"2"})
560-
b = VerificationResult(True, {"3"}, {"4"})
561-
ab = VerificationResult(True, {"1", "3"}, {"2", "4"})
562-
self.assertEqual(a.union(b), ab)
612+
# Test: Verify with previous root
613+
prev_root.signed.version -= 1
614+
result = root.signed.get_root_verification_result(
615+
prev_root.signed, root.signed_bytes, root.signatures
616+
)
617+
self.assertTrue(result)
618+
self.assertEqual(result.signed, {key1_id: key1})
619+
self.assertEqual(result.unsigned, {})
620+
621+
# Test: Add a signer to previous root (threshold still 1)
622+
prev_root.signed.add_key(key2, Root.type)
623+
result = root.signed.get_root_verification_result(
624+
prev_root.signed, root.signed_bytes, root.signatures
625+
)
626+
self.assertTrue(result)
627+
self.assertEqual(result.signed, {key1_id: key1})
628+
self.assertEqual(result.unsigned, {key2_id: key2})
629+
630+
# Test: Increase threshold in previous root
631+
prev_root.signed.roles[Root.type].threshold += 1
632+
result = root.signed.get_root_verification_result(
633+
prev_root.signed, root.signed_bytes, root.signatures
634+
)
635+
self.assertFalse(result)
636+
self.assertEqual(result.signed, {key1_id: key1})
637+
self.assertEqual(result.unsigned, {key2_id: key2})
638+
639+
# Test: Sign root with both keys
640+
root.sign(SSlibSigner(priv_key2), append=True)
641+
result = root.signed.get_root_verification_result(
642+
prev_root.signed, root.signed_bytes, root.signatures
643+
)
644+
self.assertTrue(result)
645+
self.assertEqual(result.signed, {key1_id: key1, key2_id: key2})
646+
self.assertEqual(result.unsigned, {})
647+
648+
# Test: Sign root with an unrelated key
649+
root.sign(SSlibSigner(priv_key4), append=True)
650+
result = root.signed.get_root_verification_result(
651+
prev_root.signed, root.signed_bytes, root.signatures
652+
)
653+
self.assertTrue(result)
654+
self.assertEqual(result.signed, {key1_id: key1, key2_id: key2})
655+
self.assertEqual(result.unsigned, {})
656+
657+
# Test: Remove key1 from previous root
658+
prev_root.signed.revoke_key(key1_id, Root.type)
659+
result = root.signed.get_root_verification_result(
660+
prev_root.signed, root.signed_bytes, root.signatures
661+
)
662+
self.assertFalse(result)
663+
self.assertEqual(result.signed, {key1_id: key1, key2_id: key2})
664+
self.assertEqual(result.unsigned, {})
665+
666+
# Test: Lower threshold in previous root
667+
prev_root.signed.roles[Root.type].threshold -= 1
668+
result = root.signed.get_root_verification_result(
669+
prev_root.signed, root.signed_bytes, root.signatures
670+
)
671+
self.assertTrue(result)
672+
self.assertEqual(result.signed, {key1_id: key1, key2_id: key2})
673+
self.assertEqual(result.unsigned, {})
563674

564675
def test_key_class(self) -> None:
565676
# Test if from_securesystemslib_key removes the private key from keyval

0 commit comments

Comments
 (0)