Skip to content

Commit e24faf2

Browse files
author
Lukas Pühringer
authored
Merge pull request #2481 from lukpueh/signing-status
Metadata API: add get_verification_result method
2 parents 87f9f91 + a557563 commit e24faf2

2 files changed

Lines changed: 169 additions & 14 deletions

File tree

tests/test_api.py

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
TargetFile,
4848
Targets,
4949
Timestamp,
50+
VerificationResult,
5051
)
5152
from tuf.api.serialization import DeserializationError, SerializationError
5253
from tuf.api.serialization.json import JSONSerializer
@@ -470,6 +471,96 @@ def test_signed_verify_delegate(self) -> None:
470471
Snapshot.type, snapshot_md.signed_bytes, snapshot_md.signatures
471472
)
472473

474+
def test_signed_get_verification_result(self) -> None:
475+
# Setup: Load test metadata and keys
476+
root_path = os.path.join(self.repo_dir, "metadata", "root.json")
477+
root = Metadata[Root].from_file(root_path)
478+
initial_root_keyids = root.signed.roles[Root.type].keyids
479+
self.assertEqual(len(initial_root_keyids), 1)
480+
key1_id = initial_root_keyids[0]
481+
key2 = self.keystore[Timestamp.type]
482+
key2_id = key2["keyid"]
483+
key3_id = "123456789abcdefg"
484+
key4 = self.keystore[Snapshot.type]
485+
key4_id = key4["keyid"]
486+
487+
# Test: 1 authorized key, 1 valid signature
488+
result = root.signed.get_verification_result(
489+
Root.type, root.signed_bytes, root.signatures
490+
)
491+
self.assertTrue(result.verified)
492+
self.assertEqual(result.signed, {key1_id})
493+
self.assertEqual(result.unsigned, set())
494+
495+
# Test: 2 authorized keys, 1 invalid signature
496+
# Adding a key, i.e. metadata change, invalidates existing signature
497+
root.signed.add_key(
498+
SSlibKey.from_securesystemslib_key(key2),
499+
Root.type,
500+
)
501+
result = root.signed.get_verification_result(
502+
Root.type, root.signed_bytes, root.signatures
503+
)
504+
self.assertFalse(result.verified)
505+
self.assertEqual(result.signed, set())
506+
self.assertEqual(result.unsigned, {key1_id, key2_id})
507+
508+
# Test: 3 authorized keys, 1 invalid signature, 1 key missing key data
509+
# Adding a keyid w/o key, fails verification the same as no signature
510+
# or an invalid signature for that key
511+
root.signed.roles[Root.type].keyids.append(key3_id)
512+
result = root.signed.get_verification_result(
513+
Root.type, root.signed_bytes, root.signatures
514+
)
515+
self.assertFalse(result.verified)
516+
self.assertEqual(result.signed, set())
517+
self.assertEqual(result.unsigned, {key1_id, key2_id, key3_id})
518+
519+
# Test: 3 authorized keys, 1 valid signature, 1 invalid signature, 1
520+
# key missing key data
521+
root.sign(SSlibSigner(key2), append=True)
522+
result = root.signed.get_verification_result(
523+
Root.type, root.signed_bytes, root.signatures
524+
)
525+
self.assertTrue(result.verified)
526+
self.assertEqual(result.signed, {key2_id})
527+
self.assertEqual(result.unsigned, {key1_id, key3_id})
528+
529+
# Test: 3 authorized keys, 1 valid signature, 1 invalid signature, 1
530+
# key missing key data, 1 ignored unrelated signature
531+
root.sign(SSlibSigner(key4), append=True)
532+
self.assertEqual(
533+
set(root.signatures.keys()), {key1_id, key2_id, key4_id}
534+
)
535+
self.assertTrue(result.verified)
536+
self.assertEqual(result.signed, {key2_id})
537+
self.assertEqual(result.unsigned, {key1_id, key3_id})
538+
539+
# See test_signed_verify_delegate for more related tests ...
540+
541+
def test_signed_verification_result_union(self) -> None:
542+
# Test all possible "unions" (AND) of "verified" field
543+
data = [
544+
(True, True, True),
545+
(True, False, False),
546+
(False, True, False),
547+
(False, False, False),
548+
]
549+
550+
for a_part, b_part, ab_part in data:
551+
self.assertEqual(
552+
VerificationResult(a_part, set(), set()).union(
553+
VerificationResult(b_part, set(), set())
554+
),
555+
VerificationResult(ab_part, set(), set()),
556+
)
557+
558+
# Test exemplary union (|) of "signed" and "unsigned" fields
559+
a = VerificationResult(True, {"1"}, {"2"})
560+
b = VerificationResult(True, {"3"}, {"4"})
561+
ab = VerificationResult(True, {"1", "3"}, {"2", "4"})
562+
self.assertEqual(a.union(b), ab)
563+
473564
def test_key_class(self) -> None:
474565
# Test if from_securesystemslib_key removes the private key from keyval
475566
# of a securesystemslib key dictionary.

tuf/api/metadata.py

