This
is a follow up post, describing the implementation details of Hadoop
Applier, and
steps to configure and install it. Hadoop Applier integrates MySQL
with Hadoop providing the real-time replication of INSERTs to HDFS,
and hence can be consumed by the data stores working on top of
Hadoop. You can know more about the design rationale and per-requisites in the previous post.
Design
and Implementation:
Hadoop
Applier replicates rows inserted into a table in MySQL to the Hadoop Distributed File System(HDFS). It uses an API provided by libhdfs, a C library to manipulate files in HDFS.
The
library comes pre-compiled with Hadoop distributions.
It
connects to the MySQL master (or read a binary
log generated by MySQL) and:
fetches
the row insert events occurring on the master
decodes
these events, extracts data inserted into each field of the row
uses content handlers to get it in the format required and appends
it to a text file in HDFS.
Schema
equivalence is a simple mapping:
Databases
are mapped as separate directories, with tables in them as
sub-directories. Data inserted into each table is written into text
files (named as datafile1.txt) in HDFS. Data can be in comma
separated format; or any other delimiter can be used, that is configurable by command line
arguments.
The
diagram explains the mapping between MySQL and HDFS schema.
The
file in which the data is stored is named datafile1.txt here; you can
name is anything you want. The working directory where this datafile
goes is base_dir/db_name.db/tb_name.
The
timestamp at which the event occurs is included as the first field in
each row inserted in the text file.
The
implementation follows these steps:
-
Connect to the MySQL master using the interfaces to the binary log
#include
“binlog_api.h”
Binary_log
binlog(create_transport(mysql_uri.c_str()));
binlog.connect();
-
Register content handlers
/*
Table_index is a sub class of Content_handler class in the Binlog API
*/
Table_index
table_event_hdlr;
Applier
replay_hndlr(&table_event_hdlr, &sqltohdfs_obj);
binlog.content_handler_pipeline()->push_back(&table_event_hdlr);
binlog.content_handler_pipeline()->push_back(&replay_hndlr);
-
Start an event loop and wait for the events to occur on the master
while
(true)
{
/*
Pull
events from the master. This is the heart beat of the event listener.
*/
Binary_log_event *event;
binlog.wait_for_next_event(&event);
}
-
Decode the event using the content handler interfaces
class
Applier : public mysql::Content_handler
{
public:
Applier(Table_index *index, HDFSSchema *mysqltohdfs_obj)
{
m_table_index= index;
m_hdfs_schema= mysqltohdfs_obj;
}
mysql::Binary_log_event *process_event(mysql::Row_event
*rev)
{
int
table_id= rev->table_id;
typedef
std::map<long int, mysql::Table_map_event *> Int2event_map;
int2event_map::iterator ti_it=
m_table_index->find(table_id);
-
Each row event contains multiple rows and fields.
Iterate one row at a time using Row_iterator.
mysql::Row_event_set rows(rev, ti_it->second);
mysql::Row_event_set::iterator it= rows.begin();
do
{
mysql::Row_of_fields fields= *it;
long
int timestamp= rev->header()->timestamp;
if
(rev->get_event_type() == mysql::WRITE_ROWS_EVENT)
table_insert(db_name, table_name, fields, timestamp,
m_hdfs_schema);
}
while (++it != rows.end());
- Get
the field data separated by field delimiters and row delimiters.
Each
row contains a vector of Value objects. The converter allows us to
transform the value into another representation.
mysql::Row_of_fields::const_iterator
field_it= fields.begin();
mysql::Converter converter;
std::ostringstream data;
data
<< timestamp;
do
{
field_index_counter++;
std::vector<long int>::iterator it;
std::string
str;
converter.to(str, *field_it);
data
<< sqltohdfs_obj->hdfs_field_delim;
data
<< str;
}
while (++field_it != fields.end());
data
<< sqltohdfs_obj->hdfs_row_delim;
-
Connect to the HDFS file system.
If not provided, the
connection information (user name, password host and port) are read
from the XML configuration file, hadoop-site.xml.
HdfsFS
m_fs= hdfsConnect(host.c_str(), port);
-
Create the directory structure in HDFS.
Set the working
directory and open the file in append mode.
hdfsSetWorkingDirectory(m_fs,
(stream_dir_path.str()).c_str());
const
char* write_path= "datafile1.txt";
hdfsFile
writeFile;
-
Append data at the end of the file.
writeFile=
hdfsOpenFile(m_fs, write_path, O_WRONLY|O_APPEND, 0, 0, 0);
tSize
num_written_bytes = hdfsWrite(m_fs, writeFile, (void*)data,
strlen(data));
Install
and Configure:
Follow
these steps to install and run the Applier:
1.
Download a Hadoop release (I am using 1.0.4); configure and install
(for the purpose of the demo, install it in pseudo distributed mode).
Flag
'dfs.support.append'
must be set to true while configuring HDFS(hdfs-site.xml). Since
append is not supported in Hadoop 1.x, set the flag
'dfs.support.broken.append' to
true.
2.
Set the environment variable $HADOOP_HOME to point to the Hadoop
installation directory.
3.
CMake doesn't come with a 'find' module for libhdfs. Ensure that the
'FindHDFS.cmake' is in the CMAKE_MODULE_PATH. You can download a
copy here.
4.
Edit the file 'FindHDFS.cmake', if necessary, to have HDFS_LIB_PATHS
set as a path to libhdfs.so, and HDFS_INCLUDE_DIRS have the path
pointing to the location of hdfs.h.
For
1.x versions, library path is $ENV{HADOOP_HOME}/c++/Linux-i386-32/lib
, and header files are contained in
$ENV{HADOOP_HOME}/src/c++/libhdfs. For 2.x releases, header files and
libraries can be found in $ENV{HADOOP_HOME}/lib/native, and
$ENV{HADOOP_HOME}/include respectively.
For versions 1.x, this patch will fix the paths:
--- a/cmake_modules/FindHDFS.cmake
+++ b/cmake_modules/FindHDFS.cmake
@@ -11,6 +11,7 @@ exec_program(hadoop ARGS version OUTPUT_VARIABLE
Hadoop_VERSION
# currently only looking in HADOOP_HOME
find_path(HDFS_INCLUDE_DIR hdfs.h PATHS
$ENV{HADOOP_HOME}/include/
+ $ENV{HADOOP_HOME}/src/c++/libhdfs/
# make sure we don't accidentally pick up a different version
NO_DEFAULT_PATH
)
@@ -26,9 +27,9 @@ endif()
message(STATUS "Architecture: ${arch_hint}")
if ("${arch_hint}" STREQUAL "x64")
- set(HDFS_LIB_PATHS $ENV{HADOOP_HOME}/lib/native)
+ set(HDFS_LIB_PATHS $ENV{HADOOP_HOME}/c++/Linux-amd64-64/lib)
else ()
- set(HDFS_LIB_PATHS $ENV{HADOOP_HOME}/lib/native)
+ set(HDFS_LIB_PATHS $ENV{HADOOP_HOME}/c++/Linux-i386-32/lib)
endif ()
message(STATUS "HDFS_LIB_PATHS: ${HDFS_LIB_PATHS}")
5.Since
libhdfs is JNI based API, it requires JNI header files and libraries
to build. If there exists a module FindJNI.cmake in the
CMAKE_MODULE_PATH and JAVA_HOME is set; the headers will be included,
and the libraries would be linked to. If not, it will be required to
include the headers and load the libraries
separately (modify LD_LIBRARY_PATH).
6.
Build and install the library 'libreplication', to be used by Hadoop Applier,using CMake.
-
'mysqlclient'
library is required to be installed in the default library paths.
You can either download and install it (you can get a copy here),
or set the environment variable $MYSQL_DIR to point to the parent
directory of MySQL source code. Make sure to run cmake on MySQL
source directory.
$export
MYSQL_DIR=/usr/local/mysql-5.6
Run
the 'cmake' command on the parent directory of the Hadoop Applier
source. This will generate the necessary Makefiles. Make sure to set
cmake option ENABLE_DOWNLOADS=1; which will install Google Test
required to run the unit tests.
$cmake
. -DENABLE_DOWNLOADS=1
Run
'make' and 'make install' to build and install. This will install
the library 'libreplication' which is to be used by Hadoop Applier.
7.
Make sure to set the CLASSPATH to all the hadoop jars needed to run
Hadoop itself.
$export PATH=$HADOOP_HOME/bin:$PATH
$export
CLASSPATH=$(hadoop classpath)
8.
The code for Hadoop Applier can be found in /examples/mysql2hdfs, in the Hadoop Applier
repository. To compile, you can simply load the libraries (modify
LD_LIBRARY_PATH if required), and run the command “make happlier” on your
terminal. This will create an executable file in the mysql2hdfs directory.
..
and then you are done!
Now
run hadoop dfs (namenode and datanode), start a MySQL server as
master with row based replication (you can use mtr rpl suite for
testing purposes : $MySQL-5.6/mysql-test$./mtr --start --suite=rpl --mysqld=--binlog_format='ROW' --mysqld=--binlog_checksum=NONE), start hive (optional) and run the executable
./happlier, optionally providing MySQL and HDFS uri's and other
available command line options. (./happlier –help for more info).
There
are useful filters as command line options to the Hadoop applier.
Options |
Use |
-r,
--field-delimiter=DELIM
Use
DELIM instead of ctrl-A for field delimiter. DELIM can be a string
or an ASCII value in the format '\nnn' .
Escape sequences are not
allowed.
|
Provide the string by which
the fields in a row will be separated. By default, it is set to
ctrl-A |
-w,
--row-delimiter=DELIM
Use
DELIM instead of LINE FEED for row delimiter . DELIM can be a
string or an ASCII value in the format '\nnn'
Escape sequences are not
allowed.
|
Provide the string by which
the rows of a table will be separated. By default, it is set to
LINE FEED (\n) |
-d,
--databases=DB_LIST
DB-LIST
is made up of one database name, or many names separated by
commas.
Each
database name can be optionally followed by table names.
The
table names must follow the database name, separated by HYPHENS
Example:
-d=db_name1-table1-table2,dbname2-table1,dbname3 |
Import entries for some
databases, optionally include only specified tables.
|
-f,
--fields=LIST
Similar
to cut command, LIST is made up of one range, or many ranges
separated by commas.Each range is one of:
N
N'th byte, character or field, counted from 1
N-
from N'th byte, character or field, to end of line
N-M from N'th to M'th
(included) byte,
character
or field
-M
from first to M'th (included) byte, character or field
|
Import
entries for some fields only in a table
|
-h, --help |
Display help |
Integration
with HIVE:
Hive
runs on top of Hadoop. It
is sufficient to install Hive only on the Hadoop master node.
Take
note of the default data warehouse directory, set as a property in
hive-default.xml.template configuration file. This must be the same
as the base directory into which Hadoop Applier writes.
Since
the Applier does not import DDL statements; you have to create
similar schema's on both MySQL and Hive, i.e. set up a similar
database in Hive using Hive QL(Hive Query Language). Since timestamps
are inserted as first field in HDFS files,you must take this into
account while creating tables in Hive.
SQL Query |
Hive Query |
CREATE TABLE t (i INT); |
CREATE
TABLE t ( time_stamp INT, i INT)
[ROW
FORMAT DELIMITED]
STORED AS TEXTFILE; |
Now,
when any row is inserted into table on MySQL databases, a
corresponding entry is made in the Hive tables. Watch the demo to get
a better understanding.
The demo is non audio, and is meant to be followed in conjunction with the blog.You can also create an external table in hive
and load data into the tables; its your choice!
Watch the Hadoop Applier Demo >>
Limitations
of the Applier:
In
the first version we support WRITE_ROW_EVENTS, i.e. only insert
statements are replicated.
We
have considered adding support for deletes, updates and DDL's as
well, but they are more complicated to handle and we are not sure how
much interest there is in this.
We
would very much appreciate your feedback on requirements - please use
the comments section of this blog to let us know!
The
Hadoop Applier is compatible with MySQL 5.6,
however it does not import events if binlog checksums are enabled. Make sure to set them to NONE on the master, and
the server in row based replication mode.
This
innovation includes dedicated contribution from Neha Kumari, Mats
Kindahl and Narayanan Venkateswaran. Without them, this project would
not be a success.