Skip to content

Commit

Permalink
all loop to ensure kernel is alive before connecting working unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Zsailer committed Nov 17, 2022
1 parent bfc4153 commit 2650341
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 8 deletions.
18 changes: 15 additions & 3 deletions jupyter_server/services/kernels/connection/channels.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import asyncio
import json
import time
import weakref
from concurrent.futures import Future
from textwrap import dedent
Expand All @@ -16,6 +17,8 @@
except ImportError:
from jupyter_client.jsonutil import date_default as json_default

from jupyter_client.utils import ensure_async

from jupyter_server.transutils import _i18n

from .abc import KernelWebsocketConnectionABC
Expand Down Expand Up @@ -282,7 +285,7 @@ async def _register_session(self):
if (
self.kernel_id in self.multi_kernel_manager
): # only update open sessions if kernel is actively managed
self._open_sessions[self.session_key] = self
self._open_sessions[self.session_key] = self.websocket_handler

async def prepare(self):
# check session collision:
Expand All @@ -302,6 +305,12 @@ async def prepare(self):
self.kernel_manager.reason = str(e)
raise web.HTTPError(500, str(e)) from e

t0 = time.time()
while not await ensure_async(self.kernel_manager.is_alive()):
await asyncio.sleep(0.1)
if time.time() - t0 > self.multi_kernel_manager.kernel_info_timeout:
raise TimeoutError("Kernel never reached an 'alive' state.")

self.session.key = self.kernel_manager.session.key
future = self.request_kernel_info()

Expand Down Expand Up @@ -360,7 +369,7 @@ def replay(value):
for _, stream in self.channels.items():
if not stream.closed():
stream.close()
self.close()
self.disconnect()
return

self.multi_kernel_manager.add_restart_callback(self.kernel_id, self.on_kernel_restarted)
Expand All @@ -376,6 +385,9 @@ def subscribe(value):
ZMQChannelsWebsocketConnection._open_sockets.add(self)
return connected

def close(self):
return self.disconnect()

def disconnect(self):
self.log.debug("Websocket closed %s", self.session_key)
# unregister myself as an open session (only if it's really me)
Expand Down Expand Up @@ -536,7 +548,7 @@ def _on_zmq_reply(self, stream, msg_list):
# eventloop but hasn't been called.
if stream.closed():
self.log.warning("zmq message arrived on closed channel")
self.close()
self.disconnect()
return
channel = getattr(stream, "channel", None)
if self.subprotocol == "v1.kernel.websocket.jupyter.org":
Expand Down
3 changes: 1 addition & 2 deletions jupyter_server/services/kernels/websocket.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,7 @@ async def pre_get(self):

kernel = self.kernel_manager.get_kernel(self.kernel_id)
self.connection = self.kernel_websocket_connection_class(
parent=kernel,
websocket_handler=self,
parent=kernel, websocket_handler=self, config=self.config
)

if self.get_argument("session_id", None):
Expand Down
10 changes: 7 additions & 3 deletions tests/services/sessions/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

from ...utils import expected_http_error

TEST_TIMEOUT = 60
TEST_TIMEOUT = 10


@pytest.fixture(autouse=True)
Expand Down Expand Up @@ -556,9 +556,13 @@ async def test_restart_kernel(session_client, jp_base_url, jp_fetch, jp_ws_fetch
model = json.loads(r.body.decode())
assert model["connections"] == 0

# Open a websocket connection.
await jp_ws_fetch("api", "kernels", kid, "channels")
# Add a delay to give the kernel enough time to restart.
# time.sleep(2)

# Open a websocket connection.
ws2 = await jp_ws_fetch("api", "kernels", kid, "channels")
# Close/open websocket
ws2.close()
r = await jp_fetch("api", "kernels", kid, method="GET")
model = json.loads(r.body.decode())
assert model["connections"] == 1

0 comments on commit 2650341

Please sign in to comment.