Lines changed: 78 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import io
3434
import logging
3535
import tempfile
36+
from dataclasses import dataclass
3637
from datetime import datetime
3738
from typing import (
3839
IO,
@@ -43,6 +44,7 @@
4344
Iterator,
4445
List,
4546
Optional,
47+
Set,
4648
Tuple,
4749
Type,
4850
TypeVar,
@@ -646,6 +648,37 @@ def to_dict(self) -> Dict[str, Any]:
646648
}
647649

648650

651+
@dataclass
652+
class VerificationResult:
653+
"""Signature verification result for delegated role metadata.
654+
655+
Attributes:
656+
verified: True, if threshold of signatures is met.
657+
signed: Set of delegated keyids, which validly signed.
658+
unsigned: Set of delegated keyids, which did not validly sign.
659+
660+
"""
661+
662+
verified: bool
663+
signed: Set[str]
664+
unsigned: Set[str]
665+
666+
def __bool__(self) -> bool:
667+
return self.verified
668+
669+
def union(self, other: "VerificationResult") -> "VerificationResult":
670+
"""Combine two verification results.
671+
672+
Can be used to verify, if root metadata is signed by the threshold of
673+
keys of previous root and the threshold of keys of itself.
674+
"""
675+
return VerificationResult(
676+
self.verified and other.verified,
677+
self.signed | other.signed,
678+
self.unsigned | other.unsigned,
679+
)
680+
681+
649682
class _DelegatorMixin(metaclass=abc.ABCMeta):
650683
"""Class that implements verify_delegate() for Root and Targets"""
651684

@@ -665,54 +698,85 @@ def get_key(self, keyid: str) -> Key:
665698
"""
666699
raise NotImplementedError
667700

668-
def verify_delegate(
701+
def get_verification_result(
669702
self,
670703
delegated_role: str,
671704
payload: bytes,
672705
signatures: Dict[str, Signature],
673-
) -> None:
674-
"""Verify signature threshold for delegated role.
706+
) -> VerificationResult:
707+
"""Return signature threshold verification result for delegated role.
675708
676-
Verify that there are enough valid ``signatures`` over ``payload``, to
677-
meet the threshold of keys for ``delegated_role``, as defined by the
678-
delegator (``self``).
709+
NOTE: Unlike `verify_delegate()` this method does not raise, if the
710+
role metadata is not fully verified.
679711
680712
Args:
681713
delegated_role: Name of the delegated role to verify
682714
payload: Signed payload bytes for the delegated role
683715
signatures: Signatures over payload bytes
684716
685717
Raises:
686-
UnsignedMetadataError: ``delegated_role`` was not signed with
687-
required threshold of keys for ``role_name``.
688718
ValueError: no delegation was found for ``delegated_role``.
689719
"""
690720
role = self.get_delegated_role(delegated_role)
691721

692-
# verify that delegated_metadata is signed by threshold of unique keys
693-
signing_keys = set()
722+
signed = set()
723+
unsigned = set()
724+
694725
for keyid in role.keyids:
695726
try:
696727
key = self.get_key(keyid)
697728
except ValueError:
729+
unsigned.add(keyid)
698730
logger.info("No key for keyid %s", keyid)
699731
continue
700732

701733
if keyid not in signatures:
734+
unsigned.add(keyid)
702735
logger.info("No signature for keyid %s", keyid)
703736
continue
704737

705738
sig = signatures[keyid]
706739
try:
707740
key.verify_signature(sig, payload)
708-
signing_keys.add(keyid)
741+
signed.add(keyid)
709742
except sslib_exceptions.UnverifiedSignatureError:
743+
unsigned.add(keyid)
710744
logger.info("Key %s failed to verify %s", keyid, delegated_role)
711745

712-
if len(signing_keys) < role.threshold:
746+
return VerificationResult(
747+
len(signed) >= role.threshold, signed, unsigned
748+
)
749+
750+
def verify_delegate(
751+
self,
752+
delegated_role: str,
753+
payload: bytes,
754+
signatures: Dict[str, Signature],
755+
) -> None:
756+
"""Verify signature threshold for delegated role.
757+
758+
Verify that there are enough valid ``signatures`` over ``payload``, to
759+
meet the threshold of keys for ``delegated_role``, as defined by the
760+
delegator (``self``).
761+
762+
Args:
763+
delegated_role: Name of the delegated role to verify
764+
payload: Signed payload bytes for the delegated role
765+
signatures: Signatures over payload bytes
766+
767+
Raises:
768+
UnsignedMetadataError: ``delegated_role`` was not signed with
769+
required threshold of keys for ``role_name``.
770+
ValueError: no delegation was found for ``delegated_role``.
771+
"""
772+
result = self.get_verification_result(
773+
delegated_role, payload, signatures
774+
)
775+
if not result:
776+
role = self.get_delegated_role(delegated_role)
713777
raise UnsignedMetadataError(
714-
f"{delegated_role} was signed by {len(signing_keys)}/"
715-
f"{role.threshold} keys",
778+
f"{delegated_role} was signed by {len(result.signed)}/"
779+
f"{role.threshold} keys"
716780
)
717781

718782

0 commit comments

Comments
 (0)