Welcome to the Treehouse Community

Want to collaborate on code errors? Have bugs you need feedback on? Looking for an extra set of eyes on your latest project? Get support with fellow developers, designers, and programmers of all backgrounds and skill levels here with the Treehouse Community! While you're at it, check out some resources Treehouse students have shared here.

Looking to learn something new?

Treehouse offers a seven day free trial for new students. Get access to thousands of hours of content and join thousands of Treehouse students and alumni in the community today.

Start your free trial

Python

Sami Mäntysaari
Sami Mäntysaari
55 Points

Help wanted with using flask_restplus, how can I use the API doc decorated inside a resource?

Hello everyone, I'm new here so bear with me.

I'm trying to use flask_restplus to be able to document the API while coding it.

I have no idea how can I use the api.doc() in my resource, any pointers? Reference: https://flask-restplus.readthedocs.io/en/stable/swagger.html#documenting-with-the-api-doc-decorator

My run.py file, which is ran when I start my application:

from flask import Flask


def create_app(config_filename):
    app = Flask(__name__)
    app.config.from_object(config_filename)

    from app import api_bp
    app.register_blueprint(api_bp, url_prefix='/')
    from models import db
    db.init_app(app)

    return app


app = create_app("config")

if __name__ == "__main__":
    # app = create_app("config")
    app.run(host='0.0.0.0', debug=True)

app.py:

from flask import Blueprint
from flask_restplus import Api
from resources.Hello import Hello
from resources.Messages import MessageResource
from resources.Messages import CountMessages
from resources.Messages import CountMessagesByChannel
from resources.Profiles import ProfilesResource
from resources.Users import UsersResource

api_bp = Blueprint('api', __name__)
api = Api(api_bp, title="Messis API", version='1.0')

# Routes
api.add_resource(Hello, 'Hello',)
api.add_resource(MessageResource, 'Messages')
api.add_resource(ProfilesResource, 'Profiles')
api.add_resource(CountMessages, 'UserMessageCount')
api.add_resource(CountMessagesByChannel, 'UserMessageCountByChannels')
api.add_resource(UsersResource, 'Users')

Models:

from marshmallow import fields
from flask_marshmallow import Marshmallow
from flask_sqlalchemy import SQLAlchemy

ma = Marshmallow()
db = SQLAlchemy()


class Messages(db.Model):
    __tablename__ = 'discord_messages'
    discord_message_id = db.Column(db.Integer, primary_key=True, autoincrement=True)
    server_id = db.Column(db.BigInteger, nullable=False)
    channel_id = db.Column(db.Integer, db.ForeignKey('discord_channels.channel_id'), unique=True, nullable=False)
    message_id = db.Column(db.BigInteger, nullable=False)
    message_date = db.Column(db.TIMESTAMP(3), server_default=db.func.current_timestamp(), nullable=False)
    person_name = db.Column(db.VARCHAR, nullable=False)
    message_text = db.Column(db.VARCHAR(2000), nullable=True)
    user_id = db.Column(db.BigInteger, nullable=False)

    def __init__(self, server_id, channel_id, message_id, message_date, person_name, message_text, user_id):
        self.server_id = server_id
        self.channel_id = channel_id
        self.message_id = message_id
        self.message_date = message_date
        self.person_name = person_name
        self.message_text = message_text
        self.user_id = user_id


class Channels(db.Model):
    __tablename__ = 'discord_channels'
    channel_id = db.Column(db.BigInteger, primary_key=True, nullable=False)
    channel_name = db.Column(db.VARCHAR, nullable=False)

    def __init__(self, channel_id, channel_name):
        self.channel_id = channel_id
        self.channel_name = channel_name


class Discord_Users(db.Model):
    __tablename__ = 'discord_users'
    uid = db.Column(db.Integer, nullable=False, autoincrement=True)
    user_id = db.Column(db.BigInteger, nullable=False, primary_key=True)
    joined_at = db.Column(db.TIMESTAMP(3), nullable=False)
    permission_granted = db.Column(db.TIMESTAMP(3))
    permission_revoked = db.Column(db.TIMESTAMP(3), nullable=True)
    permission_granted_update = db.Column(db.TIMESTAMP(3), nullable=True)

    def __init__(self, uid, user_id, joined_at, permission_granted, permission_revoked, permission_granted_update):
        self.uid = uid
        self.user_id = user_id
        self.joined_at = joined_at
        self.permission_granted = permission_granted
        self.permission_revoked = permission_revoked
        self.permission_granted_update = permission_granted_update


class Profile(db.Model):
    __tablename__ = 'Users_Profile'
    userid = db.Column(db.BigInteger, primary_key=True)
    settingsid = db.Column(db.Integer, primary_key=True)
    settingsvalue = db.Column(db.VARCHAR(255), nullable=True)

    def __init__(self, userid, settingsid, settingsvalue):
        self.userid = userid
        self.settingsid = settingsid
        self.settingsvalue = settingsvalue


class Users(db.Model):
    __tablename__ = 'Users'
    uid = db.Column(db.Integer, primary_key=True)
    UserID = db.Column(db.BigInteger, nullable=False)
    Roles = db.Column(db.JSON, nullable=False)

    def __init__(self, uid, UserID, Roles):
        self.uid = uid
        self.UserID = UserID
        self.Roles = Roles


