Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Daq 5039 #398

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions helm/blueapi/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ worker:
host: 0.0.0.0 # Allow non-loopback traffic
port: 8000
env:
data_writing:
beamline: example
root: /tmp
sources:
- kind: deviceFunctions
module: blueapi.startup.example_devices
Expand Down
5 changes: 2 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ classifiers = [
]
description = "Lightweight Bluesky-as-a-service wrapper application. Also usable as a library."
dependencies = [
"bluesky",
"bluesky @ file://dls_sw/p38/software/blueapi/devices_only/bluesky",
"ophyd",
"nslsii",
"pyepics",
Expand All @@ -26,8 +26,7 @@ dependencies = [
"fastapi[all]<0.99",
"uvicorn",
"requests",
"dls-bluesky-core", #requires ophyd-async
"dls-dodal",
"dls_dodal @ file://dls_sw/p38/software/blueapi/devices_only/dodal",
"typing_extensions<4.6",
]
dynamic = ["version"]
Expand Down
17 changes: 15 additions & 2 deletions src/blueapi/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,22 @@ class StompConfig(BaseModel):


class DataWritingConfig(BlueapiBaseModel):
"""Configuration for where to write output files for ophyd-async devices.

Args:
visit_service_url (Optional[str]): the /api endpoint of a visit_service
beamline (str): beamline identifier to use in constructing the name of files
root (Optional[str]): root to use when constructing StreamResources.
When visits are part of the response from the visit_service,
this will be deprecated and replaced with a method that constructs
it (e.g. return f"/{year}/{visit_id})
"""

beamline: str = "example"
root: Optional[
Path
] = None # When mounting with a static root, e.g. /2024/cm12345-1/
visit_service_url: Optional[str] = None # e.g. "http://localhost:8088/api"
visit_directory: Path = Path("/tmp/0-0")
group_name: str = "example"


class WorkerEventConfig(BlueapiBaseModel):
Expand Down
2 changes: 1 addition & 1 deletion src/blueapi/core/bluesky_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
Triggerable,
WritesExternalAssets,
)
from dls_bluesky_core.core import MsgGenerator, PlanGenerator
from dodal.common.types import MsgGenerator, PlanGenerator
from ophyd_async.core import Device as AsyncDevice
from pydantic import BaseModel, Field

Expand Down
36 changes: 20 additions & 16 deletions src/blueapi/data_management/visit_directory_provider.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import logging
from abc import ABC, abstractmethod
from pathlib import Path
from typing import Optional
Expand All @@ -9,6 +8,12 @@


class DataCollectionIdentifier(BaseModel):
"""
Equivalent to a `Scan Number` or `scan_id`, non-globally unique scan identifier.
Should be always incrementing, unique per-visit, co-ordinated with any other
scan engines.
"""

collectionNumber: int


Expand Down Expand Up @@ -71,23 +76,22 @@ class VisitDirectoryProvider(DirectoryProvider):
should write to, and determine how their files should be named.
"""

_data_group_name: str
_data_directory: Path

_beamline: str
_root: Path
_resource_dir = Path(".")
_client: VisitServiceClientBase
_current_collection: Optional[DirectoryInfo]
_session: Optional[ClientSession]

def __init__(
self,
data_group_name: str,
data_directory: Path,
beamline: str,
root: Optional[Path],
client: VisitServiceClientBase,
):
self._data_group_name = data_group_name
self._data_directory = data_directory
self._client = client

self._root = root or Path("/")
self._beamline = beamline
self._current_collection = None
self._session = None

Expand All @@ -104,20 +108,20 @@ async def update(self) -> None:
try:
collection_id_info = await self._client.create_new_collection()
self._current_collection = self._generate_directory_info(collection_id_info)
except Exception as ex:
# TODO: The catch all is needed because the RunEngine will not
# currently handle it, see
# https://github.com/bluesky/bluesky/pull/1623
except Exception as e:
self._current_collection = None
logging.exception(ex)
raise e

def _generate_directory_info(
self,
collection_id_info: DataCollectionIdentifier,
) -> DirectoryInfo:
collection_id = collection_id_info.collectionNumber
file_prefix = f"{self._data_group_name}-{collection_id}"
return DirectoryInfo(str(self._data_directory), file_prefix)
return DirectoryInfo(
root=self._root,
resource_dir=self._resource_dir,
prefix=f"{self._beamline}-{collection_id}",
)

def __call__(self) -> DirectoryInfo:
if self._current_collection is not None:
Expand Down
7 changes: 3 additions & 4 deletions src/blueapi/preprocessors/attach_metadata.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import bluesky.plan_stubs as bps
import bluesky.preprocessors as bpp
from bluesky.utils import make_decorator
from ophyd_async.core import DirectoryInfo

from blueapi.core import MsgGenerator
from blueapi.data_management.visit_directory_provider import VisitDirectoryProvider
Expand Down Expand Up @@ -32,10 +33,8 @@ def attach_metadata(
Iterator[Msg]: Plan messages
"""
yield from bps.wait_for([provider.update])
directory_info = provider()
yield from bpp.inject_md_wrapper(
plan, md={DATA_SESSION: directory_info.filename_prefix}
)
directory_info: DirectoryInfo = provider()
yield from bpp.inject_md_wrapper(plan, md={DATA_SESSION: directory_info.prefix})


