Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions admin/base/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
re_path(r'^draft_registrations/', include('admin.draft_registrations.urls', namespace='draft_registrations')),
re_path(r'^files/', include('admin.files.urls', namespace='files')),
re_path(r'^share_reindex/', include('admin.share_reindex.urls', namespace='share_reindex')),
re_path(r'^notifications/', include('admin.notifications.urls', namespace='notifications')),
]),
),
]
Expand Down
8 changes: 8 additions & 0 deletions admin/notifications/forms.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from django import forms
from osf.models import NotificationType


class NotificationTypeForm(forms.ModelForm):
class Meta:
model = NotificationType
fields = '__all__'
14 changes: 14 additions & 0 deletions admin/notifications/urls.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from django.urls import re_path
from . import views

app_name = 'admin'

urlpatterns = [
re_path(r'$', views.NotificationsList.as_view(), name='list'),
re_path(r'types/$', views.NotificationTypeList.as_view(), name='types_list'),
re_path(r'type_display/(?P<pk>\d+)/$', views.NotificationTypeDisplay.as_view(), name='type_display'),
re_path(r'type_detail/(?P<pk>\d+)/$', views.NotificationTypeDetail.as_view(), name='type_detail'),
re_path(r'types_preview/(?P<pk>\d+)/$', views.NotificationTypePreview.as_view(), name='types_preview'),
re_path(r'subscriptions/$', views.NotificationSubscriptionsList.as_view(), name='subscriptions_list'),
re_path(r'email_tasks/$', views.EmailTasksList.as_view(), name='email_tasks_list'),
]
327 changes: 326 additions & 1 deletion admin/notifications/views.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,329 @@
from osf.models.notification_subscription import NotificationSubscription
from django.urls import reverse_lazy
from django.db.models import Q
from osf.models import NotificationSubscription, NotificationType, Notification, EmailTask
from django.views.generic import ListView, DetailView, UpdateView
from django.contrib.auth.mixins import PermissionRequiredMixin
from django.forms.models import model_to_dict
from .forms import NotificationTypeForm
from osf.email import _render_email_html
import json
from collections import defaultdict
from mako.lexer import Lexer
from mako.parsetree import ControlLine
import re

def delete_selected_notifications(selected_ids):
NotificationSubscription.objects.filter(id__in=selected_ids).delete()

TEMPLATE_IDENTIFIER_BLACKLIST = {
'if', 'else', 'and', 'or', 'not', 'in',
'True', 'False', 'len', 'str', 'int', 'float', 'list', 'dict', 'set', 'tuple',
}

def resolve_identifiers(identifier_structure):
structure = defaultdict(dict)
if hasattr(identifier_structure, 'nodes') and identifier_structure.nodes:
for node in identifier_structure.nodes:
if isinstance(node, ControlLine) and node.keyword == 'for':
match = re.match(r'for (\w+) in (.+):', node.text)
if match:
iterator, source = match.groups()
structure[node.text] = {
'type': 'loop',
'iterator': iterator,
'source': source,
'children': resolve_identifiers(node)
}
elif hasattr(node, 'text'):
field_match = re.match(r"(\w+)\['(.+)'\]", node.text)
if field_match:
source, field = field_match.groups()
structure[node.text] = {
'type': 'field',
'source': source,
'field': field
}
return structure

def generate_mock_json(structure, list_name=None):
item = {}
result = {}
for key, value in structure.items():
# simple field
if isinstance(value, dict) and value.get('type') == 'field':
field_name = value['field']
item[field_name] = f"mock_{field_name}"

# nested loop
elif isinstance(value, dict) and value.get('type') == 'loop':
nested_source = value['source']
nested_match = re.match(r"\w+\['(.+)'\]", nested_source)
if nested_match:
nested_field = nested_match.group(1)
item[nested_field] = [1, 2, 3, 4]

# top-level loop wrapper
elif key.startswith('for '):
match = re.match(r'for (\w+) in (.+):', key)
if match:
_, source = match.groups()
# Extract final field name
field_match = re.search(r"(\w+)\['(.+?)'\]$", source)
if field_match:
field_name = field_match.group(1)
list_name = field_match.group(2)
return {field_name: generate_mock_json(value, list_name)}
else:
list_name = source
return generate_mock_json(value, list_name)
if list_name:
result[list_name] = [item, item, item]

return result


def build_safe_context(template: str) -> dict:
templatenode = Lexer(text=template).parse()
identifiers_location = []
for node in templatenode.get_children():
if hasattr(node, 'nodes'):
identifiers_location.extend(node.nodes)

