Skip to content

The backend Python package for downloading real-time data from the WIS2 network.

License

Notifications You must be signed in to change notification settings

wmo-im/wis2downloader

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

78 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

The WIS2 Downloader

The backend tool for subscribing to the latest data on the WIS2 network

License Badge Super-Linter Unit-Tests

The WIS2 Downloader is a Flask-based Python application that allows you to connect to a WIS2 Global Broker, manage subscriptions to topic hierarchies, and configure their associated download directories.

Features

  • Dynamic Subscription Management: Quickly add or remove subscriptions ad hoc without needing to restart the service or change configuration files.
  • Monitor Download Statistics: Access the Prometheus metrics through the /metrics endpoint, ideal for Grafana visualization.
  • Multi-Threading Support: Configure the number of download workers for more efficient data downloading.

Getting Started

1. Installation

python -m pip install wis2downloader

2. Configuration

Create a file config.json in your local directory that conforms with the following schema:

schema:
  type: object
  properties:
    base_url:
      type: string
      description:
        Base URL for the wis2downloader service.
      example: http://localhost:5050
    broker_hostname:
      type: string
      description: The hostname of the global broker to subscribe to.
      example: globalbroker.meteo.fr
    broker_password:
      type: string
      description: The password to use when connecting to the specified global broker.
      example: everyone
    broker_port:
      type: number
      description: The port the global broker is using for the specified protocol.
      example: 443
    broker_protocol:
      type: string
      description: The protocol (websockets or tcp) to use when connecting to the global broker.
      example: websockets
    broker_username:
      type: string
      description: The username to use when connecting to the global broker.
      example: everyone
    download_workers:
      type: number
      description: The number of download worker threads to spawn.
      example: 1
    download_dir:
      type: string
      description: The path to download data to on the server/computer running the wis2downloader.
      example: ./downloads
    flask_host:
      type: string
      description: Network interface on which flask should listen when run in dev mode.
      example: 0.0.0.0
    flask_port:
      type: number
      description: The port on which flask should listen when run in dev mode.
      example: 5050
    log_level:
      type: string
      description: Log level to use
      example: DEBUG
    log_path:
      type: string
      description: Path to write log files to.
      example: ./logs
    min_free_space:
      type: number
      description:
        Minimum free space (GB) to leave on download volume / disk after download.
        Files exceeding limit will not be saved.
      example: 10
    save_logs:
      type: boolean
      description: Write log files to disk (true) or stdout (false)
      example: false
    mqtt_session_info:
      type: string
      description:
        File to save session information (active subscriptions and MQTT client id) to.
        Used to persist subscriptions on restart.
      example: mqtt_session.json
    validate_topics:
      type: boolean
      description: Whether to validate the specified topic against the published WIS2 topic hierarchy.
      example: true

An example is given below:

{
    "base_url": "http://localhost:5050",
    "broker_hostname": "globalbroker.meteo.fr",
    "broker_password": "everyone",
    "broker_port": 443,
    "broker_protocol": "websockets",
    "broker_username": "everyone",
    "download_workers": 1,
    "download_dir": "downloads",
    "flask_host": "0.0.0.0",
    "flask_port": 5050,
    "log_level": "DEBUG",
    "log_path": "logs",
    "min_free_space": 10,
    "mqtt_session_info" : "mqtt_session.json",
    "save_logs": false,
    "validate_topics": true
}

3. Running

  1. Set an environment variable specifying the path to the config.json file.

Linux (bash)

export WIS2DOWNLOADER_CONFIG=<path_to_your_config_file>

Windows (Command Prompt)

set WIS2DOWNLOADER_CONFIG=<path_to_your_config_file>

Windows (PowersShell)

$env:WIS2DOWNLOADER_CONFIG = <path_to_your_config_file>
  1. Start the downloader

Dev mode (Windows and Linux)

wis2downloader

Using gunicorn (Linux only)

gunicorn --bind 0.0.0.0:5050 -w 1 wis2downloader.app:app

Note: Only one worker is supported due to the downloader spawning additional threads and persistence of MQTT connections.

The Flask application should now be running. If you need to stop the application, you can do so in the terminal with Ctrl+C.

Maintaining and Monitoring Subscriptions

The API defintion of the downloader can be found at the /swagger endpoint, when run locally see [http://localhost:5050/swagger]. this includes the ability to try out the different endpoints.

Adding subscriptions

Subscriptions can be added via a POST request to the /subscriptions endpoint. The request body should be JSON-encoded and adhere to the following schema:

schema:
  type: object
  properties:
    topic:
      type: string
      description: The WIS2 topic to subscribe to
      example: cache/a/wis2/+/data/core/weather/surface-based-observations/#
    target:
      type: string
      description: Sub directory to save data to
      example: surface-obs
  required:
    - topic

In this example all notifications published to the surface-based-observations topic from any WIS2 centre will be subscribed to, with the downloaded data written to the surface-obs subdirectory of the download_dir.

Notes:

  1. If the target is not specified it will default to the topic the data are published on.
  2. The + wildcard is used to specify any match at a single level, matching as WIS2 centre in the above example.
  3. The # wildcard matches any topic at or below the level it occurs. In the above example any topic published below cache/a/wis2/+/data/core/weather/surface-based-observations will be matched.

Example CURL command to add subscription

curl -X 'POST' \
  'http://127.0.0.1:5050/subscriptions' \
  -H 'accept: application/json' \
  -H 'Content-Type: application/json' \
  -d '{
      "topic": "cache/a/wis2/+/data/core/weather/surface-based-observations/#",
      "target": "surface-obs"
  }'

Deleting subscriptions

Subscriptions are deleted via a DELETE request to the /subscriptions/{topic} endpoint where {topic} is the topic to unsubscribe from.

Example CURL command to delete subscription

curl -X DELETE http://localhost:5050/subscriptions/cache/a/wis2/%2B/data/core/weather/%23

This cancels the cache/a/wis2/+/data/core/weather/# subscription. Note the need to URL encode the + (%2B) and # (%23) symbols.

Listing subscriptions

Current subscriptions can listed via a GET request to /subscriptions endpoint.

Example CURL command to list subscriptions

curl http://localhost:5050/subscriptions

The list of active subscriptions should be returned as a JSON object.

Viewing download metrics

Prometheus metrics for the downloader are found via a GET request to the /metrics endpoint.

Example CURL command to view download metrics

curl http://localhost:5050/metrics

Bugs and Issues

All bugs, enhancements and issues are managed on GitHub.

Contact