Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: Access logs #198

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file.
53 changes: 53 additions & 0 deletions backend/src/zelthy/api/platform/access_logs/v1/serializers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import importlib

from rest_framework import serializers

from zelthy.api.platform.tenancy.v1.serializers import AppUserModelSerializerModel
from zelthy.apps.access_logs.models import AppAccessLog
from zelthy.core.utils import get_datetime_str_in_tenant_timezone, get_current_request


class AccessLogSerializerModel(serializers.ModelSerializer):
user = serializers.SerializerMethodField()
role = serializers.SerializerMethodField()
attempt_time = serializers.SerializerMethodField()
session_expired_at = serializers.SerializerMethodField()
is_login_successful = serializers.SerializerMethodField()

def get_attempt_time(self, obj):
if obj.attempt_time:
return get_datetime_str_in_tenant_timezone(
obj.attempt_time, self.context["tenant"]
)
return "NA"

def get_session_expired_at(self, obj):
if obj.session_expired_at:
return get_datetime_str_in_tenant_timezone(
obj.session_expired_at, self.context["tenant"]
)

return "NA"

def get_user(self, obj):
return obj.user.name if obj.user else "NA"

def get_role(self, obj):
return obj.role.name if obj.role else "NA"

def get_is_login_successful(self, obj):
return "Successful" if obj.is_login_successful else "Failed"

class Meta:
model = AppAccessLog
fields = [
"id",
"ip_address",
"user",
"attempt_type",
"attempt_time",
"role",
"user_agent",
"is_login_successful",
"session_expired_at",
]
11 changes: 11 additions & 0 deletions backend/src/zelthy/api/platform/access_logs/v1/urls.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from django.urls import path

from .views import AccessLogViewAPIV1

urlpatterns = [
path(
"",
AccessLogViewAPIV1.as_view(),
name="accesslog-apiv1-accessloglistview",
),
]
161 changes: 161 additions & 0 deletions backend/src/zelthy/api/platform/access_logs/v1/views.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
import csv
import json
import pytz
import traceback
from datetime import datetime

from django.db.models import Q
from django.utils.decorators import method_decorator

from zelthy.core.utils import get_search_columns
from zelthy.core.api.utils import ZelthyAPIPagination
from zelthy.apps.access_logs.models import AppAccessLog
from zelthy.core.common_utils import set_app_schema_path
from zelthy.apps.shared.tenancy.models import TenantModel
from zelthy.core.api import get_api_response, ZelthyGenericPlatformAPIView

from .serializers import AccessLogSerializerModel


@method_decorator(set_app_schema_path, name="dispatch")
class AccessLogViewAPIV1(ZelthyGenericPlatformAPIView, ZelthyAPIPagination):
pagination_class = ZelthyAPIPagination

def process_timestamp(self, timestamp, timezone):
try:
ts = json.loads(timestamp)
tz = pytz.timezone(timezone)
ts["start"] = tz.localize(
datetime.strptime(ts["start"] + "-" + "00:00", "%Y-%m-%d-%H:%M"),
is_dst=None,
)
ts["end"] = tz.localize(
datetime.strptime(ts["end"] + "-" + "23:59", "%Y-%m-%d-%H:%M"),
is_dst=None,
)
return ts
except Exception:
return None

def process_id(self, id):
try:
return int(id)
except ValueError:
return None

def get_queryset(self, search, tenant, columns={}):

field_name_query_mapping = {
"id": "id",
"user": "user__name__icontains",
"user_agent": "user_agent__icontains",
}
search_filters = {
"id": self.process_id,
"attempt_time": self.process_timestamp,
}

records = AppAccessLog.objects.all().order_by("-id")

if search == "" and columns == {}:
return records

filters = Q()
for field_name, query in field_name_query_mapping.items():
if search:
if search_filters.get(field_name, None):
filters |= Q(**{query: search_filters[field_name](search)})
else:
filters |= Q(**{query: search})
records = records.filter(filters).distinct()

if columns.get("attempt_time"):
processed = self.process_timestamp(
columns.get("attempt_time"), tenant.timezone
)
if processed is not None:
records = records.filter(
attempt_time__gte=processed["start"],
attempt_time__lte=processed["end"],
)
if columns.get("attempt_type"):
records = records.filter(attempt_type=columns.get("attempt_type"))

if columns.get("is_login_successful") != None:
records = records.filter(
is_login_successful=columns.get("is_login_successful")
)

if columns.get("role"):
records = records.filter(role=columns.get("role"))

return records

def get_dropdown_options(self):
options = {}
options["attempt_type"] = [
{
"id": "login",
"label": "Login",
},
{
"id": "switch_role",
"label": "Switch Role",
},
]
options["is_login_successful"] = [
{
"id": True,
"label": "Successful",
},
{
"id": False,
"label": "Failed",
},
]
role_list = list(
AppAccessLog.objects.all()
.values_list("role__id", "role__name")
.order_by("role__name")
.distinct()
)
for user_role in role_list:
options["role"].append(
{
"id": user_role[0],
"label": user_role[1],
}
)
return options

