Run Black over the whole codebase

This commit is contained in:
Danielle McLean 2023-08-10 16:52:37 +10:00
parent cd990e4e2f
commit 2e7d12b3e6
Signed by untrusted user: 00dani
GPG key ID: 52C059C3B22A753E
109 changed files with 1539 additions and 1209 deletions

View file

@ -2,4 +2,4 @@ from django.apps import AppConfig
class MicropubConfig(AppConfig):
name = 'micropub'
name = "micropub"

View file

@ -4,33 +4,35 @@ from typing import Optional
def forbidden() -> ResponseException:
return res('forbidden', 403)
return res("forbidden", 403)
def unauthorized() -> ResponseException:
return res('unauthorized', 401)
return res("unauthorized", 401)
def bad_req(msg: str) -> ResponseException:
return res('invalid_request', msg=msg)
return res("invalid_request", msg=msg)
def bad_type(type: str) -> ResponseException:
msg = 'unsupported request type {0}'.format(type)
return res('invalid_request', 415, msg)
msg = "unsupported request type {0}".format(type)
return res("invalid_request", 415, msg)
def bad_scope(scope: str) -> ResponseException:
return res('insufficient_scope', 401, scope=scope)
return res("insufficient_scope", 401, scope=scope)
def res(error: str,
status: Optional[int]=400,
msg: Optional[str]=None,
scope: Optional[str]=None):
content = {'error': error}
def res(
error: str,
status: Optional[int] = 400,
msg: Optional[str] = None,
scope: Optional[str] = None,
):
content = {"error": error}
if msg is not None:
content['error_description'] = msg
content["error_description"] = msg
if scope:
content['scope'] = scope
content["scope"] = scope
return ResponseException(JsonResponse(content, status=status))

View file

@ -2,8 +2,8 @@ from django.urls import path
from .views import micropub
from .views.media import media
app_name = 'micropub'
app_name = "micropub"
urlpatterns = (
path('', micropub, name='micropub'),
path('/media', media, name='media'),
path("", micropub, name="micropub"),
path("/media", media, name="media"),
)

View file

@ -10,22 +10,22 @@ from .delete import delete
from .query import query
actions = {
'create': create,
'delete': delete,
"create": create,
"delete": delete,
}
@csrf_exempt
@require_http_methods(['GET', 'HEAD', 'POST'])
@require_http_methods(["GET", "HEAD", "POST"])
def micropub(request):
request.token = tokens.auth(request)
if request.method in ('GET', 'HEAD'):
if request.method in ("GET", "HEAD"):
return query(request)
action = request.POST.get('action', 'create')
if request.content_type == 'application/json':
action = request.POST.get("action", "create")
if request.content_type == "application/json":
request.json = json.load(request)
action = request.json.get('action', 'create')
action = request.json.get("action", "create")
if action not in actions:
raise error.bad_req('unknown action: {}'.format(action))
raise error.bad_req("unknown action: {}".format(action))
return actions[action](request)

View file