class MessagesSchema(ma.Schema):
    discord_message_id = fields.Integer()
    server_id = fields.Integer()
    channel_id = fields.Integer()
    message_id = fields.Integer()
    message_date = fields.DateTime()
    person_name = fields.Field()
    message_text = fields.Field()
    user_id = fields.Integer()


class ChannelsSchema(ma.Schema):
    channel_id = fields.Integer()
    channel_name = fields.String()


class ProfileSchema(ma.Schema):
    uid = fields.Integer()
    UserID = fields.String()
    Roles = fields.Field()

Example resource, aka the profiles:

from flask import request
from flask_restplus import Resource
from models import db, Profile
from sqlalchemy import func
from sqlalchemy.orm.exc import NoResultFound

import requests as http_client


class ProfilesResource(Resource):
    def get(self):
        user_id = request.args.get('user_id')
        action = request.args.get('action')
        if user_id is not None and action is None:
            query = db.session.query(Profile.settingsid, Profile.settingsvalue)\
                .filter(Profile.userid == user_id)\
                .all()
            return {'status': 'success', 'data': query}, 200
        elif action == str("countByCity"):
            settings_value = int(4)
            query = db.session.query(Profile.settingsvalue, func.count(Profile.settingsvalue))\
                .group_by(Profile.settingsvalue)\
                .filter(Profile.settingsid == settings_value)\
                .all()
            return {'status': 'success', 'data': query}, 200
        elif action == str("cities"):
            settings_value = int(4)
            query = db.session.query(Profile.settingsvalue).distinct()\
                .filter(Profile.settingsid == settings_value)\
                .all()
            return {'status': 'success', 'data': query}, 200
        elif action == str("getEtunimi") and user_id is not None:
            settings_value = int(2)
            query = db.session.query(Profile.settingsvalue)\
                .filter(Profile.userid == user_id)\
                .filter(Profile.settingsid == settings_value)\
                .all()
            return {'status': 'success', 'data': query}, 200
        elif action == str("getSukunimi") and user_id is not None:
            settings_value = int(3)
            query = db.session.query(Profile.settingsvalue) \
                .filter(Profile.userid == user_id) \
                .filter(Profile.settingsid == settings_value) \
                .all()
            return {'status': 'success', 'data': query}, 200
        elif action == str("getEmail") and user_id is not None:
            settings_value = int(1)
            query = db.session.query(Profile.settingsvalue) \
                .filter(Profile.userid == user_id) \
                .filter(Profile.settingsid == settings_value) \
                .all()
            return {'status': 'success', 'data': query}, 200
        elif action == str("checkPaikkakunta") and user_id is not None:
            settings_value = int(4)
            try:
                query = db.session.query(Profile.settingsid).filter(Profile.userid == user_id)\
                    .filter(Profile.settingsid == settings_value).one()
                return {'status': 'success', 'data': query}, 200
            except NoResultFound:
                return {'status': 'failed'}, 400
        else:
            return {'status': 'failed', 'message': 'This is not how it works'}, 400

    def post(self):
        user_id = request.args.get('user_id')
        action = request.args.get('action')
        settings_id = request.args.get('settings_id')
        settings_value = request.args.get('settings_value')
        if user_id is None:
            return {'status': 'failed', 'message': 'This is not how it works, you need to '
                                                   'pass something else too for it to work.'}, 400
        elif action == str("update") and settings_id is not None and user_id is not None and settings_value is not None:
            if settings_id == str(4) or settings_id == str(9):
                # Haetaan kaupunkien/kuntien valkoinen lista avoimen datan API:sta.
                client = http_client.get(f'https://www.avoindata.fi/data/fi/data/api/3/action/datastore_search?q='
                                         f'{settings_value}&resource_id=b1cb9870-191f-4616-9c53-5388b7ca6beb')
                whitelist = client.json()
                for entry in whitelist['result']['records']:
                    if entry['KUNTANIMIFI'] == settings_value:
                        profile = Profile.query.filter_by(userid=user_id, settingsid=settings_id).first()
                        profile.settingsvalue = settings_value
                        db.session.commit()
                    return {'status': 'success'}, 200
                return {'status': 'failed', 'message': "Not accepted."}, 400
        elif action == str("insert_new") and settings_id is not None and user_id is not None \
                and settings_value is not None:
            if settings_id == str(4) or settings_id == str(9):
                client = http_client.get(f'https://www.avoindata.fi/data/fi/data/api/3/action/datastore_search?q='
                                         f'{settings_value}&resource_id=b1cb9870-191f-4616-9c53-5388b7ca6beb')
                whitelist = client.json()
                for entry in whitelist['result']['records']:
                    if entry['KUNTANIMIFI'] == settings_value:
                        profile_add = Profile(user_id, settings_id, settings_value)
                        db.session.add(profile_add)
                        db.session.commit()
                        return {'status': 'success'}, 200
        elif action == str("update") and settings_id is not None:
            return {'status': 'failed', 'message': 'This is not how it works, you need to '
                                                   'pass me settings_value too for it to work.'}, 400
        else:
            return {'status': 'failed'}, 400