Skip to content

Commit

Permalink
Add Plex lifecycle management (#40)
Browse files Browse the repository at this point in the history
* Add Plex lifecycle management

* Add codecov configuration file
  • Loading branch information
RemiRigal authored May 13, 2022
1 parent 025def0 commit 31f0926
Show file tree
Hide file tree
Showing 6 changed files with 135 additions and 26 deletions.
5 changes: 5 additions & 0 deletions .codecov.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
coverage:
status:
patch:
default:
target: 80%
52 changes: 39 additions & 13 deletions main.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import signal
import argparse
from time import sleep
from websocket import WebSocketConnectionClosedException

from plex_auto_languages.plex_server import PlexServer
from plex_auto_languages.utils.notifier import Notifier
Expand All @@ -14,6 +15,8 @@ class PlexAutoLanguages():

def __init__(self, user_config_path: str):
self.alive = False
self.must_stop = False
self.stop_signal = False
self.plex_alert_listener = None

# Health-check server
Expand All @@ -28,16 +31,19 @@ def __init__(self, user_config_path: str):
if self.config.get("notifications.enable"):
self.notifier = Notifier(self.config.get("notifications.apprise_configs"))

# Plex
self.plex = PlexServer(self.config.get("plex.url"), self.config.get("plex.token"), self.notifier, self.config)

# Scheduler
self.scheduler = None
if self.config.get("scheduler.enable"):
self.scheduler = Scheduler(self.config.get("scheduler.schedule_time"), self.scheduler_callback)

# Plex
self.plex = None

self.set_signal_handlers()

def init(self):
self.plex = PlexServer(self.config.get("plex.url"), self.config.get("plex.token"), self.notifier, self.config)

def is_ready(self):
return self.alive

Expand All @@ -50,25 +56,45 @@ def set_signal_handlers(self):

def stop(self, *_):
logger.info("Received SIGINT or SIGTERM, stopping gracefully")
self.alive = False
self.must_stop = True
self.stop_signal = True

def start(self):
logger.info("Starting alert listener")
self.plex.start_alert_listener()
if self.scheduler:
logger.info("Starting scheduler")
self.scheduler.start()
self.alive = True
while self.is_healthy():
sleep(1)
self.plex.save_cache()

while not self.stop_signal:
self.must_stop = False
self.init()
self.plex.start_alert_listener(self.alert_listener_error_callback)
self.alive = True
count = 0
while not self.must_stop:
sleep(1)
count += 1
if count % 60 == 0 and not self.plex.is_alive:
logger.warning("Lost connection to the Plex server")
self.must_stop = True
self.alive = False
self.plex.save_cache()
if not self.stop_signal:
logger.info("Trying to restore the connection to the Plex server...")

if self.scheduler:
logger.info("Stopping scheduler")
self.scheduler.shutdown()
logger.info("Stopping alert listener")
self.scheduler.join()
self.healthcheck_server.shutdown()

def alert_listener_error_callback(self, error: Exception):
if isinstance(error, WebSocketConnectionClosedException):
logger.warning("The Plex server closed the websocket connection")
else:
logger.warning("Alert listener had an unexpected error")
self.must_stop = True

def scheduler_callback(self):
if self.plex or not self.plex.is_alive:
return
logger.info("Starting scheduler task")
self.plex.start_deep_analysis()

Expand Down
55 changes: 46 additions & 9 deletions plex_auto_languages/plex_server.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import time
import itertools
from typing import Union
from typing import Union, Callable
from datetime import datetime, timedelta
from requests import ConnectionError as RequestsConnectionError
from plexapi.media import MediaPart
from plexapi.library import ShowSection
from plexapi.video import Episode, Show
from plexapi.exceptions import NotFound
from plexapi.alert import AlertListener
from plexapi.exceptions import NotFound, Unauthorized, BadRequest
from plexapi.server import PlexServer as BasePlexServer

from plex_auto_languages.utils.logger import get_logger
Expand All @@ -24,12 +27,29 @@ class UnprivilegedPlexServer():

def __init__(self, url: str, token: str):
self._plex_url = url
self._plex = BasePlexServer(url, token)
self._plex = self._get_server(url, token)

@property
def connected(self):
if self._plex is None:
return False
try:
_ = self._plex.library.sections()
return True
except (BadRequest, RequestsConnectionError):
return False

@property
def unique_id(self):
return self._plex.machineIdentifier

@staticmethod
def _get_server(url, token):
try:
return BasePlexServer(url, token)
except (RequestsConnectionError, Unauthorized):
return None

def fetch_item(self, item_id: Union[str, int]):
try:
return self._plex.fetchItem(item_id)
Expand Down Expand Up @@ -82,8 +102,7 @@ def __init__(self, url: str, token: str, notifier: Notifier, config: Configurati
if self._user is None:
logger.error("Unable to find the user associated with the provided Plex Token")
raise UserNotFound
else:
logger.info(f"Successfully connected as user '{self.username}' (id: {self.user_id})")
logger.info(f"Successfully connected as user '{self.username}' (id: {self.user_id})")
self._alert_handler = None
self._alert_listener = None
self.cache = PlexServerCache(self)
Expand All @@ -98,7 +117,19 @@ def username(self):

@property
def is_alive(self):
return self._alert_listener is not None and self._alert_listener.is_alive()
return self.connected and self._alert_listener is not None and self._alert_listener.is_alive()

@staticmethod
def _get_server(url: str, token: str, max_tries: int = 5000):
for _ in range(max_tries):
try:
return BasePlexServer(url, token)
except RequestsConnectionError:
logger.warning("ConnectionError: Unable to connect to Plex server, retrying...")
except Unauthorized:
logger.warning("Unauthorized: make sure your credentials are correct. Retrying to connect to Plex server...")
time.sleep(5)
return None

def _get_logged_user(self):
plex_username = self._plex.myPlexAccount().username
Expand All @@ -110,12 +141,14 @@ def _get_logged_user(self):
def save_cache(self):
self.cache.save()

def start_alert_listener(self):
def start_alert_listener(self, error_callback: Callable):
trigger_on_play = self.config.get("trigger_on_play")
trigger_on_scan = self.config.get("trigger_on_scan")
trigger_on_activity = self.config.get("trigger_on_activity")
self._alert_handler = PlexAlertHandler(self, trigger_on_play, trigger_on_scan, trigger_on_activity)
self._alert_listener = self._plex.startAlertListener(self._alert_handler)
self._alert_listener = AlertListener(self._plex, self._alert_handler, error_callback)
logger.info("Starting alert listener")
self._alert_listener.start()

def get_instance_users(self):
users = []
Expand All @@ -137,7 +170,11 @@ def get_plex_instance_of_user(self, user_id: Union[int, str]):
logger.error(f"Unable to find user with id '{user_id}'")
return None
user_token = matching_users[0].get_token(self._plex.machineIdentifier)
return UnprivilegedPlexServer(self._plex_url, user_token)
user_plex = UnprivilegedPlexServer(self._plex_url, user_token)
if not user_plex.connected:
logger.error(f"Connection to the Plex server failed for user '{matching_users[0].name}'")
return None
return user_plex

def get_user_from_client_identifier(self, client_identifier: str):
plex_sessions = self._plex.sessions()
Expand Down
8 changes: 4 additions & 4 deletions plex_auto_languages/utils/json.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

class DateTimeEncoder(json.JSONEncoder):

def default(self, obj):
if isinstance(obj, (datetime, date, time)):
return obj.isoformat()
return super(DateTimeEncoder, self).default(obj)
def default(self, o):
if isinstance(o, (datetime, date, time)):
return o.isoformat()
return super().default(o)
7 changes: 7 additions & 0 deletions plex_auto_languages/utils/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@
from threading import Thread, Event
import schedule

from plex_auto_languages.utils.logger import get_logger


logger = get_logger()


class Scheduler(Thread):

Expand All @@ -12,9 +17,11 @@ def __init__(self, time_of_day: str, callback: Callable):
self._stop_event = Event()

def run(self):
logger.info("Starting scheduler")
while not self._stop_event.is_set():
schedule.run_pending()
time.sleep(5)

def shutdown(self):
logger.info("Stopping scheduler")
self._stop_event.set()
34 changes: 34 additions & 0 deletions tests/test_plex_server.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import time
import math
import pytest
from datetime import datetime
Expand Down Expand Up @@ -145,6 +146,33 @@ def test_save_cache(plex):
mocked_save.assert_called_once()


def test_get_server(plex, config, caplog):
url = config.get("plex.url")
token = config.get("plex.token")
server = plex._get_server(url, token, max_tries=1)
assert server is not None
assert server.account()

server = plex._get_server(url, "invalid_token", max_tries=1)
assert server is None
assert "Unauthorized" in caplog.text
caplog.clear()

server = plex._get_server("http://invalid_url:8888", "invalid_token", max_tries=1)
assert server is None
assert "ConnectionError" in caplog.text


def test_start_alert_listener(plex):
plex.start_alert_listener(None)
time.sleep(1)
assert plex.is_alive is True
plex._alert_listener.stop()
plex._alert_listener.join()
time.sleep(1)
assert plex.is_alive is False


def test_init(config):
with patch.object(PlexServer, "_get_logged_user", return_value=None):
with pytest.raises(UserNotFound):
Expand Down Expand Up @@ -215,3 +243,9 @@ def test_change_tracks(plex, episode):
selected_audio, selected_sub = plex.get_selected_streams(second_episode)
assert selected_audio.languageCode == "fra"
assert selected_sub.languageCode == "fra"


def test_deep_analysis(plex):
with patch.object(PlexServer, "change_tracks"):
with patch.object(PlexServer, "process_new_or_updated_episode"):
plex.start_deep_analysis()

0 comments on commit 31f0926

Please sign in to comment.