Skip to content

Commit

Permalink
feat: plugin manager (#27)
Browse files Browse the repository at this point in the history
  • Loading branch information
phrp720 authored Feb 3, 2025
1 parent 3d87743 commit 2828260
Show file tree
Hide file tree
Showing 30 changed files with 367 additions and 964 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,15 @@ jobs:
run: |
mkdir -p ../release/linux
GOOS=linux GOARCH=amd64 go build -o ../release/linux/aw-sync-agent
cp aw-sync-agent.yaml ../release/linux/
cp config ../release/linux/
# Build aw-sync-agent for Windows
- name: Build aw-sync-agent for Windows
working-directory: aw-sync-agent
run: |
mkdir -p ../release/windows
GOOS=windows GOARCH=amd64 go build -o ../release/windows/aw-sync-agent.exe
cp aw-sync-agent.yaml ../release/windows/
cp config ../release/windows/
# Package aw-sync-center
- name: Package aw-sync-center
Expand Down
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,6 @@ pkg/
*.sublime-workspace

## aw-sync-agent files
checkpoint.json
checkpoint.json

config
18 changes: 17 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ Open-Source Solution for Securely Syncing and Visualizing Multiple ActivityWatch
3. [Installation & Usage](#-installation--usage)
4. [Components](#-components)
- [aw-sync-agent](#-aw-sync-agent)
- [aw-sync-suite-plugins](#-aw-sync-suite-plugins)
- [aw-sync-center](#-aw-sync-center)
5. [Architecture](#-architecture)
6. [Requirements](#-requirements)
Expand All @@ -60,6 +61,7 @@ You can check also the [screenshots](https://github.com/phrp720/aw-sync-suite/tr
- 🌐 **Centralized Monitoring:** Aggregate data from multiple devices effortlessly.
- 🛡️ **Data Filtering:** Protect sensitive information by filtering or sanitizing it at the source.
- 📍 **Checkpointing Mechanism:** Smart synchronization with automatic tracking of synced data.
- 🔌 **Plugin Mechanism:** Use the pre-built plugins or create your own for custom data processing.
- 📈 **Pre-Built Dashboards:** Use intuitive Grafana dashboards for instant insights.
- ⚙️ **Effortless Deployment:** Simple setup for both agent and central components.

Expand All @@ -77,7 +79,7 @@ Runs on each device, retrieves and filters ActivityWatch data, and sends it secu

- **Purpose**: Syncs data from ActivityWatch to Prometheus.
- **Deployment**: Run on each computer you wish to track user activity from.
- **Configuration**: Configurable via the `aw-sync-agent.yaml` file.
- **Configuration**: Configurable via the `aw-sync-settings.yaml` file and its [plugins](https://github.com/phrp720/aw-sync-suite-plugins).

| Platform Support | Runs as a Service (`-service`) | General Workability |
|------------------|--------------------------------|---------------------|
Expand All @@ -89,6 +91,20 @@ Runs on each device, retrieves and filters ActivityWatch data, and sends it secu
> - The **aw-sync-agent** is fully operational on **macOS**, but the `-service` feature (which allows the agent to run as a background service) is not yet implemented for macOS.
> - On macOS, you can still manually start and run the agent to sync ActivityWatch data without issues.
### 🔌 [aw-sync-suite-plugins](https://github.com/phrp720/aw-sync-suite-plugins)

The `aw-sync-suite` supports a range of plugins that allow for custom data processing before pushing the data to Prometheus. You can utilize these plugins to enhance your data synchronization capabilities.

- **Purpose**: Plugins enable pre-processing of ActivityWatch data, allowing for filtering, transformation, and custom handling based on specific requirements(read more on how they work [here](https://github.com/phrp720/aw-sync-suite/wiki/Agent-Plugins)).
- **Integration**: Easily configurable within the `aw-sync-agent` through the `aw-sync-settings.yaml` file.
- **Repository**: For a list of available plugins and their usage, visit the [aw-sync-suite-plugins repository](https://github.com/phrp720/aw-sync-suite-plugins).

#### How to Use Plugins

1. **Add the Plugin**: Ensure the desired plugin is available in the `aw-sync-suite-plugins` repository and modify the `aw-sync-settings.yaml` file to include the plugin.
2. **Configure the Plugin**: If the plugin is configurable add the plugin configuration in `config` folder to configure the plugin settings.
3. **Run the Agent**: Start the `aw-sync-agent`, and it will utilize the configured plugins for data processing.

### 📦 [aw-sync-center](https://github.com/phrp720/aw-sync-suite/tree/master/aw-sync-center)

A centralized Prometheus and Grafana setup for aggregating and visualizing data.
Expand Down
324 changes: 43 additions & 281 deletions aw-sync-agent/README.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package aw
package activitywatch

import (
"encoding/json"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package aw
package activitywatch

import "time"

Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
package util
package activitywatch

import (
"aw-sync-agent/aw"
internalErrors "aw-sync-agent/errors"
"github.com/phrp720/aw-sync-agent-plugins/models"
"log"
"net/http"
"sort"
"time"
)

// RemoveExcludedWatchers removes the excluded watchers from the buckets
func RemoveExcludedWatchers(buckets aw.Watchers, excludedWatchers []string) aw.Watchers {
func RemoveExcludedWatchers(buckets Watchers, excludedWatchers []string) Watchers {
if len(excludedWatchers) > 0 {
for _, excludedWatcher := range excludedWatchers {
for id, bucket := range buckets {
Expand All @@ -23,7 +24,7 @@ func RemoveExcludedWatchers(buckets aw.Watchers, excludedWatchers []string) aw.W
return buckets
}

func ActivityWatchHealthCheck(activityWatchUrl string) bool {
func HealthCheck(activityWatchUrl string) bool {

resp, err := getRequest(activityWatchUrl)
if err != nil {
Expand Down Expand Up @@ -59,3 +60,34 @@ func getRequest(url string) (*http.Response, error) {

return resp, nil
}

func SortAndTrimEvents(events []Event) []Event {
// Sort events by timestamp. Older to newer.
sort.Slice(events, func(i, j int) bool {
return events[i].Timestamp.Before(events[j].Timestamp)
})

// Remove the newest event because it might be incomplete.
if len(events) > 0 {
events = events[:len(events)-1]
}

return events
}

func ToPluginEvent(events []Event) models.Events {
var convertedEvents models.Events
for _, event := range events {

convertedEvents = append(convertedEvents, models.Event{ID: event.ID, Timestamp: event.Timestamp, Duration: event.Duration, Data: event.Data})
}
return convertedEvents
}

func ToAwEvent(events models.Events) Events {
var convertedEvents Events
for _, event := range events {
convertedEvents = append(convertedEvents, Event{ID: event.ID, Timestamp: event.Timestamp, Duration: event.Duration, Data: event.Data})
}
return convertedEvents
}
12 changes: 0 additions & 12 deletions aw-sync-agent/aw-sync-agent.yaml

This file was deleted.

9 changes: 9 additions & 0 deletions aw-sync-agent/config/aw-sync-settings.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
#Settings:
# awUrl: http://localhost:5600
# prometheusUrl: http://localhost:9090
# prometheusAuth: ""
# excludedWatchers: []
# userId: TestUser
# includeHostname: true
# plugins:
# - "filters" # This is the only plugin that is enabled by default
94 changes: 19 additions & 75 deletions aw-sync-agent/datamanager/datamanager.go
Original file line number Diff line number Diff line change
@@ -1,43 +1,40 @@
package datamanager

import (
"aw-sync-agent/aw"
"aw-sync-agent/activitywatch"
"aw-sync-agent/checkpoint"
"aw-sync-agent/filter"
"aw-sync-agent/prometheus"
"aw-sync-agent/util"
"context"
"errors"
"fmt"
"github.com/phrp720/aw-sync-agent-plugins/models"
"log"
"sort"
"strconv"
)

// ScrapeData scrapes the data from the local ActivityWatch instance via the aw Client
func ScrapeData(awUrl string, excludedWatchers []string) (aw.WatcherNameToEventsMap, error) {
if !util.ActivityWatchHealthCheck(awUrl) {
func ScrapeData(awUrl string, excludedWatchers []string) (activitywatch.WatcherNameToEventsMap, error) {
if !activitywatch.HealthCheck(awUrl) {
return nil, errors.New("activityWatch is not reachable. Data will be pushed at the next synchronization")
}
log.Print("Fetching buckets ...\n")
buckets, err := aw.GetBuckets(awUrl)
buckets, err := activitywatch.GetBuckets(awUrl)
if err != nil {
return nil, fmt.Errorf("Error fetching buckets: %v", err)
}

log.Print("Buckets fetched successfully")
log.Print("Total buckets fetched: ", len(buckets))
util.RemoveExcludedWatchers(buckets, excludedWatchers)
eventsMap := make(aw.WatcherNameToEventsMap)
activitywatch.RemoveExcludedWatchers(buckets, excludedWatchers)
eventsMap := make(activitywatch.WatcherNameToEventsMap)
for name, bucket := range buckets {
if !util.ActivityWatchHealthCheck(awUrl) {
if !activitywatch.HealthCheck(awUrl) {
return nil, errors.New("activityWatch is not reachable. Data will be pushed at the next synchronization")
}
log.Print("Fetching events from ", bucket.Client, " ...")
startPoint := checkpoint.Read(bucket.Client)

//endPoint := time.Now().AddDate(0, 0, -1) // Set end date to one day before the current date
events, err := aw.GetEvents(awUrl, name, startPoint, nil, nil)
events, err := activitywatch.GetEvents(awUrl, name, startPoint, nil, nil)
if err != nil {
return nil, fmt.Errorf("error fetching events for bucket %s: %v", bucket.Client, err)
}
Expand All @@ -49,72 +46,19 @@ func ScrapeData(awUrl string, excludedWatchers []string) (aw.WatcherNameToEvents

// AggregateData aggregates the data
// This is going to be called with events for each watcher separately
func AggregateData(events []aw.Event, watcher string, userID string, includeHostName bool, filters []filter.Filter) []prometheus.TimeSeries {

// Sort events by timestamp. Older to newer.
sort.Slice(events, func(i, j int) bool {
return events[i].Timestamp.Before(events[j].Timestamp)
})

// Remove the newest event because it might be incomplete.
if len(events) > 0 {
events = events[:len(events)-1]
}
func AggregateData(Plugins []models.Plugin, events []activitywatch.Event, watcher string, userID string, includeHostName bool) []prometheus.TimeSeries {

events = activitywatch.SortAndTrimEvents(events)
var timeSeriesList []prometheus.TimeSeries

//Apply the filters
var watcherFilters []filter.Filter
if watcher != "aw-watcher-afk" {
watcherFilters = filter.GetMatchingFilters(filters, watcher)
// Sort watcherFilters so filters with a Category take priority
sort.Slice(watcherFilters, func(i, j int) bool {
return watcherFilters[i].Category != "" && watcherFilters[j].Category == ""
})
var unmarshaledEvents models.Events
for _, plugin := range Plugins {
unmarshaledEvents = plugin.Execute(activitywatch.ToPluginEvent(events), watcher, userID, includeHostName)
}
if len(Plugins) > 0 {
events = activitywatch.ToAwEvent(unmarshaledEvents)
}

var dropEvent bool
for _, event := range events {

//Apply the filters
if watcher != "aw-watcher-afk" {
event.Data["category"] = "Other" //Default category
event.Data, dropEvent = filter.Apply(event.Data, watcherFilters)
}

// Drop the event if it matches the filter
if dropEvent {
continue
}
var labels []prometheus.Label

util.AddMetricLabel(&labels, "__name__", util.SanitizeLabelName(watcher)) //Watcher name
util.AddMetricLabel(&labels, "unique_id", util.GetRandomUUID()) // Unique ID for each event to avoid duplicate errors of timestamp seconds
util.AddMetricLabel(&labels, "aw_id", strconv.Itoa(event.ID)) //Event ID created from activityWatch
util.AddMetricLabel(&labels, "user", userID)

hostValue := "Unknown"
if includeHostName {
hostValue = util.GetHostname()
}

util.AddMetricLabel(&labels, "host", hostValue)

// Add the data as labels
for key, value := range event.Data {
util.AddMetricLabel(&labels, key, fmt.Sprintf("%v", value))
}
sample := prometheus.Sample{
Value: event.Duration,
Time: event.Timestamp,
}

timeSeries := prometheus.TimeSeries{
Labels: labels,
Sample: sample,
}

timeSeriesList = append(timeSeriesList, timeSeries)
timeSeriesList = append(timeSeriesList, prometheus.AttachTimeSeriesPayload(event, includeHostName, watcher, userID))
}
return timeSeriesList
}
Expand All @@ -126,7 +70,7 @@ func PushData(client *prometheus.Client, prometheusUrl string, prometheusSecretK
log.Print("Pushing data for [", watcher, "] ...")

for i := 0; i < len(timeseries); i += chunkSize {
if !util.PromHealthCheck(prometheusUrl, prometheusSecretKey) {
if !prometheus.HealthCheck(prometheusUrl, prometheusSecretKey) {
return errors.New("prometheus is not reachable or Internet connection is lost. Data will be pushed at the next synchronization")
}
end := i + chunkSize
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
Settings:
awUrl: http://localhost:5600
prometheusUrl: http://localhost:9090
excludedWatchers:
- aw-watcher-window
userId: DemoUser

Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@ services:
container_name: aw-sync-agent
restart: unless-stopped
volumes:
- ./aw-sync-agent.yaml:/opt/aw-sync-agent.yaml
- ./config:/opt/config
Original file line number Diff line number Diff line change
@@ -1,10 +1,3 @@
Settings:
awUrl: http://localhost:5600
prometheusUrl: http://localhost:9090
excludedWatchers:
- aw-watcher-window
userId: DemoUser

Filters:

- Filter:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ services:
restart: unless-stopped
network_mode: "host"
volumes:
- ./aw-sync-agent.yaml:/opt/aw-sync-agent.yaml
- ./config:/opt/config
environment:
- ACTIVITY_WATCH_URL=http://localhost:5600
- PROMETHEUS_URL=http://localhost:9090
Expand Down
Loading

0 comments on commit 2828260

Please sign in to comment.