Skip to content

Commit a871f64

Browse files
author
Lukas Pühringer
authored
Merge pull request #2378 from jku/move-verify-delegate-v2
Move verify_delegate() to Root/Targets
2 parents e6f397d + 15dd931 commit a871f64

File tree

4 files changed

+205
-91
lines changed

4 files changed

+205
-91
lines changed

examples/repository/_simplerepo.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -177,8 +177,7 @@ def submit_role(self, role: str, data: bytes) -> bool:
177177
if not targetpath.startswith(f"{role}/"):
178178
raise ValueError(f"targets allowed under {role}/ only")
179179

180-
targets_md = self.open("targets")
181-
targets_md.verify_delegate(role, md)
180+
self.targets().verify_delegate(role, md.signed_bytes, md.signatures)
182181

183182
if md.signed.version != self.targets(role).version + 1:
184183
raise ValueError("Invalid version {md.signed.version}")

tests/test_api.py

Lines changed: 90 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@
4949
Timestamp,
5050
)
5151
from tuf.api.serialization import DeserializationError, SerializationError
52-
from tuf.api.serialization.json import CanonicalJSONSerializer, JSONSerializer
52+
from tuf.api.serialization.json import JSONSerializer
5353

5454
logger = logging.getLogger(__name__)
5555

@@ -208,7 +208,7 @@ def test_sign_verify(self) -> None:
208208
# Load sample metadata (targets) and assert ...
209209
md_obj = Metadata.from_file(os.path.join(path, "targets.json"))
210210
sig = md_obj.signatures[targets_keyid]
211-
data = CanonicalJSONSerializer().serialize(md_obj.signed)
211+
data = md_obj.signed_bytes
212212

213213
# ... it has a single existing signature,
214214
self.assertEqual(len(md_obj.signatures), 1)
@@ -274,7 +274,7 @@ def test_key_verify_failures(self) -> None:
274274
path = os.path.join(self.repo_dir, "metadata", "timestamp.json")
275275
md_obj = Metadata.from_file(path)
276276
sig = md_obj.signatures[timestamp_keyid]
277-
data = CanonicalJSONSerializer().serialize(md_obj.signed)
277+
data = md_obj.signed_bytes
278278

279279
# Test failure on unknown scheme (securesystemslib
280280
# UnsupportedAlgorithmError)
@@ -362,44 +362,113 @@ def test_metadata_verify_delegate(self) -> None:
362362
with self.assertRaises(ValueError):
363363
role2.verify_delegate("role1", role1)
364364

365+
def test_signed_verify_delegate(self) -> None:
366+
# pylint: disable=too-many-locals,too-many-statements
367+
root_path = os.path.join(self.repo_dir, "metadata", "root.json")
368+
root_md = Metadata[Root].from_file(root_path)
369+
root = root_md.signed
370+
snapshot_path = os.path.join(self.repo_dir, "metadata", "snapshot.json")
371+
snapshot_md = Metadata[Snapshot].from_file(snapshot_path)
372+
snapshot = snapshot_md.signed
373+
targets_path = os.path.join(self.repo_dir, "metadata", "targets.json")
374+
targets_md = Metadata[Targets].from_file(targets_path)
375+
targets = targets_md.signed
376+
role1_path = os.path.join(self.repo_dir, "metadata", "role1.json")
377+
role1_md = Metadata[Targets].from_file(role1_path)
378+
role1 = role1_md.signed
379+
role2_path = os.path.join(self.repo_dir, "metadata", "role2.json")
380+
role2_md = Metadata[Targets].from_file(role2_path)
381+
role2 = role2_md.signed
382+
383+
# test the expected delegation tree
384+
root.verify_delegate(
385+
Root.type, root_md.signed_bytes, root_md.signatures
386+
)
387+
root.verify_delegate(
388+
Snapshot.type, snapshot_md.signed_bytes, snapshot_md.signatures
389+
)
390+
root.verify_delegate(
391+
Targets.type, targets_md.signed_bytes, targets_md.signatures
392+
)
393+
targets.verify_delegate(
394+
"role1", role1_md.signed_bytes, role1_md.signatures
395+
)
396+
role1.verify_delegate(
397+
"role2", role2_md.signed_bytes, role2_md.signatures
398+
)
399+
400+
# only root and targets can verify delegates
401+
with self.assertRaises(AttributeError):
402+
snapshot.verify_delegate(
403+
Snapshot.type, snapshot_md.signed_bytes, snapshot_md.signatures
404+
)
405+
# verify fails for roles that are not delegated by delegator
406+
with self.assertRaises(ValueError):
407+
root.verify_delegate(
408+
"role1", role1_md.signed_bytes, role1_md.signatures
409+
)
410+
with self.assertRaises(ValueError):
411+
targets.verify_delegate(
412+
Targets.type, targets_md.signed_bytes, targets_md.signatures
413+
)
414+
# verify fails when delegator has no delegations
415+
with self.assertRaises(ValueError):
416+
role2.verify_delegate(
417+
"role1", role1_md.signed_bytes, role1_md.signatures
418+
)
419+
365420
# verify fails when delegate content is modified
366-
expires = snapshot.signed.expires
367-
snapshot.signed.expires = expires + timedelta(days=1)
421+
expires = snapshot.expires
422+
snapshot.expires = expires + timedelta(days=1)
368423
with self.assertRaises(exceptions.UnsignedMetadataError):
369-
root.verify_delegate(Snapshot.type, snapshot)
370-
snapshot.signed.expires = expires
424+
root.verify_delegate(
425+
Snapshot.type, snapshot_md.signed_bytes, snapshot_md.signatures
426+
)
427+
snapshot.expires = expires
371428

