-
Notifications
You must be signed in to change notification settings - Fork 13
Expand file tree
/
Copy pathformatters.py
More file actions
137 lines (109 loc) · 4.75 KB
/
formatters.py
File metadata and controls
137 lines (109 loc) · 4.75 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
import json
from datetime import datetime
from logging import Formatter
from code42cli.maps import CEF_CUSTOM_FIELD_NAME_MAP
from code42cli.maps import FILE_EVENT_TO_SIGNATURE_ID_MAP
from code42cli.maps import JSON_TO_CEF_MAP
CEF_TEMPLATE = (
"CEF:0|Code42|{productName}|1|{signatureID}|{eventName}|{severity}|{extension}"
)
CEF_TIMESTAMP_FIELDS = ["end", "fileCreateTime", "fileModificationTime", "rt"]
class FileEventDictToCEFFormatter(Formatter):
"""Formats file event dicts into CEF format. Attach to a logger via `setFormatter` to use.
Args:
default_product_name: The default value to use in the product name segment of the CEF message.
default_severity_level: The default integer between 1 and 10 to assign to the severity segment of the CEF message.
"""
def __init__(
self,
default_product_name="Advanced Exfiltration Detection",
default_severity_level="5",
):
super().__init__()
self._default_product_name = default_product_name
self._default_severity_level = default_severity_level
def format(self, record):
"""
Args:
record (LogRecord): `record.msg` must be a `dict`.
"""
file_event_dict = record.msg
# security events must convert to file event dict format before calling this.
ext, evt, sig_id = map_event_to_cef(file_event_dict)
cef_log = CEF_TEMPLATE.format(
productName=self._default_product_name,
signatureID=sig_id,
eventName=evt,
severity=self._default_severity_level,
extension=ext,
)
return cef_log
class FileEventDictToJSONFormatter(Formatter):
"""Formats file event dicts into JSON format. Attach to a logger via `setFormatter` to use.
Items in the dictionary whose values are `None`, empty string, or empty lists will be excluded
from the JSON conversion.
"""
def format(self, record):
"""
Args:
record (LogRecord): `record.msg` must be a `dict`.
"""
file_event_dict = record.msg
file_event_dict = {
key: file_event_dict[key]
for key in file_event_dict
if file_event_dict[key] or file_event_dict[key] == 0
}
return json.dumps(file_event_dict)
class FileEventDictToRawJSONFormatter(Formatter):
"""Formats file event dicts into JSON format. Attach to a logger via `setFormatter` to use."""
def format(self, record):
return json.dumps(record.msg)
def _format_cef_kvp(cef_field_key, cef_field_value):
if cef_field_key + "Label" in CEF_CUSTOM_FIELD_NAME_MAP:
return _format_custom_cef_kvp(cef_field_key, cef_field_value)
cef_field_value = _handle_nested_json_fields(cef_field_key, cef_field_value)
if isinstance(cef_field_value, list):
cef_field_value = _convert_list_to_csv(cef_field_value)
elif cef_field_key in CEF_TIMESTAMP_FIELDS:
cef_field_value = convert_file_event_timestamp_to_cef_timestamp(cef_field_value)
return f"{cef_field_key}={cef_field_value}"
def _handle_nested_json_fields(cef_field_key, cef_field_value):
result = []
if cef_field_key == "duser":
result = [
item["cloudUsername"] for item in cef_field_value if type(item) is dict
]
return result or cef_field_value
def _format_custom_cef_kvp(custom_cef_field_key, custom_cef_field_value):
custom_cef_label_key = f"{custom_cef_field_key}Label"
custom_cef_label_value = CEF_CUSTOM_FIELD_NAME_MAP[custom_cef_label_key]
return (
f"{custom_cef_field_key}={custom_cef_field_value} "
f"{custom_cef_label_key}={custom_cef_label_value}"
)
def _convert_list_to_csv(_list):
value = ",".join([val for val in _list])
return value
def convert_file_event_timestamp_to_cef_timestamp(timestamp_value):
try:
_datetime = datetime.strptime(timestamp_value, "%Y-%m-%dT%H:%M:%S.%fZ")
except ValueError:
_datetime = datetime.strptime(timestamp_value, "%Y-%m-%dT%H:%M:%SZ")
value = f"{_datetime_to_ms_since_epoch(_datetime):.0f}"
return value
def _datetime_to_ms_since_epoch(_datetime):
epoch = datetime.utcfromtimestamp(0)
total_seconds = (_datetime - epoch).total_seconds()
# total_seconds will be in decimals (millisecond precision)
return total_seconds * 1000
def map_event_to_cef(event):
kvp_list = {
JSON_TO_CEF_MAP[key]: event[key]
for key in event
if key in JSON_TO_CEF_MAP and (event[key] is not None and event[key] != [])
}
extension = " ".join(_format_cef_kvp(key, kvp_list[key]) for key in kvp_list)
event_name = event.get("eventType", "UNKNOWN")
signature_id = FILE_EVENT_TO_SIGNATURE_ID_MAP.get(event_name, "C42000")
return extension, event_name, signature_id