def get(self, request, *args, **kwargs):
try:
app_uuid = kwargs.get("app_uuid")
tenant = TenantModel.objects.get(uuid=app_uuid)
include_dropdown_options = request.GET.get("include_dropdown_options")
search = request.GET.get("search", None)
columns = get_search_columns(request)
access_logs = self.get_queryset(search, tenant, columns)
paginated_access_logs = self.paginate_queryset(
access_logs, request, view=self
)
serializer = AccessLogSerializerModel(
paginated_access_logs, many=True, context={"tenant": tenant}
)
accesslogs = self.get_paginated_response_data(serializer.data)
success = True
response = {
"audit_logs": accesslogs,
"message": "Access logs fetched successfully",
}
if include_dropdown_options:
response["dropdown_options"] = self.get_dropdown_options()

status = 200

except Exception as e:
traceback.print_exc()
success = False
response = {"message": str(e)}
status = 500
return get_api_response(success, response, status)
2 changes: 2 additions & 0 deletions backend/src/zelthy/api/platform/tenancy/v1/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from zelthy.api.platform.packages.v1 import urls as packages_v1_urls
from zelthy.api.platform.tasks.v1 import urls as tasks_v1_urls
from zelthy.api.platform.codeassist.v1 import urls as codeassist_v1_urls
from zelthy.api.platform.access_logs.v1 import urls as access_logs_v1_urls


urlpatterns = [
Expand Down Expand Up @@ -57,5 +58,6 @@
re_path(r"^(?P<app_uuid>[\w-]+)/packages/$", include(packages_v1_urls)),
re_path(r"^(?P<app_uuid>[\w-]+)/tasks/", include(tasks_v1_urls)),
re_path(r"^(?P<app_uuid>[\w-]+)/code-assist/", include(codeassist_v1_urls)),
re_path(r"^(?P<app_uuid>[\w-]+)/access-logs/", include(access_logs_v1_urls)),
path("", include(permissions_v1_urls)),
]
Empty file.
9 changes: 9 additions & 0 deletions backend/src/zelthy/apps/access_logs/apps.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from django.apps import AppConfig


class AccesslogsAppConfig(AppConfig):

name = "zelthy.apps.access_logs"

def ready(self):
import zelthy.apps.access_logs.signals
86 changes: 86 additions & 0 deletions backend/src/zelthy/apps/access_logs/migrations/0001_accesslogs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
# Generated by Django 4.2.11 on 2024-04-12 09:20

from django.db import migrations, models
import django.db.models.deletion


class Migration(migrations.Migration):

initial = True

dependencies = [
("appauth", "0006_appusermodel_app_objects"),
]

operations = [
migrations.CreateModel(
name="AppAccessLog",
fields=[
(
"id",
models.AutoField(
auto_created=True,
primary_key=True,
serialize=False,
verbose_name="ID",
),
),
(
"user_agent",
models.CharField(
db_index=True, max_length=255, verbose_name="User Agent"
),
),
(
"ip_address",
models.GenericIPAddressField(
db_index=True, null=True, verbose_name="IP Address"
),
),
(
"username",
models.CharField(
db_index=True,
max_length=255,
null=True,
verbose_name="Username",
),
),
(
"http_accept",
models.CharField(max_length=1025, verbose_name="HTTP Accept"),
),
("path_info", models.CharField(max_length=255, verbose_name="Path")),
(
"attempt_time",
models.DateTimeField(
auto_now_add=True, verbose_name="Attempt Time"
),
),
("attempt_type", models.CharField(max_length=20, null=True)),
("is_login_successful", models.BooleanField(default=False)),
("session_expired_at", models.DateTimeField(blank=True, null=True)),
(
"role",
models.ForeignKey(
blank=True,
null=True,
on_delete=django.db.models.deletion.CASCADE,
to="appauth.userrolemodel",
),
),
(
"user",
models.ForeignKey(
null=True,
on_delete=django.db.models.deletion.CASCADE,
to="appauth.appusermodel",
),
),
],
options={
"ordering": ["-attempt_time"],
"abstract": False,
},
),
]
Empty file.
18 changes: 18 additions & 0 deletions backend/src/zelthy/apps/access_logs/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from django.db import models
from axes.models import AccessBase

from zelthy.apps.appauth.models import AppUserModel, UserRoleModel


class AppAccessLog(AccessBase):

user = models.ForeignKey(AppUserModel, null=True, on_delete=models.CASCADE)
role = models.ForeignKey(
UserRoleModel, null=True, blank=True, on_delete=models.CASCADE
)
attempt_type = models.CharField(max_length=20, null=True)
is_login_successful = models.BooleanField(default=False)
session_expired_at = models.DateTimeField(null=True, blank=True)

class Meta(AccessBase.Meta):
app_label = "access_logs"
Loading