372429
# verify fails if sslib verify fails with VerificationError
373430
# (in this case signature is malformed)
374-
keyid = next(iter(root.signed.roles[Snapshot.type].keyids))
375-
good_sig = snapshot.signatures[keyid].signature
376-
snapshot.signatures[keyid].signature = "foo"
431+
keyid = next(iter(root.roles[Snapshot.type].keyids))
432+
good_sig = snapshot_md.signatures[keyid].signature
433+
snapshot_md.signatures[keyid].signature = "foo"
377434
with self.assertRaises(exceptions.UnsignedMetadataError):
378-
root.verify_delegate(Snapshot.type, snapshot)
379-
snapshot.signatures[keyid].signature = good_sig
435+
root.verify_delegate(
436+
Snapshot.type, snapshot_md.signed_bytes, snapshot_md.signatures
437+
)
438+
snapshot_md.signatures[keyid].signature = good_sig
380439

381440
# verify fails if roles keys do not sign the metadata
382441
with self.assertRaises(exceptions.UnsignedMetadataError):
383-
root.verify_delegate(Timestamp.type, snapshot)
442+
root.verify_delegate(
443+
Timestamp.type, snapshot_md.signed_bytes, snapshot_md.signatures
444+
)
384445

385446
# Add a key to snapshot role, make sure the new sig fails to verify
386-
ts_keyid = next(iter(root.signed.roles[Timestamp.type].keyids))
387-
root.signed.add_key(root.signed.keys[ts_keyid], Snapshot.type)
388-
snapshot.signatures[ts_keyid] = Signature(ts_keyid, "ff" * 64)
447+
ts_keyid = next(iter(root.roles[Timestamp.type].keyids))
448+
root.add_key(root.keys[ts_keyid], Snapshot.type)
449+
snapshot_md.signatures[ts_keyid] = Signature(ts_keyid, "ff" * 64)
389450

390451
# verify succeeds if threshold is reached even if some signatures
391452
# fail to verify
392-
root.verify_delegate(Snapshot.type, snapshot)
453+
root.verify_delegate(
454+
Snapshot.type, snapshot_md.signed_bytes, snapshot_md.signatures
455+
)
393456

394457
# verify fails if threshold of signatures is not reached
395-
root.signed.roles[Snapshot.type].threshold = 2
458+
root.roles[Snapshot.type].threshold = 2
396459
with self.assertRaises(exceptions.UnsignedMetadataError):
397-
root.verify_delegate(Snapshot.type, snapshot)
460+
root.verify_delegate(
461+
Snapshot.type, snapshot_md.signed_bytes, snapshot_md.signatures
462+
)
398463

399464
# verify succeeds when we correct the new signature and reach the
400465
# threshold of 2 keys
401-
snapshot.sign(SSlibSigner(self.keystore[Timestamp.type]), append=True)
402-
root.verify_delegate(Snapshot.type, snapshot)
466+
snapshot_md.sign(
467+
SSlibSigner(self.keystore[Timestamp.type]), append=True
468+
)
469+
root.verify_delegate(
470+
Snapshot.type, snapshot_md.signed_bytes, snapshot_md.signatures
471+
)
403472

