Dashboard: use Popen() on Windows (#5110)

This commit is contained in:
Kuba Szczodrzyński 2023-07-19 22:39:35 +02:00 committed by Jesse Hills
parent ab32dd7420
commit 73db164fb1
No known key found for this signature in database
GPG key ID: BEAAE804EFD8E83A

View file

@ -25,6 +25,7 @@ import tornado.ioloop
import tornado.iostream import tornado.iostream
import tornado.netutil import tornado.netutil
import tornado.process import tornado.process
import tornado.queues
import tornado.web import tornado.web
import tornado.websocket import tornado.websocket
import yaml import yaml
@ -202,7 +203,11 @@ class EsphomeCommandWebSocket(tornado.websocket.WebSocketHandler):
def __init__(self, application, request, **kwargs): def __init__(self, application, request, **kwargs):
super().__init__(application, request, **kwargs) super().__init__(application, request, **kwargs)
self._proc = None self._proc = None
self._queue = None
self._is_closed = False self._is_closed = False
# Windows doesn't support non-blocking pipes,
# use Popen() with a reading thread instead
self._use_popen = os.name == "nt"
@authenticated @authenticated
def on_message(self, message): def on_message(self, message):
@ -224,13 +229,28 @@ class EsphomeCommandWebSocket(tornado.websocket.WebSocketHandler):
return return
command = self.build_command(json_message) command = self.build_command(json_message)
_LOGGER.info("Running command '%s'", " ".join(shlex_quote(x) for x in command)) _LOGGER.info("Running command '%s'", " ".join(shlex_quote(x) for x in command))
self._proc = tornado.process.Subprocess(
command, if self._use_popen:
stdout=tornado.process.Subprocess.STREAM, self._queue = tornado.queues.Queue()
stderr=subprocess.STDOUT, # pylint: disable=consider-using-with
stdin=tornado.process.Subprocess.STREAM, self._proc = subprocess.Popen(
) command,
self._proc.set_exit_callback(self._proc_on_exit) stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
stdout_thread = threading.Thread(target=self._stdout_thread)
stdout_thread.daemon = True
stdout_thread.start()
else:
self._proc = tornado.process.Subprocess(
command,
stdout=tornado.process.Subprocess.STREAM,
stderr=subprocess.STDOUT,
stdin=tornado.process.Subprocess.STREAM,
)
self._proc.set_exit_callback(self._proc_on_exit)
tornado.ioloop.IOLoop.current().spawn_callback(self._redirect_stdout) tornado.ioloop.IOLoop.current().spawn_callback(self._redirect_stdout)
@property @property
@ -252,7 +272,13 @@ class EsphomeCommandWebSocket(tornado.websocket.WebSocketHandler):
while True: while True:
try: try:
data = yield self._proc.stdout.read_until_regex(reg) if self._use_popen:
data = yield self._queue.get()
if data is None:
self._proc_on_exit(self._proc.poll())
break
else:
data = yield self._proc.stdout.read_until_regex(reg)
except tornado.iostream.StreamClosedError: except tornado.iostream.StreamClosedError:
break break
data = codecs.decode(data, "utf8", "replace") data = codecs.decode(data, "utf8", "replace")
@ -260,6 +286,19 @@ class EsphomeCommandWebSocket(tornado.websocket.WebSocketHandler):
_LOGGER.debug("> stdout: %s", data) _LOGGER.debug("> stdout: %s", data)
self.write_message({"event": "line", "data": data}) self.write_message({"event": "line", "data": data})
def _stdout_thread(self):
if not self._use_popen:
return
while True:
data = self._proc.stdout.readline()
if data:
data = data.replace(b"\r", b"")
self._queue.put_nowait(data)
if self._proc.poll() is not None:
break
self._proc.wait(1.0)
self._queue.put_nowait(None)
def _proc_on_exit(self, returncode): def _proc_on_exit(self, returncode):
if not self._is_closed: if not self._is_closed:
# Check if the proc was not forcibly closed # Check if the proc was not forcibly closed
@ -270,7 +309,10 @@ class EsphomeCommandWebSocket(tornado.websocket.WebSocketHandler):
# Check if proc exists (if 'start' has been run) # Check if proc exists (if 'start' has been run)
if self.is_process_active: if self.is_process_active:
_LOGGER.debug("Terminating process") _LOGGER.debug("Terminating process")
self._proc.proc.terminate() if self._use_popen:
self._proc.terminate()
else:
self._proc.proc.terminate()
# Shutdown proc on WS close # Shutdown proc on WS close
self._is_closed = True self._is_closed = True