-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmaster.py
More file actions
123 lines (101 loc) · 4.67 KB
/
master.py
File metadata and controls
123 lines (101 loc) · 4.67 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
import subprocess
import argparse
import time
import grpc
from concurrent import futures
import logging
import os
import json
import shutil
from key_val_pb2 import *
from key_val_pb2_grpc import *
import grpc
logging.basicConfig(level=logging.INFO, format="Master" + ": %(message)s")
def get_keys_from_key_store(ip, port):
with grpc.insecure_channel(ip + ":" + str(port)) as channel:
stub = master_kvStub(channel)
response = stub.get_number_of_keys(key_length_request(get_key="dict len"))
return int(response.key_length)
def main(input_location, num_mappers, num_reducers, map_func, red_func, config_dict):
# prepare key val store & mapper processes
data_store_process = "python data_server.py -i " + str(input_location) + " -m " + str(num_mappers) \
+ " -ip " + config_dict['data_store_ip'] + " -p " + config_dict['data_store_port']
key_store_process = "python key_store_server.py -ip " + config_dict['key_store_ip'] + " -p " \
+ config_dict['key_store_port']
process_list = [data_store_process, key_store_process]
for i in range(num_mappers):
process_list.append("python mapper.py -i " + str(i) + " -f " + map_func + " --dip " +
config_dict['data_store_ip'] + " --dport " + config_dict['data_store_port'] +
" --kip " + config_dict['key_store_ip'] + " --kport " + config_dict['key_store_port'])
# invoke mappers
logging.info("Spawning key-store, data service & mappers")
processes = [subprocess.Popen(process) for process in process_list]
while True:
if len(processes) == 2:
break
for index, process in enumerate(processes):
if process.poll() is not None:
processes.pop(index)
else:
if len(processes) == 2:
break
time.sleep(0.8)
# At barrier
logging.info("Mappers have exited")
logging.info("At barrier")
# Get number of keys in the store
keys = get_keys_from_key_store(config_dict['key_store_ip'], config_dict['key_store_port'])
# prepare reducers
start_index = 0
chunk = keys // num_reducers
end_index = chunk
reducers_list = []
for i in range(num_reducers):
if i == num_reducers - 1:
reducers_list.append("python reducer.py -s " + str(start_index) + " -e "
+ "last" + " -f " + red_func + " -i " + str(i) + " -dip " +
config_dict["data_store_ip"] + " -dp " + config_dict["data_store_port"] + " -kip "
+ config_dict["key_store_ip"] + " -kp " + config_dict["key_store_port"])
else:
reducers_list.append("python reducer.py -s " + str(start_index) + " -e "
+ str(end_index) + " -f " + red_func + " -i " + str(i) + " -dip " +
config_dict["data_store_ip"] + " -dp " + config_dict["data_store_port"] + " -kip "
+ config_dict["key_store_ip"] + " -kp " + config_dict["key_store_port"])
start_index = end_index
end_index += chunk
logging.info("Spawning reducers")
# Invoke reducers
for process in reducers_list:
processes.append(subprocess.Popen(process))
while True:
if len(processes) == 2:
break
for index, process in enumerate(processes):
if process.poll() is not None:
processes.pop(index)
else:
if len(processes) == 2:
break
time.sleep(0.8)
logging.info("Reducers have exited")
processes[0].terminate()
processes[1].terminate()
processes.pop()
processes.pop()
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Start the master")
parser.add_argument('-i', '--inputlocation', type=str, default=1,
help='Enter input data location')
parser.add_argument('-m', '--mappers', type=int, default=1,
help='Enter number of mappers')
parser.add_argument('-r', '--reducers', type=int, default=1,
help='Enter number of reducers')
parser.add_argument('-mf', '--map', type=str, default=None,
help='Enter map function')
parser.add_argument('-rf', '--red', type=str, default=None,
help='Enter reduce function')
args = parser.parse_args()
with open('./config.json') as config_file:
config_data = json.load(config_file)
config_dict = config_data
main(args.inputlocation, args.mappers, args.reducers, args.map, args.red, config_dict)