404473
def test_key_class(self) -> None:
405474
# Test if from_securesystemslib_key removes the private key from keyval

tuf/api/metadata.py

Lines changed: 96 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,16 @@ def __eq__(self, other: Any) -> bool:
150150
and self.unrecognized_fields == other.unrecognized_fields
151151
)
152152

153+
@property
154+
def signed_bytes(self) -> bytes:
155+
"""Default canonical json byte representation of ``self.signed``."""
156+
157+
# Use local scope import to avoid circular import errors
158+
# pylint: disable=import-outside-toplevel
159+
from tuf.api.serialization.json import CanonicalJSONSerializer
160+
161+
return CanonicalJSONSerializer().serialize(self.signed)
162+
153163
@classmethod
154164
def from_dict(cls, metadata: Dict[str, Any]) -> "Metadata[T]":
155165
"""Create ``Metadata`` object from its json/dict representation.
@@ -366,13 +376,9 @@ def sign(
366376
"""
367377

368378
if signed_serializer is None:
369-
# Use local scope import to avoid circular import errors
370-
# pylint: disable=import-outside-toplevel
371-
from tuf.api.serialization.json import CanonicalJSONSerializer
372-
373-
signed_serializer = CanonicalJSONSerializer()
374-
375-
bytes_data = signed_serializer.serialize(self.signed)
379+
bytes_data = self.signed_bytes
380+
else:
381+
bytes_data = signed_serializer.serialize(self.signed)
376382

377383
try:
378384
signature = signer.sign(bytes_data)
@@ -393,58 +399,24 @@ def verify_delegate(
393399
signed_serializer: Optional[SignedSerializer] = None,
394400
) -> None:
395401
"""Verify that ``delegated_metadata`` is signed with the required
396-
threshold of keys for the delegated role ``delegated_role``.
402+
threshold of keys for ``delegated_role``.
397403
398-
Args:
399-
delegated_role: Name of the delegated role to verify
400-
delegated_metadata: ``Metadata`` object for the delegated role
401-
signed_serializer: Serializer used for delegate
402-
serialization. Default is ``CanonicalJSONSerializer``.
403-
404-
Raises:
405-
UnsignedMetadataError: ``delegated_role`` was not signed with
406-
required threshold of keys for ``role_name``.
407-
ValueError: no delegation was found for ``delegated_role``.
408-
TypeError: called this function on non-delegating metadata class.
404+
.. deprecated:: 3.1.0
405+
Please use ``Root.verify_delegate()`` or ``Targets.verify_delegate()``.
409406
"""
410407

411408
if self.signed.type not in ["root", "targets"]:
412409
raise TypeError("Call is valid only on delegator metadata")
413410

414411
if signed_serializer is None:
415-
# pylint: disable=import-outside-toplevel
416-
from tuf.api.serialization.json import CanonicalJSONSerializer
417-
418-
signed_serializer = CanonicalJSONSerializer()
419-
420-
data = signed_serializer.serialize(delegated_metadata.signed)
421-
role = self.signed.get_delegated_role(delegated_role)
422-
423-
# verify that delegated_metadata is signed by threshold of unique keys
424-
signing_keys = set()
425-
for keyid in role.keyids:
426-
try:
427-
key = self.signed.get_key(keyid)
428-
except ValueError:
429-
logger.info("No key for keyid %s", keyid)
430-
continue
412+
payload = delegated_metadata.signed_bytes
431413

432-
if keyid not in delegated_metadata.signatures:
433-
logger.info("No signature for keyid %s", keyid)
434-
continue
435-
436-
sig = delegated_metadata.signatures[keyid]
437-
try:
438-
key.verify_signature(sig, data)
439-
signing_keys.add(keyid)
440-
except sslib_exceptions.UnverifiedSignatureError:
441-
logger.info("Key %s failed to verify %s", keyid, delegated_role)
414+
else:
415+
payload = signed_serializer.serialize(delegated_metadata.signed)
442416

443-
if len(signing_keys) < role.threshold:
444-
raise UnsignedMetadataError(
445-
f"{delegated_role} was signed by {len(signing_keys)}/"
446-
f"{role.threshold} keys",
447-
)
417+
self.signed.verify_delegate(
418+
delegated_role, payload, delegated_metadata.signatures
419+
)
448420

