こんにちは。 ピリカ開発チームの九鬼です。
以前、Google DriveからGoogle Cloud Storageにファイルをコピーする方法1を本ブログで紹介しました。当該記事はNode.js向けの内容だったのですが、Python向けに対応できたのでその方法を紹介いたします。
この手法のメリット・デメリット
メリット
- データ送受信のチャンクサイズにもよるものの、メモリ使用量を抑えることができる
- コード上でのファイル生成が不要
デメリット
前提
事前準備
- GCPプロジェクトの「APIとライブラリ」でGoogle Drive APIとCloud Storageが使えるように設定しておく
- ローカル開発時、GCPの認証を通せるように設定しておく(
gcloud auth application-default login
やサービスアカウントキーなど) 依存ライブラリのインストール。以下ライブラリが必要。
コード
以下、実装したコードです。本コードのポイントはいくつかあり、
- Drive APIによるファイルダウンロードを、io.BytesIOクラスを継承したクラスのreadメソッド上で実装する。
- ChunkedDriveFileStreamないしupload_by_streamにおいて、chunk_sizeは同じバイトサイズを指定する
- 読み書きのサイズが整合性とれず、正しくデータをチャンクコピーできないため
となります。
import io from googleapiclient.http import MediaIoBaseDownload class ChunkedDriveFileStream(io.BytesIO): def __init__( self, drive_service, # googleapiclient.discovery.build('drive', 'v3', credentials=...)で得られたDrive APIアクセス用サービス file_id: str, chunk_size: int ): super().__init__() request = drive_service.files().get_media(fileId=file_id) self._fd = io.BytesIO() self._downloader = MediaIoBaseDownload( self._fd, request, chunksize=chunk_size) self._done = False self.chunk_size = chunk_size def tell(self) -> int: return self._fd.tell() def flush(self) -> None: self._fd.flush() def close(self) -> None: self._fd.close() def read(self, *args, **kwargs) -> bytes: """Google Driveからchunk_size[Byte]だけデータをダウンロードの上、得られたデータをGoogle Cloud Storageアップロード用に返却する """ if self._done: return b"" # 処理完了時は空のバイト列を渡して完了通知する pos = self._fd.tell() progress, done = self._downloader.next_chunk() if done: self._done = done after_pos = self._fd.tell() read_bytes = after_pos - pos # next_chunkで読みだしたバイト数を取得 self._fh.seek(pos) return self._fd.read(read_bytes)
Google Cloud Storageにアップロードする処理
以下、一例です。ポイントは2点で、
となります。
import io from google.cloud import storage class CloudStorage: def __init__(self, project_id: str): self.project_id = project_id def upload_by_stream(chunk_stream: ChunkedDriveFileStream, file_name: str): client = storage.Client(self.project_id) bucket = storage.Bucket( client, name="your-bucket-name") # バケット名は適宜指定する blob = bucket.blob( file_name, chunk_size=chunk_stream.chunk_size) # 注: GoogleCloudErrorなどについて、必要な例外処理を行うようにしてください blob.upload_from_file(byte_stream)
以上により、chunk_sizeごとにデータをダウンロード・アップロードすることができます!