import traceback |
import multiprocessing |
import time |
import sys |
from core.interact import interact as io |
class Subprocessor(object): |
class SilenceException(Exception): |
pass |
class Cli(object): |
def __init__ ( self, client_dict ): |
s2c = multiprocessing.Queue() |
c2s = multiprocessing.Queue() |
self.p = multiprocessing.Process(target=self._subprocess_run, args=(client_dict,s2c,c2s) ) |
self.s2c = s2c |
self.c2s = c2s |
self.p.daemon = True |
self.p.start() |
self.state = None |
self.sent_time = None |
self.sent_data = None |
self.name = None |
self.host_dict = None |
def kill(self): |
self.p.terminate() |
self.p.join() |
def on_initialize(self, client_dict): |
pass |
def on_finalize(self): |
pass |
def process_data(self, data): |
raise NotImplementedError |
def get_data_name (self, data): |
return "undefined" |
def log_info(self, msg): self.c2s.put ( {'op': 'log_info', 'msg':msg } ) |
def log_err(self, msg): self.c2s.put ( {'op': 'log_err' , 'msg':msg } ) |
def progress_bar_inc(self, c): self.c2s.put ( {'op': 'progress_bar_inc' , 'c':c } ) |
def _subprocess_run(self, client_dict, s2c, c2s): |
self.c2s = c2s |
data = None |
is_error = False |
try: |
self.on_initialize(client_dict) |
c2s.put ( {'op': 'init_ok'} ) |
while True: |
msg = s2c.get() |
op = msg.get('op','') |
if op == 'data': |
data = msg['data'] |
result = self.process_data (data) |
c2s.put ( {'op': 'success', 'data' : data, 'result' : result} ) |
data = None |
elif op == 'close': |
break |
time.sleep(0.001) |
self.on_finalize() |
c2s.put ( {'op': 'finalized'} ) |
except Subprocessor.SilenceException as e: |
c2s.put ( {'op': 'error', 'data' : data} ) |
except Exception as e: |
err_msg = traceback.format_exc() |
c2s.put ( {'op': 'error', 'data' : data, 'err_msg' : err_msg} ) |
c2s.close() |
s2c.close() |
self.c2s = None |
def __getstate__(self): |
return dict() |
def __setstate__(self, d): |
self.__dict__.update(d) |
def __init__(self, name, SubprocessorCli_class, no_response_time_sec = 0, io_loop_sleep_time=0.005, initialize_subprocesses_in_serial=False): |
if not issubclass(SubprocessorCli_class, Subprocessor.Cli): |
raise ValueError("SubprocessorCli_class must be subclass of Subprocessor.Cli") |
self.name = name |
self.SubprocessorCli_class = SubprocessorCli_class |
self.no_response_time_sec = no_response_time_sec |
self.io_loop_sleep_time = io_loop_sleep_time |
self.initialize_subprocesses_in_serial = initialize_subprocesses_in_serial |
def process_info_generator(self): |
raise NotImplementedError |
def on_clients_initialized(self): |
pass |
def on_clients_finalized(self): |
pass |
def get_data(self, host_dict): |
raise NotImplementedError |
def on_data_return (self, host_dict, data): |
raise NotImplementedError |
def on_result (self, host_dict, data, result): |
raise NotImplementedError |
def get_result(self): |
return None |
def on_tick(self): |
return True |
def on_check_run(self): |
return True |
def run(self): |
if not self.on_check_run(): |
return self.get_result() |
self.clis = [] |
def cli_init_dispatcher(cli): |
while not cli.c2s.empty(): |
obj = cli.c2s.get() |
op = obj.get('op','') |
if op == 'init_ok': |
cli.state = 0 |
elif op == 'log_info': |
io.log_info(obj['msg']) |
elif op == 'log_err': |
io.log_err(obj['msg']) |
elif op == 'error': |
err_msg = obj.get('err_msg', None) |
if err_msg is not None: |
io.log_info(f'Error while subprocess initialization: {err_msg}') |
cli.kill() |
self.clis.remove(cli) |
break |
for name, host_dict, client_dict in self.process_info_generator(): |
try: |
cli = self.SubprocessorCli_class(client_dict) |
cli.state = 1 |
cli.sent_time = 0 |
cli.sent_data = None |
cli.name = name |
cli.host_dict = host_dict |
self.clis.append (cli) |
if self.initialize_subprocesses_in_serial: |
while True: |
cli_init_dispatcher(cli) |
if cli.state == 0: |
break |
io.process_messages(0.005) |
except: |
raise Exception (f"Unable to start subprocess {name}. Error: {traceback.format_exc()}") |
if len(self.clis) == 0: |
raise Exception ("Unable to start Subprocessor '%s' " % (self.name)) |
while True: |
for cli in self.clis[:]: |
cli_init_dispatcher(cli) |
if all ([cli.state == 0 for cli in self.clis]): |
break |
io.process_messages(0.005) |
if len(self.clis) == 0: |
raise Exception ( "Unable to start subprocesses." ) |
self.on_clients_initialized() |
while True: |
for cli in self.clis[:]: |
while not cli.c2s.empty(): |
obj = cli.c2s.get() |
op = obj.get('op','') |
if op == 'success': |
self.on_result (cli.host_dict, obj['data'], obj['result']) |
self.sent_data = None |
cli.state = 0 |
elif op == 'error': |
err_msg = obj.get('err_msg', None) |
if err_msg is not None: |
io.log_info(f'Error while processing data: {err_msg}') |
if 'data' in obj.keys(): |
self.on_data_return (cli.host_dict, obj['data'] ) |
cli.kill() |
self.clis.remove(cli) |
elif op == 'log_info': |
io.log_info(obj['msg']) |
elif op == 'log_err': |
io.log_err(obj['msg']) |
elif op == 'progress_bar_inc': |
io.progress_bar_inc(obj['c']) |
for cli in self.clis[:]: |
if cli.state == 1: |
if cli.sent_time != 0 and self.no_response_time_sec != 0 and (time.time() - cli.sent_time) > self.no_response_time_sec: |
print ( '%s doesnt response, terminating it.' % (cli.name) ) |
self.on_data_return (cli.host_dict, cli.sent_data ) |
cli.kill() |
self.clis.remove(cli) |
for cli in self.clis[:]: |
if cli.state == 0: |
data = self.get_data(cli.host_dict) |
if data is not None: |
cli.s2c.put ( {'op': 'data', 'data' : data} ) |
cli.sent_time = time.time() |
cli.sent_data = data |
cli.state = 1 |
if self.io_loop_sleep_time != 0: |
io.process_messages(self.io_loop_sleep_time) |
if self.on_tick() and all ([cli.state == 0 for cli in self.clis]): |
break |
for cli in self.clis[:]: |
cli.s2c.put ( {'op': 'close'} ) |
cli.sent_time = time.time() |
while True: |
for cli in self.clis[:]: |
terminate_it = False |
while not cli.c2s.empty(): |
obj = cli.c2s.get() |
obj_op = obj['op'] |
if obj_op == 'finalized': |
terminate_it = True |
break |
if (time.time() - cli.sent_time) > 30: |
terminate_it = True |
if terminate_it: |
cli.state = 2 |
cli.kill() |
if all ([cli.state == 2 for cli in self.clis]): |
break |
self.on_clients_finalized() |
return self.get_result() |