みなさん、こんにちは。
今回は前回、投稿したJWTの考察を踏まえて。
ユーザ認証(ログイン)後にJWTを発行するところまで実装をやってみます。
最近使っているfastapiを使用して実装します。
環境情報
環境情報は以下の通りです。
# cat /etc/redhat-release
CentOS Linux release 7.8.2003 (Core)
# python --version
Python 3.8.5
fastapiを動作させるためのpackageのインストール
いつも通り仮想環境を作成して、開発します。
# python -m venv my_fastapi_env
# ls
my_fastapi_env
# cd ~/<dev_dir>/my_fastapi
# pip install --upgrade pip
Collecting pip
Downloading pip-20.2.4-py2.py3-none-any.whl (1.5 MB)
|████████████████████████████████| 1.5 MB 306 kB/s
Installing collected packages: pip
Attempting uninstall: pip
Found existing installation: pip 20.1.1
Uninstalling pip-20.1.1:
Successfully uninstalled pip-20.1.1
Successfully installed pip-20.2.4
# pip install fastapi
Collecting fastapi
Downloading fastapi-0.61.1-py3-none-any.whl (48 kB)
|████████████████████████████████| 48 kB 317 kB/s
Collecting starlette==0.13.6
Using cached starlette-0.13.6-py3-none-any.whl (59 kB)
Collecting pydantic<2.0.0,>=1.0.0
Downloading pydantic-1.7.1-cp38-cp38-manylinux2014_x86_64.whl (12.2 MB)
|████████████████████████████████| 12.2 MB 2.0 MB/s
Installing collected packages: starlette, pydantic, fastapi
Successfully installed fastapi-0.61.1 pydantic-1.7.1 starlette-0.13.6
# pip install uvicorn
Collecting uvicorn
Downloading uvicorn-0.12.2-py3-none-any.whl (45 kB)
|████████████████████████████████| 45 kB 1.3 MB/s
Collecting click==7.*
Using cached click-7.1.2-py2.py3-none-any.whl (82 kB)
Collecting h11>=0.8
Downloading h11-0.11.0-py2.py3-none-any.whl (54 kB)
|████████████████████████████████| 54 kB 1.9 MB/s
Installing collected packages: click, h11, uvicorn
Successfully installed click-7.1.2 h11-0.11.0 uvicorn-0.12.2
こんな感じのディレクトリ構成を用意してみました。
今回使用するのはbackends以下だけです。
# tree
.
├── backends
│ └── core-api
│ └── main.py
└── frontends
を参考にfastapiの動作テスト用として、以下のファイルを用意する。
## ./backends/core-api/main.py
from typing import Optional
from fastapi import FastAPI
app = FastAPI()
@app.get("/")
def read_root():
return {"Hello": "World"}
@app.get("/items/{item_id}")
def read_item(item_id: int, q: Optional[str] = None):
return {"item_id": item_id, "q": q}
以下コマンドでWebサーバを起動する。
# uvicorn main:app --host=0.0.0.0 --port=80 --reload
INFO: Uvicorn running on http://0.0.0.0:80 (Press CTRL+C to quit)
INFO: Started reloader process [6052] using statreload
INFO: Started server process [6054]
INFO: Waiting for application startup.
INFO: Application startup complete.
ブラウザ等なんでもいいですが、以下のようにレスポンスがかえってくれば fastapiのTESTは完了!
http://192.168.XX.XX/ => {"Hello": "World"}
ログイン機能を実装するために考える点
機能を実装する前に考える点をまとめました。
- ユーザ登録 (これは次回取り上げます)
- Emailの重複確認
- パスワードのセキュリティチェック(もし必要なら)
- パスワードをhashとして保存
- ユーザログイン
- パスワード認証:hashとして保存してあるパスワードと送られてきたパスワードが同じかどうかを確認する
- パスワードが同じならJWTを返す
- パスワードが間違っている or Emailアドレスが間違っているならエラーを返す
ログイン機能を実装してみる
それでは、ついに実装部分です。
まずは主なディレクトリ構成から。
backend/
└── core-api
├── alembic
│ ├── env.py
│ └── versions
│ └── 4da86599fece_.py --- (1)
├── api
│ └── api_v1
│ ├── api.py
│ └── endpoints
│ ├── __init__.py
│ └── login.py --- (2)
├── crud
│ ├── base.py
│ ├── crud_user.py --- (3)
│ └── __init__.py
├── db
│ ├── base_class.py
│ ├── base.py
│ ├── init_db.py
│ ├── __init__.py
│ └── session.py
├── keys --- (4)
│ ├── id_rsa
│ └── id_rsa.pub
├── main.py
├── models --- (5)
│ ├── __init__.py
│ └── user.py
└── schemas --- (6)
│ ├── base.py
│ ├── __init__.py
│ ├── token.py
│ └── user.py
└── core
└── security
└── __init__.py --- (7)
それでは1つずつ書いていきます。
(1) マイグレーションファイル
マイグレーションファイル生成には、alembicを使用しています。
中身はこんな感じです。
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import mysql
# revision identifiers, used by Alembic.
revision = '4da86599fece'
down_revision = None
branch_labels = None
depends_on = None
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('users',
sa.Column('id', sa.BigInteger(), nullable=False),
sa.Column('name', sa.String(length=45), nullable=False),
sa.Column('email', sa.String(length=255), nullable=False),
sa.Column('hashed_password', sa.String(length=255), nullable=False),
sa.Column('created_at', mysql.DATETIME(timezone=True, fsp=6), server_default=sa.text('CURRENT_TIMESTAMP(6)'), nullable=True),
sa.Column('updated_at', mysql.DATETIME(timezone=True, fsp=6), server_default=sa.text('CURRENT_TIMESTAMP(6)'), nullable=True),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_users_email'), 'users', ['email'], unique=True)
op.create_index(op.f('ix_users_name'), 'users', ['name'], unique=False)
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index(op.f('ix_users_name'), table_name='users')
op.drop_index(op.f('ix_users_email'), table_name='users')
op.drop_table('users')
# ### end Alembic commands ###
(2) ログインエンドポイント
ログインエンドポイントは上で考察したように、email & passwordのチェックを行っています。
CRUDで処理内容をブラックボックス化しているのがポイントです。
※参考にしているテンプレートが最初から、そのような使用方法になっていました。
from typing import Any
from fastapi import APIRouter, Depends, status, HTTPException
from fastapi.responses import JSONResponse
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy.orm import Session
from starlette.requests import Request
import crud, models, schemas
from api import deps
from core import security
from core.config import settings
router = APIRouter()
@router.post("/login", response_model=schemas.Token)
def login_token(
request: Request, *, db: Session = Depends(deps.get_db), form_data: OAuth2PasswordRequestForm = Depends()
) -> Any:
user = crud.user.authenticate(db, email=form_data.username, password=form_data.password)
if not user:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Your email or password is wrong!!")
return JSONResponse(
content={
"token": security.create_token(sub=user.id, key=settings.SECRET_KEY)
}
)
(3) CRUD
from typing import Any, Dict, Optional, Union, cast
from sqlalchemy.orm import Session
import crud
from core.security import get_hashed_password, check_password
from crud.base import CRUDBase
from models import User
from schemas.user import UserCreate, UserUpdate
class CRUDUser(CRUDBase[User, UserCreate, UserUpdate]):
def get_by_email(self, db: Session, *, email: str) -> Optional[User]:
return cast(Optional[User], db.query(User).filter(User.email == email).first())
...
def authenticate(self, db: Session, *, email: str, password: str) -> Optional[User]:
instance = self.get_by_email(db, email=email)
if not instance:
return None
if not check_password(password, instance.hashed_password):
return None
return instance
user = CRUDUser(User)
(4) JWT用に使用する秘密鍵・公開鍵
単にJWT生成に使用する鍵です。
ssh-keygen -t rsa
で作成しました!
(5) モデル
モデルは、sqlalchemyを使用して以下のように定義しました。
from sqlalchemy import Column, String, BigInteger
from sqlalchemy.dialects.mysql import DATETIME
from sqlalchemy.sql.functions import current_timestamp
from db.base_class import Base
class User(Base):
__tablename__ = "users"
id = Column(BigInteger, primary_key=True)
name = Column(String(45), index=True, nullable=False)
email = Column(String(255), index=True, unique=True, nullable=False)
hashed_password = Column(String(255), nullable=False)
created_at = Column(DATETIME(fsp=6), server_default=current_timestamp(6))
updated_at = Column(
DATETIME(fsp=6), server_default=current_timestamp(6), onupdate=current_timestamp(6)
)
(6) スキーマ
以下は、ログイン成功時にTokenを返すための型とJWTからsub(id)を取り出すための型。
※これは、ほぼそのままです。
# token.py
from typing import Optional
from pydantic import BaseModel, Field
class Token(BaseModel):
token: str
class TokenPayload(BaseModel):
sub: Optional[int] = None
(7) トークン関連の処理
以下は、いろいろ書いてありますが。
ここがログイン処理をする上で、一番重要な箇所だと思います。
※パスワードのHash化については、またあとで書きます。
ログイン後にトークンを作成しなければいけないのですが、それをcreate_token
関数で行っています。
今回のJWT作成で使用したのは、authlibです。
※このauthlibでトークンを作成するために、うえで書いてある秘密鍵を使用しています。
from typing import Any, Union, cast
from authlib.jose import jwt
import bcrypt
from core.config import settings
ALGORITHM = "RS256"
def create_token(sub: Union[str, Any]) -> str:
header = {'alg': ALGORITHM}
payload = {'sub': str(sub)}
key = settings.PRIVATE_KEY
s = jwt.encode(header, payload, key)
return cast(str, s.decode())
def get_salt() -> str:
salt = bcrypt.gensalt(rounds=10, prefix=b'2a')
return salt
def get_hashed_password(password: str) -> str:
return bcrypt.hashpw(password.encode(), get_salt())
def check_password(password: str, hashed: str) -> bool:
return cast(bool, bcrypt.checkpw(password.encode(), hashed.encode()))
まとめ
えーと、今回Fastapiを使用してログイン処理を実装してみましたが、シンプルにわかりやすく書ける印象を持ちました。
これからエンジニアを志す方にとっても、fastapiは全体的なコードの構成や動作を学ぶためによいと思います。
次回以降に書きたい内容としては、
- パスワードのHash化について
- トークンの認証について
といったところでしょうか。
最後まで目を通してくださり、ありがとうございました(*- -)(*_ _)ペコリ