Google DriveからGoogle Cloud Storageにファイルをコピーする(Python編)

こんにちは。 ピリカ開発チームの九鬼です。

以前、Google DriveからGoogle Cloud Storageにファイルをコピーする方法1を本ブログで紹介しました。当該記事はNode.js向けの内容だったのですが、Python向けに対応できたのでその方法を紹介いたします。

この手法のメリット・デメリット

メリット

  • データ送受信のチャンクサイズにもよるものの、メモリ使用量を抑えることができる
  • コード上でのファイル生成が不要

デメリット

  • ダウンロードとアップロードを交互に行うので、エラーハンドリング・デバッグが若干煩雑になる
    • 途中で失敗してもGoogle Cloud Storage上にファイルができるので、後から削除が必要になる

前提

  • コードの実況環境は Google Cloud Function(第一世代)/Pythonとする

事前準備

  • GCPプロジェクトの「APIとライブラリ」でGoogle Drive APIとCloud Storageが使えるように設定しておく
  • ローカル開発時、GCPの認証を通せるように設定しておく(gcloud auth application-default loginやサービスアカウントキーなど)
  • 依存ライブラリのインストール。以下ライブラリが必要。

    • google-api-python-client: Drive APIを呼び出すために使う。検証済みバージョンは2.52.0
    • google-cloud-storage: Google Cloud Storageアクセスに使用。検証済みバージョンは2.4.0

コード

以下、実装したコードです。本コードのポイントはいくつかあり、

  • Drive APIによるファイルダウンロードを、io.BytesIOクラスを継承したクラスのreadメソッド上で実装する。
    • 同メソッド内でダウンロードしつつ、tell()およびread()を使ってダウンロードしたバイト列を取得・returnする
    • 内部でio.BytesIOクラスのインスタンスを持っておき、データの授受は当インスタンスで行う
  • ChunkedDriveFileStreamないしupload_by_streamにおいて、chunk_sizeは同じバイトサイズを指定する
    • 読み書きのサイズが整合性とれず、正しくデータをチャンクコピーできないため

となります。

import io

from googleapiclient.http import MediaIoBaseDownload


class ChunkedDriveFileStream(io.BytesIO):
    def __init__(
            self, drive_service,   # googleapiclient.discovery.build('drive', 'v3', credentials=...)で得られたDrive APIアクセス用サービス
            file_id: str,
            chunk_size: int
    ):
        super().__init__()

        request = drive_service.files().get_media(fileId=file_id)
        self._fd = io.BytesIO()  
        self._downloader = MediaIoBaseDownload(
            self._fd, request, chunksize=chunk_size)
        self._done = False
        self.chunk_size = chunk_size

    def tell(self) -> int:
        return self._fd.tell()

    def flush(self) -> None:
        self._fd.flush()

    def close(self) -> None:
        self._fd.close()

    def read(self, *args, **kwargs) -> bytes:
        """Google Driveからchunk_size[Byte]だけデータをダウンロードの上、得られたデータをGoogle Cloud Storageアップロード用に返却する
        """
        if self._done:
            return b""  # 処理完了時は空のバイト列を渡して完了通知する
        pos = self._fd.tell()
        progress, done = self._downloader.next_chunk()
        if done:
            self._done = done
        after_pos = self._fd.tell()
        read_bytes = after_pos - pos  # next_chunkで読みだしたバイト数を取得
        self._fh.seek(pos)
        return self._fd.read(read_bytes)

Google Cloud Storageにアップロードする処理

以下、一例です。ポイントは2点で、

  • bucket.blob呼び出し時にchunk_sizeを指定する
  • blob.upload_from_fileにChunkedDriveFileStreamクラスのインスタンスを渡す

となります。

import io

from google.cloud import storage

class CloudStorage:
    def __init__(self, project_id: str):
        self.project_id = project_id

    def upload_by_stream(chunk_stream: ChunkedDriveFileStream, file_name: str):
        client = storage.Client(self.project_id)
        bucket = storage.Bucket(
            client, name="your-bucket-name")  # バケット名は適宜指定する
        blob = bucket.blob(
            file_name,
            chunk_size=chunk_stream.chunk_size)

        # 注: GoogleCloudErrorなどについて、必要な例外処理を行うようにしてください
        blob.upload_from_file(byte_stream)

以上により、chunk_sizeごとにデータをダウンロード・アップロードすることができます!