Data governance helps ensure that high data quality exists throughout the lifecycle of the data. One big difference between traditional data governance and Hadoop big data governance is the sources of the data that are out of the platform team's control.
In conventional data warehouses, we had everything under check, whether it was how much data came in or from where the data came. However, in big data governance, we have challenges on the volume and diversity aspect of the data. Another major problem is that we are dealing with unstructured, semi-structured, and various other types of data. The technologies are relatively new in big data, and the systems around big-data governance are still very naive.
The need for Hadoop data governance
Currently, in the eBay Hadoop landscape, organizations have their own data sets, which are managed by local data architects working inside their organization, where the governance is mainly on the local level, restricted to the department or only to their organization. We want to converge these local data governances into one single platform and provide a holistic view of the entire platform. We want to unite these silos or the local data governance initiatives into one unique place to provide a unified and consolidated view of the data. Big data governance is more like traditional data governance, except for the scale and the lack of definition associated with the data. Also, when we move data from traditional data warehouses to the Hadoop world, a lot of metadata associated with the data sets gets dropped, making it hard for the data steward to manage all the data in the big data ecosystem. We need to have some kind of self-service capability, where we can socialize some of the governance to end users.
Why do we do data governance
- To ensure security and privacy of the data and access control.
- To capture the metadata of datasets for security and end-user data consumption purposes.
- To help to ensure the quality of the data.
- To identify the owner of the data set.
Opportunity
As the eBay analytics data platform team, we want to have the following capabilities on the platform level for all data existing on our Hadoop and Teradata clusters. We started this project with the following primary objectives:
- Establish a data tagging approach for the enterprise where the metadata that will govern the use of information will be embedded with the data as it passes through various systems in the enterprise.
- Provide a centralized platform for all Hadoop and Teradata customers to generate and consume the technical metadata.
We started this project as an initial prototype to evaluate the technical feasibility of tagging metadata in the HDFS (Hadoop Distributed File System). We wanted to create a solution that is technically performant, scalable, pluggable, and that doesn't interact with the natural Hadoop workflow. Data governance is a vast topic, and in this prototype, we are concentrating only on how to set/view tags on the file system. We did a small prototype with HDFS extended attributes, and we found out that we can leverage these kinds of solutions just for small clusters. Some of our cluster sizes have more than 4500 nodes and use more than 300 PB of storage, which means we need a more robust solution.
We then evaluated Apache Atlas and found that we can leverage it for building the data tagging capabilities and as a metadata store. Also, we can integrate Apache Ranger with Apache Atlas to roll out role-based access control on the Hadoop platform.
We split the projects into four major phases:
- Phase 1: Technical feasibility and onboard hive/sparkSQL/Teradata datasets to Atlas
- Phase 2: Model HDFS datasets on Atlas
- Phase 3: Build tools on top of Atlas for creating/consuming the metadata
- Phase 4: Enable Role-Based Access control on the platform
In this blog, we are going to discuss the details for Phase 1, where we will be mainly focusing on onboarding primarily hive/sparkSQL/Teradata datasets to Atlas.
Challenges
We are massive hive and Spark-SQL users and have around 200k+ tables on some of our clusters. The two main problems were doing the initial and incremental loads to Atlas.
The initial load was challenging because of the vast amount of the databases/tables that we need to load to Atlas. We optimized the code a lot to make this process efficient.
The next challenge was how we should handle the incremental loads. We have 1000s of tables being created on a daily basis, and we want to ensure the metadata repository always presents the most accurate data for governing and security purposes.
Atlas already provides hive hooks for capturing the data definition language (DDL). You can deploy these hooks on the gateways nodes (a.k.a. CLI/edge nodes) or in the HiveServer2 server. These Atlas hooks can help us capture the table metadata updates real-time on the Atlas side. Even with this approach, we faced two significant challenges:
- Currently, Atlas doesn't have any hooks for the hive metastore server. The majority of our customers are still using hive/sparkSQL by connecting to hive metastore servers. We have some dedicated clusters primarily running only sparkSQL workloads by connecting to hive metastore servers. One practical solution is to help customers migrate from using hive metastore service to HiveServer2 service. HiveServer2 has metastore hooks, which we can leverage for capturing the table metadata changes. But, this migration needs a lot of code changes on the customer side as well and will take a few months to complete this migration due to the customer's priorities and other milestones.
- Deploying client-side hive hook on hundreds of CLIs/edge nodes is not a flexible solution for us. If a CLI or edge node misses the hook, this will cause inconsistency in the table metadata on the cluster and the Atlas side. We don't want these kinds of differences in our governance tool. Also, deploying these kinds of client-side hooks would create a lot of operational nightmares in the future.
Given these challenges, we decided to deploy a listener on hive metastore server, so that we can capture any DDL changes on the server side.
Hive Metastore Event listener
Synchronize hive metadata and Atlas repo with hive metastore event listener:
Environment
Hive data and Atlas reside in separate clusters in which Atlas functions as a repo for several Hive data clusters. Entities in Atlas is uniquely identified by having the cluster name as part of its qualified name.
Multiple data clusters (HDP 2.4.2, Hive 1.2, Spark 2.1.0) → Atlas cluster (HDP 2.6, Atlas 1.0.0 alpha)
Workflow
Methodology
The metastore listener listens for table create/change/drop events and sends this change to Atlas via message bus (Kafka).
To set up on a hive server box in the data cluster, register the metastore listener with the hive by specifying the name of the customized metastore listener, which in this case is AtlasMhook in the hive config file (hive-site.xml).
<property>
<name>hive.metastore.event.listeners</name>
<value>AtlasMhook</value>
</property>
Set up the metastore listener to be aware of the messaging bus (Kafka) by adding Kafka info in the atlas-application Properties file in the same config directory where hive-site.xml resides:
atlas.kafka.zookeeper.connect=<server name>:2181
atlas.kafka.bootstrap.servers=<server name>:909
atlas.notification.topics=ATLAS_HOOK,ATLAS_ENTITIES
atlas.kafka.hook.group.id=atlas
..
..
The metastore listener code consists of a class called AtlasMhook that extends the MetaStoreEventListener and classes for each event.
On each DDL event (create/alter/drop...), retrieve the current Table object and instantiate the corresponding event class accordingly:
public class AtlasMhook extends MetaStoreEventListener { public void onCreateTable(CreateTableEvent tableEvent) throws MetaException .... Table createdTab = tableEvent.getTable(); GoAtlas send_atlas = new GoAtlas(createdTab); send_atlas.createAtlasTable(); ; ..... public void onDropTable(DropTableEvent tableEvent) throws MetaException { org.apache.hadoop.hive.metastore.api.Table deletedTab = tableEvent.getTable(); GoAtlas delete_atlas = new GoAtlas(deletedTab); delete_atlas.deleteAtlasTable(); public void onAlterTable(AlterTableEvent tableEvent) throws MetaException { org.apache.hadoop.hive.metastore.api.Table oldTable = tableEvent.getOldTable(); org.apache.hadoop.hive.metastore.api.Table newTable = tableEvent.getNewTable(); //case rename table if(!oldTable.getTableName().equalsIgnoreCase(newTable.getTableName()) || !oldTable.getDbName().equalsIgnoreCase(newTable.getDbName()) ) { //create the new named table GoAtlas send_atlas = new GoAtlas(newTable); try { send_atlas.createAtlasTable(); .... //case alter table else { GoAtlas alter_atlas = new GoAtlas(newTable); try { alter_atlas.alterAtlasTable(); ....
Use a similar framework for the alter and drop table events.
Here is Kafka producer thread as seen in Hive metastore process:
"kafka-producer-network-thread | producer-1" #53 daemon prio=5 os_prio=0 tid=0x00007f0bf1c95000 nid=0x5d7d runnable [0x00007f0bc8323000]
Kafka producer takes the metastore listener's message payload and sends it to the Kafka consumer process in Atlas cluster.
Then sample message as received by Kafka consumer process in Atlas cluster is, as follows:
{"msgSourceIP":".....","msgCreatedBy":"....","msgCreationTime":1528237987736,"message":{"entities":[{"jsonClass":"org.apache.atlas.typesystem.json.InstanceSerialization$_Reference","id":
{"jsonClass":"org.apache.atlas.typesystem.json.InstanceSerialization$_Id","id":"-33870525960846557","version":0,"typeName":"hive_table","state":"ACTIVE"},"typeName":"hive_table","values":
{"tableType":"MANAGED_TABLE","name":"datatable_may022018_mhook175","temporary":false,"retention":0,"qualifiedName":"default.datatable_may022018_mhook175@myclustername","columns":
[{"jsonClass":"org.apache.atlas.typesystem.json.InstanceSerialization$_Reference","id":
{"jsonClass":"org.apache.atlas.typesystem.json.InstanceSerialization$_Id","id":"-33870525960846555","version":0,"typeName":"hive_column","state":"ACTIVE"},"typeName":"hive_column" .... ....
The Kafka notification message is then sent to Atlas, and the entity is created/changed in Atlas accordingly. The result is stored in the Janus Graph database with hbase as the storage backend.
Conclusion
At eBay, the hive metastore listener is helping us in two ways:
- It helps us keep the metadata in sync with Atlas almost real-time. We are a hive powerhouse, and each of our clusters has more than 200,000 tables, which means there are a lot of DDL changes happening on these systems at any point in time. We wanted to make sure our data governance solution is always consistent with what is available on the cluster. After we deployed the hive metastore listener, we were able to keep the DDL changes in sync between Hadoop clusters and Atlas. This way, the customers can do tagging, and we can enforce role-based access controls on these table without any delays. Another major use case is capturing these tables on Atlas in real-time provides the real-time insights into the technical metadata through Atlas to our customers.
- In our environment, we have a requirement to keep some of the tables and databases in sync between some clusters. Initially, we were scanning the tables and databases on the source clusters, identifying the missing tables/databases, and then recreating the tables on the destination cluster. With this implementation, we can quickly determine the DDL changes happening on the source clusters, and we were able to recreate these table on the destination clusters. This way, we were able to move from batch processing with automation to almost real-time streaming of the DDL changes. Near real-time metadata sync between the source and destination through the metastore listener and clusters enhanced our developer productivity a lot, since they don’t need to wait for the batch sync-up to happen between these clusters.