forked from carbonblack/cbapi-python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_cber.py
More file actions
157 lines (126 loc) · 4.98 KB
/
test_cber.py
File metadata and controls
157 lines (126 loc) · 4.98 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
from cbapi.response.models import Binary, Process, Sensor, Watchlist, Feed
from cbapi.response.rest_api import CbEnterpriseResponseAPI
from nose.tools import assert_equal
from testconfig import config
import unittest
import os
import sys
import sqlite3
sys.path.append(os.path.dirname(__file__))
import requests_cache
#import requests.packages.urllib3
#requests.packages.urllib3.disable_warnings()
cache_file_name = os.path.join(os.path.dirname(__file__), "caches/cber")
#
# Config file parsing
#
if config['cache']['use_golden'].lower() == "false":
use_golden = False
else:
use_golden = True
if not use_golden:
if 'cache_file_name' in config['cache']:
cache_file_name = config['cache']['cache_filename']
if config['cache']['cache_overwrite'].lower() == 'true':
overwrite = True
#
# Delete old cache
#
try:
os.remove(cache_file_name + ".sqlite")
except:
pass
#
# Create our assertion table
#
con = sqlite3.connect(cache_file_name + ".sqlite")
cur = con.cursor()
cur.execute('drop table if exists AssertionTest')
cur.execute('create table AssertionTest (testname string primary key, testresult string)')
con.commit()
con.close()
#
# Install the cache
# NOTE: deny_outbound is set so we don't attempt comms and force using the cache
#
requests_cache.install_cache(cache_file_name, allowable_methods=('GET', 'POST'), deny_outbound=use_golden)
def insertResult(testname, testresult):
con = sqlite3.connect(cache_file_name + ".sqlite")
cur = con.cursor()
cur.execute('insert into AssertionTest VALUES ("{0}", "{1}")'.format(testname, testresult))
con.commit()
con.close()
def getTestResult(testname):
con = sqlite3.connect(cache_file_name + ".sqlite")
cur = con.cursor()
cur.execute('select testresult from AssertionTest where testname == "{0}"'.format(testname))
result = cur.fetchone()
con.close()
return result
class TestCbResponse(unittest.TestCase):
def setUp(self):
if use_golden:
#
# We don't want to connect to a cbserver so using bogus values
#
self.c = CbEnterpriseResponseAPI(url="http://localhost", token="N/A", ssl_verify=False)
else:
self.c = CbEnterpriseResponseAPI()
self.sensor = self.c.select(Sensor, 1)
self.lr_session = self.sensor.lr_session()
def test_all_binary(self):
binary_query = self.c.select(Binary).where('')
if use_golden:
test_result = getTestResult('test_all_binary')[0]
assert_equal(len(binary_query),
test_result,
"Number of Binaries returned should be {0}, but received {1}".format(
test_result, len(binary_query)))
else:
insertResult('test_all_binary', str(len(binary_query)))
def test_read_binary(self):
data = self.c.select(Binary).where('').first().file.read(2)
if use_golden:
assert_equal(data, b'MZ')
def test_all_process(self):
process_query = self.c.select(Process).where('')
if use_golden:
test_result = getTestResult('test_all_process')[0]
assert_equal(len(process_query),
test_result,
"Number of Processes returned should be {0}, but received {1}".format(
test_result, len(process_query)))
else:
insertResult('test_all_process', str(len(process_query)))
def test_cblr_ps(self):
processes = self.lr_session.list_processes()
if use_golden:
test_result = len(processes)
assert_equal(len(processes),
test_result,
"Number of Binaries returned should be {0}, but received {1}".format(
test_result, len(processes)))
else:
insertResult('test_cblr_ps', str(len(processes)))
def test_cblr_get(self):
self.lr_session.get_raw_file(r"C:\test.txt")
def test_cber_watchlists(self):
watchlists = self.c.select(Watchlist)
if use_golden:
test_result = getTestResult('test_cber_watchlists')[0]
assert_equal(len(watchlists),
test_result,
"Number of Watchlists returned should be {0}, but received {1}".format(
test_result, len(watchlists)))
else:
insertResult('test_cber_watchlists', str(len(watchlists)))
def test_cber_feeds(self):
feeds = self.c.select(Feed)
if use_golden:
test_result = getTestResult('test_cber_feeds')[0]
assert_equal(len(feeds),
test_result,
"Number of Feeds returned should be {0}, but received {1}".format(
test_result, len(feeds)))
else:
insertResult('test_cber_feeds', str(len(feeds)))