Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,30 @@ import (
"google.golang.org/api/bigquery/v2"
)

func bigqueryTableToMap(table *bigquery.Table) (map[string]interface{}, error) {
b, err := json.Marshal(table)
if err != nil {
return nil, err
}
var m map[string]interface{}
if err := json.Unmarshal(b, &m); err != nil {
return nil, err
}
return m, nil
}

func bigqueryTableFromMap(rawRes map[string]interface{}) (*bigquery.Table, error) {
b, err := json.Marshal(rawRes)
if err != nil {
return nil, err
}
var res bigquery.Table
if err := json.Unmarshal(b, &res); err != nil {
return nil, err
}
return &res, nil
}

func bigQueryTableSortArrayByName(array []interface{}) {
sort.Slice(array, func(i, k int) bool {
return array[i].(map[string]interface{})["name"].(string) < array[k].(map[string]interface{})["name"].(string)
Expand Down Expand Up @@ -2057,16 +2081,24 @@ func resourceBigQueryTableCreate(d *schema.ResourceData, meta interface{}) error
}

replicationDDL = fmt.Sprintf("%s AS REPLICA OF %s.%s.%s", replicationDDL, tableReplicationInfo["source_project_id"], tableReplicationInfo["source_dataset_id"], tableReplicationInfo["source_table_id"])
useLegacySQL := false

req := &bigquery.QueryRequest{
Query: replicationDDL,
UseLegacySql: &useLegacySQL,
}

log.Printf("[INFO] Creating a replica materialized view with DDL: '%s'", replicationDDL)

_, err := NewClient(config, userAgent).Jobs.Query(project, req).Do()
queriesURL, err := tpgresource.ReplaceVars(d, config, "{{"{{"}}BigQueryBasePath{{"}}"}}projects/{{"{{"}}project{{"}}"}}/queries")
if err != nil {
return err
}
_, err = transport_tpg.SendRequest(transport_tpg.SendRequestOptions{
Config: config,
Method: "POST",
Project: project,
RawURL: queriesURL,
UserAgent: userAgent,
Body: map[string]any{
"query": replicationDDL,
"useLegacySql": false,
},
})

id := fmt.Sprintf("projects/%s/datasets/%s/tables/%s", project, datasetID, d.Get("table_id").(string))
if err != nil {
Expand All @@ -2093,31 +2125,84 @@ func resourceBigQueryTableCreate(d *schema.ResourceData, meta interface{}) error

log.Printf("[INFO] Creating BigQuery table: %s without schema", table.TableReference.TableId)

res, err := NewClient(config, userAgent).Tables.Insert(project, datasetID, table).Do()
tableInsertURL, err := tpgresource.ReplaceVars(d, config, "{{"{{"}}BigQueryBasePath{{"}}"}}projects/{{"{{"}}project{{"}}"}}/datasets/{{"{{"}}dataset_id{{"}}"}}/tables")
if err != nil {
return err
}
tableBody, err := bigqueryTableToMap(table)
if err != nil {
return err
}
insertRawRes, err := transport_tpg.SendRequest(transport_tpg.SendRequestOptions{
Config: config,
Method: "POST",
Project: project,
RawURL: tableInsertURL,
UserAgent: userAgent,
Body: tableBody,
})
if err != nil {
return err
}
insertedRes, err := bigqueryTableFromMap(insertRawRes)
if err != nil {
return err
}

log.Printf("[INFO] BigQuery table %s has been created", res.Id)
d.SetId(fmt.Sprintf("projects/%s/datasets/%s/tables/%s", res.TableReference.ProjectId, res.TableReference.DatasetId, res.TableReference.TableId))
log.Printf("[INFO] BigQuery table %s has been created", insertedRes.Id)
d.SetId(fmt.Sprintf("projects/%s/datasets/%s/tables/%s", insertedRes.TableReference.ProjectId, insertedRes.TableReference.DatasetId, insertedRes.TableReference.TableId))

table.Schema = schemaBack
log.Printf("[INFO] Updating BigQuery table: %s with schema", table.TableReference.TableId)
if _, err = NewClient(config, userAgent).Tables.Update(project, datasetID, res.TableReference.TableId, table).Do(); err != nil {
tableUpdateURL, err := tpgresource.ReplaceVars(d, config, "{{"{{"}}BigQueryBasePath{{"}}"}}projects/{{"{{"}}project{{"}}"}}/datasets/{{"{{"}}dataset_id{{"}}"}}/tables/{{"{{"}}table_id{{"}}"}}")
if err != nil {
return err
}
tableBodyWithSchema, err := bigqueryTableToMap(table)
if err != nil {
return err
}
if _, err = transport_tpg.SendRequest(transport_tpg.SendRequestOptions{
Config: config,
Method: "PUT",
Project: project,
RawURL: tableUpdateURL,
UserAgent: userAgent,
Body: tableBodyWithSchema,
}); err != nil {
return err
}

log.Printf("[INFO] BigQuery table %s has been updated with schema", res.Id)
log.Printf("[INFO] BigQuery table %s has been updated with schema", insertedRes.Id)
} else {
log.Printf("[INFO] Creating BigQuery table: %s", table.TableReference.TableId)

res, err := NewClient(config, userAgent).Tables.Insert(project, datasetID, table).Do()
tableInsertURL, err := tpgresource.ReplaceVars(d, config, "{{"{{"}}BigQueryBasePath{{"}}"}}projects/{{"{{"}}project{{"}}"}}/datasets/{{"{{"}}dataset_id{{"}}"}}/tables")
if err != nil {
return err
}
tableBody, err := bigqueryTableToMap(table)
if err != nil {
return err
}
insertRawRes, err := transport_tpg.SendRequest(transport_tpg.SendRequestOptions{
Config: config,
Method: "POST",
Project: project,
RawURL: tableInsertURL,
UserAgent: userAgent,
Body: tableBody,
})
if err != nil {
return err
}
insertedRes, err := bigqueryTableFromMap(insertRawRes)
if err != nil {
return err
}

log.Printf("[INFO] BigQuery table %s has been created", res.Id)
d.SetId(fmt.Sprintf("projects/%s/datasets/%s/tables/%s", res.TableReference.ProjectId, res.TableReference.DatasetId, res.TableReference.TableId))
log.Printf("[INFO] BigQuery table %s has been created", insertedRes.Id)
d.SetId(fmt.Sprintf("projects/%s/datasets/%s/tables/%s", insertedRes.TableReference.ProjectId, insertedRes.TableReference.DatasetId, insertedRes.TableReference.TableId))
}

return resourceBigQueryTableRead(d, meta)
Expand All @@ -2137,18 +2222,29 @@ func resourceBigQueryTableRead(d *schema.ResourceData, meta interface{}) error {
return err
}

datasetID := d.Get("dataset_id").(string)
tableID := d.Get("table_id").(string)

client := NewClient(config, userAgent).Tables.Get(project, datasetID, tableID)
tableURL, err := tpgresource.ReplaceVars(d, config, "{{"{{"}}BigQueryBasePath{{"}}"}}projects/{{"{{"}}project{{"}}"}}/datasets/{{"{{"}}dataset_id{{"}}"}}/tables/{{"{{"}}table_id{{"}}"}}")
if err != nil {
return err
}
if tableMetadataViewRaw, ok := d.GetOk("table_metadata_view"); ok {
client = client.View(tableMetadataViewRaw.(string))
tableURL = tableURL + "?view=" + tableMetadataViewRaw.(string)
}
res, err := client.Do()

rawRes, err := transport_tpg.SendRequest(transport_tpg.SendRequestOptions{
Config: config,
Method: "GET",
Project: project,
RawURL: tableURL,
UserAgent: userAgent,
})
if err != nil {
return transport_tpg.HandleNotFoundError(err, d, fmt.Sprintf("BigQuery table %q", tableID))
}
res, err := bigqueryTableFromMap(rawRes)
if err != nil {
return err
}

if err := d.Set("project", project); err != nil {
return fmt.Errorf("Error setting project: %s", err)
Expand Down Expand Up @@ -2353,25 +2449,7 @@ func resourceBigQueryTableRead(d *schema.ResourceData, meta interface{}) error {
return fmt.Errorf("Error setting resource tags: %s", err)
}

// TODO: Update when the Get API fields for TableReplicationInfo are available in the client library.
url, err := tpgresource.ReplaceVars(d, config, "{{"{{"}}BigQueryBasePath{{"}}"}}projects/{{"{{"}}project{{"}}"}}/datasets/{{"{{"}}dataset_id{{"}}"}}/tables/{{"{{"}}table_id{{"}}"}}")
if err != nil {
return err
}

log.Printf("[INFO] Reading BigQuery table through API: %s", url)

getRes, err := transport_tpg.SendRequest(transport_tpg.SendRequestOptions{
Config: config,
Method: "GET",
RawURL: url,
UserAgent: userAgent,
})
if err != nil {
return err
}

if v, ok := getRes["tableReplicationInfo"]; ok {
if v, ok := rawRes["tableReplicationInfo"]; ok {
tableReplicationInfo := flattenTableReplicationInfo(v.(map[string]interface{}))

if err := d.Set("table_replication_info", tableReplicationInfo); err != nil {
Expand Down Expand Up @@ -2483,11 +2561,24 @@ func resourceBigQueryTableUpdate(d *schema.ResourceData, meta interface{}) error
var errOldTable error

if shouldDropColumns || shouldIgnoreDataPolicies {
client := NewClient(config, userAgent).Tables.Get(project, datasetID, tableID)
oldTableURL, err := tpgresource.ReplaceVars(d, config, "{{"{{"}}BigQueryBasePath{{"}}"}}projects/{{"{{"}}project{{"}}"}}/datasets/{{"{{"}}dataset_id{{"}}"}}/tables/{{"{{"}}table_id{{"}}"}}")
if err != nil {
return err
}
if len(tableMetadataView) > 0 {
client = client.View(tableMetadataView)
oldTableURL = oldTableURL + "?view=" + tableMetadataView
}
oldTableRaw, err := transport_tpg.SendRequest(transport_tpg.SendRequestOptions{
Config: config,
Method: "GET",
Project: project,
RawURL: oldTableURL,
UserAgent: userAgent,
})
if err != nil {
return err
}
oldTable, errOldTable = client.Do()
oldTable, errOldTable = bigqueryTableFromMap(oldTableRaw)
if errOldTable != nil {
return errOldTable
}
Expand All @@ -2512,7 +2603,22 @@ func resourceBigQueryTableUpdate(d *schema.ResourceData, meta interface{}) error
}
}

if _, err = NewClient(config, userAgent).Tables.Update(project, datasetID, tableID, table).Do(); err != nil {
tableUpdateURL, err := tpgresource.ReplaceVars(d, config, "{{"{{"}}BigQueryBasePath{{"}}"}}projects/{{"{{"}}project{{"}}"}}/datasets/{{"{{"}}dataset_id{{"}}"}}/tables/{{"{{"}}table_id{{"}}"}}")
if err != nil {
return err
}
tableBody, err := bigqueryTableToMap(table)
if err != nil {
return err
}
if _, err = transport_tpg.SendRequest(transport_tpg.SendRequestOptions{
Config: config,
Method: "PUT",
Project: project,
RawURL: tableUpdateURL,
UserAgent: userAgent,
Body: tableBody,
}); err != nil {
return err
}

Expand Down Expand Up @@ -2546,13 +2652,18 @@ func resourceBigQueryTableColumnDrop(config *transport_tpg.Config, userAgent str
dropColumnsDDL := fmt.Sprintf("ALTER TABLE `%s.%s.%s` DROP COLUMN %s", tableReference.project, tableReference.datasetID, tableReference.tableID, droppedColumnsString)
log.Printf("[INFO] Dropping columns in-place: %s", dropColumnsDDL)

useLegacySQL := false
req := &bigquery.QueryRequest{
Query: dropColumnsDDL,
UseLegacySql: &useLegacySQL,
}

_, err := NewClient(config, userAgent).Jobs.Query(tableReference.project, req).Do()
queriesURL := transport_tpg.BaseUrl(Product, config) + "projects/" + tableReference.project + "/queries"
_, err := transport_tpg.SendRequest(transport_tpg.SendRequestOptions{
Config: config,
Method: "POST",
Project: tableReference.project,
RawURL: queriesURL,
UserAgent: userAgent,
Body: map[string]any{
"query": dropColumnsDDL,
"useLegacySql": false,
},
})
if err != nil {
return err
}
Expand Down Expand Up @@ -2586,10 +2697,17 @@ func resourceBigQueryTableDelete(d *schema.ResourceData, meta interface{}) error
return err
}

datasetID := d.Get("dataset_id").(string)
tableID := d.Get("table_id").(string)

if err := NewClient(config, userAgent).Tables.Delete(project, datasetID, tableID).Do(); err != nil {
tableDeleteURL, err := tpgresource.ReplaceVars(d, config, "{{"{{"}}BigQueryBasePath{{"}}"}}projects/{{"{{"}}project{{"}}"}}/datasets/{{"{{"}}dataset_id{{"}}"}}/tables/{{"{{"}}table_id{{"}}"}}")
if err != nil {
return err
}
if _, err = transport_tpg.SendRequest(transport_tpg.SendRequestOptions{
Config: config,
Method: "DELETE",
Project: project,
RawURL: tableDeleteURL,
UserAgent: userAgent,
}); err != nil {
return err
}

Expand Down
Loading