106 lines
3.2 KiB
Python
106 lines
3.2 KiB
Python
|
|
from ninja import Router, Schema
|
||
|
|
from django.contrib.auth import authenticate, login, logout
|
||
|
|
from django.http import JsonResponse
|
||
|
|
from django.conf import settings
|
||
|
|
from ninja.errors import HttpError
|
||
|
|
from pydantic import BaseModel
|
||
|
|
from core.models import AppUser
|
||
|
|
|
||
|
|
router = Router(tags=["auth"])
|
||
|
|
|
||
|
|
class LoginSchema(Schema):
|
||
|
|
username: str
|
||
|
|
password: str
|
||
|
|
|
||
|
|
class UserSchema(Schema):
|
||
|
|
id: int
|
||
|
|
username: str
|
||
|
|
email: str
|
||
|
|
is_staff: bool
|
||
|
|
is_superuser: bool
|
||
|
|
|
||
|
|
class SetupSchema(Schema):
|
||
|
|
username: str
|
||
|
|
password: str
|
||
|
|
email: str
|
||
|
|
|
||
|
|
@router.get("/has-users")
|
||
|
|
def check_has_users(request):
|
||
|
|
"""Returns True if any users exist in the database."""
|
||
|
|
return {"has_users": AppUser.objects.exists()}
|
||
|
|
|
||
|
|
@router.post("/setup")
|
||
|
|
def setup_admin(request, payload: SetupSchema):
|
||
|
|
"""Allows creating a superuser if and only if the database is entirely empty."""
|
||
|
|
if AppUser.objects.exists():
|
||
|
|
raise HttpError(403, "Database already has users. Cannot run initial setup.")
|
||
|
|
|
||
|
|
user = AppUser.objects.create_superuser(
|
||
|
|
username=payload.username,
|
||
|
|
email=payload.email,
|
||
|
|
password=payload.password
|
||
|
|
)
|
||
|
|
|
||
|
|
# Bootstrap a default library now that the first user exists.
|
||
|
|
from core.services.bootstrap import ensure_default_library
|
||
|
|
ensure_default_library()
|
||
|
|
|
||
|
|
# Log them in automatically
|
||
|
|
user = authenticate(request, username=payload.username, password=payload.password)
|
||
|
|
login(request, user)
|
||
|
|
request.session.set_expiry(settings.SESSION_COOKIE_AGE)
|
||
|
|
|
||
|
|
from django.middleware.csrf import get_token
|
||
|
|
csrf_token = get_token(request)
|
||
|
|
|
||
|
|
return {
|
||
|
|
"success": True,
|
||
|
|
"user": {
|
||
|
|
"id": user.id,
|
||
|
|
"username": user.username,
|
||
|
|
"email": user.email,
|
||
|
|
"is_staff": user.is_staff,
|
||
|
|
"is_superuser": user.is_superuser
|
||
|
|
},
|
||
|
|
"csrf_token": csrf_token
|
||
|
|
}
|
||
|
|
|
||
|
|
@router.post("/login")
|
||
|
|
def auth_login(request, payload: LoginSchema):
|
||
|
|
# Passwords in django are already PBKDF2 hashed and verified through `authenticate`
|
||
|
|
user = authenticate(request, username=payload.username, password=payload.password)
|
||
|
|
|
||
|
|
if user is not None:
|
||
|
|
login(request, user)
|
||
|
|
request.session.set_expiry(settings.SESSION_COOKIE_AGE)
|
||
|
|
# We also need to get the CSRF token to be placed in the cookie so the frontend can send it back natively.
|
||
|
|
from django.middleware.csrf import get_token
|
||
|
|
csrf_token = get_token(request)
|
||
|
|
return {
|
||
|
|
"success": True,
|
||
|
|
"user": {
|
||
|
|
"id": user.id,
|
||
|
|
"username": user.username,
|
||
|
|
"email": user.email,
|
||
|
|
"is_staff": user.is_staff,
|
||
|
|
"is_superuser": user.is_superuser
|
||
|
|
},
|
||
|
|
"csrf_token": csrf_token
|
||
|
|
}
|
||
|
|
else:
|
||
|
|
raise HttpError(401, "Invalid username or password")
|
||
|
|
|
||
|
|
@router.post("/logout")
|
||
|
|
def auth_logout(request):
|
||
|
|
logout(request)
|
||
|
|
resp = JsonResponse({"success": True})
|
||
|
|
resp.delete_cookie(settings.SESSION_COOKIE_NAME, path='/')
|
||
|
|
return resp
|
||
|
|
|
||
|
|
@router.get("/me", response=UserSchema)
|
||
|
|
def auth_me(request):
|
||
|
|
if request.user.is_authenticated:
|
||
|
|
return request.user
|
||
|
|
else:
|
||
|
|
raise HttpError(401, "Not authenticated")
|