Skip to content

Commit

Permalink
Added limit to file size and request timeouts
Browse files Browse the repository at this point in the history
  • Loading branch information
brianhks committed Mar 6, 2019
1 parent 23eb500 commit 890486a
Show file tree
Hide file tree
Showing 9 changed files with 151 additions and 157 deletions.
20 changes: 7 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,16 @@ The remote plugin is for forwarding metric data to a remote Kairos instance.
Metric data is gathered locally on the filesystem where it is compressed and uploaded to the
remote Kairos on specified intervals. (see kairos-remote.properties for options)

## Remote Datastore
The remote plugin can be loaded in one of two ways. The first is as the Kairos datastore:

```properties
kairosdb.service.datastore=org.kairosdb.plugin.remote.RemoteModule
```

This effectively makes the Kairos node write only. The node will not try to connect to
Cassandra or load the H2 database.

## Remote Listener
The second way to load the remote plugin is as a data point listener:
The remote plugin comes with a data points listener class and in order to laod
it you load the `ListenerModule` in your kairos configuration:

```properties
kairosdb.datastore.remote.service.remote=org.kairosdb.plugin.remote.ListenerModule
```

The `ListenerModule` adds a listener to the data point events going into the datastore and
forwards the events on to a remote Kairos instance. Effectively letting you fork the data.
The `ListenerModule` adds a listener to the data point events coming into kairos and
forwards the events on to a remote Kairos instance. Effectively letting you fork the data.

