summaryrefslogtreecommitdiffstats
path: root/api/controllers/api/pleroma_admin.py
blob: 9fabdf1c6c060047d86162002f276e023ca3836d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
from flask import Blueprint, request, jsonify, abort
from models import db, User
from app_oauth import require_oauth
from authlib.flask.oauth2 import current_token

bp_api_pleroma_admin = Blueprint("bp_api_pleroma_admin", __name__)

# This API copy the pleroma one, so admin-fe can be used, at least with users, and basic usage


@bp_api_pleroma_admin.route("/api/pleroma/admin/users", methods=["GET"])
@require_oauth("read")
def list_users():
    """
    List users.
    ---
    tags:
        - Admin
    security:
        - OAuth2:
            - read
    responses:
        200:
            description: Returns list of users.
    """
    user = current_token.user
    if not user.is_admin():
        abort(403)

    # query params, optionals
    # p_query = request.args.get("query", None)
    # p_filters = request.args.get("filters", None)
    p_page = int(request.args.get("page", 1))
    p_page_size = int(request.args.get("page_size", 50))
    # p_tags = request.args.get("tags", None)
    # p_name = request.args.get("name", None)
    # p_email = request.args.get("email", None)

    q = User.query.paginate(page=p_page, per_page=p_page_size)

    resp = {"page_size": p_page_size, "count": len(q.items), "users": []}
    for i in q.items:
        resp["users"].append(
            {
                "deactivated": False,
                "id": i.id,
                "nickname": i.name,
                "roles": {"admin": i.is_admin(), "moderator": False},  # not implemented, pleroma specific
                "local": True,
                "tags": [],  # not implemented, pleroma specific
            }
        )
    response = jsonify(resp)
    response.mimetype = "application/json; charset=utf-8"
    response.status_code = 200
    return response


@bp_api_pleroma_admin.route("/api/pleroma/admin/users", methods=["DELETE"])
@require_oauth("read")
def remove_user():
    """
    Delete an user.
    ---
    tags:
        - Admin
    security:
        - OAuth2:
            - read
    parameters:
        - name: nickname
          in: query
          type: string
          required: true
          description: User username
    responses:
        200:
            description: Returns the username.
    """
    user = current_token.user
    if not user.is_admin():
        abort(403)

    username = request.args.get("nickname", None)
    if not username:
        abort(400)

    # we need to delete user
    user = User.query.filter(User.name == username).one()
    if not user:
        abort(404)

    # FIXME: CASCADE stuff things
    db.session.delete(user.actor[0])
    db.session.delete(user)
    db.session.commit()

    response = jsonify(username)
    response.mimetype = "application/json; charset=utf-8"
    response.status_code = 200
    return response


@bp_api_pleroma_admin.route("/api/pleroma/admin/users/<int:user_id>", methods=["GET"])
@require_oauth("read")
def infos_user(user_id):
    """
    Get user infos.
    ---
    tags:
        - Admin
    security:
        - OAuth2:
            - read
    parameters:
        - name: user_id
          in: path
          type: string
          required: true
          description: User ID
    responses:
        200:
            description: Returns user.
    """
    user = current_token.user
    if not user.is_admin():
        abort(403)

    if not user_id:
        abort(400)

    user = User.query.filter(User.id == user_id).one()
    if not user:
        abort(404)

    return jsonify(
        id=user.id,
        username=user.name,
        acct=user.name,
        display_name=user.display_name,
        locked=False,
        created_at=user.created_at,
        followers_count=user.actor[0].followers.count(),
        following_count=user.actor[0].followings.count(),
        statuses_count=user.sounds.count(),
        note=user.actor[0].summary,
        url=user.actor[0].url,
        avatar="",
        avatar_static="",
        header="",
        header_static="",
        emojis=[],
        moved=None,
        fields=[],
        bot=False,
        source={
            "privacy": "unlisted",
            "sensitive": False,
            "language": user.locale,
            "note": user.actor[0].summary,
            "fields": [],
        },
        pleroma={"is_admin": user.is_admin()},
        tags=[],
        roles={"admin": user.is_admin(), "moderator": False},  # not implemented, pleroma specific
        local=True,
        deactivated=False,
        nickname=user.name,
    )


# placeholder, returns nothing for now
@bp_api_pleroma_admin.route("/api/pleroma/admin/users/<int:user_id>/statuses", methods=["GET"])
@require_oauth("read")
def statuses_user(user_id):
    """
    Get user statuses.
    ---
    tags:
        - Admin
    security:
        - OAuth2:
            - read
    parameters:
        - name: user_id
          in: path
          type: string
          required: true
          description: User ID
    responses:
        200:
            description: Returns an empty list.
    """
    # params: godmode
    user = current_token.user
    if not user.is_admin():
        abort(403)

    return jsonify([])


@bp_api_pleroma_admin.route("/api/pleroma/admin/*", methods=["GET", "POST", "PUT", "DELETE", "PATCH"])
@require_oauth("read")
def unimplemented():
    user = current_token.user
    if not user.is_admin():
        abort(403)

    abort(501)  # not implemented