if not identifiers_location:
identifiers_location = templatenode.get_children()
identifier_structure = defaultdict()
for control_structure in identifiers_location:
if isinstance(control_structure, ControlLine):
identifier_structure[control_structure.text] = resolve_identifiers(control_structure)

identifiers = [x.undeclared_identifiers() for x in identifiers_location if hasattr(x, 'undeclared_identifiers')]
flatten_identifiers = set()
for indentifier_set in identifiers:
flatten_identifiers.update(indentifier_set)
mock_json = generate_mock_json(identifier_structure)
context = {identifier: f'mock_{identifier}' for identifier in flatten_identifiers if identifier not in TEMPLATE_IDENTIFIER_BLACKLIST}
context.update(mock_json)
return context

class NotificationsList(PermissionRequiredMixin, ListView):
paginate_by = 25
template_name = 'notifications/notifications_list.html'
ordering = 'id'
permission_required = 'osf.view_notification'
raise_exception = True
model = Notification

def get_queryset(self):
qs = Notification.objects.all().order_by(self.ordering)
q = self.request.GET.get('q')
if q:
qs = qs.filter(
Q(subscription__notification_type__name__icontains=q) |
Q(subscription__user__username__icontains=q) |
Q(subscription__message_frequency__icontains=q)
)
return qs

def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
q = self.request.GET.get('q', '')
context['q'] = q
# append search param to pagination links
if q:
context['extra_query_params'] = f"&q={q}"
else:
context['extra_query_params'] = ''

context['notifications'] = context['object_list']
context['page'] = context['page_obj']
return context

class NotificationSubscriptionsList(PermissionRequiredMixin, ListView):
paginate_by = 25
template_name = 'notifications/notification_subscriptions_list.html'
ordering = 'id'
permission_required = 'osf.view_notificationsubscription'
raise_exception = True
model = NotificationSubscription

def get_queryset(self):
qs = NotificationSubscription.objects.all().order_by(self.ordering)
q = self.request.GET.get('q')
if q:
qs = qs.filter(
Q(notification_type__name__icontains=q) |
Q(user__username__icontains=q) |
Q(message_frequency__icontains=q)
)
return qs

def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
q = self.request.GET.get('q', '')
context['q'] = q
# append search param to pagination links
if q:
context['extra_query_params'] = f"&q={q}"
else:
context['extra_query_params'] = ''
context['subscriptions'] = context['object_list']
context['page'] = context['page_obj']
return context

class EmailTasksList(PermissionRequiredMixin, ListView):
paginate_by = 25
template_name = 'notifications/email_tasks_list.html'
ordering = 'task_id'
permission_required = 'osf.view_emailtask'
raise_exception = True
model = EmailTask

def get_queryset(self):
qs = EmailTask.objects.all().order_by(self.ordering)
q = self.request.GET.get('q')
if q:
qs = qs.filter(
Q(task_id=q) |
Q(user__username__icontains=q) |
Q(status=q)
)
return qs

def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
q = self.request.GET.get('q', '')
context['q'] = q
# append search param to pagination links
if q:
context['extra_query_params'] = f"&q={q}"
else:
context['extra_query_params'] = ''
context['email_tasks'] = context['object_list']
context['page'] = context['page_obj']
return context

class NotificationTypeList(PermissionRequiredMixin, ListView):
paginate_by = 25
template_name = 'notifications/notification_types_list.html'
ordering = 'name'
permission_required = 'osf.view_notificationtype'
raise_exception = True
model = NotificationType

def get_queryset(self):
qs = NotificationType.objects.all().order_by(self.ordering)
q = self.request.GET.get('q')
if q:
qs = qs.filter(
Q(name__icontains=q) |
Q(subject__icontains=q) |
Q(notification_interval_choices__icontains=q)
)
return qs

def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
q = self.request.GET.get('q', '')
context['q'] = q
# append search param to pagination links
if q:
context['extra_query_params'] = f"&q={q}"
else:
context['extra_query_params'] = ''

context['notification_types'] = context['object_list']
context['page'] = context['page_obj']
return context

class NotificationTypeDisplay(PermissionRequiredMixin, DetailView):
Comment thread
Ostap-Zherebetskyi marked this conversation as resolved.
model = NotificationType
template_name = 'notifications/notification_type_detail.html'
permission_required = 'osf.view_notificationtype'
raise_exception = True

def get_object(self, queryset=None):
return NotificationType.objects.get(id=self.kwargs.get('pk'))

