I'm attempting to generate snapshots for my BigQuery tables and transfer them to Google Cloud Storage. While I can successfully create the snapshots, I encounter an error when attempting to move them to GCS.
google.api_core.exceptions.BadRequest: 400 melodic-map-408207:kebron_dataset.GCS_SNAPSHOT_customer_2024-02-28 is not allowed for this operation because it is currently a SNAPSHOT.; reason: invalid, message: melodic-map-408207:kebron_dataset.GCS_SNAPSHOT_customer_2024-02-28 is not allowed for this operation because it is currently a SNAPSHOT
Below is my code
import os
import base64
import datetime
import json
from dotenv import load_dotenv
from google.cloud import bigquery
from google.cloud import storage
from google.oauth2 import service_account
from google.auth.transport.requests import AuthorizedSession
load_dotenv()
# Access environment variables
SERVICE_ACCOUNT_FILE = 'credentials.json'
PROJECT_ID = os.getenv("PROJECT_ID")
DATASET_ID = os.getenv("DATASET_ID")
FILE_EXTENSION = os.getenv("FILE_EXTENSION")
# Construct the URL for the API endpoint
BASE_URL = f"https://bigquery.googleapis.com"
credentials = service_account.Credentials.from_service_account_file(
SERVICE_ACCOUNT_FILE,
scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
def move_snapshots_to_gcs(env_vars):
# acess environment variables from scheduler
BUCKET_NAME = env_vars.get("BUCKET_NAME")
LOCATION = env_vars.get("LOCATION")
authed_session = AuthorizedSession(credentials)
# Generate a timestamp string
timestamp = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
# Generate the folder name with the current date
folder_name = f'SNAPSHOT_{timestamp.split("_")[0]}'
# Initialize a GCS client
storage_client = storage.Client(credentials=credentials)
# Get the bucket
bucket = storage_client.bucket(BUCKET_NAME)
# Check if the folder exists in the bucket
folder_blob = bucket.blob(folder_name + "/")
if not folder_blob.exists():
# Create the folder
folder_blob.upload_from_string("")
print(f"Folder {folder_name} created.")
# Check if the flag file exists in the folder of that bucket
flag_blob = bucket.blob(f"{folder_name}/backup_flag")
if flag_blob.exists():
print("Backup has already been performed. Exiting...")
return
# Construct a BigQuery client object with the service account credentials
client = bigquery.Client(credentials=credentials)
# List tables in the specified dataset
tables = client.list_tables(DATASET_ID)
print("Tables contained in '{}':".format(DATASET_ID))
for table in tables:
TABLE_ID=table.table_id
SNAPSHOT_TABLE_ID=f"GCS_SNAPSHOT_{TABLE_ID}_{timestamp.split('_')[0]}"
# Make the GET request with the access token
response_status_code = create_table_snapshot()
print(response_status_code)
if response_status_code == 200:
print(response_status_code)
# Define the filename with the timestamp and filetype as parquet
filename = f"{table.table_id.split('.')[-1]}_{timestamp}.{FILE_EXTENSION}"
# Format the destination URI
destination_uri = f"gs://{BUCKET_NAME}/{folder_name}/{filename}"
dataset_ref = bigquery.DatasetReference(PROJECT_ID, DATASET_ID)
table_ref = dataset_ref.table(SNAPSHOT_TABLE_ID)
extract_job = client.extract_table(
table_ref,
destination_uri,
# Location must match that of the source table.
location=LOCATION,
) # API request
extract_job.result() # Waits for job to complete.
print(
"Exported {}:{}.{} to {}".format(table.project, table.dataset_id, table.table_id, destination_uri)
)
def build_json_body(TABLE_ID, SNAPSHOT_TABLE_ID):
json_body = f'''
{{
"configuration": {{
"copy": {{
"sourceTables": [
{{
"projectId": "{PROJECT_ID}",
"datasetId": "{DATASET_ID}",
"tableId": "{TABLE_ID}"
}}
],
"destinationTable":
{{
"projectId": "{PROJECT_ID}",
"datasetId": "{DATASET_ID}",
"tableId": "{SNAPSHOT_TABLE_ID}"
}},
"operationType": "SNAPSHOT",
"writeDisposition": "WRITE_EMPTY"
}}
}}
}}
'''
return json.loads(json_body)
def create_table_snapshot():
# Generate a timestamp string
timestamp = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
authed_session = AuthorizedSession(credentials)
# Construct a BigQuery client object with the service account credentials
client = bigquery.Client(credentials=credentials)
# List tables in the specified dataset
tables = client.list_tables(DATASET_ID)
last_response_status = None # Variable to store the last response status code
for table in tables:
TABLE_ID=table.table_id
SNAPSHOT_TABLE_ID=f"GCS_SNAPSHOT_{TABLE_ID}_{timestamp.split('_')[0]}"
json = build_json_body(TABLE_ID, SNAPSHOT_TABLE_ID)
# Make the POST request to create snapshot
response = authed_session.post(
url=f'{BASE_URL}/bigquery/v2/projects/{PROJECT_ID}/jobs',
json=json,
headers={'Content-Type': 'application/json'}
)
print(response.json())
last_response_status = response.status_code # Update the status code
return last_response_status
if __name__ == "__main__":
#print(build_json_body('melodic-map-408207', 'kebron_dataset', 'student'))
move_snapshots_to_gcs({'BUCKET_NAME': 'kebron_daily_backup_bucket', 'LOCATION': 'US'})
I attempted to do it manually, but encountered a similar error.

Shouldn't this be working correctly? Why isn't it functioning as expected?


Based on your use case, you want to "Export table data to Cloud Storage". The document can guide you through it, either coded or via the console UI.
Make sure to review the limitations, needed permissions and location considerations as mentioned by @Kolban
Sample Code: