반응형
Notice
Recent Posts
Recent Comments
Link
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | |||||
3 | 4 | 5 | 6 | 7 | 8 | 9 |
10 | 11 | 12 | 13 | 14 | 15 | 16 |
17 | 18 | 19 | 20 | 21 | 22 | 23 |
24 | 25 | 26 | 27 | 28 | 29 | 30 |
Tags
- opensearch
- 카오스 엔지니어링
- 오퍼레이터
- serving
- Kubernetes 인증
- Pulumi
- CANARY
- Model Serving
- CI/CD
- gitea
- Kubeflow
- gitops
- Continuous Deployment
- nginx ingress
- MLflow
- Kopf
- seldon core
- keda
- Litmus
- mlops
- operator
- kubernetes operator
- blue/green
- Argo
- argo rollout
- tekton
- opentelemetry
- Kubernetes
- argocd
- knative
Archives
- Today
- Total
Kubernetes 이야기
Fastapi 모범 사례 본문
반응형
Pydantic을 사용하여 데이터 유효성 검사
from enum import Enum
from pydantic import AnyUrl, BaseModel, EmailStr, Field, constr
class MusicBand(str, Enum):
AEROSMITH = "AEROSMITH"
QUEEN = "QUEEN"
ACDC = "AC/DC"
class UserBase(BaseModel):
first_name: str = Field(min_length=1, max_length=128)
username: constr(regex="^[A-Za-z0-9-_]+$", to_lower=True, strip_whitespace=True)
email: EmailStr
age: int = Field(ge=18, default=None) # must be greater or equal to 18
favorite_band: MusicBand = None # only "AEROSMITH", "QUEEN", "AC/DC" values are allowed to be inputted
website: AnyUrl = None
Depends 를 사용하여 데이터 검증
# dependencies.py
async def valid_post_id(post_id: UUID4) -> Mapping:
post = await service.get_by_id(post_id)
if not post:
raise PostNotFound()
return post
# router.py
@router.get("/posts/{post_id}", response_model=PostResponse)
async def get_post_by_id(post: Mapping = Depends(valid_post_id)):
return post
@router.put("/posts/{post_id}", response_model=PostResponse)
async def update_post(
update_data: PostUpdate,
post: Mapping = Depends(valid_post_id),
):
updated_post: Mapping = await service.update(id=post["id"], data=update_data)
return updated_post
@router.get("/posts/{post_id}/reviews", response_model=list[ReviewsResponse])
async def get_post_reviews(post: Mapping = Depends(valid_post_id)):
post_reviews: list[Mapping] = await reviews_service.get_by_post_id(post["id"])
return post_reviews
체인 종속성
종속성은 다른 종속성을 사용하고 유사한 논리에 대한 코드 반복을 피할 수 있다.
# dependencies.py
from fastapi.security import OAuth2PasswordBearer
from jose import JWTError, jwt
async def valid_post_id(post_id: UUID4) -> Mapping:
post = await service.get_by_id(post_id)
if not post:
raise PostNotFound()
return post
async def parse_jwt_data(
token: str = Depends(OAuth2PasswordBearer(tokenUrl="/auth/token"))
) -> dict:
try:
payload = jwt.decode(token, "JWT_SECRET", algorithms=["HS256"])
except JWTError:
raise InvalidCredentials()
return {"user_id": payload["id"]}
async def valid_owned_post(
post: Mapping = Depends(valid_post_id),
token_data: dict = Depends(parse_jwt_data),
) -> Mapping:
if post["creator_id"] != token_data["user_id"]:
raise UserNotOwner()
return post
# router.py
@router.get("/users/{user_id}/posts/{post_id}", response_model=PostResponse)
async def get_user_post(post: Mapping = Depends(valid_owned_post)):
return post
종속성을 분리하고 재사용합니다. 종속성 호출은 캐시된다.
종속성은 여러 번 재사용할 수 있으며 다시 계산되지 않는다. FastAPI는 기본적으로 요청 범위 내에서 종속성 결과를 캐시한다.
# dependencies.py
from fastapi import BackgroundTasks
from fastapi.security import OAuth2PasswordBearer
from jose import JWTError, jwt
async def valid_post_id(post_id: UUID4) -> Mapping:
post = await service.get_by_id(post_id)
if not post:
raise PostNotFound()
return post
async def parse_jwt_data(
token: str = Depends(OAuth2PasswordBearer(tokenUrl="/auth/token"))
) -> dict:
try:
payload = jwt.decode(token, "JWT_SECRET", algorithms=["HS256"])
except JWTError:
raise InvalidCredentials()
return {"user_id": payload["id"]}
async def valid_owned_post(
post: Mapping = Depends(valid_post_id),
token_data: dict = Depends(parse_jwt_data),
) -> Mapping:
if post["creator_id"] != token_data["user_id"]:
raise UserNotOwner()
return post
async def valid_active_creator(
token_data: dict = Depends(parse_jwt_data),
):
user = await users_service.get_by_id(token_data["user_id"])
if not user["is_active"]:
raise UserIsBanned()
if not user["is_creator"]:
raise UserNotCreator()
return user
# router.py
@router.get("/users/{user_id}/posts/{post_id}", response_model=PostResponse)
async def get_user_post(
worker: BackgroundTasks,
post: Mapping = Depends(valid_owned_post),
user: Mapping = Depends(valid_active_creator),
):
"""Get post that belong the active user."""
worker.add_task(notifications_service.send_email, user["id"])
return post
위에서 parse_jwt_data는 첫 번째 호출에서 한 번만 호출된다.
동기 I/O 작업만 있는 경우 경로를 비동기로 만들지 않는다.
import asyncio
import time
@router.get("/terrible-ping")
async def terrible_catastrophic_ping():
time.sleep(10) # I/O blocking operation for 10 seconds
pong = service.get_pong() # I/O blocking operation to get pong from DB
return {"pong": pong}
@router.get("/good-ping")
def good_ping():
time.sleep(10) # I/O blocking operation for 10 seconds, but in another thread
pong = service.get_pong() # I/O blocking operation to get pong from DB, but in another thread
return {"pong": pong}
@router.get("/perfect-ping")
async def perfect_ping():
await asyncio.sleep(10) # non-blocking I/O operation
pong = await service.async_get_pong() # non-blocking I/O db call
return {"pong": pong}
사용자 정의 기본 모델
from datetime import datetime
from zoneinfo import ZoneInfo
import orjson
from fastapi.encoders import jsonable_encoder
from pydantic import BaseModel, root_validator
def orjson_dumps(v, *, default):
# orjson.dumps returns bytes, to match standard json.dumps we need to decode
return orjson.dumps(v, default=default).decode()
def convert_datetime_to_gmt(dt: datetime) -> str:
if not dt.tzinfo:
dt = dt.replace(tzinfo=ZoneInfo("UTC"))
return dt.strftime("%Y-%m-%dT%H:%M:%S%z")
class ORJSONModel(BaseModel):
class Config:
json_loads = orjson.loads
json_dumps = orjson_dumps
json_encoders = {datetime: convert_datetime_to_gmt} # method for customer JSON encoding of datetime fields
@root_validator()
def set_null_microseconds(cls, data: dict) -> dict:
"""Drops microseconds in all the datetime field values."""
datetime_fields = {
k: v.replace(microsecond=0)
for k, v in data.items()
if isinstance(k, datetime)
}
return {**data, **datetime_fields}
def serializable_dict(self, **kwargs):
"""Return a dict which contains only serializable fields."""
default_dict = super().dict(**kwargs)
return jsonable_encoder(default_dict)
- 명시적 시간대를 사용하여 모든 날짜/시간 필드를 표준 형식으로 직렬화
- 표준 날짜/시간 형식을 사용
API가 공개되지 않은 경우 기본적으로 문서를 숨긴다.
from fastapi import FastAPI
from starlette.config import Config
config = Config(".env") # parse .env file for env variables
ENVIRONMENT = config("ENVIRONMENT") # get current env name
SHOW_DOCS_ENVIRONMENT = ("local", "staging") # explicit list of allowed envs
app_configs = {"title": "My Cool API"}
if ENVIRONMENT not in SHOW_DOCS_ENVIRONMENT:
app_configs["openapi_url"] = None # set url for docs as null
app = FastAPI(**app_configs)
환경설정에 Pydantic의 BaseSettings 사용
https://docs.pydantic.dev/usage/settings/
pydantic의 가장 유용한 응용 프로그램 중 하나는 설정 관리이다.
- 명확하게 정의되고 유형 힌트가 있는 애플리케이션 구성 클래스 만들기
- 환경 변수에서 구성에 대한 수정 사항을 자동으로 읽습니다.
- 원하는 경우 이니셜라이저의 특정 설정을 수동으로 재정의합니다(예: 단위 테스트).
from pydantic import AnyUrl, BaseSettings, PostgresDsn
class AppSettings(BaseSettings):
class Config:
env_file = ".env"
env_file_encoding = "utf-8"
env_prefix = "app_"
DATABASE_URL: PostgresDsn
IS_GOOD_ENV: bool = True
ALLOWED_CORS_ORIGINS: set[AnyUrl]
SQLAlchemy: DB 키 명명 규칙 설정
from sqlalchemy import MetaData
POSTGRES_INDEXES_NAMING_CONVENTION = {
"ix": "%(column_0_label)s_idx",
"uq": "%(table_name)s_%(column_0_name)s_key",
"ck": "%(table_name)s_%(constraint_name)s_check",
"fk": "%(table_name)s_%(column_0_name)s_fkey",
"pk": "%(table_name)s_pkey",
}
metadata = MetaData(naming_convention=POSTGRES_INDEXES_NAMING_CONVENTION)
비동기 Test Client
import pytest
from async_asgi_testclient import TestClient
from src.main import app # inited FastAPI app
@pytest.fixture
async def client():
host, port = "127.0.0.1", "5555"
scope = {"client": (host, port)}
async with TestClient(
app, scope=scope, headers={"X-User-Fingerprint": "Test"}
) as client:
yield client
@pytest.mark.asyncio
async def test_create_post(client: TestClient):
resp = await client.post("/posts")
assert resp.status_code == 201
Chunked로 파일을 저장한다.
import aiofiles
from fastapi import UploadFile
DEFAULT_CHUNK_SIZE = 1024 * 1024 * 50 # 50 megabytes
async def save_video(video_file: UploadFile):
async with aiofiles.open("/file/path/name.mp4", "wb") as f:
while chunk := await video_file.read(DEFAULT_CHUNK_SIZE):
await f.write(chunk)
동기화 SDK를 사용해야 하는 경우 스레드 풀에서 실행
from fastapi import FastAPI
from fastapi.concurrency import run_in_threadpool
from my_sync_library import SyncAPIClient
app = FastAPI()
@app.get("/")
async def call_my_sync_library():
my_data = await service.get_my_data()
client = SyncAPIClient()
await run_in_threadpool(client.make_request, data=my_data)
참고
반응형
'개발 > python' 카테고리의 다른 글
Ubuntu에서 pipenv 실행 시 FileNotFoundError 오류 (0) | 2023.01.28 |
---|---|
Loguru (0) | 2023.01.24 |
fastapi 개발 환경 구성 (0) | 2023.01.22 |
Pipenv로 Python 가상환경 구성 및 패키지 관리하기 (0) | 2023.01.22 |
centos7 python 3.8 설치 (0) | 2022.05.03 |
Comments