Abaixo desenvolvi um processo automatizado para inserir dados em uma tabela do BigQuery a partir de um arquivo .csv. Especificamente usando uma cloud function que será executada automaticamente sempre que um novo arquivo csv for carregado no bucket do Google Cloud Storage.
Para fazer essa POC eu gerei alguns dados fictícios.
Gerei um arquivo CSV com 1000 linhas de dados fictícios pelo site https://www.mockaroo.com/ tente usar o esquema padrão para que se pareça com:
$ head -5 csv_dados_mock.csv
id,first_name,last_name,email,gender,ip_address
1,Austine,Cormode,acormode0@paypal.com,Female,35.80.41.166
2,Gipsy,Bloor,gbloor1@google.ca,Female,217.37.240.238
3,Nichol,Ellissen,nellissen2@purevolume.com,Female,201.214.92.177
4,Amargo,Griffitts,agriffitts3@goo.gl,Female,37.175.107.194
5,Zack,Schneidau,zschneidau4@youtube.com,Male,126.44.217.80
Crie um bucket no Google Cloud Storage
$ gsutil mb gs://bucket-csv-test
Se necessário instale:
$ pip3 install google-cloud-bigquery --upgrade
Crie um dataset no BigQuery
$ bq mk --dataset diogo-dev-projeto:dataset_csv_test
Crie um tabela dentro do dataset correspondente ao esquema definido.
$ bq mk -t dataset_csv_test.tb_csv_test \
id:INTEGER,
first_name:STRING,
last_name:STRING,
email:STRING,
gender:STRING,
ip_address:STRING
a tabela criada no dataset do Bigquery deve estar limpa:
agora chegou a hora de escrever o código python:
import os
from google.cloud import bigquery
def csv_loader(data, context):
client = bigquery.Client()
dataset_id = os.environ['DATASET']
dataset_ref = client.dataset(dataset_id)
job_config = bigquery.LoadJobConfig()
job_config.schema = [
bigquery.SchemaField('id', 'INTEGER'),
bigquery.SchemaField('first_name', 'STRING'),
bigquery.SchemaField('last_name', 'STRING'),
bigquery.SchemaField('email', 'STRING'),
bigquery.SchemaField('gender', 'STRING'),
bigquery.SchemaField('ip_address', 'STRING')
]
job_config.skip_leading_rows = 1
job_config.source_format = bigquery.SourceFormat.CSV
# para carregar o CSV no Storage
uri = 'gs://' + os.environ['BUCKET'] + '/' + data['name']
# Vamos carregar
load_job = client.load_table_from_uri(
uri,
dataset_ref.table(os.environ['TABLE']),
job_config=job_config)
print('Starting job {}'.format(load_job.job_id))
print('Function=cf_csv_test, Version=' + os.environ['VERSION'])
print('File: {}'.format(data['name']))
# Aguarde a conclusão de carga na tabela.
load_job.result()
print('Job finished.')
destination_table = client.get_table(dataset_ref.table(os.environ['TABLE']))
print('Loaded {} rows.'.format(destination_table.num_rows))
Agora vamos criar um arquivo env.yaml que armazenará a configuração das variáveis de ambiente, será armazenado o nome bucket | dataset | tabelas.
BUCKET: bucket-csv-test
DATASET: dataset_csv_test
TABLE: tb_csv_test
VERSION: v14
Crie um arquivo de requisitos.txt para as importações necessárias.
google-cloud
google-cloud-bigquery
Crie um arquivo .gcloudignore para que seus arquivos yaml ou CSV não entrem no deploy no GCP
*csv
*yaml
Até aqui a sua folder deve estar assim:
env.yaml main.py requisitos.txt csv_dados_mock.csv
Agora estamos prontos para fazer o deploy da Cloud Function, vamos adicionar uma trigger no bucket GCS que será acionado sempre que um novo arquivo for adicionado ao bucket. Vou criar a cloud function e chama la de cf_csv_test.
$ gcloud beta functions deploy cf_csv_test \
--runtime=python37 \
--trigger-resource=gs://bucket-csv-test \
--trigger-event=google.storage.object.finalize \
--entry-point=cf_csv_test \
--env-vars-file=env.yaml
Agora que fizemos o deploy da function, vamos copiar o arquivo .csv para o bucket.
$ gsutil cp csv_dados_mock.csv gs://bucket-csv-test/
Copying file://csv_dados_mock.csv [Content-Type=text/csv]...
- [1 files][ 72.4 KiB/ 72.4 KiB]
Operation completed over 1 objects/72.4 KiB.
Agora que copiamos o arquivo csv para o bucket, a function deve ser acionada automaticamente, vejo o log.
$ gcloud functions logs read
[ ... snipped for brevity ... ]
cf_csv_test 274732139359754 2020-10-07 20:12:27.852 Function execution started
cf_csv_test 274732139359754 2020-10-07 20:12:28.492 Starting job 9ca2f39c-539f-454d-aa8e-3299bc9f7287
cf_csv_test 274732139359754 2020-10-07 20:12:28.492 Function=cf_csv_test, Version=v14
cf_csv_test 274732139359754 2020-10-07 20:12:28.492 File: csv_dados_mock.csv
cf_csv_test 274732139359754 2020-10-07 20:12:31.022 Job finished.
cf_csv_test 274732139359754 2020-10-07 20:12:31.136 Loaded 1000 rows.
cf_csv_test 274732139359754 2020-10-07 20:12:31.139 Function execution took 3288 ms, finished with status: 'ok'
Boa, function executada com sucesso!
Vamos dar uma olhada na tabela do BigQuery e checar a contagem de linhas.
$ bq show dataset_csv_test.tb_csv_test
Table diogo-dev-projeto:dataset_csv_test.tb_csv_test
Last modified Schema Total Rows Total Bytes Expiration Time Partitioning Labels
----------------- ----------------------- ------------ ------------- ------------ ------------------- --------
07 Dec 20:19:29 |- id: integer 1000 70950
|- first_name: string
|- last_name: string
|- email: string
|- gender: string
|- ip_address: string
Ótimo, 1000 linhas inseridas.
Conclusão final: cloud functions funcionam muito bem rsrs
Espero ter ajudado!!
Comentarios