forked from demisto/content
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathvalidate_index.py
More file actions
182 lines (141 loc) · 7.87 KB
/
validate_index.py
File metadata and controls
182 lines (141 loc) · 7.87 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
"""Run validation on the index.json file.
Check index.json file inside the index.zip archive in the cloud.
Validate no missing ids are found and that all packs have a non negative price.
Validate commit hash is in master's history.
"""
import argparse
import logging
import sys
import os
from Tests.Marketplace.marketplace_services import init_storage_client, load_json, get_content_git_client
from Tests.Marketplace.upload_packs import download_and_extract_index
from Tests.Marketplace.marketplace_constants import GCPConfig, CONTENT_ROOT_PATH
from Tests.scripts.utils.log_util import install_logging
from pprint import pformat
MANDATORY_PREMIUM_PACKS_PATH = "Tests/Marketplace/mandatory_premium_packs.json"
def options_handler():
parser = argparse.ArgumentParser(description='Run validation on the index.json file.')
parser.add_argument('-e', '--extract_path',
help=f'Full path of folder to extract the {GCPConfig.INDEX_NAME}.zip to',
required=True)
parser.add_argument('-pb', '--production_bucket_name', help='Production bucket name', required=True)
parser.add_argument('-sb', '--storage_base_path', help="Storage base path of the directory to upload to.",
required=False)
parser.add_argument('-sa', '--service_account', help='Path to gcloud service account', required=True)
parser.add_argument('-c', '--circle_branch', help="CircleCi branch of current build", required=True)
options = parser.parse_args()
return options
def log_message_if_statement(statement: bool, error_message: str, success_message: str = None) -> bool:
"""Log error message if statement is false, Log success otherwise
Args:
statement: The boolean statement to check.
error_message: The error message to log if statement is false
success_message: The success message to log if statement is true
Returns: The statements boolean value.
"""
if not statement:
logging.error(error_message)
elif success_message:
logging.success(success_message)
return statement
def check_index_data(index_data: dict) -> bool:
"""Check index.json file inside the index.zip archive in the cloud.
Validate by running verify_pack on each pack.
Args:
index_data: Dictionary of the index.json contents.
Returns: True if all packs are valid, False otherwise.
"""
logging.info("Found index data in index file. Checking...")
logging.debug(f"Index data is:\n {pformat(index_data)}")
packs_list_exists = log_message_if_statement(statement=(len(index_data.get("packs", [])) != 0),
error_message="Found 0 packs in index file."
"\nAborting the rest of the check.")
# If all packs are gone, return False
if not packs_list_exists:
return False
mandatory_pack_ids = load_json(MANDATORY_PREMIUM_PACKS_PATH).get("packs", [])
packs_are_valid = True
for pack in index_data["packs"]:
pack_is_good = verify_pack(pack)
if not pack_is_good:
packs_are_valid = False
if pack["id"] in mandatory_pack_ids:
mandatory_pack_ids.remove(pack["id"])
all_mandatory_packs_are_found = log_message_if_statement(statement=(mandatory_pack_ids == []),
error_message=f"index json is missing some mandatory"
f" pack ids: {pformat(mandatory_pack_ids)}",
success_message="All premium mandatory pack ids were"
" found in the index.json file.")
return all([packs_are_valid, all_mandatory_packs_are_found])
def verify_pack(pack: dict) -> bool:
"""Verify the pack id is not empty and it's price is non negative.
Args:
pack: The pack to verify.
Returns: True if pack is valid, False otherwise.
"""
id_exists = log_message_if_statement(statement=(pack.get("id", "") not in ("", None)),
error_message="There is a missing pack id.")
price_is_valid = log_message_if_statement(statement=(pack.get("price", -1) >= 0),
error_message=f"The price on the pack {pack.get('id', '')} is invalid.",
success_message=f"The price on the pack {pack.get('id', '')} is valid.")
return all([id_exists, price_is_valid])
def check_commit_in_branch_history(index_commit_hash: str, circle_branch: str) -> bool:
"""Assert commit hash is in branch's commit history.
Args:
index_commit_hash: commit hash
circle_branch: circle branch of current run
Returns: True if commit hash is in branch's commit history, False otherwise.
"""
content_repo = get_content_git_client(CONTENT_ROOT_PATH)
branch_commits = list(map(lambda commit: commit.hexsha, list(content_repo.iter_commits(f"origin/{circle_branch}"))))
return log_message_if_statement(statement=(index_commit_hash in branch_commits),
error_message=f"Commit hash {index_commit_hash} is not in {circle_branch} history",
success_message="Commit hash in index file is valid.")
def get_index_json_data(service_account: str, production_bucket_name: str, extract_path: str, storage_base_path: str) \
-> (dict, str):
"""Retrieve the index.json file from production bucket.
Args:
service_account: Path to gcloud service account
production_bucket_name: Production bucket name
extract_path: Full path of folder to extract the index.zip to
storage_base_path: The base path in the bucket
Returns:
(Dict: content of the index.json, Str: path to index.json)
"""
logging.info('Downloading and extracting index.zip from the cloud')
if storage_base_path:
GCPConfig.STORAGE_BASE_PATH = storage_base_path
storage_client = init_storage_client(service_account)
production_bucket = storage_client.bucket(production_bucket_name)
index_folder_path, _, _ = download_and_extract_index(production_bucket, extract_path)
logging.info("Retrieving the index file")
index_file_path = os.path.join(index_folder_path, f"{GCPConfig.INDEX_NAME}.json")
index_data = load_json(index_file_path)
return index_data, index_file_path
def main():
install_logging("Validate index.log")
options = options_handler()
exit_code = 0
index_data, index_file_path = get_index_json_data(
service_account=options.service_account, production_bucket_name=options.production_bucket_name,
extract_path=options.extract_path, storage_base_path=options.storage_base_path
)
# Validate index.json file
index_is_valid = check_index_data(index_data)
log_message_if_statement(statement=index_is_valid,
error_message=f"The packs in the {index_file_path} file were found invalid.",
success_message=f"{index_file_path} file was found valid")
# Validate commit hash in master history
commit_hash_is_valid = log_message_if_statement(statement=("commit" in index_data),
error_message="No commit field was found in the index.json")
if commit_hash_is_valid:
commit_hash_is_valid = check_commit_in_branch_history(index_data.get("commit", ""), options.circle_branch)
if not all([index_is_valid, commit_hash_is_valid]):
logging.critical("Index content is invalid. Aborting.")
exit_code = 1
# Deleting GCS PATH before exit
if exit_code == 1 and os.path.exists(options.service_account):
os.remove(options.service_account)
sys.exit(exit_code)
if __name__ == '__main__':
main()