For a pure remote Kairos instance you can comment out the datastore modules and just
use the `ListenerModule`, effectively making the Kairos instance a write only node.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ public class ListenerModule extends AbstractModule
@Override
protected void configure()
{
bind(RemoteDatastore.class).in(Scopes.SINGLETON);
bind(RemoteListener.class).in(Scopes.SINGLETON);
bind(RemoteSendJob.class).in(Scopes.SINGLETON);
bind(RemoteDatastoreHealthCheck.class).in(Scopes.SINGLETON);
bind(DiskUtils.class).to(DiskUtilsImpl.class);
Expand Down
49 changes: 0 additions & 49 deletions src/main/java/org/kairosdb/plugin/remote/NullServiceKeyStore.java

This file was deleted.

17 changes: 14 additions & 3 deletions src/main/java/org/kairosdb/plugin/remote/RemoteHostImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,27 @@
public class RemoteHostImpl implements RemoteHost
{
private static final Logger logger = LoggerFactory.getLogger(RemoteHostImpl.class);
private static final String REMOTE_URL_PROP = "kairosdb.datastore.remote.remote_url";
private static final String REMOTE_URL_PROP = "kairosdb.remote.remote_url";
private static final String CONNECTION_REQUEST_TIMEOUT = "kairosdb.remote.connection_request_timeout";
private static final String CONNECTION_TIMEOUT = "kairosdb.remote.connection_timeout";
private static final String SOCKET_TIMEOUT = "kairosdb.remote.socket_timeout";

private final String url;
private CloseableHttpClient client;

@Inject
public RemoteHostImpl(@Named(REMOTE_URL_PROP) String remoteUrl)
public RemoteHostImpl(@Named(REMOTE_URL_PROP) String remoteUrl,
@Named(CONNECTION_REQUEST_TIMEOUT) int requestTimeout,
@Named(CONNECTION_TIMEOUT) int connectionTimeout,
@Named(SOCKET_TIMEOUT) int socketTimeout)
{
this.url = checkNotNullOrEmpty(remoteUrl, "url must not be null or empty");
client = HttpClients.createDefault();
RequestConfig requestConfig = RequestConfig.custom().setSocketTimeout(socketTimeout)
.setConnectionRequestTimeout(requestTimeout)
.setConnectTimeout(connectionTimeout)
.build();
HttpClients.custom().setDefaultRequestConfig(requestConfig);
}

@Override
Expand Down Expand Up @@ -67,7 +78,7 @@ else if (response.getStatusLine().getStatusCode() == 400)
{
//This means it was a bad file, more than likely the json is not well formed
//renaming it will make sure we don't try it again as it will likely fail again
//Most of the data likely was loaded into kairos
//All of the data likely was loaded into kairos especially if it was missing the last ]
ByteArrayOutputStream body = new ByteArrayOutputStream();
response.getEntity().writeTo(body);
logger.error("Unable to send file " + zipFile + ": " + response.getStatusLine() +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package org.kairosdb.plugin.remote;

import com.google.common.base.Stopwatch;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ImmutableSortedMap;
import com.google.common.collect.Multimap;
Expand All @@ -39,49 +40,53 @@
import java.nio.file.Files;
import java.util.Arrays;
import java.util.SortedMap;
import java.util.concurrent.TimeUnit;
import java.util.zip.GZIPOutputStream;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Strings.isNullOrEmpty;


public class RemoteDatastore
public class RemoteListener
{
private static final Logger logger = LoggerFactory.getLogger(RemoteDatastore.class);
private static final String DATA_DIR_PROP = "kairosdb.datastore.remote.data_dir";
private static final String DROP_PERCENT_PROP = "kairosdb.datastore.remote.drop_on_used_disk_space_threshold_percent";
private static final String METRIC_PREFIX_FILTER = "kairosdb.datastore.remote.prefix_filter";

private static final String FILE_SIZE_METRIC = "kairosdb.datastore.remote.file_size";
private static final String ZIP_FILE_SIZE_METRIC = "kairosdb.datastore.remote.zip_file_size";
private static final String WRITE_SIZE_METRIC = "kairosdb.datastore.remote.write_size";
private static final String TIME_TO_SEND_METRIC = "kairosdb.datastore.remote.time_to_send";
private static final String DELETE_ZIP_METRIC = "kairosdb.datastore.remote.deleted_zipFile_size";
private static final Logger logger = LoggerFactory.getLogger(RemoteListener.class);
private static final String DATA_DIR_PROP = "kairosdb.remote.data_dir";
private static final String DROP_PERCENT_PROP = "kairosdb.remote.drop_on_used_disk_space_threshold_percent";
private static final String METRIC_PREFIX_FILTER = "kairosdb.remote.prefix_filter";

private static final String FILE_SIZE_METRIC = "kairosdb.remote.file_size";
private static final String ZIP_FILE_SIZE_METRIC = "kairosdb.remote.zip_file_size";
private static final String WRITE_SIZE_METRIC = "kairosdb.remote.write_size";
private static final String TIME_TO_SEND_METRIC = "kairosdb.remote.time_to_send";
private static final String DELETE_ZIP_METRIC = "kairosdb.remote.deleted_zipFile_size";
private static final String FLUSH_INTERVAL = "kairosdb.remote.flush_interval_ms";
private static final String MAX_FILE_SIZE_MB = "kairosdb.remote.max_file_size_mb";

private final Object m_dataFileLock = new Object();
private final Object m_sendLock = new Object();
private final int m_dropPercent;
private final File m_dataDirectory;
private final RemoteHost m_remoteHost;
private final DiskUtils m_diskUtils;
private final int m_flushInterval;
private final ImmutableSortedMap<String, String> m_tags;
private BufferedWriter m_dataWriter;
private final Publisher<DataPointEvent> m_publisher;
private String m_dataFileName;
private volatile boolean m_firstDataPoint = true;
private int m_dataPointCounter;
private Stopwatch m_sendTimer = Stopwatch.createUnstarted();

private volatile Multimap<DataPointKey, DataPoint> m_dataPointMultimap;
private final Object m_mapLock = new Object(); //Lock for the above map

private volatile boolean m_running;

@Inject
@Named("HOSTNAME")
private String m_hostName = "localhost";

private String[] m_prefixFilterArray = new String[0];

private long m_maxFileSize = 1024*1024*10;

@Inject
private LongDataPointFactory m_longDataPointFactory = new LongDataPointFactoryImpl();

Expand All @@ -96,9 +101,13 @@ public void setPrefixFilter(@Named(METRIC_PREFIX_FILTER) String prefixFilter)
}

@Inject
public RemoteDatastore(@Named(DATA_DIR_PROP) String dataDir,
@Named(DROP_PERCENT_PROP) String dropPercent, RemoteHost remoteHost,
FilterEventBus eventBus, DiskUtils diskUtils) throws IOException, DatastoreException
public RemoteListener(@Named(DATA_DIR_PROP) String dataDir,
@Named(DROP_PERCENT_PROP) String dropPercent,
@Named(FLUSH_INTERVAL) int flushInterval,
@Named("HOSTNAME") String hostName,
RemoteHost remoteHost,
FilterEventBus eventBus,
DiskUtils diskUtils) throws IOException, DatastoreException
{
m_dataDirectory = new File(dataDir);
m_dropPercent = Integer.parseInt(dropPercent);
Expand All @@ -107,6 +116,11 @@ public RemoteDatastore(@Named(DATA_DIR_PROP) String dataDir,
m_remoteHost = checkNotNull(remoteHost, "remote host must not be null");
m_publisher = eventBus.createPublisher(DataPointEvent.class);
m_diskUtils = checkNotNull(diskUtils, "diskUtils must not be null");
m_flushInterval = flushInterval;

m_tags = ImmutableSortedMap.<String, String>naturalOrder()
.put("host", hostName)
.build();

createNewMap();

Expand All @@ -123,7 +137,7 @@ public RemoteDatastore(@Named(DATA_DIR_PROP) String dataDir,
{
flushMap();

Thread.sleep(2000);
Thread.sleep(m_flushInterval);
}
catch (Exception e)
{
Expand All @@ -138,6 +152,12 @@ public RemoteDatastore(@Named(DATA_DIR_PROP) String dataDir,

}

@Inject
public void setMaxFileSize(@Named(MAX_FILE_SIZE_MB)long maxFileSize)
{
m_maxFileSize = maxFileSize * 1024 * 1024;
}

private Multimap<DataPointKey, DataPoint> createNewMap()
{
Multimap<DataPointKey, DataPoint> ret;
Expand All @@ -161,6 +181,9 @@ private void flushMap()
{
try
{
//Check if we need to role to a new file because of size
rollAndZipFile(System.currentTimeMillis(), true);

for (DataPointKey dataPointKey : flushMap.keySet())
{
//We have to clear the writer every time or it gets confused
Expand Down Expand Up @@ -359,7 +382,7 @@ private void cleanDiskSpace()
Files.delete(zipFiles[0].toPath());
logger.warn("Disk is too full to create zip file. Deleted older zip file " + zipFiles[0].getName() + " size: " + size);
// For forwarding this metric will be reported both on the local kairos node and the remote
m_publisher.post(new DataPointEvent(DELETE_ZIP_METRIC, ImmutableSortedMap.of("host", m_hostName),
m_publisher.post(new DataPointEvent(DELETE_ZIP_METRIC, m_tags,
m_longDataPointFactory.createDataPoint(System.currentTimeMillis(), size)));
cleanDiskSpace(); // continue cleaning until space is freed up or all zip files are deleted.
}
Expand All @@ -376,39 +399,65 @@ private boolean hasSpace()
return m_dropPercent >= 100 || m_diskUtils.percentAvailable(m_dataDirectory) < m_dropPercent;
}

void sendData() throws IOException
//Rolls to a new file and zips up the current one
private void rollAndZipFile(long now, boolean conditionalRoll) throws IOException
{
synchronized (m_sendLock)
{
String oldDataFile = m_dataFileName;
long now = System.currentTimeMillis();

long fileSize = (new File(m_dataFileName)).length();
int dataPointCounter;
String oldDataFile;
long fileSize;

ImmutableSortedMap<String, String> tags = ImmutableSortedMap.<String, String>naturalOrder()
.put("host", m_hostName)
.build();
synchronized (m_dataFileLock)
{
fileSize = (new File(m_dataFileName)).length();

int dataPointCounter;
synchronized (m_dataFileLock)
if (conditionalRoll)
{
closeDataFile();
dataPointCounter = m_dataPointCounter;
openDataFile();
//Check file size
if (fileSize < m_maxFileSize)
return;
}

long zipSize = zipFile(oldDataFile);
oldDataFile = m_dataFileName;

closeDataFile();
//m_dataPointCounter gets reset in openDataFile()
dataPointCounter = m_dataPointCounter;
openDataFile();
}

long zipSize = zipFile(oldDataFile);

try
{
putDataPoint(new DataPointEvent(FILE_SIZE_METRIC, m_tags, m_longDataPointFactory.createDataPoint(now, fileSize), 0));
putDataPoint(new DataPointEvent(WRITE_SIZE_METRIC, m_tags, m_longDataPointFactory.createDataPoint(now, dataPointCounter), 0));
putDataPoint(new DataPointEvent(ZIP_FILE_SIZE_METRIC, m_tags, m_longDataPointFactory.createDataPoint(now, zipSize), 0));
}
catch (DatastoreException e)
{
logger.error("Error writing remote metrics", e);
}
}

//Called by RemoteSendJob that is on a timer set in config
void sendData() throws IOException
{
synchronized (m_sendLock)
{

long now = System.currentTimeMillis();
m_sendTimer.start();

rollAndZipFile(now, false);

sendAllZipfiles();

long timeToSend = System.currentTimeMillis() - now;
long timeToSend = m_sendTimer.elapsed(TimeUnit.MILLISECONDS);
m_sendTimer.reset();

try
{
putDataPoint(new DataPointEvent(FILE_SIZE_METRIC, tags, m_longDataPointFactory.createDataPoint(now, fileSize), 0));
putDataPoint(new DataPointEvent(WRITE_SIZE_METRIC, tags, m_longDataPointFactory.createDataPoint(now, dataPointCounter), 0));
putDataPoint(new DataPointEvent(ZIP_FILE_SIZE_METRIC, tags, m_longDataPointFactory.createDataPoint(now, zipSize), 0));
putDataPoint(new DataPointEvent(TIME_TO_SEND_METRIC, tags, m_longDataPointFactory.createDataPoint(now, timeToSend), 0));
putDataPoint(new DataPointEvent(TIME_TO_SEND_METRIC, m_tags, m_longDataPointFactory.createDataPoint(now, timeToSend), 0));
}
catch (DatastoreException e)
{
Expand Down
28 changes: 0 additions & 28 deletions src/main/java/org/kairosdb/plugin/remote/RemoteModule.java

This file was deleted.

Loading

0 comments on commit 890486a

Please sign in to comment.