attach_metadata_decorator = make_decorator(attach_metadata)
21 changes: 5 additions & 16 deletions src/blueapi/service/handler.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import logging
from typing import List, Mapping, Optional

from dodal.beamlines.beamline_utils import set_directory_provider

from blueapi.config import ApplicationConfig
from blueapi.core import BlueskyContext
from blueapi.core.event import EventStream
Expand Down Expand Up @@ -150,7 +152,6 @@ def setup_handler(
) -> None:
global HANDLER

provider = None
plan_wrappers = []
if config:
visit_service_client: VisitServiceClientBase
Expand All @@ -162,24 +163,12 @@ def setup_handler(
visit_service_client = LocalVisitServiceClient()

provider = VisitDirectoryProvider(
data_group_name=config.env.data_writing.group_name,
data_directory=config.env.data_writing.visit_directory,
beamline=config.env.data_writing.beamline,
root=config.env.data_writing.root,
client=visit_service_client,
)

# Make all dodal devices created by the context use provider if they can
try:
from dodal.parameters.gda_directory_provider import (
set_directory_provider_singleton,
)

set_directory_provider_singleton(provider)
except ImportError:
logging.error(
"Unable to set directory provider for ophyd-async devices, "
"a newer version of dodal is required"
)

set_directory_provider(provider)
plan_wrappers.append(lambda plan: attach_metadata(plan, provider))

handler = Handler(
Expand Down
8 changes: 4 additions & 4 deletions src/blueapi/startup/example_plans.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
from typing import List

from bluesky.plan_stubs import mv
from bluesky.plans import count
from bluesky.protocols import Movable, Readable
from dls_bluesky_core.core import inject
from dls_bluesky_core.plans import count
from dls_bluesky_core.stubs import move
from dodal.common import inject

from blueapi.core import MsgGenerator

Expand All @@ -27,5 +27,5 @@ def stp_snapshot(
Yields:
Iterator[MsgGenerator]: Bluesky messages
"""
yield from move({temperature: 0, pressure: 10**5})
yield from mv({temperature: 0, pressure: 10**5})
yield from count(detectors, 1)
15 changes: 9 additions & 6 deletions tests/data_management/test_visit_directory_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,9 @@ async def test_update_sets_collection_number(
) -> None:
await visit_directory_provider.update()
assert visit_directory_provider() == DirectoryInfo(
directory_path="/tmp",
filename_prefix="example-1",
root=Path("/tmp"),
resource_dir=Path("."),
prefix="example-1",
)


Expand All @@ -56,11 +57,13 @@ async def test_update_sets_collection_number_multi(
) -> None:
await visit_directory_provider.update()
assert visit_directory_provider() == DirectoryInfo(
directory_path="/tmp",
filename_prefix="example-1",
root=Path("/tmp"),
resource_dir=Path("."),
prefix="example-1",
)
await visit_directory_provider.update()
assert visit_directory_provider() == DirectoryInfo(
directory_path="/tmp",
filename_prefix="example-2",
root=Path("/tmp"),
resource_dir=Path("."),
prefix="example-2",
)
12 changes: 6 additions & 6 deletions tests/preprocessors/test_attach_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ def client() -> VisitServiceClient:
@pytest.fixture
def provider(client: VisitServiceClient) -> VisitDirectoryProvider:
return VisitDirectoryProvider(
data_directory=DATA_DIRECTORY,
data_group_name=DATA_GROUP_NAME,
root=DATA_DIRECTORY,
beamline=DATA_GROUP_NAME,
client=client,
)

Expand Down Expand Up @@ -101,13 +101,13 @@ async def read(self) -> Dict[str, Reading]:
}

async def describe(self) -> Dict[str, DataKey]:
directory_info = self._provider()
path = f"{directory_info.directory_path}/{directory_info.filename_prefix}"
info = self._provider()
path = info.root / info.resource_dir / info.prefix
return {
f"{self.name}_data": {
"dtype": "string",
"shape": [1],
"source": path,
"source": str(path),
}
}

Expand Down Expand Up @@ -339,7 +339,7 @@ def test_visit_directory_provider_fails(
)


def test_visit_directory_provider_fails_after_one_sucess(
def test_visit_directory_provider_fails_after_one_success(
run_engine: RunEngine,
detectors: List[Readable],
provider: DirectoryProvider,
Expand Down
Loading