fastapiでログイン後にJWTを返してみるところまでの実装

Web開発

みなさん、こんにちは。

今回は前回、投稿したJWTの考察を踏まえて。

ユーザ認証(ログイン)後にJWTを発行するところまで実装をやってみます。

最近使っているfastapiを使用して実装します。

環境情報

環境情報は以下の通りです。

# cat /etc/redhat-release
CentOS Linux release 7.8.2003 (Core)

# python --version
Python 3.8.5

fastapiを動作させるためのpackageのインストール

いつも通り仮想環境を作成して、開発します。

# python -m venv my_fastapi_env

# ls
my_fastapi_env

# cd ~/<dev_dir>/my_fastapi

# pip install --upgrade pip
Collecting pip
  Downloading pip-20.2.4-py2.py3-none-any.whl (1.5 MB)
     |████████████████████████████████| 1.5 MB 306 kB/s 
Installing collected packages: pip
  Attempting uninstall: pip
    Found existing installation: pip 20.1.1
    Uninstalling pip-20.1.1:
      Successfully uninstalled pip-20.1.1
Successfully installed pip-20.2.4

# pip install fastapi
Collecting fastapi
  Downloading fastapi-0.61.1-py3-none-any.whl (48 kB)
     |████████████████████████████████| 48 kB 317 kB/s 
Collecting starlette==0.13.6
  Using cached starlette-0.13.6-py3-none-any.whl (59 kB)
Collecting pydantic<2.0.0,>=1.0.0
  Downloading pydantic-1.7.1-cp38-cp38-manylinux2014_x86_64.whl (12.2 MB)
     |████████████████████████████████| 12.2 MB 2.0 MB/s 
Installing collected packages: starlette, pydantic, fastapi
Successfully installed fastapi-0.61.1 pydantic-1.7.1 starlette-0.13.6

# pip install uvicorn
Collecting uvicorn
  Downloading uvicorn-0.12.2-py3-none-any.whl (45 kB)
     |████████████████████████████████| 45 kB 1.3 MB/s 
Collecting click==7.*
  Using cached click-7.1.2-py2.py3-none-any.whl (82 kB)
Collecting h11>=0.8
  Downloading h11-0.11.0-py2.py3-none-any.whl (54 kB)
     |████████████████████████████████| 54 kB 1.9 MB/s 
Installing collected packages: click, h11, uvicorn
Successfully installed click-7.1.2 h11-0.11.0 uvicorn-0.12.2

こんな感じのディレクトリ構成を用意してみました。
今回使用するのはbackends以下だけです。

# tree
.
├── backends
│   └── core-api
│       └── main.py
└── frontends
tiangolo/fastapi
FastAPI framework, high performance, easy to learn, fast to code, ready for production - tiangolo/fastapi

を参考にfastapiの動作テスト用として、以下のファイルを用意する。

## ./backends/core-api/main.py
from typing import Optional

from fastapi import FastAPI

app = FastAPI()


@app.get("/")
def read_root():
    return {"Hello": "World"}


@app.get("/items/{item_id}")
def read_item(item_id: int, q: Optional[str] = None):
    return {"item_id": item_id, "q": q}

以下コマンドでWebサーバを起動する。

# uvicorn main:app --host=0.0.0.0 --port=80 --reload
INFO:     Uvicorn running on http://0.0.0.0:80 (Press CTRL+C to quit)
INFO:     Started reloader process [6052] using statreload
INFO:     Started server process [6054]
INFO:     Waiting for application startup.
INFO:     Application startup complete.

ブラウザ等なんでもいいですが、以下のようにレスポンスがかえってくれば fastapiのTESTは完了!

http://192.168.XX.XX/ => {"Hello": "World"}

ログイン機能を実装するために考える点

