Skip to content

Commit

Permalink
pull: Ensure we always process queue only from main thread
Browse files Browse the repository at this point in the history
I was easily reproducing a hang on pulls with thousands of requests on
current git master.  The initial symptom seemed to be that there are
multiple code paths where we don't invoke
`session_thread_process_pending_queue()`.  We really need to do
that any time we remove something from the outstanding queue,
to ensure it gets filled again.

A further issue is that we were tying the lifecycle of the pending
object to the `GTask`, but the task could be unref'd from the main
thread (via a `GSource` on the main thread), and that introduced
threadsafety issues, because the hash table and other data suddenly
could be concurrently modified.

Both of these need to be fixed together.  First, we introduce
`Arc<Pending>`, and ensure that both the main and worker threads hold
references.

Second, we ensure that we re-process the queue *immediately* whenever
a task is done, inside the worker thread, rather than doing it
incidentally via an unref.  This architecture is quite similar to what
the outside pull code is doing.

Closes: #350
Approved by: jlebon
  • Loading branch information
cgwalters authored and rh-atomic-bot committed Jun 17, 2016
1 parent d262fc2 commit 535033a
Showing 1 changed file with 47 additions and 8 deletions.
55 changes: 47 additions & 8 deletions src/libostree/ostree-fetcher.c
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,12 @@ typedef struct {
guint64 total_downloaded;
} ThreadClosure;

static void
session_thread_process_pending_queue (ThreadClosure *thread_closure);

typedef struct {
volatile int ref_count;

ThreadClosure *thread_closure;
SoupURI *uri;

Expand Down Expand Up @@ -186,10 +191,22 @@ pending_task_compare (gconstpointer a,
(priority_a < priority_b) ? -1 : 1;
}

static OstreeFetcherPendingURI *
pending_uri_ref (OstreeFetcherPendingURI *pending)
{
g_return_val_if_fail (pending != NULL, NULL);
g_return_val_if_fail (pending->ref_count > 0, NULL);

g_atomic_int_inc (&pending->ref_count);

return pending;
}

static void
pending_uri_free (OstreeFetcherPendingURI *pending)
pending_uri_unref (OstreeFetcherPendingURI *pending)
{
g_hash_table_remove (pending->thread_closure->outstanding, pending);
if (!g_atomic_int_dec_and_test (&pending->ref_count))
return;

g_clear_pointer (&pending->thread_closure, thread_closure_unref);

Expand Down Expand Up @@ -331,8 +348,7 @@ session_thread_process_pending_queue (ThreadClosure *thread_closure)
pending = g_task_get_task_data (task);
cancellable = g_task_get_cancellable (task);

/* pending_uri_free() removes this. */
g_hash_table_add (thread_closure->outstanding, pending);
g_hash_table_add (thread_closure->outstanding, pending_uri_ref (pending));

soup_request_send_async (pending->request,
cancellable,
Expand Down Expand Up @@ -540,7 +556,7 @@ _ostree_fetcher_constructed (GObject *object)
self->thread_closure->tmpdir_dfd = -1;
self->thread_closure->tmpdir_lock = empty_lockfile;

self->thread_closure->outstanding = g_hash_table_new (NULL, NULL);
self->thread_closure->outstanding = g_hash_table_new_full (NULL, NULL, NULL, (GDestroyNotify)pending_uri_unref);
self->thread_closure->output_stream_set = g_hash_table_new_full (NULL, NULL,
(GDestroyNotify) NULL,
(GDestroyNotify) g_object_unref);
Expand Down Expand Up @@ -742,6 +758,18 @@ on_stream_read (GObject *object,
GAsyncResult *result,
gpointer user_data);

static void
remove_pending_rerun_queue (OstreeFetcherPendingURI *pending)
{
/* Hold a temporary ref to ensure the reference to
* pending->thread_closure is valid.
*/
pending_uri_ref (pending);
g_hash_table_remove (pending->thread_closure->outstanding, pending);
session_thread_process_pending_queue (pending->thread_closure);
pending_uri_unref (pending);
}

static void
on_out_splice_complete (GObject *object,
GAsyncResult *result,
Expand Down Expand Up @@ -770,7 +798,10 @@ on_out_splice_complete (GObject *object,

out:
if (local_error)
g_task_return_error (task, local_error);
{
g_task_return_error (task, local_error);
remove_pending_rerun_queue (pending);
}

g_object_unref (task);
}
Expand Down Expand Up @@ -802,6 +833,7 @@ on_stream_read (GObject *object,
g_task_return_pointer (task,
g_strdup (pending->out_tmpfile),
(GDestroyNotify) g_free);
remove_pending_rerun_queue (pending);
}
else
{
Expand Down Expand Up @@ -837,7 +869,10 @@ on_stream_read (GObject *object,

out:
if (local_error)
g_task_return_error (task, local_error);
{
g_task_return_error (task, local_error);
remove_pending_rerun_queue (pending);
}

g_object_unref (task);
}
Expand Down Expand Up @@ -883,6 +918,7 @@ on_request_sent (GObject *object,
g_strdup (pending->out_tmpfile),
(GDestroyNotify) g_free);
}
remove_pending_rerun_queue (pending);
goto out;
}
else if (!SOUP_STATUS_IS_SUCCESSFUL (msg->status_code))
Expand Down Expand Up @@ -947,6 +983,7 @@ on_request_sent (GObject *object,
g_task_return_pointer (task,
g_object_ref (pending->request_body),
(GDestroyNotify) g_object_unref);
remove_pending_rerun_queue (pending);
}

out:
Expand All @@ -955,6 +992,7 @@ on_request_sent (GObject *object,
if (pending->request_body)
(void) g_input_stream_close (pending->request_body, NULL, NULL);
g_task_return_error (task, local_error);
remove_pending_rerun_queue (pending);
}

g_object_unref (task);
Expand All @@ -979,14 +1017,15 @@ ostree_fetcher_request_uri_internal (OstreeFetcher *self,

/* SoupRequest is created in session thread. */
pending = g_new0 (OstreeFetcherPendingURI, 1);
pending->ref_count = 1;
pending->thread_closure = thread_closure_ref (self->thread_closure);
pending->uri = soup_uri_copy (uri);
pending->max_size = max_size;
pending->is_stream = is_stream;

task = g_task_new (self, cancellable, callback, user_data);
g_task_set_source_tag (task, source_tag);
g_task_set_task_data (task, pending, (GDestroyNotify) pending_uri_free);
g_task_set_task_data (task, pending, (GDestroyNotify) pending_uri_unref);

/* We'll use the GTask priority for our own priority queue. */
g_task_set_priority (task, priority);
Expand Down

0 comments on commit 535033a

Please sign in to comment.