Skip to content

Commit

Permalink
support python 3 only; note BUG: shell commands with multiple words d…
Browse files Browse the repository at this point in the history
…ont work
  • Loading branch information
sohang3112 committed Jun 5, 2024
1 parent fe6a7be commit 3e4c396
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 55 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ Forth kernel for Jupyter notebook / lab. This is a fork of [github.com/jdfreder/
**Note:** Check the [changelog](CHANGELOG.md) to see the latest changes in development as well as in releases.

## Installation
- Check `python --version` to ensure you're running Python 3 only (Python 2 is NOT supported).
- Install [Gforth](https://www.gnu.org/software/gforth/). Make sure it is accessible via the commandline/terminal (`gforth --version`).
- Run `pip install forth_kernel`

Expand Down
144 changes: 89 additions & 55 deletions forth_kernel/forth_kernel.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,132 +9,166 @@

from ipykernel.kernelbase import Kernel

try:
from Queue import Empty, Queue
except ImportError:
from queue import Empty, Queue # python 3.x
from queue import Empty, Queue

__version__ = '0.2'
__path__ = os.environ.get('GFORTHPATH')
__version__ = "0.2"
__path__ = os.environ.get("GFORTHPATH")


# TODO: override abstract methods do_apply(), do_clear(), do_debug_request() of Kernel class
class ForthKernel(Kernel):
"""Jupyter kernel for Forth language"""

implementation = 'forth_kernel'
implementation = "forth_kernel"
implementation_version = __version__
language = 'forth'
language = "forth"
first_command = True

@property
def language_version(self) -> str:
"""Version no. of GForth."""
return self.banner.partition(',')[0]
return self.banner.partition(",")[0]

language_info = {
'name': 'forth',
'version': '0.3',
'mimetype': 'text',
'file_extension': '.4th'
}
"name": "forth",
"version": "0.3",
"mimetype": "text",
"file_extension": ".4th",
}

def __init__(self, **kwargs):
Kernel.__init__(self, **kwargs)
ON_POSIX = 'posix' in sys.builtin_module_names
ON_POSIX = "posix" in sys.builtin_module_names

def enqueue_output(out, queue):
for line in iter(out.readline, b''):
for line in iter(out.readline, b""):
queue.put(line)
out.close()

self._gforth = Popen('gforth', stdin=PIPE, stdout=PIPE, stderr=PIPE, bufsize=2, close_fds=ON_POSIX)
self._gforth = Popen(
"gforth",
stdin=PIPE,
stdout=PIPE,
stderr=PIPE,
bufsize=2,
close_fds=ON_POSIX,
)
self._gforth_stdout_queue = Queue()
self._gforth_stderr_queue = Queue()

t_stdout = Thread(target=enqueue_output, args=(self._gforth.stdout, self._gforth_stdout_queue))
t_stdout = Thread(
target=enqueue_output, args=(self._gforth.stdout, self._gforth_stdout_queue)
)
t_stdout.daemon = True
t_stdout.start()

t_stderr = Thread(target=enqueue_output, args=(self._gforth.stderr, self._gforth_stderr_queue))
t_stderr = Thread(
target=enqueue_output, args=(self._gforth.stderr, self._gforth_stderr_queue)
)
t_stderr.daemon = True
t_stderr.start()
t_stderr.start()

self.banner = self.get_queue(self._gforth_stdout_queue)

def get_queue(self, queue: Queue) -> str:
"""Get all lines from queue and return them as a string."""
output = ''
line = b'.'
timeout = 5.
while len(line) or timeout > 0.:
output = ""
line = b"."
timeout = 5.0
while len(line) or timeout > 0.0:
try:
line = queue.get_nowait()
except Empty:
line = b''
if timeout > 0.:
line = b""
if timeout > 0.0:
time.sleep(0.01)
timeout -= 0.01
else:
try:
output += line.decode()
except UnicodeDecodeError:
output += line.decode('latin-1')
timeout = 0.
output += line.decode("latin-1")
timeout = 0.0
return output

def answer_text(self, text: str, stream: Literal['stdout', 'stderr']):
def answer_text(self, text: str, stream: Literal["stdout", "stderr"]):
"""Send text response to Jupyter cell."""
self.send_response(self.iopub_socket, 'stream', {
'name': stream,
'text': text + '\n'
})
self.send_response(
self.iopub_socket, "stream", {"name": stream, "text": text + "\n"}
)

def answer_html(self, html_text: str):
"""Send HTML response to Jupyter cell."""
self.send_response(self.iopub_socket, 'display_data', {
'metadata': {},
'data': {
'text/html': html_text
}
})
self.send_response(
self.iopub_socket,
"display_data",
{"metadata": {}, "data": {"text/html": html_text}},
)

def success_response(self) -> Dict[str, Any]:
"""Tell Jupyter that cell ran successfully."""
return {'status': 'ok', 'execution_count': self.execution_count,
'payload': [], 'user_expressions': {}}

return {
"status": "ok",
"execution_count": self.execution_count,
"payload": [],
"user_expressions": {},
}

# TODO MAYBE: make this function async. Eg. ipykernel using "async def do_execute()"
# TODO: Arguments (store_history, user_expressions, allow_stdin) are unused <- use them!
# BUG: (maybe!) a long-running continous shell output may not show in Jupyter cell output
# Eg. an infinite loop printing same line continously
# FIX: show code output line by line instead of all at once
def do_execute(self, code: str, silent: bool, store_history=True, user_expressions=None, allow_stdin=False) -> Dict[str, Any]:
if code.startswith('!'): # Shell Command
self.answer_text(check_output(code[1:], encoding='utf-8'), 'stdout')
def do_execute(
self,
code: str,
silent: bool,
store_history=True,
user_expressions=None,
allow_stdin=False,
) -> Dict[str, Any]:
# TODO: add magic function '%html' (& '%%html'), in which Forth output is directly shown as HTML
# NOTE: this can only be done AFTER we remove input code from stdout output

if code.startswith("!"): # Shell Command
# TODO: handle case where first line (starting with !) is shell command, and rest is Forth code
# TODO: answer with error if invalid shell command
# BUG: commands with multiple words not working - eg. "echo hello world"
self.answer_text(check_output(code[1:], encoding="utf-8"), "stdout")
return self.success_response()

self._gforth.stdin.write((code + '\n').encode('utf-8'))
self._gforth.stdin.write((code + "\n").encode("utf-8"))

output = self.get_queue(self._gforth_stdout_queue)
error = self.get_queue(self._gforth_stderr_queue)

# Return Jupyter cell output.
if not silent:
code, output = html.escape(code), html.escape(output)
s = SequenceMatcher(lambda x: x == '', code, output)
# TODO: exclude output lines that are just input followed by "ok"
# TODO: if GForth raises error on a line, DON'T feed the remaining code to GForth.
s = SequenceMatcher(lambda x: x == "", code, output)
# TODO: refactor
html_output = '<pre>' + ''.join(
'<b>' + output[j1:j2] + '</b>' if tag in ('insert', 'replace') else output[j1:j2]
for tag, i1, i2, j1, j2 in s.get_opcodes()
) + '</pre>'
html_output = (
"<pre>"
+ "".join(
(
"<b>" + output[j1:j2] + "</b>"
if tag in ("insert", "replace")
else output[j1:j2]
)
for tag, i1, i2, j1, j2 in s.get_opcodes()
)
+ "</pre>"
)
self.answer_html(html_output)

if error:
self.answer_text(error, 'stderr')
self.answer_text(error, "stderr")

exit_code = self._gforth.poll()
if exit_code is not None:
self.answer_text('Killing kernel because GForth process has died', 'stderr')
sys.exit(exit_code)
self.answer_text("Killing kernel because GForth process has died", "stderr")
sys.exit(exit_code)

return self.success_response()
return self.success_response()

0 comments on commit 3e4c396

Please sign in to comment.