Skip to content

Commit

Permalink
perf(storage): use zmq for storage write
Browse files Browse the repository at this point in the history
Replace RPC calls with ZeroMQ messages for a more efficient transfer
of image chunks.

Signed-off-by: Cedric Hombourger <[email protected]>
  • Loading branch information
chombourger committed Jan 17, 2025
1 parent 6fb486f commit b116642
Show file tree
Hide file tree
Showing 5 changed files with 110 additions and 102 deletions.
4 changes: 4 additions & 0 deletions docs/config.rst
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,10 @@ General settings
Remote port to connect to in order to get console messages (defaults to
``5557``).

* ``data``: integer [optional]
Remote port for data transfers between the client and agent (defaults to
``0`` for a dynamic port assignment).

* ``host``: string [optional]
Remote host name or ip to connect to as a client to interact with the
MTDA agent (defaults to ``localhost``).
Expand Down
38 changes: 18 additions & 20 deletions mtda/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import subprocess
import tempfile
import time
import zmq
import zstandard as zstd

from mtda.main import MultiTenantDeviceAccess
Expand Down Expand Up @@ -44,6 +45,7 @@ def __init__(self, host=None, session=None, config_files=None,
else:
self._impl = agent
self._agent = agent
self._data = None

if session is None:
HOST = socket.gethostname()
Expand Down Expand Up @@ -115,7 +117,12 @@ def storage_open(self):
while tries > 0:
tries = tries - 1
try:
self._impl.storage_open(self._session)
host = self.remote()
port = self._impl.storage_open(self._session)
context = zmq.Context()
socket = context.socket(zmq.PUSH)
socket.connect(f'tcp://{host}:{port}')
self._data = socket
return
except Exception:
if tries > 0:
Expand Down Expand Up @@ -201,7 +208,7 @@ def storage_write_image(self, path, callback=None):

try:
# Prepare for download/copy
file.prepare(image_size)
file.prepare(self._data, image_size)

# Copy image to shared storage
file.copy()
Expand Down Expand Up @@ -298,6 +305,7 @@ def __init__(self, path, agent, session, blksz, callback=None):
self._path = path
self._session = session
self._totalread = 0
self._totalsent = 0

def bmap(self, path):
return None
Expand All @@ -324,21 +332,25 @@ def flush(self):
inputsize = self._inputsize
totalread = self._totalread
outputsize = self._outputsize

agent.storage_flush(self._totalsent)
while True:
status, writing, written = agent.storage_status(self._session)
status, writing, written = agent.storage_status()
if callback is not None:
callback(imgname, totalread, inputsize, written, outputsize)
if writing is False:
break
time.sleep(0.5)
self._socket.close()

def path(self):
return self._path

def prepare(self, output_size=None, compression=None):
def prepare(self, socket, output_size=None, compression=None):
compr = self.compression() if compression is None else compression
self._inputsize = self.size()
self._outputsize = output_size
self._socket = socket
# if image is uncompressed, we compress on the fly
if compr == CONSTS.IMAGE.RAW.value:
compr = CONSTS.IMAGE.ZST.value
Expand All @@ -362,22 +374,8 @@ def size(self):
return None

def _write_to_storage(self, data):
max_tries = int(CONSTS.STORAGE.TIMEOUT / CONSTS.STORAGE.RETRY_INTERVAL)

for _ in range(max_tries):
result = self._agent.storage_write(data, self._session)
if result != 0:
break
time.sleep(CONSTS.STORAGE.RETRY_INTERVAL)

if result > 0:
return result
elif result < 0:
exc = 'write or decompression error from shared storage'
raise IOError(exc)
else:
exc = 'timeout from shared storage'
raise IOError(exc)
self._socket.send(data)
self._totalsent += len(data)


class ImageLocal(ImageFile):
Expand Down
3 changes: 1 addition & 2 deletions mtda/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ class STORAGE:


class WRITER:
QUEUE_SLOTS = 16
QUEUE_TIMEOUT = 5
RECV_TIMEOUT = 5
READ_SIZE = 1*1024*1024
WRITE_SIZE = 1*1024*1024
62 changes: 22 additions & 40 deletions mtda/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ def __init__(self):
self.usb_switches = []
self.ctrlport = 5556
self.conport = 5557
self.dataport = 0
self.prefix_key = self._prefix_key_code(DEFAULT_PREFIX_KEY)
self.is_remote = False
self.is_server = False
Expand Down Expand Up @@ -661,7 +662,20 @@ def storage_close(self, session=None):
if self.storage is not None:
self.storage_locked()

self.mtda.debug(3, f"main.storage_close(): {str(result)}")
self.mtda.debug(3, f"main.storage_close(): {result}")
return result

@Pyro4.expose
def storage_flush(self, size, session=None):
self.mtda.debug(3, "main.storage_flush()")

self.session_ping(session)
if self.storage is None:
result = False
else:
result = self._writer.flush(size)

self.mtda.debug(3, f"main.storage_flush(): {result}")
return result

@Pyro4.expose
Expand Down Expand Up @@ -790,6 +804,7 @@ def storage_open(self, session=None):

self.session_ping(session)
owner = self._storage_owner
result = None
status, _, _ = self.storage_status()

if self.storage is None:
Expand All @@ -802,12 +817,13 @@ def storage_open(self, session=None):
self.storage.open()
self._storage_opened = True
self._storage_owner = session
self._writer.start()
result = self._writer.start(session)
self._storage_event(CONSTS.STORAGE.OPENED, session)
if self.storage is not None:
self.storage_locked()

self.mtda.debug(3, 'main.storage_open(): success')
self.mtda.debug(3, f'main.storage_open(): {result}')
return result

@Pyro4.expose
def storage_status(self, session=None):
Expand Down Expand Up @@ -877,42 +893,6 @@ def storage_swap(self, session=None):
self.mtda.debug(3, f"main.storage_swap(): {str(result)}")
return result

@Pyro4.expose
def storage_write(self, data, session=None):
self.mtda.debug(3, "main.storage_write()")

self.session_ping(session)
if self.storage is None:
raise RuntimeError('no shared storage')
elif self._storage_opened is False:
raise RuntimeError('shared storage was not opened')
elif self._writer.failed is True:
raise RuntimeError('write or decompression error '
'from shared storage')
elif session != self._storage_owner:
raise RuntimeError('shared storage in use')

import queue
try:
if len(data) == 0:
self.mtda.debug(2, "main.storage_write(): "
"using queued data")
data = self._writer_data
self._writer_data = data
self._writer.put(data, timeout=10)
result = self.blksz
except queue.Full:
self.mtda.debug(2, "main.storage_write(): "
"queue is full")
result = 0

if self._writer.failed is True:
self.error('storage_write failed: write or decompression error')
result = -1

self.mtda.debug(3, f"main.storage_write(): {str(result)}")
return result

def systemd_configure(self):
from filecmp import dircmp

Expand Down Expand Up @@ -1467,7 +1447,7 @@ def post_configure_storage(self, storage, config, parser):
self.mtda.debug(3, "main.post_configure_storage()")

from mtda.storage.writer import AsyncImageWriter
self._writer = AsyncImageWriter(self, storage)
self._writer = AsyncImageWriter(self, storage, self.dataport)

import atexit
atexit.register(self.storage_close)
Expand All @@ -1479,6 +1459,8 @@ def load_remote_config(self, parser):
parser.get('remote', 'console', fallback=self.conport))
self.ctrlport = int(
parser.get('remote', 'control', fallback=self.ctrlport))
self.dataport = int(
parser.get('remote', 'data', fallback=self.dataport))
if self.is_server is False:
if self.remote is None:
# Load remote setting from the configuration
Expand Down
Loading

0 comments on commit b116642

Please sign in to comment.