機能を実装する前に考える点をまとめました。

  • ユーザ登録 (これは次回取り上げます)
    • Emailの重複確認
    • パスワードのセキュリティチェック(もし必要なら)
    • パスワードをhashとして保存
  • ユーザログイン
    • パスワード認証:hashとして保存してあるパスワードと送られてきたパスワードが同じかどうかを確認する
    • パスワードが同じならJWTを返す
    • パスワードが間違っている or Emailアドレスが間違っているならエラーを返す

ログイン機能を実装してみる

それでは、ついに実装部分です。

まずは主なディレクトリ構成から。

backend/
└── core-api
    ├── alembic
    │   ├── env.py
    │   └── versions
    │       └── 4da86599fece_.py --- (1)
    ├── api
    │   └── api_v1
    │       ├── api.py
    │       └── endpoints
    │           ├── __init__.py
    │           └── login.py --- (2)
    ├── crud
    │   ├── base.py
    │   ├── crud_user.py --- (3)
    │   └── __init__.py
    ├── db
    │   ├── base_class.py
    │   ├── base.py
    │   ├── init_db.py
    │   ├── __init__.py
    │   └── session.py
    ├── keys --- (4)
    │   ├── id_rsa
    │   └── id_rsa.pub
    ├── main.py
    ├── models --- (5)
    │   ├── __init__.py
    │   └── user.py
    └── schemas --- (6)
    │   ├── base.py
    │   ├── __init__.py
    │   ├── token.py
    │   └── user.py
    └── core
        └── security
            └── __init__.py --- (7)

それでは1つずつ書いていきます。

(1) マイグレーションファイル

マイグレーションファイル生成には、alembicを使用しています。

中身はこんな感じです。

from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import mysql

# revision identifiers, used by Alembic.
revision = '4da86599fece'
down_revision = None
branch_labels = None
depends_on = None


def upgrade():
    # ### commands auto generated by Alembic - please adjust! ###
    op.create_table('users',
    sa.Column('id', sa.BigInteger(), nullable=False),
    sa.Column('name', sa.String(length=45), nullable=False),
    sa.Column('email', sa.String(length=255), nullable=False),
    sa.Column('hashed_password', sa.String(length=255), nullable=False),
    sa.Column('created_at', mysql.DATETIME(timezone=True, fsp=6), server_default=sa.text('CURRENT_TIMESTAMP(6)'), nullable=True),
    sa.Column('updated_at', mysql.DATETIME(timezone=True, fsp=6), server_default=sa.text('CURRENT_TIMESTAMP(6)'), nullable=True),
    sa.PrimaryKeyConstraint('id')
    )
    op.create_index(op.f('ix_users_email'), 'users', ['email'], unique=True)
    op.create_index(op.f('ix_users_name'), 'users', ['name'], unique=False)
    # ### end Alembic commands ###


def downgrade():
    # ### commands auto generated by Alembic - please adjust! ###
    op.drop_index(op.f('ix_users_name'), table_name='users')
    op.drop_index(op.f('ix_users_email'), table_name='users')
    op.drop_table('users')
    # ### end Alembic commands ###

(2) ログインエンドポイント

ログインエンドポイントは上で考察したように、email & passwordのチェックを行っています。

CRUDで処理内容をブラックボックス化しているのがポイントです。

※参考にしているテンプレートが最初から、そのような使用方法になっていました。

from typing import Any

from fastapi import APIRouter, Depends, status, HTTPException
from fastapi.responses import JSONResponse
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy.orm import Session
from starlette.requests import Request

import crud, models, schemas
from api import deps
from core import security
from core.config import settings

router = APIRouter()


@router.post("/login", response_model=schemas.Token)
def login_token(
    request: Request, *, db: Session = Depends(deps.get_db), form_data: OAuth2PasswordRequestForm = Depends()
) -> Any:
    user = crud.user.authenticate(db, email=form_data.username, password=form_data.password)
    if not user:
        raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Your email or password is wrong!!")

    return JSONResponse(
        content={
            "token": security.create_token(sub=user.id, key=settings.SECRET_KEY)
        }
    )

(3) CRUD

from typing import Any, Dict, Optional, Union, cast

from sqlalchemy.orm import Session