@ -14,63 +14,62 @@ def form_to_mf2(request):
properties = {}
post = request.POST
for key in post.keys():
if key.endswith('[]'):
if key.endswith("[]"):
key = key[:-2]
if key == 'access_token':
if key == "access_token":
continue
properties[key] = post.getlist(key) + post.getlist(key + '[]')
properties[key] = post.getlist(key) + post.getlist(key + "[]")
type = []
if 'h' in properties:
type = ['h-' + p for p in properties['h']]
del properties['h']
return {'type': type, 'properties': properties}
if "h" in properties:
type = ["h-" + p for p in properties["h"]]
del properties["h"]
return {"type": type, "properties": properties}
def create(request):
normalise = {
'application/json': lambda r: r.json,
'application/x-www-form-urlencoded': form_to_mf2,
"application/json": lambda r: r.json,
"application/x-www-form-urlencoded": form_to_mf2,
}
if 'create' not in request.token:
raise error.bad_scope('create')
if "create" not in request.token:
raise error.bad_scope("create")
if request.content_type not in normalise:
raise error.unsupported_type(request.content_type)
body = normalise[request.content_type](request)
if 'type' not in body:
raise error.bad_req('mf2 object type required')
if body['type'] != ['h-entry']:
raise error.bad_req('only h-entry supported')
if "type" not in body:
raise error.bad_req("mf2 object type required")
if body["type"] != ["h-entry"]:
raise error.bad_req("only h-entry supported")
entry = Entry(author=request.token.user)
props = body.get('properties', {})
props = body.get("properties", {})
kind = Note
if 'name' in props:
entry.name = '\n'.join(props['name'])
if "name" in props:
entry.name = "\n".join(props["name"])
kind = Article
if 'content' in props:
entry.content = '\n'.join(
c if isinstance(c, str) else c['html']
for c in props['content']
if "content" in props:
entry.content = "\n".join(
c if isinstance(c, str) else c["html"] for c in props["content"]
)
if 'in-reply-to' in props:
entry.in_reply_to = props['in-reply-to']
if "in-reply-to" in props:
entry.in_reply_to = props["in-reply-to"]
kind = Reply
if 'like-of' in props:
entry.like_of = props['like-of']
if "like-of" in props:
entry.like_of = props["like-of"]
kind = Like
if 'repost-of' in props:
entry.repost_of = props['repost-of']
if "repost-of" in props:
entry.repost_of = props["repost-of"]
kind = Repost
cats = [Cat.objects.from_name(c) for c in props.get('category', [])]
cats = [Cat.objects.from_name(c) for c in props.get("category", [])]
entry.kind = kind.id
entry.save()
entry.cats.set(cats)
entry.save()
for url in props.get('syndication', []):
for url in props.get("syndication", []):
entry.syndications.create(url=url)
base = utils.origin(request)
@ -80,6 +79,6 @@ def create(request):
send_mentions.delay(perma)
res = HttpResponse(status=201)
res['Location'] = perma
res['Link'] = '<{}>; rel="shortlink"'.format(short)
res["Location"] = perma
res["Link"] = '<{}>; rel="shortlink"'.format(short)
return res

View file

@ -6,24 +6,25 @@ from entries.jobs import ping_hub, send_mentions
from .. import error
def delete(request):
normalise = {
'application/json': lambda r: r.json.get('url'),
'application/x-www-form-urlencoded': lambda r: r.POST.get('url'),
"application/json": lambda r: r.json.get("url"),
"application/x-www-form-urlencoded": lambda r: r.POST.get("url"),
}
if 'delete' not in request.token:
raise error.bad_scope('delete')
if "delete" not in request.token:
raise error.bad_scope("delete")
if request.content_type not in normalise:
raise error.unsupported_type(request.content_type)
url = normalise[request.content_type](request)
entry = from_url(url)
if entry.author != request.token.user:
raise error.forbid('entry belongs to another user')
raise error.forbid("entry belongs to another user")
perma = entry.absolute_url
pings = entry.affected_urls
mentions = webmention.findMentions(perma)['refs']
mentions = webmention.findMentions(perma)["refs"]
entry.delete()

View file

@ -11,9 +11,9 @@ from lemoncurry.utils import absolute_url
from .. import error
ACCEPTED_MEDIA_TYPES = (
'image/gif',
'image/jpeg',
'image/png',
"image/gif",
"image/jpeg",
"image/png",
)
@ -21,15 +21,13 @@ ACCEPTED_MEDIA_TYPES = (
@require_POST
def media(request):
token = tokens.auth(request)
if 'file' not in request.FILES:
if "file" not in request.FILES:
raise error.bad_req(
"a file named 'file' must be provided to the media endpoint"
)
file = request.FILES['file']
file = request.FILES["file"]
if file.content_type not in ACCEPTED_MEDIA_TYPES:
raise error.bad_req(
'unacceptable file type {0}'.format(file.content_type)
)
raise error.bad_req("unacceptable file type {0}".format(file.content_type))
mime = None
sha = hashlib.sha256()
@ -40,14 +38,15 @@ def media(request):
if mime != file.content_type:
raise error.bad_req(
'detected file type {0} did not match specified file type {1}'
.format(mime, file.content_type)
"detected file type {0} did not match specified file type {1}".format(
mime, file.content_type
)
)
path = 'mp/{0[0]}/{2}.{1}'.format(*mime.split('/'), sha.hexdigest())
path = "mp/{0[0]}/{2}.{1}".format(*mime.split("/"), sha.hexdigest())
path = store.save(path, file)
url = absolute_url(request, store.url(path))
res = HttpResponse(status=201)
res['Location'] = url
res["Location"] = url
return res

View file

@ -7,48 +7,47 @@ from lemoncurry.utils import absolute_url
from .. import error
def config(request):
config = syndicate_to(request)
config['media-endpoint'] = absolute_url(request, reverse('micropub:media'))
config["media-endpoint"] = absolute_url(request, reverse("micropub:media"))
return config
def source(request):
if 'url' not in request.GET:
raise error.bad_req('must specify url parameter for source query')
entry = from_url(request.GET['url'])
if "url" not in request.GET:
raise error.bad_req("must specify url parameter for source query")
entry = from_url(request.GET["url"])
props = {}
keys = set(request.GET.getlist('properties') + request.GET.getlist('properties[]'))
if not keys or 'content' in keys:
props['content'] = [entry.content]
if (not keys or 'category' in keys) and entry.cats.exists():
props['category'] = [cat.name for cat in entry.cats.all()]
if (not keys or 'name' in keys) and entry.name:
props['name'] = [entry.name]
if (not keys or 'syndication' in keys) and entry.syndications.exists():
props['syndication'] = [synd.url for synd in entry.syndications.all()]
keys = set(request.GET.getlist("properties") + request.GET.getlist("properties[]"))
if not keys or "content" in keys:
props["content"] = [entry.content]
if (not keys or "category" in keys) and entry.cats.exists():
props["category"] = [cat.name for cat in entry.cats.all()]
if (not keys or "name" in keys) and entry.name:
props["name"] = [entry.name]
if (not keys or "syndication" in keys) and entry.syndications.exists():
props["syndication"] = [synd.url for synd in entry.syndications.all()]
return {'type': ['h-entry'], 'properties': props}
return {"type": ["h-entry"], "properties": props}
def syndicate_to(request):
return {'syndicate-to': []}
return {"syndicate-to": []}
queries = {
'config': config,
'source': source,
'syndicate-to': syndicate_to,
"config": config,
"source": source,
"syndicate-to": syndicate_to,
}
def query(request):
if 'q' not in request.GET:
raise error.bad_req('must specify q parameter')
q = request.GET['q']
if "q" not in request.GET:
raise error.bad_req("must specify q parameter")
q = request.GET["q"]
if q not in queries:
raise error.bad_req('unsupported query {0}'.format(q))
raise error.bad_req("unsupported query {0}".format(q))
res = queries[q](request)
return JsonResponse(res)