Skip to content

Commit

Permalink
#39 minor fixes and removing thread configuration from docker file due
Browse files Browse the repository at this point in the history
to crash issues.
  • Loading branch information
ambud committed Jul 1, 2017
1 parent 56077b2 commit f91295f
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 40 deletions.
45 changes: 32 additions & 13 deletions core/src/main/java/com/srotya/sidewinder/core/SidewinderServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@
package com.srotya.sidewinder.core;

import java.io.FileInputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

import com.codahale.metrics.Metric;
Expand Down Expand Up @@ -86,22 +88,39 @@ public void run(SidewinderConfig config, Environment env) throws Exception {
env.healthChecks().register("restapi", new RestAPIHealthCheck());

@SuppressWarnings("resource")
SidewinderDropwizardReporter reporter = new SidewinderDropwizardReporter(registry, "request", new MetricFilter() {

@Override
public boolean matches(String name, Metric metric) {
return true;
}
}, TimeUnit.SECONDS, TimeUnit.SECONDS, storageEngine);
SidewinderDropwizardReporter reporter = new SidewinderDropwizardReporter(registry, "request",
new MetricFilter() {

@Override
public boolean matches(String name, Metric metric) {
return true;
}
}, TimeUnit.SECONDS, TimeUnit.SECONDS, storageEngine);
reporter.start(1, TimeUnit.SECONDS);

NettyHTTPIngestionServer server = new NettyHTTPIngestionServer();
server.init(storageEngine, conf, registry);
server.start();
if (Boolean.parseBoolean(conf.getOrDefault("server.netty.http.enabled", "false"))) {
NettyHTTPIngestionServer server = new NettyHTTPIngestionServer();
server.init(storageEngine, conf, registry);
server.start();
}

if (Boolean.parseBoolean(conf.getOrDefault("server.netty.binary.enabled", "false"))) {
NettyBinaryIngestionServer binServer = new NettyBinaryIngestionServer();
binServer.init(storageEngine, conf);
binServer.start();
}

NettyBinaryIngestionServer binServer = new NettyBinaryIngestionServer();
binServer.init(storageEngine, conf);
binServer.start();
Runtime.getRuntime().addShutdownHook(new Thread("shutdown-thread") {
@Override
public void run() {
try {
logger.info("Storage engine shutdown started");
storageEngine.disconnect();
} catch (IOException e) {
logger.log(Level.SEVERE, "Storage engine shutdown failure", e);
}
}
});
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,7 @@ private void writeDataPoint(long timestamp, long value) throws IOException {
compressAndWriteTimestamp(buf, timestamp);
compressAndWriteValue(buf, value);
count++;
// if (onDisk) {
updateCount();
// }
}

private void compressAndWriteValue(ByteBuffer tBuf, long value) {
Expand Down Expand Up @@ -219,8 +217,8 @@ public ByzantineReader getReader() throws IOException {
configure(conf);
}
ByteBuffer rbuf = buf.duplicate();
rbuf.rewind();
read.unlock();
rbuf.rewind();
reader = new ByzantineReader(rbuf);
return reader;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
import com.srotya.sidewinder.core.storage.TimeSeriesBucket;
import com.srotya.sidewinder.core.storage.disk.BucketEntry;

/**
* @author ambud
*/
public class BucketEntry {

private BucketEntry next, prev;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import java.util.SortedMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -50,7 +49,6 @@
import com.srotya.sidewinder.core.storage.mem.Archiver;
import com.srotya.sidewinder.core.storage.mem.TimeSeries;
import com.srotya.sidewinder.core.storage.mem.archival.NoneArchiver;
import com.srotya.sidewinder.core.utils.BackgrounThreadFactory;
import com.srotya.sidewinder.core.utils.MiscUtils;

/**
Expand Down Expand Up @@ -112,24 +110,22 @@ public void configure(Map<String, String> conf, ScheduledExecutorService bgTaskP
compressionFQCN = conf.getOrDefault(MEM_COMPRESSION_CLASS,
"com.srotya.sidewinder.core.storage.compression.byzantine.ByzantineWriter");

Executors.newSingleThreadScheduledExecutor(new BackgrounThreadFactory("sidewinder-gc"))
.scheduleAtFixedRate(() -> {
for (Entry<String, Map<String, SortedMap<String, TimeSeries>>> measurementMap : databaseMap
.entrySet()) {
// String db = measurementMap.getKey();
for (Entry<String, SortedMap<String, TimeSeries>> measurementEntry : measurementMap.getValue()
.entrySet()) {
// String measurement = measurementEntry.getKey();
for (Entry<String, TimeSeries> entry : measurementEntry.getValue().entrySet()) {
try {
entry.getValue().collectGarbage();
} catch (IOException e) {
logger.log(Level.SEVERE, "Error collecing garbage", e);
}
}
bgTaskPool.scheduleAtFixedRate(() -> {
for (Entry<String, Map<String, SortedMap<String, TimeSeries>>> measurementMap : databaseMap.entrySet()) {
// String db = measurementMap.getKey();
for (Entry<String, SortedMap<String, TimeSeries>> measurementEntry : measurementMap.getValue()
.entrySet()) {
// String measurement = measurementEntry.getKey();
for (Entry<String, TimeSeries> entry : measurementEntry.getValue().entrySet()) {
try {
entry.getValue().collectGarbage();
} catch (IOException e) {
logger.log(Level.SEVERE, "Error collecing garbage", e);
}
}
}, 500, 60, TimeUnit.SECONDS);
}
}
}, 500, 60, TimeUnit.SECONDS);

loadDatabases();
}
Expand Down Expand Up @@ -744,7 +740,7 @@ public void disconnect() throws IOException {
}
}
}
System.gc();
logger.info("All series buffers have been closed");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,12 @@ public void configure(Map<String, String> conf, ScheduledExecutorService bgTaskP
.entrySet()) {
String measurement = measurementEntry.getKey();
for (Entry<String, TimeSeries> entry : measurementEntry.getValue().entrySet()) {
List<TimeSeriesBucket> buckets = entry.getValue().collectGarbage();
List<TimeSeriesBucket> buckets;
try {
buckets = entry.getValue().collectGarbage();
} catch (IOException e1) {
continue;
}
for (TimeSeriesBucket bucket : buckets) {
try {
archiver.archive(
Expand Down
4 changes: 0 additions & 4 deletions core/src/main/resources/configs/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,9 @@ server:
gzip:
enabled: true
bufferSize: 8KiB
adminMinThreads: 1
adminMaxThreads: 2
applicationConnectors:
- type: http
port: 8080
acceptorThreads: 1
selectorThreads: 2
adminConnectors:
- type: http
port: 9001
Expand Down

0 comments on commit f91295f

Please sign in to comment.