File Uploads with FastAPI

Internal library for Backblaze B2

Want more?

This lesson for enrolled students only. Join the course to unlock it!

You can see the code changes implemented in this lecture below.

If you have purchased the course in a different platform, you still have access to the code changes per lecture here on Teclado. The lecture video and lecture notes remain locked.
Join course for $30

New files

storeapi/libs/b2/__init__.py
import logging
from functools import lru_cache

import b2sdk.v2 as b2
from storeapi.config import config

logger = logging.getLogger(__name__)


@lru_cache()
def b2_api():
    logger.debug("Creating and authorizing B2 API")
    info = b2.InMemoryAccountInfo()
    b2_api = b2.B2Api(info)

    b2_api.authorize_account("production", config.B2_KEY_ID, config.B2_APPLICATION_KEY)
    return b2_api


@lru_cache()
def b2_get_bucket(api: b2.B2Api):
    return api.get_bucket_by_name(config.B2_BUCKET_NAME)


def b2_upload_file(local_file: str, file_name: str):
    api = b2_api()
    logger.debug(f"Uploading {local_file} to B2 as {file_name}")
    uploaded_file = b2_get_bucket(api).upload_local_file(
        local_file=local_file, file_name=file_name
    )
    download_url = api.get_download_url_for_fileid(uploaded_file.id_)
    logger.debug(
        f"Uploaded {local_file} to B2 successfully and got download URL {download_url}"
    )
    return download_url

Modified files

storeapi/main.py
--- 
+++ 
@@ -12,12 +12,14 @@

 logger = logging.getLogger(__name__)

+
 @asynccontextmanager
 async def lifespan(app: FastAPI):
     configure_logging()
     await database.connect()
     yield
     await database.disconnect()
+

 app = FastAPI(lifespan=lifespan)
 app.add_middleware(CorrelationIdMiddleware)