From f91295f16d18cc8a1d0f99e26f6b2935bee428d2 Mon Sep 17 00:00:00 2001 From: ambud Date: Sat, 1 Jul 2017 01:01:30 -0700 Subject: [PATCH] #39 minor fixes and removing thread configuration from docker file due to crash issues. --- .../sidewinder/core/SidewinderServer.java | 45 +++++++++++++------ .../byzantine/ByzantineWriter.java | 4 +- .../core/storage/disk/BucketEntry.java | 3 ++ .../core/storage/disk/DiskStorageEngine.java | 34 +++++++------- .../core/storage/mem/MemStorageEngine.java | 7 ++- core/src/main/resources/configs/config.yaml | 4 -- 6 files changed, 57 insertions(+), 40 deletions(-) diff --git a/core/src/main/java/com/srotya/sidewinder/core/SidewinderServer.java b/core/src/main/java/com/srotya/sidewinder/core/SidewinderServer.java index d6d0bef..d03f7ab 100644 --- a/core/src/main/java/com/srotya/sidewinder/core/SidewinderServer.java +++ b/core/src/main/java/com/srotya/sidewinder/core/SidewinderServer.java @@ -16,12 +16,14 @@ package com.srotya.sidewinder.core; import java.io.FileInputStream; +import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.Properties; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.logging.Level; import java.util.logging.Logger; import com.codahale.metrics.Metric; @@ -86,22 +88,39 @@ public void run(SidewinderConfig config, Environment env) throws Exception { env.healthChecks().register("restapi", new RestAPIHealthCheck()); @SuppressWarnings("resource") - SidewinderDropwizardReporter reporter = new SidewinderDropwizardReporter(registry, "request", new MetricFilter() { - - @Override - public boolean matches(String name, Metric metric) { - return true; - } - }, TimeUnit.SECONDS, TimeUnit.SECONDS, storageEngine); + SidewinderDropwizardReporter reporter = new SidewinderDropwizardReporter(registry, "request", + new MetricFilter() { + + @Override + public boolean matches(String name, Metric metric) { + return true; + } + }, TimeUnit.SECONDS, TimeUnit.SECONDS, storageEngine); reporter.start(1, TimeUnit.SECONDS); - NettyHTTPIngestionServer server = new NettyHTTPIngestionServer(); - server.init(storageEngine, conf, registry); - server.start(); + if (Boolean.parseBoolean(conf.getOrDefault("server.netty.http.enabled", "false"))) { + NettyHTTPIngestionServer server = new NettyHTTPIngestionServer(); + server.init(storageEngine, conf, registry); + server.start(); + } + + if (Boolean.parseBoolean(conf.getOrDefault("server.netty.binary.enabled", "false"))) { + NettyBinaryIngestionServer binServer = new NettyBinaryIngestionServer(); + binServer.init(storageEngine, conf); + binServer.start(); + } - NettyBinaryIngestionServer binServer = new NettyBinaryIngestionServer(); - binServer.init(storageEngine, conf); - binServer.start(); + Runtime.getRuntime().addShutdownHook(new Thread("shutdown-thread") { + @Override + public void run() { + try { + logger.info("Storage engine shutdown started"); + storageEngine.disconnect(); + } catch (IOException e) { + logger.log(Level.SEVERE, "Storage engine shutdown failure", e); + } + } + }); } /** diff --git a/core/src/main/java/com/srotya/sidewinder/core/storage/compression/byzantine/ByzantineWriter.java b/core/src/main/java/com/srotya/sidewinder/core/storage/compression/byzantine/ByzantineWriter.java index 3e100a0..89118e5 100644 --- a/core/src/main/java/com/srotya/sidewinder/core/storage/compression/byzantine/ByzantineWriter.java +++ b/core/src/main/java/com/srotya/sidewinder/core/storage/compression/byzantine/ByzantineWriter.java @@ -136,9 +136,7 @@ private void writeDataPoint(long timestamp, long value) throws IOException { compressAndWriteTimestamp(buf, timestamp); compressAndWriteValue(buf, value); count++; - // if (onDisk) { updateCount(); - // } } private void compressAndWriteValue(ByteBuffer tBuf, long value) { @@ -219,8 +217,8 @@ public ByzantineReader getReader() throws IOException { configure(conf); } ByteBuffer rbuf = buf.duplicate(); - rbuf.rewind(); read.unlock(); + rbuf.rewind(); reader = new ByzantineReader(rbuf); return reader; } diff --git a/core/src/main/java/com/srotya/sidewinder/core/storage/disk/BucketEntry.java b/core/src/main/java/com/srotya/sidewinder/core/storage/disk/BucketEntry.java index 2c71bb6..dbc17d8 100644 --- a/core/src/main/java/com/srotya/sidewinder/core/storage/disk/BucketEntry.java +++ b/core/src/main/java/com/srotya/sidewinder/core/storage/disk/BucketEntry.java @@ -20,6 +20,9 @@ import com.srotya.sidewinder.core.storage.TimeSeriesBucket; import com.srotya.sidewinder.core.storage.disk.BucketEntry; +/** + * @author ambud + */ public class BucketEntry { private BucketEntry next, prev; diff --git a/core/src/main/java/com/srotya/sidewinder/core/storage/disk/DiskStorageEngine.java b/core/src/main/java/com/srotya/sidewinder/core/storage/disk/DiskStorageEngine.java index 525c2cd..ff85ed2 100644 --- a/core/src/main/java/com/srotya/sidewinder/core/storage/disk/DiskStorageEngine.java +++ b/core/src/main/java/com/srotya/sidewinder/core/storage/disk/DiskStorageEngine.java @@ -31,7 +31,6 @@ import java.util.SortedMap; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentSkipListMap; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -50,7 +49,6 @@ import com.srotya.sidewinder.core.storage.mem.Archiver; import com.srotya.sidewinder.core.storage.mem.TimeSeries; import com.srotya.sidewinder.core.storage.mem.archival.NoneArchiver; -import com.srotya.sidewinder.core.utils.BackgrounThreadFactory; import com.srotya.sidewinder.core.utils.MiscUtils; /** @@ -112,24 +110,22 @@ public void configure(Map conf, ScheduledExecutorService bgTaskP compressionFQCN = conf.getOrDefault(MEM_COMPRESSION_CLASS, "com.srotya.sidewinder.core.storage.compression.byzantine.ByzantineWriter"); - Executors.newSingleThreadScheduledExecutor(new BackgrounThreadFactory("sidewinder-gc")) - .scheduleAtFixedRate(() -> { - for (Entry>> measurementMap : databaseMap - .entrySet()) { - // String db = measurementMap.getKey(); - for (Entry> measurementEntry : measurementMap.getValue() - .entrySet()) { - // String measurement = measurementEntry.getKey(); - for (Entry entry : measurementEntry.getValue().entrySet()) { - try { - entry.getValue().collectGarbage(); - } catch (IOException e) { - logger.log(Level.SEVERE, "Error collecing garbage", e); - } - } + bgTaskPool.scheduleAtFixedRate(() -> { + for (Entry>> measurementMap : databaseMap.entrySet()) { + // String db = measurementMap.getKey(); + for (Entry> measurementEntry : measurementMap.getValue() + .entrySet()) { + // String measurement = measurementEntry.getKey(); + for (Entry entry : measurementEntry.getValue().entrySet()) { + try { + entry.getValue().collectGarbage(); + } catch (IOException e) { + logger.log(Level.SEVERE, "Error collecing garbage", e); } } - }, 500, 60, TimeUnit.SECONDS); + } + } + }, 500, 60, TimeUnit.SECONDS); loadDatabases(); } @@ -744,7 +740,7 @@ public void disconnect() throws IOException { } } } - System.gc(); + logger.info("All series buffers have been closed"); } @Override diff --git a/core/src/main/java/com/srotya/sidewinder/core/storage/mem/MemStorageEngine.java b/core/src/main/java/com/srotya/sidewinder/core/storage/mem/MemStorageEngine.java index a6ec4f9..71e35e3 100644 --- a/core/src/main/java/com/srotya/sidewinder/core/storage/mem/MemStorageEngine.java +++ b/core/src/main/java/com/srotya/sidewinder/core/storage/mem/MemStorageEngine.java @@ -122,7 +122,12 @@ public void configure(Map conf, ScheduledExecutorService bgTaskP .entrySet()) { String measurement = measurementEntry.getKey(); for (Entry entry : measurementEntry.getValue().entrySet()) { - List buckets = entry.getValue().collectGarbage(); + List buckets; + try { + buckets = entry.getValue().collectGarbage(); + } catch (IOException e1) { + continue; + } for (TimeSeriesBucket bucket : buckets) { try { archiver.archive( diff --git a/core/src/main/resources/configs/config.yaml b/core/src/main/resources/configs/config.yaml index 28d0d57..cd5bbcd 100644 --- a/core/src/main/resources/configs/config.yaml +++ b/core/src/main/resources/configs/config.yaml @@ -2,13 +2,9 @@ server: gzip: enabled: true bufferSize: 8KiB - adminMinThreads: 1 - adminMaxThreads: 2 applicationConnectors: - type: http port: 8080 - acceptorThreads: 1 - selectorThreads: 2 adminConnectors: - type: http port: 9001