|
import multiprocessing |
|
import sys |
|
import time |
|
import traceback |
|
|
|
from PyQt5.QtCore import * |
|
from PyQt5.QtGui import * |
|
from PyQt5.QtWidgets import * |
|
|
|
from core.interact import interact as io |
|
|
|
from .qtex import * |
|
|
|
class QSubprocessor(object): |
|
""" |
|
|
|
""" |
|
|
|
class Cli(object): |
|
def __init__ ( self, client_dict ): |
|
s2c = multiprocessing.Queue() |
|
c2s = multiprocessing.Queue() |
|
self.p = multiprocessing.Process(target=self._subprocess_run, args=(client_dict,s2c,c2s) ) |
|
self.s2c = s2c |
|
self.c2s = c2s |
|
self.p.daemon = True |
|
self.p.start() |
|
|
|
self.state = None |
|
self.sent_time = None |
|
self.sent_data = None |
|
self.name = None |
|
self.host_dict = None |
|
|
|
def kill(self): |
|
self.p.terminate() |
|
self.p.join() |
|
|
|
|
|
def on_initialize(self, client_dict): |
|
|
|
pass |
|
|
|
|
|
def on_finalize(self): |
|
|
|
pass |
|
|
|
|
|
def process_data(self, data): |
|
|
|
raise NotImplementedError |
|
|
|
|
|
def get_data_name (self, data): |
|
|
|
return "undefined" |
|
|
|
def log_info(self, msg): self.c2s.put ( {'op': 'log_info', 'msg':msg } ) |
|
def log_err(self, msg): self.c2s.put ( {'op': 'log_err' , 'msg':msg } ) |
|
def progress_bar_inc(self, c): self.c2s.put ( {'op': 'progress_bar_inc' , 'c':c } ) |
|
|
|
def _subprocess_run(self, client_dict, s2c, c2s): |
|
self.c2s = c2s |
|
data = None |
|
try: |
|
self.on_initialize(client_dict) |
|
c2s.put ( {'op': 'init_ok'} ) |
|
while True: |
|
msg = s2c.get() |
|
op = msg.get('op','') |
|
if op == 'data': |
|
data = msg['data'] |
|
result = self.process_data (data) |
|
c2s.put ( {'op': 'success', 'data' : data, 'result' : result} ) |
|
data = None |
|
elif op == 'close': |
|
break |
|
time.sleep(0.001) |
|
self.on_finalize() |
|
c2s.put ( {'op': 'finalized'} ) |
|
except Exception as e: |
|
c2s.put ( {'op': 'error', 'data' : data} ) |
|
if data is not None: |
|
print ('Exception while process data [%s]: %s' % (self.get_data_name(data), traceback.format_exc()) ) |
|
else: |
|
print ('Exception: %s' % (traceback.format_exc()) ) |
|
c2s.close() |
|
s2c.close() |
|
self.c2s = None |
|
|
|
|
|
def __getstate__(self): |
|
return dict() |
|
def __setstate__(self, d): |
|
self.__dict__.update(d) |
|
|
|
|
|
def __init__(self, name, SubprocessorCli_class, no_response_time_sec = 0, io_loop_sleep_time=0.005): |
|
if not issubclass(SubprocessorCli_class, QSubprocessor.Cli): |
|
raise ValueError("SubprocessorCli_class must be subclass of QSubprocessor.Cli") |
|
|
|
self.name = name |
|
self.SubprocessorCli_class = SubprocessorCli_class |
|
self.no_response_time_sec = no_response_time_sec |
|
self.io_loop_sleep_time = io_loop_sleep_time |
|
|
|
self.clis = [] |
|
|
|
|
|
for name, host_dict, client_dict in self.process_info_generator(): |
|
try: |
|
cli = self.SubprocessorCli_class(client_dict) |
|
cli.state = 1 |
|
cli.sent_time = 0 |
|
cli.sent_data = None |
|
cli.name = name |
|
cli.host_dict = host_dict |
|
|
|
self.clis.append (cli) |
|
except: |
|
raise Exception (f"Unable to start subprocess {name}. Error: {traceback.format_exc()}") |
|
|
|
if len(self.clis) == 0: |
|
raise Exception ("Unable to start QSubprocessor '%s' " % (self.name)) |
|
|
|
|
|
while True: |
|
for cli in self.clis[:]: |
|
while not cli.c2s.empty(): |
|
obj = cli.c2s.get() |
|
op = obj.get('op','') |
|
if op == 'init_ok': |
|
cli.state = 0 |
|
elif op == 'log_info': |
|
io.log_info(obj['msg']) |
|
elif op == 'log_err': |
|
io.log_err(obj['msg']) |
|
elif op == 'error': |
|
cli.kill() |
|
self.clis.remove(cli) |
|
break |
|
if all ([cli.state == 0 for cli in self.clis]): |
|
break |
|
io.process_messages(0.005) |
|
|
|
if len(self.clis) == 0: |
|
raise Exception ( "Unable to start subprocesses." ) |
|
|
|
|
|
self.on_clients_initialized() |
|
|
|
self.q_timer = QTimer() |
|
self.q_timer.timeout.connect(self.tick) |
|
self.q_timer.start(5) |
|
|
|
|
|
def process_info_generator(self): |
|
|
|
for i in range(min(multiprocessing.cpu_count(), 8) ): |
|
yield 'CPU%d' % (i), {}, {} |
|
|
|
|
|
def on_clients_initialized(self): |
|
|
|
pass |
|
|
|
|
|
def on_clients_finalized(self): |
|
|
|
pass |
|
|
|
|
|
def get_data(self, host_dict): |
|
|
|
raise NotImplementedError |
|
|
|
|
|
def on_data_return (self, host_dict, data): |
|
|
|
raise NotImplementedError |
|
|
|
|
|
def on_result (self, host_dict, data, result): |
|
|
|
raise NotImplementedError |
|
|
|
def tick(self): |
|
for cli in self.clis[:]: |
|
while not cli.c2s.empty(): |
|
obj = cli.c2s.get() |
|
op = obj.get('op','') |
|
if op == 'success': |
|
|
|
self.on_result (cli.host_dict, obj['data'], obj['result']) |
|
self.sent_data = None |
|
cli.state = 0 |
|
elif op == 'error': |
|
|
|
if 'data' in obj.keys(): |
|
self.on_data_return (cli.host_dict, obj['data'] ) |
|
|
|
cli.kill() |
|
self.clis.remove(cli) |
|
elif op == 'log_info': |
|
io.log_info(obj['msg']) |
|
elif op == 'log_err': |
|
io.log_err(obj['msg']) |
|
elif op == 'progress_bar_inc': |
|
io.progress_bar_inc(obj['c']) |
|
|
|
for cli in self.clis[:]: |
|
if cli.state == 1: |
|
if cli.sent_time != 0 and self.no_response_time_sec != 0 and (time.time() - cli.sent_time) > self.no_response_time_sec: |
|
|
|
io.log_info ( '%s doesnt response, terminating it.' % (cli.name) ) |
|
self.on_data_return (cli.host_dict, cli.sent_data ) |
|
cli.kill() |
|
self.clis.remove(cli) |
|
|
|
for cli in self.clis[:]: |
|
if cli.state == 0: |
|
|
|
data = self.get_data(cli.host_dict) |
|
if data is not None: |
|
|
|
cli.s2c.put ( {'op': 'data', 'data' : data} ) |
|
cli.sent_time = time.time() |
|
cli.sent_data = data |
|
cli.state = 1 |
|
|
|
if all ([cli.state == 0 for cli in self.clis]): |
|
|
|
for cli in self.clis[:]: |
|
cli.s2c.put ( {'op': 'close'} ) |
|
cli.sent_time = time.time() |
|
|
|
while True: |
|
for cli in self.clis[:]: |
|
terminate_it = False |
|
while not cli.c2s.empty(): |
|
obj = cli.c2s.get() |
|
obj_op = obj['op'] |
|
if obj_op == 'finalized': |
|
terminate_it = True |
|
break |
|
|
|
if (time.time() - cli.sent_time) > 30: |
|
terminate_it = True |
|
|
|
if terminate_it: |
|
cli.state = 2 |
|
cli.kill() |
|
|
|
if all ([cli.state == 2 for cli in self.clis]): |
|
break |
|
|
|
|
|
self.q_timer.stop() |
|
self.q_timer = None |
|
self.on_clients_finalized() |
|
|
|
|