from datetime import datetime, timedelta
import json
import uuid
from django.contrib.auth import logout, login, update_session_auth_hash
from django.contrib.auth.decorators import login_required
from django.contrib.auth.forms import PasswordChangeForm
from django.contrib.auth.models import User
from django.core.files.base import ContentFile
from django.core.files.storage import default_storage
from django.core.paginator import Paginator
from django.db.models.aggregates import Sum
from django.http.response import HttpResponse
from django.shortcuts import render, get_object_or_404, redirect
from django.urls.base import reverse
from martor.utils import LazyEncoder
from notifications.models import Notification
from notifications.signals import notify
from path import Path
from backlog import settings
from main.forms import StoryForm, EpicForm, RegisterForm, ProfileForm, \
CommentForm, SprintForm, NewSprintForm
from main.models import Story, Epic, Sprint, Comment, Project
def register(request):
if request.method == 'POST':
form = RegisterForm(request.POST)
if form.is_valid():
user = form.save()
login(request, user)
return redirect("index")
else:
form = RegisterForm()
return render(request, 'registration/register.html', {'form': form})
@login_required
def index(request):
epics = Epic.objects.filter(closed=False)
return render(request, 'index.html', {'current_sprint': Sprint.current(), 'epics': epics})
@login_required
def profile_update(request):
if request.method == 'POST':
user = get_object_or_404(User, username=request.user)
form = ProfileForm(request.POST, instance=user)
if form.is_valid():
user = form.save()
login(request, user)
return redirect("index")
else:
user = get_object_or_404(User, username=request.user)
form = ProfileForm(instance=user)
return render(request, 'registration/register.html', {'form': form})
@login_required
def change_password(request):
if request.method == 'POST':
form = PasswordChangeForm(request.user, request.POST)
if form.is_valid():
user = form.save()
update_session_auth_hash(request, user) # Important!
return redirect('index')
else:
form = PasswordChangeForm(request.user)
return render(request, 'registration/change_password.html', {'form': form})
@login_required
def logout(request):
logout(request)
return redirect("index")
@login_required
def backlog_editor(request):
epics = Epic.objects.filter(closed=False)
closed = Epic.objects.filter(closed=True)
return render(request, 'backlog_editor.html', {'epics': epics, 'closed': closed})
def sprint_new(request):
if request.method == 'POST':
form = NewSprintForm(request.POST)
if form.is_valid():
form.save()
return redirect("sprint_end")
else:
sprint = Sprint()
current_sprint = Sprint.current()
sprint.number = current_sprint.number + 1
new_start = current_sprint.date_end + timedelta(days=1)
while new_start.weekday() >= 5:
new_start = new_start + timedelta(days=1)
new_end = new_start + timedelta(days=13)
while new_end.weekday() >= 5:
new_end = new_end - timedelta(days=1)
sprint.date_start = new_start.strftime('%d/%m/%Y')
sprint.date_end = new_end.strftime('%d/%m/%Y')
form = NewSprintForm(instance=sprint)
return render(request, 'sprint_new.html', {'form': form })
def sprint_end(request):
current_sprint = Sprint.current()
next_sprint = Sprint.next()
if not next_sprint:
return redirect("sprint_new")
if request.method == 'POST':
current_sprint.retro = request.POST["retro"]
current_sprint.closed = True
current_sprint.save()
notify_sprint_end(current_sprint, next_sprint, request.user)
return redirect("index")
form = SprintForm(instance=current_sprint)
return render(request, 'sprint_end.html', {'sprint': current_sprint, 'next_sprint': next_sprint, 'form': form})
@login_required
def story_index(request):
sprints = Sprint.objects.all()
users = User.objects.all()
stories = Story.objects
filters = request.GET
if 'state' in filters and filters['state']:
stories = stories.filter(closed=(filters['state'] == 'closed'))
if 'sprint' in filters and filters['sprint']:
if filters['sprint'] == "None":
stories = stories.filter(sprints=None)
else:
stories = stories.filter(sprints__id=filters['sprint'])
if 'author' in filters and filters['author']:
stories = stories.filter(author_id=filters['author'])
if 'assignee' in filters and filters['assignee']:
stories = stories.filter(assignees__id=filters['assignee'])
count = stories.count()
total_weight = stories.aggregate(Sum('weight'))['weight__sum']
paginator = Paginator(stories.all(), 20)
page = request.GET.get('page')
stories = paginator.get_page(page)
return render(request, 'story_index.html', {'stories': stories,
'count': count,
'total_weight': total_weight,
'sprints': sprints,
'users': users,
'pages': range(1, paginator.num_pages + 1)})
@login_required
def story_details(request, story_id):
story = get_object_or_404(Story, id=story_id)
from_ = request.GET['from'] if ('from' in request.GET and request.GET['from']) else 'epic_details';
return render(request, 'story_details.html', {'story': story, 'from': from_})
@login_required
def story_create(request, epic_id=None):
if request.method == 'POST':
form = StoryForm(request.POST)
if form.is_valid():
story = form.save()
for assignee in story.assignees.all():
if assignee != request.user:
notify_story_assigned(story, request.user, assignee)
return redirect("story_details", story.id)
else:
story = Story()
if epic_id is not None:
story.epic = get_object_or_404(Epic, id=epic_id)
story.author = User.objects.get(username=request.user)
form = StoryForm(instance=story)
return render(request, 'story_form.html', {'form': form, 'current_sprint_id': Sprint.current().id})
@login_required
def story_edit(request, story_id):
if request.method == 'POST':
story = get_object_or_404(Story, id=story_id)
former_assignees = list(story.assignees.all())
form = StoryForm(request.POST, instance=story)
if form.is_valid():
form.save()
for assignee in form.instance.assignees.all():
if not assignee in former_assignees and assignee.id != request.user.id:
notify_story_assigned(story, request.user, assignee)
return redirect("story_details", story.id)
else:
story = get_object_or_404(Story, id=story_id)
form = StoryForm(instance=story)
return render(request, 'story_form.html', {'form': form, 'current_sprint_id': Sprint.current().id})
@login_required
def story_delete(request, story_id):
if request.method == 'POST':
story = get_object_or_404(Story, id=story_id)
story.delete()
return redirect("index")
else:
story = get_object_or_404(Story, id=story_id)
return render(request, 'deletion.html', {'object': story})
@login_required
def story_close(request, story_id):
story = get_object_or_404(Story, id=story_id)
story.close()
notify_story_closed(story, request.user)
if request.is_ajax():
return HttpResponse(story.to_json())
else:
return redirect(request.META['HTTP_REFERER'])
@login_required
def story_reaffect(request, story_id):
story = get_object_or_404(Story, id=story_id)
next_sprint = Sprint.next()
story.sprints.add(next_sprint)
story.save()
if request.is_ajax():
return HttpResponse(story.to_json())
else:
return redirect('story_details', story.id)
@login_required
def story_reopen(request, story_id):
story = get_object_or_404(Story, id=story_id)
story.reopen()
notify_story_reopened(story, request.user)
return redirect(request.META['HTTP_REFERER'])
@login_required
def epic_details(request, epic_id):
epic = get_object_or_404(Epic, id=epic_id)
return render(request, 'epic_details.html', {'epic': epic})
@login_required
def epic_create(request):
if request.method == 'POST':
form = EpicForm(request.POST)
if form.is_valid():
epic = form.save(commit=False)
epic.author = User.objects.get(username=request.user)
epic.save()
return redirect("backlog_editor")
else:
form = EpicForm()
return render(request, 'epic_form.html', {'form': form})
@login_required
def epic_edit(request, epic_id, from_=""):
if request.method == 'POST':
epic = get_object_or_404(Epic, id=epic_id)
form = EpicForm(request.POST, instance=epic)
if form.is_valid():
form.save()
if from_:
return redirect(from_)
else:
return redirect("epic_details", epic.id)
else:
epic = get_object_or_404(Epic, id=epic_id)
form = EpicForm(instance=epic)
return render(request, 'epic_form.html', {'form': form})
@login_required
def epic_delete(request, epic_id):
if request.method == 'POST':
epic = get_object_or_404(Epic, id=epic_id)
epic.delete()
return redirect("index")
else:
epic = get_object_or_404(Epic, id=epic_id)
return render(request, 'deletion.html', {'object': epic})
@login_required
def epic_value_update(request, epic_id):
if request.method == 'POST':
epic = get_object_or_404(Epic, id=epic_id)
epic.value = request.POST.get('value')
epic.save()
if request.is_ajax():
return HttpResponse(epic.to_json())
else:
return redirect("backlog_editor")
@login_required
def epic_close(request, epic_id):
epic = get_object_or_404(Epic, id=epic_id)
epic.close()
epic.save()
notify_epic_closed(epic, request.user)
if request.is_ajax():
return HttpResponse(epic.to_json())
else:
return redirect("backlog_editor")
@login_required
def epic_reopen(request, epic_id):
epic = get_object_or_404(Epic, id=epic_id)
epic.reopen()
notify_epic_reopened(epic, request.user)
if request.is_ajax():
return HttpResponse(epic.to_json())
else:
return redirect("backlog_editor")
@login_required
def reports(request):
return render(request, 'reports/report_index.html')
@login_required
def report_sprints(request):
sprints = Sprint.objects.all()
return render(request, 'reports/report_sprints.html', {'sprints': sprints})
@login_required
def report_projects(request):
epics = Epic.objects.filter(closed=False)
return render(request, 'reports/report_projects.html', {'epics': epics})
def report_activity(request):
projects_activity = {}
for project in Project.objects.all():
projects_activity[project] = {}
projects_activity[project]["current"] = 0
projects_activity[project]["sixmonths"] = 0
epics_activity = {}
for epic in Epic.objects.all():
epics_activity[epic] = {}
epics_activity[epic]["current"] = 0
epics_activity[epic]["sixmonths"] = 0
current = True
for sprint in Sprint.objects.filter(date_end__lt = datetime.datetime.today()).order_by('-date_start')[:12]:
for story in sprint.stories.all():
if not story.epic:
continue
if current:
projects_activity[story.epic.project]["current"] += story.weight
epics_activity[story.epic]["current"] += story.weight
projects_activity[story.epic.project]["sixmonths"] += story.weight
epics_activity[story.epic]["sixmonths"] += story.weight
current = False
epics_activity_cleaned = {epic: act for epic, act in epics_activity.items() if act["sixmonths"] > 0 }
return render(request, 'reports/report_activity.html', {'projects_activity': projects_activity, 'epics_activity': epics_activity_cleaned})
# comments
@login_required
def comment_post(request):
form = CommentForm(request.POST)
if form.is_valid():
comment = form.save(commit=False)
comment.author = request.user
comment.save()
notify_comment(comment)
return redirect(request.META['HTTP_REFERER'].split("#")[0] + "#a-comment-{}".format(comment.id))
else:
return redirect(request.META['HTTP_REFERER'])
@login_required
def comment_edit(request, comment_id):
comment = get_object_or_404(Comment, id=comment_id)
form = CommentForm(request.POST, instance=comment)
if form.is_valid():
form.save()
return redirect(request.META['HTTP_REFERER'].split("#")[0] + "#a-comment-{}".format(comment_id))
else:
return redirect(request.META['HTTP_REFERER'])
@login_required
def comment_del(request, comment_id):
comment = get_object_or_404(Comment, id=comment_id)
comment.delete()
return redirect(request.META['HTTP_REFERER'].split("#")[0] + "#a-comment-section")
@login_required
def search(request):
qstr = request.GET["q"]
results = []
results += Epic.objects.filter(name__icontains=qstr)
results += Story.objects.filter(name__icontains=qstr)
results += Epic.objects.filter(description__icontains=qstr)
results += Story.objects.filter(description__icontains=qstr)
if len(results) == 1:
r = results[0]
if isinstance(r, Epic):
return redirect("epic_details", r.id)
else:
return redirect("story_details", r.id)
else:
paginator = Paginator(results, 10)
page = request.GET.get('page')
results = paginator.get_page(page)
return render(request, 'search_results.html', {'results': results, 'pages': range(1, paginator.num_pages + 1)})
# notifications
@login_required
def notif_seen(_, notif_id):
notif = get_object_or_404(Notification, id=notif_id)
notif.mark_as_read()
return HttpResponse('{}')
@login_required
def notif_all_seen(request):
for notif in request.user.notifications.all():
notif.mark_as_read()
return HttpResponse('{}')
def notify_comment(comment):
obj = comment.content_object
target_url = reverse(f"{obj.model_name().lower()}_details", args=(obj.id,))
notif_content= f"{comment.author.username} a commenté {obj.model_name()} #{obj.id}"
for user in obj.contributors():
if user.id != comment.author.id:
notify.send(comment.author, recipient=user, action_object = obj, verb=notif_content)
def notify_story_closed(story, closed_by):
target_url = reverse(f"story_details", args=(story.id,))
notif_content= f"La story #{story.id} a été clôturée par {closed_by}"
notify.send(closed_by, recipient=story.contributors(), action_object = story, verb=notif_content)
def notify_story_reopened(story, opened_by):
target_url = reverse(f"story_details", args=(story.id,))
notif_content= f"La story #{story.id} a été réouverte par {opened_by}"
notify.send(opened_by, recipient=story.contributors(), action_object = story, verb=notif_content)
def notify_epic_closed(epic, closed_by):
target_url = reverse(f"epic_details", args=(epic.id,))
notif_content= f"La story #{epic.id} a été clôturée par {closed_by}"
notify.send(closed_by, recipient=User.objects.all(), action_object = epic, verb=notif_content)
def notify_epic_reopened(epic, opened_by):
target_url = reverse(f"epic_details", args=(epic.id,))
notif_content= f"L'epic #{epic.id} a été réouverte par {opened_by}"
notify.send(opened_by, recipient=User.objects.all(), action_object = epic, verb=notif_content)
def notify_story_assigned(story, assigned_by, assigned_to):
target_url = reverse(f"story_details", args=(story.id,))
notif_content= f"La story #{story.id} vous a été assignée par {assigned_by}"
notify.send(assigned_by, recipient=assigned_to, action_object = story, verb=notif_content)
def notify_sprint_end(sprint, next_sprint, ended_by):
target_url = reverse(f"report_sprints")
notify.send(ended_by, recipient=User.objects.all(), action_object = sprint, verb=f"Fin du {sprint}")
notify.send(ended_by, recipient=User.objects.all(), action_object = sprint, verb=f"Démarrage du {next_sprint}")
@login_required
def md_upload_file(request):
if request.method == 'POST' and request.is_ajax():
if 'markdown-image-upload' in request.FILES:
image = request.FILES['markdown-image-upload']
if image.size > settings.MAX_IMAGE_UPLOAD_SIZE:
data = json.dumps({'status': 405,
'error': _('Maximum image file is %(size) MB.') % {'size': (settings.MAX_IMAGE_UPLOAD_SIZE / (1024 * 1024))} },
cls=LazyEncoder)
return HttpResponse(data, content_type='application/json', status=405)
img_uuid = "{0}-{1}".format(uuid.uuid4().hex[:10], image.name.replace(' ', '-'))
tmp_file = Path(settings.MARTOR_UPLOAD_PATH) / img_uuid
def_path = default_storage.save(tmp_file, ContentFile(image.read()))
img_url = Path(settings.MEDIA_URL) / def_path
data = json.dumps({'status': 200, 'link': img_url, 'name': image.name})
return HttpResponse(data, content_type='application/json')
return HttpResponse(_('Invalid request!'))
return HttpResponse(_('Invalid request!'))