def get_context_data(self, *args, **kwargs):
notification_type = self.get_object()
notification_type_dict = model_to_dict(notification_type)
fields = notification_type_dict.copy()
kwargs.setdefault('page_number', self.request.GET.get('page', '1'))
notification_type_dict['is_digest_type'] = notification_type.is_digest_type
kwargs['notification_type'] = notification_type_dict
kwargs['template'] = notification_type_dict.pop('template', None)
kwargs['change_form'] = NotificationTypeForm(initial=fields)

return kwargs

class NotificationTypePreview(PermissionRequiredMixin, DetailView):
model = NotificationType
template_name = 'notifications/notification_type_preview.html'
permission_required = 'osf.view_notificationtype'
raise_exception = True

def get_object(self, queryset=None):
return NotificationType.objects.get(id=self.kwargs.get('pk'))

def get_context_data(self, *args, **kwargs):
notification_type = self.get_object()
raw_context = self.request.GET.get('context')
if raw_context:
try:
if notification_type.is_digest_type:
safe_context = {'notifications': [json.loads(raw_context)]}
else:
safe_context = json.loads(raw_context)

return_context = json.loads(raw_context)
except json.JSONDecodeError as e:
kwargs['rendered_template'] = f"Error parsing JSON: {str(e)}"
kwargs['context'] = raw_context
return kwargs
else:
if notification_type.is_digest_type:
inner_context = build_safe_context(notification_type.template)
inner_template = _render_email_html(notification_type, ctx=inner_context, return_original_error=True)
safe_context = {'notifications': [inner_template]}
return_context = inner_context
else:
safe_context = build_safe_context(notification_type.template)
return_context = safe_context

if notification_type.is_digest_type:
# Use user_digest template as a wrapper for digest notification preview.
template_obj = NotificationType.objects.get(name='user_digest')
else:
template_obj = notification_type
try:
kwargs['rendered_template'] = _render_email_html(template_obj, ctx=safe_context, return_original_error=True)
except Exception as e:
kwargs['rendered_template'] = f"Error rendering template: {str(e)}"

kwargs['context'] = json.dumps(return_context, indent=4)

return kwargs

class NotificationTypeDetail(PermissionRequiredMixin, DetailView):
model = NotificationType
template_name = 'notifications/notification_type_detail.html'
permission_required = 'osf.view_notificationtype'
raise_exception = True

def get(self, request, *args, **kwargs):
view = NotificationTypeDetail.as_view()
return view(request, *args, **kwargs)

def post(self, request, *args, **kwargs):
view = NotificationTypeChangeForm.as_view()
return view(request, *args, **kwargs)

class NotificationTypeChangeForm(PermissionRequiredMixin, UpdateView):
template_name = 'institutions/detail.html'
permission_required = 'osf.change_notificationtype'
raise_exception = True
model = NotificationType
form_class = NotificationTypeForm

def get_success_url(self, *args, **kwargs):
return reverse_lazy('notifications:type_display', kwargs={'pk': self.kwargs.get('pk')})
22 changes: 22 additions & 0 deletions admin/templates/base.html
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,28 @@
</div>
{% endif %}
{% endif %}
{% if perms.osf.view_notification or perms.osf.view_notificationtype or perms.osf.view_notificationsubscription %}
<li><a role="button" data-toggle="collapse" href="#collapseNotifications">
<i class='fa fa-caret-down'></i> Notifications
</a></li>
<div class="collapse" id="collapseNotifications">
<ul class="sidebar-menu sidebar-menu-inner">
{% if perms.osf.view_notificationtype %}
<li><a href="{% url 'notifications:types_list' %}"><i class='fa fa-link'></i> <span>Notification Types</span></a></li>
{% endif %}
{% if perms.osf.view_notification %}
<li><a href="{% url 'notifications:list' %}"><i class='fa fa-link'></i> <span>Notifications</span></a></li>
{% endif %}
{% if perms.osf.view_notificationsubscription %}
<li><a href="{% url 'notifications:subscriptions_list' %}"><i class='fa fa-link'></i> <span>Notification Subscriptions</span></a></li>
{% endif %}
{% if perms.osf.view_emailtask %}
<li><a href="{% url 'notifications:email_tasks_list' %}"><i class='fa fa-link'></i> <span>Email Tasks</span></a></li>
{% endif %}

</ul>
</div>
{% endif %}
{% if perms.osf.view_metrics %}
<li><a href="{% url 'metrics:metrics' %}"><i class='fa fa-link'></i> <span>Metrics</span></a></li>
{% endif %}
Expand Down
Loading
Loading