Skip to content

Commit

Permalink
Merge pull request #469 from ksooo/fix-issue465-2
Browse files Browse the repository at this point in the history
7.1.4: HTSP connection related fixes and cleanup
  • Loading branch information
ksooo authored Sep 3, 2020
2 parents 7d8efa8 + 41830a4 commit a944fa8
Show file tree
Hide file tree
Showing 15 changed files with 143 additions and 89 deletions.
2 changes: 1 addition & 1 deletion pvr.hts/addon.xml.in
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<addon
id="pvr.hts"
version="7.1.3"
version="7.1.4"
name="Tvheadend HTSP Client"
provider-name="Adam Sutton, Sam Stenvall, Lars Op den Kamp, Kai Sommerfeld">
<requires>@ADDON_DEPENDS@</requires>
Expand Down
6 changes: 6 additions & 0 deletions pvr.hts/changelog.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
7.1.4
- Fix deadlock while reconnecting to backend after a backend response timeout (Fixes #465)
- Refactor subscription seek code
- Refactor subscription weight code
- Fix HTSP connection thread not ended while in supended state

7.1.3
- Withdraw v7.1.2 due to new deadlocks it introduces, for example during initial connect to backend.

Expand Down
76 changes: 46 additions & 30 deletions src/Tvheadend.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1493,31 +1493,8 @@ void CTvheadend::Disconnected()

bool CTvheadend::Connected()
{
/* Rebuild state */
for (auto* dmx : m_dmx)
dmx->Connected();

m_vfs->Connected();
m_timeRecordings.Connected();
m_autoRecordings.Connected();

/* Flag all async fields in case they've been deleted */
for (auto& entry : m_channels)
entry.second.SetDirty(true);
for (auto& entry : m_tags)
entry.second.SetDirty(true);
for (auto& entry : m_schedules)
entry.second.SetDirty(true);

{
CLockObject lock(m_mutex);

for (auto& entry : m_recordings)
entry.second.SetDirty(true);
}

/* Request Async data, first is channels */
m_asyncState.SetState(ASYNC_CHN);
/* Request Async data, first is init (which rebuilds state) */
m_asyncState.SetState(ASYNC_INIT);

htsmsg_t* msg = htsmsg_create_map();
if (Settings::GetInstance().GetAsyncEpg())
Expand Down Expand Up @@ -1902,6 +1879,7 @@ void CTvheadend::SyncCompleted()
Logger::Log(LogLevel::LEVEL_INFO, "async updates initialised");

/* The complete calls are probably redundant, but its a safety feature */
SyncInitCompleted();
SyncChannelsCompleted();
SyncDvrCompleted();
SyncEpgCompleted();
Expand All @@ -1926,6 +1904,34 @@ void CTvheadend::SyncCompleted()
}
}

void CTvheadend::SyncInitCompleted()
{
/* check state engine */
if (m_asyncState.GetState() != ASYNC_INIT)
return;

/* Rebuild state */
for (auto* dmx : m_dmx)
dmx->RebuildState();

m_vfs->RebuildState();
m_timeRecordings.RebuildState();
m_autoRecordings.RebuildState();

/* Flag all async fields in case they've been deleted */
for (auto& entry : m_channels)
entry.second.SetDirty(true);
for (auto& entry : m_tags)
entry.second.SetDirty(true);
for (auto& entry : m_schedules)
entry.second.SetDirty(true);
for (auto& entry : m_recordings)
entry.second.SetDirty(true);

/* Next */
m_asyncState.SetState(ASYNC_CHN);
}

