-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathfind_duplicate_pointers.py
More file actions
122 lines (95 loc) · 3.76 KB
/
find_duplicate_pointers.py
File metadata and controls
122 lines (95 loc) · 3.76 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
import json
from datetime import datetime, timedelta, timezone
from typing import Any
import boto3
import fire
from nrlf.core.logger import logger
dynamodb = boto3.client("dynamodb")
paginator = dynamodb.get_paginator("scan")
logger.setLevel("ERROR")
REQUIRED_ATTRIBUTES = [
"nhs_number",
"custodian",
"id",
"master_identifier",
"type_id",
"created_on",
]
def _get_duplicates(
table_name: str, custodians: str | tuple[str], filename: str = "duplicates"
) -> Any:
"""
Get masterids for duplicate pointers in the given table for a list of custodians.
Parameters:
- table_name: The name of the pointers table to use.
- custodians: The ODS codes of the custodian(s) to check.
- filename: A name for the output text file containing the list of affected pointers.
"""
custodian_list = (
custodians.split(",") if isinstance(custodians, str) else list(custodians)
)
print( # noqa
f"Finding duplicate pointers for custodians {custodian_list} in table {table_name}...."
)
required_attributes = REQUIRED_ATTRIBUTES
expression_names_str = ",".join(
[f":param{custodian}" for custodian in custodian_list]
)
expression_values_list = {
f":param{custodian}": {"S": custodian} for custodian in custodian_list
}
params: dict[str, Any] = {
"TableName": table_name,
"PaginationConfig": {"PageSize": 50},
"FilterExpression": f"custodian IN ({expression_names_str})",
"ExpressionAttributeValues": expression_values_list,
"ProjectionExpression": ",".join(required_attributes),
}
pointers_by_key = {}
total_scanned_count = 0
duplicate_count = 0
duplicates_set = set()
start_time = datetime.now(tz=timezone.utc)
for page in paginator.paginate(**params):
for item in page["Items"]:
pointer_id = item.get("id", {}).get("S", "no-id")
pointer_type = item.get("type_id", {}).get("S", "no-type")
master_id = item.get("master_identifier", {}).get("S", "no-master-id")
custodian = item.get("custodian", {}).get("S", "no-custodian")
patient_id = item.get("nhs_number", {}).get("S", "no-patient-id")
created_on = item.get("created_on", {}).get("S", "no-creation-datetime")
pointer_data = {
"id": pointer_id,
"master_id": master_id,
"datetime": created_on,
}
px_type_ods_key = f"{custodian}-{patient_id}-{pointer_type}"
if px_type_ods_key not in pointers_by_key:
pointers_by_key[px_type_ods_key] = [pointer_data]
else:
pointers_by_key[px_type_ods_key].append(pointer_data)
duplicate_count += 1
duplicates_set.add(px_type_ods_key)
total_scanned_count += page["ScannedCount"]
if total_scanned_count % 1000 == 0:
print(".", end="", flush=True) # noqa
if total_scanned_count % 100000 == 0:
print( # noqa
f"scanned={total_scanned_count} found={duplicate_count} potential duplicates "
)
end_time = datetime.now(tz=timezone.utc)
print(" Table scan completed") # noqa
output_pointers = {}
for key in sorted(duplicates_set):
output_pointers[key] = pointers_by_key[key]
print(f"Writing pointers to file ./{filename}.txt ...") # noqa
with open(f"{filename}.txt", "w") as f:
f.write(json.dumps(output_pointers, indent=2))
return {
"output_file": f"{filename}.txt",
"duplicates-found": duplicate_count,
"scanned-count": total_scanned_count,
"took-secs": timedelta.total_seconds(end_time - start_time),
}
if __name__ == "__main__":
fire.Fire(_get_duplicates)