In order to do the OLAP reporting and in order to avoid stressing the Operational Database, HMS has a need to clone and sync the Operational Databases which are 6 MySQL servers to BigQuery.
Within nearly 3 years of developing, we moved so fast and therefore no real DB Architect to control, set rules as well as to standardize the schema.
Thats the main reason, we have a lot of inconsistencies which need DAs have to implement a lot of treatments to fix them. So once we implemented the ETL we want to transform them to the standard as well
To name a few:
6 separate Mysql Servers sound more security and more robust for scaling in Operation. However, it creates a lot of headaches when you need joining between databases on multiple servers. By uniting to one big and scalable server, all problems are solved.
At the begining, we decided to use a big Postgresql Server for the Datawarehouse. However, the more we use, the more problems and maintenances we face to operate a big Postgresql Server. The Postgresql server keep having CPU bottleneck when we got a collision of parallel loading data from Mysql to Postgresql as well as querying data for analytics processing.
With the limit of human resources, we really need a OLAP Server that can free us in solving these mess.
At the begining, we dreamt of having a cloning system that can guarrantee up to 10min latency. Later we can reach to this latency but find out the cost are too much … Due to we cant control the MySQL usage, devs use a lot of Upsert and Delete which makes the ETL have to implement 2 modes at the same time: Batch Extract and Real Time Extract…
With all of above requirements, Google BigQuery is the most suitable solution for our Datalake and Datawarehouse needs. Among are its prons:
==> With Google BigQuery is the choice of Analytic Database, we need to look for the solution of syncing the operating data source from 6 Mysql Servers to BigQuery.
After looking for a lot of solutions such Debezium, Airbytes etc … we finalize with using the lattest Google Service named ‘DataStream’ (https://cloud.google.com/datastream). This service does a perfect job that can support Batch Cloning ( a.k.a Backfill) and CDC ( Change Data Capture) at the same time. For the CDC it monitor the Binlog of Mysql Server.
Notice that, it only does the Extract a.k.a pushing the Mysql original data and the Changes to Google Storage using Avro Format
From the Avro Format to the Bigquery and handle all Merging, Insert etc … is done by another Google Service ‘DataFlow’ (https://cloud.google.com/dataflow)
You need to create the Connection Profiles ( host/user/password) and then creat Stream. In Stream you can set the Tables you want to clone ( decide whether need to backfill or not )
If you decide to do backfill it will do 2 jobs at the same time:
Folder structure based on ( dbname) (tablename )() Year)( Month) (Day )( Hour) (Min)
Both backfill or cdc files are all structured by creation time. Filenames decide whether they are backfill or cdc.
Using avro-tools (http://www.us.apache.org/dist/avro/avro-1.7.4/java/avro-tools-1.7.4.jar) we can read the schema
Both Backfill and CDC share exactly same schema
{
"type" : "record",
"name" : "prod_dummy",
"fields" : [ {
"name" : "uuid",
"type" : "string"
}, {
"name" : "read_timestamp",
"type" : {
"type" : "long",
"logicalType" : "timestamp-millis"
}
}, {
"name" : "source_timestamp",
"type" : {
"type" : "long",
"logicalType" : "timestamp-millis"
}
}, {
"name" : "object",
"type" : "string"
}, {
"name" : "read_method",
"type" : "string"
}, {
"name" : "stream_name",
"type" : "string"
}, {
"name" : "schema_key",
"type" : "string"
}, {
"name" : "sort_keys",
"type" : {
"type" : "array",
"items" : [ "string", "long" ]
}
}, {
"name" : "source_metadata",
"type" : {
"type" : "record",
"name" : "source_metadata",
"fields" : [ {
"name" : "table",
"type" : "string"
}, {
"name" : "database",
"type" : "string"
}, {
"name" : "primary_keys",
"type" : {
"type" : "array",
"items" : "string"
}
}, {
"name" : "log_file",
"type" : [ "null", "string" ]
}, {
"name" : "log_position",
"type" : [ "null", "long" ]
}, {
"name" : "change_type",
"type" : [ "null", "string" ]
}, {
"name" : "is_deleted",
"type" : [ "null", "boolean" ]
} ]
}
}, {
"name" : "payload",
"type" : {
"type" : "record",
"name" : "payload",
"fields" : [ {
"name" : "id",
"type" : [ "null", "string" ]
}, {
"name" : "code",
"type" : [ "null", "int" ]
}, {
"name" : "name",
"type" : [ "null", "string" ]
}, {
"name" : "guestId",
"type" : [ "null", "string" ]
}, {
"name" : "guestName",
"type" : [ "null", "string" ]
}, {
"name" : "note",
"type" : [ "null", "string" ]
}, {
"name" : "totalPrice",
"type" : [ "null", "double" ]
}, {
"name" : "totalSLA",
"type" : [ "null", "int" ]
}, {
"name" : "allowCharge",
"type" : [ "null", "int" ]
}, {
"name" : "startTime",
"type" : [ "null", {
"type" : "long",
"logicalType" : "timestamp-micros"
} ]
}, {
"name" : "endTime",
"type" : [ "null", {
"type" : "long",
"logicalType" : "timestamp-micros"
} ]
}, {
"name" : "dueDate",
"type" : [ "null", {
"type" : "long",
"logicalType" : "timestamp-micros"
} ]
}, {
"name" : "order",
"type" : [ "null", "float" ]
}, {
"name" : "state",
"type" : [ "null", "string" ]
}, {
"name" : "group",
"type" : [ "null", "string" ]
}, {
"name" : "priority",
"type" : [ "null", "string" ]
}, {
"name" : "serviceId",
"type" : [ "null", "string" ]
}, {
"name" : "assigneedId",
"type" : [ "null", "string" ]
}, {
"name" : "assigneedName",
"type" : [ "null", "string" ]
}, {
"name" : "reporterId",
"type" : [ "null", "string" ]
}, {
"name" : "roomId",
"type" : [ "null", "string" ]
}, {
"name" : "roomNumber",
"type" : [ "null", "string" ]
}, {
"name" : "areaId",
"type" : [ "null", "string" ]
}, {
"name" : "areaName",
"type" : [ "null", "string" ]
}, {
"name" : "areaType",
"type" : [ "null", "string" ]
}, {
"name" : "hotelId",
"type" : [ "null", "string" ]
}, {
"name" : "classType",
"type" : [ "null", "string" ]
}, {
"name" : "organizationId",
"type" : [ "null", "string" ]
}, {
"name" : "cancelReasonId",
"type" : [ "null", "string" ]
}, {
"name" : "reservationId",
"type" : [ "null", "string" ]
}, {
"name" : "confirmationNumber",
"type" : [ "null", "string" ]
}, {
"name" : "itineraryNumber",
"type" : [ "null", "string" ]
}, {
"name" : "serialNumber",
"type" : [ "null", "string" ]
}, {
"name" : "taskLevel",
"type" : [ "null", "string" ]
}, {
"name" : "status",
"type" : [ "null", "int" ]
}, {
"name" : "createdAt",
"type" : [ "null", {
"type" : "long",
"logicalType" : "timestamp-micros"
} ]
}, {
"name" : "updatedAt",
"type" : [ "null", {
"type" : "long",
"logicalType" : "timestamp-micros"
} ]
}, {
"name" : "createdBy",
"type" : [ "null", "string" ]
}, {
"name" : "updatedBy",
"type" : [ "null", "string" ]
}, {
"name" : "isDeleted",
"type" : [ "null", "int" ]
}, {
"name" : "tmType",
"type" : [ "null", "string" ]
}, {
"name" : "tmCode",
"type" : [ "null", "int" ]
}, {
"name" : "parentTaskId",
"type" : [ "null", "string" ]
}, {
"name" : "specialRequest",
"type" : [ "null", "string" ]
}, {
"name" : "comment",
"type" : [ "null", "string" ]
}, {
"name" : "operationType",
"type" : [ "null", "string" ]
}, {
"name" : "source",
"type" : [ "null", "string" ]
}, {
"name" : "reminder",
"type" : [ "null", "int" ]
} ]
}
} ]
}
The real data are stored in ‘payload’ field
Dataflow is one of the service that requires a lot of my effort even though what I did to create the job is just a very simple command line.
Dataflow based on Apache Beam and Google already created a Beam Template for us to do the Loading Task ( get data from Google Storage and load them to Bigquery)
Once the Bucket that stored Avro files have new files it will trigger the PubSub New File Notification, the DataFlow monitor this Pubsub and will load it to the Backlog for processing.
Dataflow is a super CPU Hungry process. It supports auto-scale based on the remaining data on Backlog but it turns out the Auto Scaling Feature can cost us heavily if we dont monitor it carefully.
If there are any error happens, the Dataflow will mark it as fail put it to the DLQ and retry later. If retry keep falling, the Windows Success Timestamp will not update –> the backlog data keep increasing, the Data Freshness keep increasing and the Dataflow auto scale up CPU to process these backlogs.
But they can’t process them coz error continue happening and the Data Freshness keep increasing and increasing –> CPU load is very small, but the System not auto scale down due to the Data Freshness still not reach to the acceptance number. It ends up we pay a lot of money for nothing. Moreover, the Window Success Timestamp delay other NOT-PROBLEM tables cant clone on time too.
This is a Fail Case
Numbers of Errors increase significantly but the log content is lousy
The easiest Monitoring Metric is the Data Freshness
Most of the time, the freshness is only up to 1 to 2 min. If the problem happens, we will give a significant data-freshness increment ( the longer the problem happen, the bigger increment up to a few hours or a few days)
If the problem happens, first we need to check the dlq folder in GCP Bucket, any ‘json’ file in this dlq folder will tell us about the real reason.
It could be a data type conversion error, or we forget to manually create a dataset or a permission error.
All troubleshooting tasks can be done without pausing or stopping the dataflow.
#hms #bigdata #bigquery #gcp/datastream #dataflow