void CTvheadend::SyncChannelsCompleted()
{
/* check state engine */
Expand Down Expand Up @@ -1987,16 +1993,15 @@ void CTvheadend::SyncDvrCompleted()
void CTvheadend::SyncEpgCompleted()
{
/* check state engine */
if (m_asyncState.GetState() != ASYNC_EPG)
return;

if (!Settings::GetInstance().GetAsyncEpg())
{
m_asyncState.SetState(ASYNC_DONE);
return;
}

/* check state engine */
if (m_asyncState.GetState() != ASYNC_EPG)
return;

/* Schedules */
std::vector<std::pair<uint32_t, uint32_t>> deletedEvents;
utilities::erase_if(m_schedules, [&](const ScheduleMapEntry& entry) {
Expand Down Expand Up @@ -2040,6 +2045,9 @@ void CTvheadend::SyncEpgCompleted()

void CTvheadend::ParseTagAddOrUpdate(htsmsg_t* msg, bool bAdd)
{
/* Rebuild state upon arrival of first async data */
SyncInitCompleted();

/* Validate */
uint32_t u32 = 0;
if (htsmsg_get_u32(msg, "tagId", &u32))
Expand Down Expand Up @@ -2120,6 +2128,8 @@ void CTvheadend::ParseTagDelete(htsmsg_t* msg)

void CTvheadend::ParseChannelAddOrUpdate(htsmsg_t* msg, bool bAdd)
{
/* Rebuild state upon arrival of first async data */
SyncInitCompleted();

/* Validate */
uint32_t u32 = 0;
Expand Down Expand Up @@ -2243,7 +2253,10 @@ void CTvheadend::ParseChannelDelete(htsmsg_t* msg)

void CTvheadend::ParseRecordingAddOrUpdate(htsmsg_t* msg, bool bAdd)
{
/* Channels must be complete */
/* Rebuild state upon arrival of first async data */
SyncInitCompleted();

/* Channels complete */
SyncChannelsCompleted();

/* Validate */
Expand Down Expand Up @@ -2637,6 +2650,9 @@ void CTvheadend::ParseRecordingDelete(htsmsg_t* msg)

bool CTvheadend::ParseEvent(htsmsg_t* msg, bool bAdd, Event& evt)
{
/* Rebuild state upon arrival of first async data */
SyncInitCompleted();

/* Recordings complete */
SyncDvrCompleted();

Expand Down
1 change: 1 addition & 0 deletions src/Tvheadend.h
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ class ATTRIBUTE_HIDDEN CTvheadend : public kodi::addon::CInstancePVRClient,
/*
* Channel/Tags/Recordings/Events
*/
void SyncInitCompleted();
void SyncChannelsCompleted();
void SyncDvrCompleted();
void SyncEpgCompleted();
Expand Down
2 changes: 1 addition & 1 deletion src/tvheadend/AutoRecordings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ AutoRecordings::~AutoRecordings()
{
}

void AutoRecordings::Connected()
void AutoRecordings::RebuildState()
{
/* Flag all async fields in case they've been deleted */
for (auto& rec : m_autoRecordings)
Expand Down
2 changes: 1 addition & 1 deletion src/tvheadend/AutoRecordings.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class AutoRecordings
~AutoRecordings();

/* state updates */
void Connected();
void RebuildState();
void SyncDvrCompleted();

/* data access */
Expand Down
18 changes: 11 additions & 7 deletions src/tvheadend/HTSPConnection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,7 @@ bool HTSPConnection::ReadMessage()
htsmsg_destroy(msg);
return true;
}

Logger::Log(LogLevel::LEVEL_TRACE, "receive message [%s]", method);

/* Pass (if return is true, message is finished) */
Expand Down Expand Up @@ -365,7 +366,8 @@ bool HTSPConnection::SendMessage0(const char* method, htsmsg_t* msg)

if (c != static_cast<ssize_t>(len))
{
Logger::Log(LogLevel::LEVEL_ERROR, "failed to write (%s)", m_socket->GetError().c_str());
Logger::Log(LogLevel::LEVEL_ERROR, "Command %s failed: failed to write (%s)", method,
m_socket->GetError().c_str());
if (!m_suspended)
Disconnect();

Expand Down Expand Up @@ -394,7 +396,7 @@ htsmsg_t* HTSPConnection::SendAndWait0(const char* method, htsmsg_t* msg, int iR
if (!SendMessage0(method, msg))
{
m_messages.erase(seq);
Logger::Log(LogLevel::LEVEL_ERROR, "failed to transmit");
Logger::Log(LogLevel::LEVEL_ERROR, "Command %s failed: failed to transmit", method);
return nullptr;
}

Expand Down Expand Up @@ -581,7 +583,6 @@ void HTSPConnection::Register()

/* Send Auth */
Logger::Log(LogLevel::LEVEL_DEBUG, "sending auth");

if (!SendAuth(user, pass))
{
SetState(PVR_CONNECTION_STATE_ACCESS_DENIED);
Expand All @@ -595,6 +596,7 @@ void HTSPConnection::Register()

Logger::Log(LogLevel::LEVEL_DEBUG, "registered");
SetState(PVR_CONNECTION_STATE_CONNECTED);

m_ready = true;
m_regCond.Broadcast();
return;
Expand All @@ -603,7 +605,7 @@ void HTSPConnection::Register()
fail:
if (!m_suspended)
{
/* Don't immediately reconnect (spare server CPU cycles)*/
/* Don't immediately reconnect (spare server CPU cycles) */
Sleep(SLOW_RECONNECT_INTERVAL);
Disconnect();
}
Expand Down Expand Up @@ -643,14 +645,15 @@ void* HTSPConnection::Process()
}
}

while (m_suspended)
while (m_suspended && !IsStopped())
{
Logger::Log(LogLevel::LEVEL_DEBUG, "suspended. Waiting for wakeup...");

/* Wait for wakeup */
Sleep(1000);
}

if (IsStopped())
break;

if (!log)
{
Logger::Log(LogLevel::LEVEL_DEBUG, "connecting to %s:%d", host.c_str(), port);
Expand Down Expand Up @@ -689,6 +692,7 @@ void* HTSPConnection::Process()

continue;
}

Logger::Log(LogLevel::LEVEL_DEBUG, "connected");
log = false;
retryAttempt = 0;
Expand Down
Loading

0 comments on commit a944fa8

Please sign in to comment.