Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add optional gzip compression #1389

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/static-analysis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ jobs:
git fetch --no-tags --no-recurse-submodules upstream "${{ github.event.pull_request.base.ref }}"
- name: Install Dependencies
run: |
sudo apt-get install -y bear clang-tidy libcurl4-openssl-dev
sudo apt-get install -y bear clang-tidy libcurl4-openssl-dev zlib1g-dev
- name: Configure Project
run: |
libtoolize
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/unit-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ jobs:
run: |
sudo apt-get install -y \
libcppunit-dev \
zlib1g-dev \
libcurl4-openssl-dev
- name: Configure Project
run: |
Expand Down
7 changes: 4 additions & 3 deletions configure.ac
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ if test "x$ax_cv_ncursesw" != xyes && test "x$ax_cv_ncurses" != xyes; then
AC_MSG_ERROR([requires either NcursesW or Ncurses library])
fi

PKG_CHECK_MODULES([ZLIB], [zlib])
PKG_CHECK_MODULES([LIBCURL], [libcurl],, [LIBCURL_CHECK_CONFIG])
PKG_CHECK_MODULES([CPPUNIT], [cppunit],, [no_cppunit="yes"])
PKG_CHECK_MODULES([DEPENDENCIES], [libtorrent >= 0.15.1])
Expand Down Expand Up @@ -77,9 +78,9 @@ CC_ATTRIBUTE_UNUSED(

dnl Only update global build variables immediately before generating the output,
dnl to avoid affecting the global build environment for other autoconf checks.
LIBS="$PTHREAD_LIBS $CURSES_LIB $CURSES_LIBS $LIBCURL $LIBCURL_LIBS $DEPENDENCIES_LIBS $LIBS"
CFLAGS="$CFLAGS $PTHREAD_CFLAGS $LIBCURL_CPPFLAGS $LIBCURL_CFLAGS $DEPENDENCIES_CFLAGS $CURSES_CFLAGS"
CXXFLAGS="$CXXFLAGS $PTHREAD_CFLAGS $LIBCURL_CPPFLAGS $LIBCURL_CFLAGS $DEPENDENCIES_CFLAGS $CURSES_CFLAGS"
LIBS="$PTHREAD_LIBS $ZLIB_LIBS $CURSES_LIB $CURSES_LIBS $LIBCURL $LIBCURL_LIBS $DEPENDENCIES_LIBS $LIBS"
CFLAGS="$CFLAGS $PTHREAD_CFLAGS $ZLIB_CFLAGS $LIBCURL_CPPFLAGS $LIBCURL_CFLAGS $DEPENDENCIES_CFLAGS $CURSES_CFLAGS"
CXXFLAGS="$CXXFLAGS $PTHREAD_CFLAGS $ZLIB_CFLAGS $LIBCURL_CPPFLAGS $LIBCURL_CFLAGS $DEPENDENCIES_CFLAGS $CURSES_CFLAGS"

AC_CONFIG_FILES([
Makefile
Expand Down
14 changes: 11 additions & 3 deletions src/command_network.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
#include "core/download.h"
#include "core/manager.h"
#include "rpc/scgi.h"
#include "rpc/scgi_task.h"
#include "ui/root.h"
#include "rpc/parse.h"
#include "rpc/parse_commands.h"
Expand Down Expand Up @@ -276,6 +277,8 @@ initialize_command_network() {
CMD2_ANY_VALUE_V ("network.send_buffer.size.set", std::bind(&torrent::ConnectionManager::set_send_buffer_size, cm, std::placeholders::_2));
CMD2_ANY ("network.receive_buffer.size", std::bind(&torrent::ConnectionManager::receive_buffer_size, cm));
CMD2_ANY_VALUE_V ("network.receive_buffer.size.set", std::bind(&torrent::ConnectionManager::set_receive_buffer_size, cm, std::placeholders::_2));


CMD2_ANY_STRING ("network.tos.set", std::bind(&apply_tos, std::placeholders::_2));

CMD2_ANY ("network.bind_address", std::bind(&core::Manager::bind_address, control->core()));
Expand All @@ -293,9 +296,14 @@ initialize_command_network() {
CMD2_ANY ("network.max_open_sockets", std::bind(&torrent::ConnectionManager::max_size, cm));
CMD2_ANY_VALUE_V ("network.max_open_sockets.set", std::bind(&torrent::ConnectionManager::set_max_size, cm, std::placeholders::_2));

CMD2_ANY_STRING ("network.scgi.open_port", std::bind(&apply_scgi, std::placeholders::_2, 1));
CMD2_ANY_STRING ("network.scgi.open_local", std::bind(&apply_scgi, std::placeholders::_2, 2));
CMD2_VAR_BOOL ("network.scgi.dont_route", false);
CMD2_ANY_STRING ("network.scgi.open_port", std::bind(&apply_scgi, std::placeholders::_2, 1));
CMD2_ANY_STRING ("network.scgi.open_local", std::bind(&apply_scgi, std::placeholders::_2, 2));
CMD2_VAR_BOOL ("network.scgi.dont_route", false);

CMD2_ANY ("network.scgi.gzip.min_size", [](const auto&, const auto&) { return rpc::SCgiTask::gzip_min_size(); });
CMD2_ANY_VALUE_V ("network.scgi.gzip.min_size.set", [](const auto&, const auto& arg) { return rpc::SCgiTask::set_gzip_min_size(arg); });
CMD2_ANY ("network.scgi.use_gzip", [](const auto&, const auto&) { return rpc::SCgiTask::gzip_enabled(); });
CMD2_ANY_VALUE_V ("network.scgi.use_gzip.set", [](const auto&, const auto& arg) { return rpc::SCgiTask::set_gzip_enabled(arg); });

CMD2_ANY_STRING ("network.xmlrpc.dialect.set", [](const auto&, const auto& arg) { return apply_xmlrpc_dialect(arg); })
CMD2_ANY ("network.xmlrpc.size_limit", [](const auto&, const auto&){ return rpc::rpc.size_limit(); });
Expand Down
157 changes: 117 additions & 40 deletions src/rpc/scgi_task.cc
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
#include "config.h"
rakshasa marked this conversation as resolved.
Show resolved Hide resolved

#include <cstdio>
#include <rak/allocators.h>
#include <rak/error_number.h>
#include <cstdio>
#include <vector>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <torrent/exceptions.h>
#include <torrent/poll.h>
#include <torrent/utils/log.h>
#include <zlib.h>

#include "utils/socket_fd.h"

#include "scgi_task.h"
#include "control.h"
#include "globals.h"
#include "scgi.h"
Expand All @@ -24,12 +25,16 @@

namespace rpc {

// Disable gzipping by default, but once enabled gzip everything
bool SCgiTask::m_allow_compressed_response = false;
int SCgiTask::m_min_compress_response_size = 0;

// If bufferSize is zero then memcpy won't do anything.
inline void
SCgiTask::realloc_buffer(uint32_t size, const char* buffer, uint32_t bufferSize) {
SCgiTask::realloc_buffer(uint32_t size, const char* buffer, uint32_t buffer_size) {
char* tmp = rak::cacheline_allocator<char>::alloc_size(size);

std::memcpy(tmp, buffer, bufferSize);
std::memcpy(tmp, buffer, buffer_size);
::free(m_buffer);
m_buffer = tmp;
}
Expand All @@ -38,15 +43,15 @@ void
SCgiTask::open(SCgi* parent, int fd) {
m_parent = parent;
m_fileDesc = fd;
m_buffer = rak::cacheline_allocator<char>::alloc_size((m_bufferSize = default_buffer_size) + 1);
m_buffer = rak::cacheline_allocator<char>::alloc_size((m_buffer_size = default_buffer_size) + 1);
m_position = m_buffer;
m_body = NULL;

worker_thread->poll()->open(this);
worker_thread->poll()->insert_read(this);
worker_thread->poll()->insert_error(this);

// scgiTimer = rak::timer::current();
// scgiTimer = rak::timer::current();
}

void
Expand All @@ -66,14 +71,14 @@ SCgiTask::close() {
m_buffer = NULL;

// Test
// char buffer[512];
// sprintf(buffer, "SCgi system call processed: %i", (int)(rak::timer::current() - scgiTimer).usec());
// control->core()->push_log(std::string(buffer));
// char buffer[512];
// sprintf(buffer, "SCgi system call processed: %i", (int)(rak::timer::current() - scgiTimer).usec());
// control->core()->push_log(std::string(buffer));
}

void
SCgiTask::event_read() {
int bytes = ::recv(m_fileDesc, m_position, m_bufferSize - (m_position - m_buffer), 0);
int bytes = ::recv(m_fileDesc, m_position, m_buffer_size - (m_position - m_buffer), 0);

if (bytes <= 0) {
if (bytes == 0 || !rak::error_number::current().is_blocked_momentary())
Expand Down Expand Up @@ -139,6 +144,11 @@ SCgiTask::event_read() {
goto event_read_failed;
} else if (strcmp(key, "CONTENT_TYPE") == 0) {
content_type = value;
} else if (strcmp(key, "ACCEPT_ENCODING") == 0) {
if (strstr(value, "gzip") != nullptr)
// This just marks it as possible to compress, it may not
// actually happen depending on the size of the response
m_client_accepts_compressed_response = true;
}
}

Expand All @@ -164,25 +174,25 @@ SCgiTask::event_read() {
goto event_read_failed;
}

if ((unsigned int)(content_length + header_size) < m_bufferSize) {
m_bufferSize = content_length + header_size;
if ((unsigned int)(content_length + header_size) < m_buffer_size) {
m_buffer_size = content_length + header_size;

} else if ((unsigned int)content_length <= default_buffer_size) {
m_bufferSize = content_length;
m_buffer_size = content_length;

std::memmove(m_buffer, m_body, std::distance(m_body, m_position));
m_position = m_buffer + std::distance(m_body, m_position);
m_body = m_buffer;

} else {
realloc_buffer((m_bufferSize = content_length) + 1, m_body, std::distance(m_body, m_position));
realloc_buffer((m_buffer_size = content_length) + 1, m_body, std::distance(m_body, m_position));

m_position = m_buffer + std::distance(m_body, m_position);
m_body = m_buffer;
}
}

if ((unsigned int)std::distance(m_buffer, m_position) != m_bufferSize)
if ((unsigned int)std::distance(m_buffer, m_position) != m_buffer_size)
return;

worker_thread->poll()->remove_read(this);
Expand All @@ -193,14 +203,14 @@ SCgiTask::event_read() {

// Clean up logging, this is just plain ugly...
// write(m_logFd, "\n---\n", sizeof("\n---\n"));
result = write(m_parent->log_fd(), m_buffer, m_bufferSize);
result = write(m_parent->log_fd(), m_buffer, m_buffer_size);
result = write(m_parent->log_fd(), "\n---\n", sizeof("\n---\n"));
}

lt_log_print_dump(torrent::LOG_RPC_DUMP, m_body, m_bufferSize - std::distance(m_buffer, m_body), "scgi", "RPC read.", 0);
lt_log_print_dump(torrent::LOG_RPC_DUMP, m_body, m_buffer_size - std::distance(m_buffer, m_body), "scgi", "RPC read.", 0);

// Close if the call failed, else stay open to write back data.
if (!m_parent->receive_call(this, m_body, m_bufferSize - std::distance(m_buffer, m_body)))
if (!m_parent->receive_call(this, m_body, m_buffer_size - std::distance(m_buffer, m_body)))
close();

return;
Expand All @@ -212,12 +222,13 @@ SCgiTask::event_read() {

void
SCgiTask::event_write() {
int bytes;
// Apple and Solaris do not support MSG_NOSIGNAL,
// so disable this fix until we find a better solution
#if defined(__APPLE__) || defined(__sun__)
int bytes = ::send(m_fileDesc, m_position, m_bufferSize, 0);
bytes = ::send(m_fileDesc, m_position, m_bufferSize, 0);
#else
int bytes = ::send(m_fileDesc, m_position, m_bufferSize, MSG_NOSIGNAL);
bytes = ::send(m_fileDesc, m_position, m_buffer_size, MSG_NOSIGNAL);
#endif

if (bytes == -1) {
Expand All @@ -228,9 +239,9 @@ SCgiTask::event_write() {
}

m_position += bytes;
m_bufferSize -= bytes;
m_buffer_size -= bytes;

if (bytes == 0 || m_bufferSize == 0)
if (bytes == 0 || m_buffer_size == 0)
return close();
}

Expand All @@ -239,36 +250,102 @@ SCgiTask::event_error() {
close();
}

// On failure, returns false and the buffer is left in an
// indeterminate state, but m_position and m_bufferSize remain the
// same.
bool
SCgiTask::gzip_compress_response(const char* buffer, uint32_t length, std::string_view header_template) {
Copy link
Owner

@rakshasa rakshasa Feb 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bit too ugly a solution, not what I had in mind.

Add to utils a function like this:

gzip_compress(const char* buffer, uint32_t length, std::function<bool(char*,uint32_t,uint32_t)>)

The lambda function will write from m_buffer+max_header_size (where max_header_size is calculated from the current header template plus max length of the printed response size).

It will receive deflateBound value so it can resize on the first call.

Once done writing, copy the bytes from the header and response size to m_buffer so they end at the start of the written gzip'ed data. Then event_write starts sending from m_buffer+m_header_offset.

Don't use snprintf for a whole template, instead the header should be two static const strings, and you copy them and the response size.

z_stream zs;
zs.zalloc = Z_NULL;
zs.zfree = Z_NULL;
zs.opaque = Z_NULL;

constexpr int window_bits = 15;
constexpr int gzip_encoding = 16;
constexpr int gzip_level = 6;
constexpr int chunk_size = 16384;

if (deflateInit2(&zs, Z_DEFAULT_COMPRESSION, Z_DEFLATED, window_bits | gzip_encoding, gzip_level, Z_DEFAULT_STRATEGY) != Z_OK)
return false;

// Calculate the maximum size the buffer could reach, note that the
// max repsonse size will usually be larger than the original length
const auto max_response_size = deflateBound(&zs, length);
if (max_response_size + max_header_size > std::max(m_buffer_size, (unsigned int)default_buffer_size))
realloc_buffer(max_response_size + max_header_size, NULL, 0);

auto output = m_buffer + max_header_size;
zs.next_in = (Bytef*)buffer;
zs.avail_in = length;
do {
zs.avail_out = chunk_size;
zs.next_out = (Bytef*)output;
if (deflate(&zs, Z_FINISH) == Z_STREAM_ERROR)
return false;
output += chunk_size - zs.avail_out;
} while (zs.avail_out == 0);

// Write the header directly to the buffer. If at any point
// max_header_size would be exceeded, fail gracefully.
const std::string_view header_end("Content-Encoding: gzip\r\n\r\n");
const int response_size = output - (m_buffer + max_header_size);
int header_size = snprintf(m_buffer, max_header_size, header_template.data(), response_size);
if (header_size < 0 || header_end.size() > max_header_size - header_size)
return false;
std::memcpy(m_buffer + header_size, header_end.data(), header_end.size());
header_size += header_end.size();

// Move the response back into position right after the headers
std::memmove(m_buffer + header_size, m_buffer + max_header_size, response_size);

m_position = m_buffer;
m_buffer_size = header_size + response_size;
return true;
}

bool
SCgiTask::receive_write(const char* buffer, uint32_t length) {
if (buffer == NULL || length > (100 << 20))
throw torrent::internal_error("SCgiTask::receive_write(...) received bad input.");

// Need to cast due to a bug in MacOSX gcc-4.0.1.
if (length + 256 > std::max(m_bufferSize, (unsigned int)default_buffer_size))
realloc_buffer(length + 256, NULL, 0);

const auto header = m_content_type == ContentType::JSON
? "Status: 200 OK\r\nContent-Type: application/json\r\nContent-Length: %i\r\n\r\n"
: "Status: 200 OK\r\nContent-Type: text/xml\r\nContent-Length: %i\r\n\r\n";

// Who ever bothers to check the return value?
int headerSize = sprintf(m_buffer, header, length);

m_position = m_buffer;
m_bufferSize = length + headerSize;

std::memcpy(m_buffer + headerSize, buffer, length);
std::string header = m_content_type == ContentType::JSON
? "Status: 200 OK\r\nContent-Type: application/json\r\nContent-Length: %i\r\n"
: "Status: 200 OK\r\nContent-Type: text/xml\r\nContent-Length: %i\r\n";

// Write to log prior to possible compression
if (m_parent->log_fd() >= 0) {
int __UNUSED result;
// Clean up logging, this is just plain ugly...
// write(m_logFd, "\n---\n", sizeof("\n---\n"));
result = write(m_parent->log_fd(), m_buffer, m_bufferSize);
result = write(m_parent->log_fd(), buffer, length);
result = write(m_parent->log_fd(), "\n---\n", sizeof("\n---\n"));
}

lt_log_print_dump(torrent::LOG_RPC_DUMP, m_buffer, m_bufferSize, "scgi", "RPC write.", 0);
lt_log_print_dump(torrent::LOG_RPC_DUMP, buffer, length, "scgi", "RPC write.", 0);

// Compress the response if possible
if (m_client_accepts_compressed_response &&
gzip_enabled() &&
length > gzip_min_size() &&
gzip_compress_response(buffer, length, header)) {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't like how a failure to compress falls back to plaintext, this should only happen if there's a serious bug so fail it completely.

event_write();
return true;
}

// Otherwise (or if the compression fails), just copy the bytes
header += "\r\n";
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This use of std::strings causes unnecessary copying of buffers.

Rewrite both the gzip compressor and this to write directly to m_buffer, pass e.g. a lambda function to gzip_compressor that does the writing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This causes a chicken-and-egg problem where we need to know the Content-Length to figure out where the write cursor in m_buffer should start from (given that there's a more than decent chance we might drop a digit in the size string in the course of compression), but we don't know the length until the compression itself is complete.

The only way I can think to work around that is to first write the compressed output directly a little ways into the m_buffer and std::memmove it back to the correct position after the headers have been written. I'll go in that direction unless you tell me otherwise.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't see why you need to do it in such a convoluted way, just pass a lambda function that does the writing and have it resize m_buffer if more is needed.

You can change m_buffer to std::vector to make it cleaner.


int header_size = snprintf(NULL, 0, header.c_str(), length);

// Need to cast due to a bug in MacOSX gcc-4.0.1.
if (length + header_size > std::max(m_buffer_size, (unsigned int)default_buffer_size))
realloc_buffer(length + header_size, NULL, 0);

m_position = m_buffer;
m_buffer_size = length + header_size;

snprintf(m_buffer, m_buffer_size, header.c_str(), length);
std::memcpy(m_buffer + header_size, buffer, length);
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we have a compressed path in a separate function, also put the plaintext in one.


event_write();
return true;
Expand Down
Loading