import crud
from core.security import get_hashed_password, check_password
from crud.base import CRUDBase
from models import User
from schemas.user import UserCreate, UserUpdate


class CRUDUser(CRUDBase[User, UserCreate, UserUpdate]):
    def get_by_email(self, db: Session, *, email: str) -> Optional[User]:
        return cast(Optional[User], db.query(User).filter(User.email == email).first())

    ...

    def authenticate(self, db: Session, *, email: str, password: str) -> Optional[User]:
        instance = self.get_by_email(db, email=email)
        if not instance:
            return None
        if not check_password(password, instance.hashed_password):
            return None
        return instance


user = CRUDUser(User)

(4) JWT用に使用する秘密鍵・公開鍵

単にJWT生成に使用する鍵です。

ssh-keygen -t rsa で作成しました!

(5) モデル

モデルは、sqlalchemyを使用して以下のように定義しました。

from sqlalchemy import Column, String, BigInteger
from sqlalchemy.dialects.mysql import DATETIME
from sqlalchemy.sql.functions import current_timestamp

from db.base_class import Base


class User(Base):
    __tablename__ = "users"

    id = Column(BigInteger, primary_key=True)
    name = Column(String(45), index=True, nullable=False)
    email = Column(String(255), index=True, unique=True, nullable=False)
    hashed_password = Column(String(255), nullable=False)
    created_at = Column(DATETIME(fsp=6), server_default=current_timestamp(6))
    updated_at = Column(
        DATETIME(fsp=6), server_default=current_timestamp(6), onupdate=current_timestamp(6)
    )

(6) スキーマ

以下は、ログイン成功時にTokenを返すための型とJWTからsub(id)を取り出すための型。

※これは、ほぼそのままです。

# token.py
from typing import Optional
from pydantic import BaseModel, Field


class Token(BaseModel):
    token: str


class TokenPayload(BaseModel):
    sub: Optional[int] = None

(7) トークン関連の処理

以下は、いろいろ書いてありますが。

ここがログイン処理をする上で、一番重要な箇所だと思います。

※パスワードのHash化については、またあとで書きます。

ログイン後にトークンを作成しなければいけないのですが、それをcreate_token関数で行っています。

今回のJWT作成で使用したのは、authlibです。

※このauthlibでトークンを作成するために、うえで書いてある秘密鍵を使用しています。

from typing import Any, Union, cast

from authlib.jose import jwt
import bcrypt

from core.config import settings

ALGORITHM = "RS256"


def create_token(sub: Union[str, Any]) -> str:
    header = {'alg': ALGORITHM}
    payload = {'sub': str(sub)}
    key = settings.PRIVATE_KEY
    s = jwt.encode(header, payload, key)
    return cast(str, s.decode())


def get_salt() -> str:
  salt = bcrypt.gensalt(rounds=10, prefix=b'2a')
  return salt


def get_hashed_password(password: str) -> str: 
  return bcrypt.hashpw(password.encode(), get_salt())


def check_password(password: str, hashed: str) -> bool:
    return cast(bool, bcrypt.checkpw(password.encode(), hashed.encode()))

まとめ

えーと、今回Fastapiを使用してログイン処理を実装してみましたが、シンプルにわかりやすく書ける印象を持ちました。

これからエンジニアを志す方にとっても、fastapiは全体的なコードの構成や動作を学ぶためによいと思います。

次回以降に書きたい内容としては、

  • パスワードのHash化について
  • トークンの認証について

といったところでしょうか。

最後まで目を通してくださり、ありがとうございました(*- -)(*_ _)ペコリ

参考

tiangolo/full-stack-fastapi-postgresql
Full stack, modern web application generator. Using FastAPI, PostgreSQL as database, Docker, automatic HTTPS and more. - tiangolo/full-stack-fastapi-postgresql
JSON Web Token (JWT)
The ultimate Python library in building OAuth and OpenID Connect servers. JWS, JWE, JWK, JWA, JWT are included.
タイトルとURLをコピーしました