449421

450422
class Signed(metaclass=abc.ABCMeta):
@@ -674,7 +646,77 @@ def to_dict(self) -> Dict[str, Any]:
674646
}
675647

676648

677-
class Root(Signed):
649+
class _DelegatorMixin(metaclass=abc.ABCMeta):
650+
"""Class that implements verify_delegate() for Root and Targets"""
651+
652+
@abc.abstractmethod
653+
def get_delegated_role(self, delegated_role: str) -> Role:
654+
"""Return the role object for the given delegated role.
655+
656+
Raises ValueError if delegated_role is not actually delegated.
657+
"""
658+
raise NotImplementedError
659+
660+
@abc.abstractmethod
661+
def get_key(self, keyid: str) -> Key:
662+
"""Return the key object for the given keyid.
663+
664+
Raises ValueError if key is not found.
665+
"""
666+
raise NotImplementedError
667+
668+
def verify_delegate(
669+
self,
670+
delegated_role: str,
671+
payload: bytes,
672+
signatures: Dict[str, Signature],
673+
) -> None:
674+
"""Verify signature threshold for delegated role.
675+
676+
Verify that there are enough valid ``signatures`` over ``payload``, to
677+
meet the threshold of keys for ``delegated_role``, as defined by the
678+
delegator (``self``).
679+
680+
Args:
681+
delegated_role: Name of the delegated role to verify
682+
payload: Signed payload bytes for the delegated role
683+
signatures: Signatures over payload bytes
684+
685+
Raises:
686+
UnsignedMetadataError: ``delegated_role`` was not signed with
687+
required threshold of keys for ``role_name``.
688+
ValueError: no delegation was found for ``delegated_role``.
689+
"""
690+
role = self.get_delegated_role(delegated_role)
691+
692+
# verify that delegated_metadata is signed by threshold of unique keys
693+
signing_keys = set()
694+
for keyid in role.keyids:
695+
try:
696+
key = self.get_key(keyid)
697+
except ValueError:
698+
logger.info("No key for keyid %s", keyid)
699+
continue
700+
701+
if keyid not in signatures:
702+
logger.info("No signature for keyid %s", keyid)
703+
continue
704+
705+
sig = signatures[keyid]
706+
try:
707+
key.verify_signature(sig, payload)
708+
signing_keys.add(keyid)
709+
except sslib_exceptions.UnverifiedSignatureError:
710+
logger.info("Key %s failed to verify %s", keyid, delegated_role)
711+
712+
if len(signing_keys) < role.threshold:
713+
raise UnsignedMetadataError(
714+
f"{delegated_role} was signed by {len(signing_keys)}/"
715+
f"{role.threshold} keys",
716+
)
717+
718+
719+
class Root(Signed, _DelegatorMixin):
678720
"""A container for the signed part of root metadata.
679721
680722
Parameters listed below are also instance attributes.
@@ -823,11 +865,7 @@ def get_delegated_role(self, delegated_role: str) -> Role:
823865

824866
return self.roles[delegated_role]
825867

826-
def get_key(self, keyid: str) -> Key:
827-
"""Return the key object for the given keyid.
828-
829-
Raises ValueError if key is not found.
830-
"""
868+
def get_key(self, keyid: str) -> Key: # noqa: D102
831869
if keyid not in self.keys:
832870
raise ValueError(f"Key {keyid} not found")
833871

@@ -1778,7 +1816,7 @@ def get_prefixed_paths(self) -> List[str]:
17781816
return paths
17791817

17801818

1781-
class Targets(Signed):
1819+
class Targets(Signed, _DelegatorMixin):
17821820
"""A container for the signed part of targets metadata.
17831821
17841822
Targets contains verifying information about target files and also
@@ -1952,11 +1990,7 @@ def get_delegated_role(self, delegated_role: str) -> Role:
19521990

19531991
return role
19541992

1955-
def get_key(self, keyid: str) -> Key:
1956-
"""Return the key object for the given keyid.
1957-
1958-
Raises ValueError if keyid is not found.
1959-
"""
1993+
def get_key(self, keyid: str) -> Key: # noqa: D102
19601994
if self.delegations is None:
19611995
raise ValueError("No delegations found")
19621996
if keyid not in self.delegations.keys:

0 commit comments

Comments
 (0)