bigquery-operator documentation¶
Wrapper for usual operations on a fixed BigQuery dataset.
Acknowledgements¶
I am grateful to my employer Easyence for providing me the resources to develop this library and for allowing me to publish it.
Installation¶
$ pip install bigquery-operator
Quickstart¶
Set up an operator.
In the following code, the credentials are inferred from the environment. For further information about how to authenticate to Google Cloud Platform with the Google Cloud Client Library for Python, have a look here.
project_id = 'tmp_project'
dataset_name = 'tmp_dataset'
from bigquery_operator import OperatorQuickSetup
o = OperatorQuickSetup(
project_id,
dataset_name,
credentials=None)
Execute various operations on the dataset tmp_dataset.
>>> o.dataset_exists()
False
>>> o.create_dataset(location='EU')
>>> o.dataset_exists()
True
>>> o.list_tables()
[]
>>> o.table_exists('table_1')
False
>>> o.run_query('select 3.14 as x', 'table_1')
{'duration': 3, 'GB': 0.0}
>>> o.table_exists('table_1')
True
>>> o.get_table_rows('table_1')
[Row((3.14,), {'x': 0})]
>>> o.get_query_rows('select 3.14 as x')
[Row((3.14,), {'x': 0})]
>>> o.create_view('select 2.718 as x', 'table_2')
>>> o.copy_table('table_1', 'copy_table_1')
# Let suppose the table tmp1_project.tmp1_dataset.table_3 exists.
# Then one can copy it into tmp_dataset.
>>> source_dataset_id = 'tmp1_project.tmp1_dataset'
>>> o.copy_table(
>>> 'table_3', 'copy_table_3',
>>> source_dataset_id=source_dataset_id)
# Let suppose the bucket tmp_bucket exists.
# Then one can extract a table from tmp_dataset into tmp_bucket.
>>> bucket_name = 'tmp_bucket'
>>> blob_name = 'table_1-*.csv'
>>> uri = f'gs://{bucket_name}/{blob_name}'
>>> o.extract_table('table_1', uri)
# Conversely, one can load data from tmp_bucket into tmp_dataset.
>>> o.load_table(uri, 'table_10')
# The methods run_queries, copy_tables, extract_tables and
# load_tables are run in parallel. For instance:
>>> queries = [f'select {i}' for i in range(4)]
>>> destination_table_names = [f'table_4{i}' for i in range(4)]
>>> o.run_queries(queries, destination_table_names)
{'duration': 4, 'GB': 0.0}
>>> o.list_tables()
['copy_table_1', 'copy_table_3', 'table_1',
'table_10', 'table_2', 'table_40', 'table_41',
'table_42', 'table_43']
>>> o.delete_table('table_43')
>>> o.list_tables()
['copy_table_1', 'copy_table_3', 'table_1',
'table_10', 'table_2', 'table_40', 'table_41',
'table_42']
>>> from google.cloud import bigquery
>>> schema = [
>>> bigquery.SchemaField('y', 'INTEGER'),
>>> bigquery.SchemaField('z', 'TIMESTAMP')]
>>> time_partitioning = bigquery.TimePartitioning(
>>> field='z', type_='DAY')
>>> o.create_empty_table(
>>> table_name='table_5',
>>> schema=schema,
>>> time_partitioning=time_partitioning)
>>> o.table_is_empty('table_5')
True
>>> query = """
>>> select 5 as y,
>>> cast('2012-11-14 14:32:30' as TIMESTAMP) as z
>>> """
>>> o.run_query(query, 'table_5')
{'duration': 3, 'GB': 0.0}
>>> o.table_is_empty('table_5')
False
>>> o.get_table('table_5').num_rows
1
>>> o.run_query(query, 'table_5',
>>> write_disposition='WRITE_APPEND')
{'duration': 2, 'cost': 0.0}
>>> o.get_table('table_5').num_rows
2
>>> o.list_tables()
['copy_table_1', 'copy_table_3', 'table_1', 'table_10',
'table_2', 'table_40', 'table_41', 'table_42', 'table_5']
>>> o.get_columns('table_1')
['x']
>>> o.get_columns('table_5')
['y', 'z']
>>> o.get_format_attributes('table_1')
{'schema': [
SchemaField('x', 'FLOAT', 'NULLABLE', None, (), None)],
'time_partitioning': None,
'range_partitioning': None,
'require_partition_filter': None,
'clustering_fields': None}
>>> o.get_format_attributes('table_5')
{'schema': [
SchemaField('y', 'INTEGER', 'NULLABLE', None, (), None),
SchemaField('z', 'TIMESTAMP', 'NULLABLE', None, (), None)],
'time_partitioning': TimePartitioning(field='z',type_='DAY'),
'range_partitioning': None,
'require_partition_filter': None,
'clustering_fields': None}
>>> for n in o.list_tables():
>>> o.set_time_to_live(table_name=n, nb_days=5)
>>> o.get_table('table_1').expires
datetime.datetime(
2023, 6, 18, 0, 0,
tzinfo=datetime.timezone.utc)
>>> o.clean_dataset()
>>> o.list_tables()
[]
>>> o.delete_dataset()
>>> o.dataset_exists()
False
Required packages¶
google-cloud-bigquery