-
Notifications
You must be signed in to change notification settings - Fork 6
Expand file tree
/
Copy pathclient_verification.py
More file actions
179 lines (144 loc) · 6.33 KB
/
client_verification.py
File metadata and controls
179 lines (144 loc) · 6.33 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
#!/usr/bin/env python3
"""Example client verification workflow for validating signed schemas."""
import json
import sys
from pathlib import Path
from unittest.mock import patch
# Add the parent directory to the path so we can import schemapin
sys.path.insert(0, str(Path(__file__).parent.parent))
from schemapin.utils import SchemaVerificationWorkflow
def mock_well_known_server(domain: str):
"""Mock .well-known server response for demonstration."""
# In a real scenario, this would be fetched from the actual domain
if domain == "example.com":
# Load the demo well-known response if it exists
try:
with open("demo_well_known.json") as f:
return json.load(f)
except FileNotFoundError:
return {
"schema_version": "1.0",
"developer_name": "Example Tool Developer",
"public_key_pem": "-----BEGIN PUBLIC KEY-----\nMFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAE...\n-----END PUBLIC KEY-----",
"contact": "[email protected]"
}
return None
def main():
"""Demonstrate client verification workflow."""
print("SchemaPin Client Verification Example")
print("=" * 45)
# Check if we have demo files from tool developer example
schema_file = Path("demo_schema_signed.json")
well_known_file = Path("demo_well_known.json")
if not schema_file.exists():
print("❌ demo_schema_signed.json not found!")
print("Please run tool_developer.py first to generate demo files.")
return
# Load signed schema
print("\n1. Loading signed schema...")
with open(schema_file) as f:
schema_data = json.load(f)
schema = schema_data["schema"]
signature = schema_data["signature"]
print("✓ Signed schema loaded")
print(f"Schema: {schema['name']} - {schema['description']}")
print(f"Signature: {signature[:32]}...")
# Load well-known response for mocking
well_known_data = None
if well_known_file.exists():
with open(well_known_file) as f:
well_known_data = json.load(f)
# Step 2: Initialize verification workflow
print("\n2. Initializing verification workflow...")
verification_workflow = SchemaVerificationWorkflow()
print("✓ Verification workflow initialized")
# Step 3: Mock the discovery service for demonstration
print("\n3. Simulating public key discovery...")
def mock_get_public_key_pem(domain, timeout=10):
if domain == "example.com" and well_known_data:
return well_known_data["public_key_pem"]
return None
def mock_get_developer_info(domain, timeout=10):
if domain == "example.com" and well_known_data:
return {
"developer_name": well_known_data.get("developer_name", "Unknown"),
"schema_version": well_known_data.get("schema_version", "1.0"),
"contact": well_known_data.get("contact", ""),
}
return None
# Patch the discovery methods
with patch.object(verification_workflow.discovery, 'get_public_key_pem', mock_get_public_key_pem), \
patch.object(verification_workflow.discovery, 'get_developer_info', mock_get_developer_info):
# Step 4: First-time verification (key pinning)
print("\n4. First-time verification (TOFU - Trust On First Use)...")
result = verification_workflow.verify_schema(
schema=schema,
signature_b64=signature,
tool_id="example.com/calculate_sum",
domain="example.com",
auto_pin=True
)
print(f"✓ Verification result: {result}")
if result['valid']:
print("✅ Schema signature is VALID")
if result['first_use']:
print("🔑 Key pinned for future use")
if result['developer_info']:
dev_info = result['developer_info']
print(f"📋 Developer: {dev_info['developer_name']}")
print(f"📧 Contact: {dev_info['contact']}")
else:
print("❌ Schema signature is INVALID")
if result['error']:
print(f"Error: {result['error']}")
# Step 5: Subsequent verification (using pinned key)
print("\n5. Subsequent verification (using pinned key)...")
result2 = verification_workflow.verify_schema(
schema=schema,
signature_b64=signature,
tool_id="example.com/calculate_sum",
domain="example.com"
)
print(f"✓ Verification result: {result2}")
if result2['valid']:
print("✅ Schema signature is VALID (using pinned key)")
print("🔒 Key was already pinned - no network request needed")
else:
print("❌ Schema signature is INVALID")
# Step 6: Show pinned keys
print("\n6. Listing pinned keys...")
pinned_keys = verification_workflow.pinning.list_pinned_keys()
if pinned_keys:
print("✓ Pinned keys:")
for key_info in pinned_keys:
print(f" - Tool: {key_info['tool_id']}")
print(f" Domain: {key_info['domain']}")
print(f" Developer: {key_info['developer_name']}")
print(f" Pinned: {key_info['pinned_at']}")
else:
print("No keys pinned yet")
# Step 7: Demonstrate invalid signature detection
print("\n7. Testing invalid signature detection...")
# Modify the signature to make it invalid
invalid_signature = signature[:-4] + "XXXX"
result3 = verification_workflow.verify_schema(
schema=schema,
signature_b64=invalid_signature,
tool_id="example.com/calculate_sum",
domain="example.com"
)
if not result3['valid']:
print("✅ Invalid signature correctly detected")
print("🛡️ SchemaPin successfully prevented use of tampered schema")
else:
print("❌ Invalid signature was not detected (this should not happen)")
print("\n" + "=" * 45)
print("Client verification workflow complete!")
print("\nKey takeaways:")
print("✓ Valid signatures are accepted")
print("✓ Invalid signatures are rejected")
print("✓ Keys are pinned on first use (TOFU)")
print("✓ Subsequent verifications use pinned keys")
print("✓ Network requests only needed for new tools")
if __name__ == "__main__":
main()