Tuesday 12 April 2016

Sqoop Learning...

Sqoop is a tool designed to transfer data between Hadoop and relational databases. You can use Sqoop to import data from a relational database management system (RDBMS) such as MySQL or Oracle into the Hadoop Distributed File System (HDFS), transform the data in Hadoop MapReduce, and then export the data back into an RDBMS.

With Sqoop, you can import data from a relational database system into HDFS. The input to the import process is a database table. Sqoop will read the table row-by-row into HDFS. The output of this import process is a set of files containing a copy of the imported table. The import process is performed in parallel. For this reason, the output will be in multiple files. These files may be delimited text files (for example, with commas or tabs separating each field), or binary Avro or SequenceFiles containing serialized record data.

A by-product of the import process is a generated Java class which can encapsulate one row of the imported table. This class is used during the import process by Sqoop itself. The Java source code for this class is also provided to you, for use in subsequent MapReduce processing of the data. This class can serialize and deserialize data to and from the SequenceFile format.

Sqoop includes some other commands which allow you to inspect the database you are working with.
For example,
1.list the available database schemas (with the sqoop-list-databases tool).

sqoop-list-databases --connect jdbc:mysql://localhost:3306/ --username root --password hr;

2.List the available tables within a schema (with the sqoop-list-tables tool).

sqoop-list-tables --connect jdbc:mysql://localhost:3306/vaibhav --username root --password hr;

3.Sqoop also includes a primitive SQL execution shell (the sqoop-eval tool).

sqoop eval --connect jdbc:mysql://localhost:3306/vaibhav --username root --password hr --query "select * from employees";

Most aspects of the import, code generation, and export processes can be customized. You can control the specific row range or columns imported. You can specify particular delimiters and escape characters for the file-based representation of the data, as well as the file format used. You can also control the class or package names used in generated code.


Sqoop ships with a help tool. To display a list of all available tools, type the following command:

$ sqoop help

usage: sqoop COMMAND [ARGS]

Available commands:
  codegen            Generate code to interact with database records
  create-hive-table  Import a table definition into Hive
  eval               Evaluate a SQL statement and display the results
  export             Export an HDFS directory to a database table
  help               List available commands
  import             Import a table from a database to HDFS
  import-all-tables  Import tables from a database to HDFS
  job                Work with saved jobs
  list-databases     List available databases on a server
  list-tables        List available tables in a database
  merge              Merge results of incremental imports
  metastore          Run a standalone Sqoop metastore
  version            Display version information


To display help for a specific tool by entering: sqoop help (tool-name);
for example,

$ sqoop help import.


usage: sqoop import [GENERIC-ARGS] [TOOL-ARGS]

Common arguments:
   --connect <jdbc-uri>                         Specify JDBC connect String

   --connection-manager <class-name>            Specify connection manager class name

   --connection-param-file <properties-file>    Specify connection parameters file

   --driver <class-name>                        Manually specify JDBC driver class to use

   --hadoop-home <hdir>                         Override $HADOOP_MAPRED_HOME_ARG

   --hadoop-mapred-home <dir>                   Override $HADOOP_MAPRED_HOME_ARG

   --password <password>                        Set authentication password

   --password-file <password-file>              Set authentication password file path

   --username <username>                        Set authentication username


Options Files to Pass Arguments

sqoop import --connect jdbc:mysql://localhost/db --username root --table TEST

$ sqoop --options-file /users/homer/work/import.txt --table TEST

where the options file /users/homer/work/import.txt contains the following:

import
--connect
jdbc:mysql://localhost/db
--username
root

The options file can have empty lines and comments for readability purposes. Comments within option files that begin with the hash character.


sqoop-import : The import tool imports an individual table from an RDBMS to HDFS.

Table 1. Common arguments
Argument Description
--connect <jdbc-uri> Specify JDBC connect string
--connection-manager <class-name> Specify connection manager class to use
--driver <class-name> Manually specify JDBC driver class to use
--hadoop-mapred-home <dir> Override $HADOOP_MAPRED_HOME
--help Print usage instructions
--password-file Set path for a file containing the authentication password
-P Read password from console
--password <password> Set authentication password
--username <username> Set authentication username
--verbose Print more information while working
--connection-param-file <filename> Optional properties file that provides connection parameters


Table 2. Import control arguments:

Argument Description
--append Append data to an existing dataset in HDFS
--as-avrodatafile Imports data to Avro Data Files
--as-sequencefile Imports data to SequenceFiles
--as-textfile Imports data as plain text (default)
--boundary-query <statement> Boundary query to use for creating splits
--columns <col,col,col…> Columns to import from table
--delete-target-dir Delete the import target directory if it exists
--direct Use direct import fast path
--direct-split-size <n> Split the input stream every n bytes when importing in direct mode
--fetch-size <n> Number of entries to read from database at once.
--inline-lob-limit <n> Set the maximum size for an inline LOB
-m,--num-mappers <n> Use n map tasks to import in parallel
-e,--query <statement> Import the results of statement.
--split-by <column-name> Column of the table used to split work units
--table <table-name> Table to read
--target-dir <dir> HDFS destination dir
--warehouse-dir <dir> HDFS parent for table destination
--where <where clause> WHERE clause to use during import
-z,--compress Enable compression
--compression-codec <c> Use Hadoop codec (default gzip)
--null-string <null-string> The string to be written for a null value for string columns
--null-non-string <null-string> The string to be written for a null value for non-string columns


Selecting the Data to Import
--table : argument to select the table to import
--columns : argument to select a subset of columns and control their ordering
--where : control which rows are imported by adding a SQL WHERE clause to the import statement
--query : Instead of using the --table, --columns and --where arguments, you can specify a SQL statement with the --query argument.

$CONDITIONS :
If you want to import the results of a query in parallel, then each map task will need to execute a copy of the query, with results partitioned by bounding conditions inferred by Sqoop. Your query must include the token $CONDITIONS which each Sqoop process will replace with a unique condition expression. You must also select a splitting column with --split-by.

the query can be executed once and imported serially, by specifying a single map task with -m 1:

-m : Sqoop imports data in parallel from most database sources. You can specify the number of map tasks (parallel processes) to use to perform the import by using the -m or --num-mappers argument.
When performing parallel imports, Sqoop needs a criterion by which it can split the workload. Sqoop uses a splitting column to split the workload. By default, Sqoop will identify the primary key column (if present) in a table and use it as the splitting column.

--warehouse-dir : By default, Sqoop will import a table named foo to a directory named foo inside your home directory in HDFS. For example, if your username is someuser, then the import tool will write to /user/someuser/foo/(files). You can adjust the parent directory of the import with the --warehouse-dir argument.

Incremental Imports : Incremental import mode which can be used to retrieve only rows newer than some previously-imported set of rows.
--check-column (col) Specifies the column to be examined when determining which rows to import. (the column should not be of type CHAR/NCHAR/VARCHAR/VARNCHAR/ LONGVARCHAR/LONGNVARCHAR)
--incremental (mode) Specifies how Sqoop determines which rows are new. Legal values for mode include append and lastmodified.
--last-value (value) Specifies the maximum value of the check column from the previous import.

File Formats : two file formats: delimited text or SequenceFiles.
1.Delimited text is the default import format. You can also specify it explicitly by using the --as-textfile argument.
2.SequenceFiles are a binary format that store individual records in custom record-specific data types.
3.Avro data files are a compact, efficient binary format that provides interoperability with applications written in other programming languages. Avro also supports versioning, so that when, e.g., columns are added or removed from a table, previously imported data files can be processed along with new ones.


-z or --compress : compress your data by using the deflate (gzip) algorithm with the -z or --compress argument,


Examples :

1. basic import of a table named employees

 sqoop import --connect jdbc:mysql://localhost:3306/vaibhav --username root --password hr --table employees;

2. basic import requiring a login:

sqoop import --connect jdbc:mysql://localhost:3306/vaibhav --username root -P --table employees;

3. Selecting specific columns from the EMPLOYEES table

sqoop import --connect jdbc:mysql://localhost:3306/vaibhav --username root --password hr --table employees --columns "employee_id, first_name, last_name";

4. Controlling the import parallelism (using 8 parallel tasks):

sqoop import --connect jdbc:mysql://localhost:3306/vaibhav --username root --password hr --table employees --columns "employee_id, first_name, last_name" -m 8;

5. Specifying the delimiters to use in a text-mode import:

sqoop import --connect jdbc:mysql://localhost:3306/vaibhav --username root --password hr --table employees --columns "employee_id, first_name, last_name" --fields-terminated-by '\t' --lines-terminated-by '\n' --optionally-enclosed-by '\"';

6. importing data to a target directory "/abhijit"

sqoop import --connect jdbc:mysql://localhost:3306/vaibhav --username root --password hr --table employees --columns "employee_id, first_name, last_name" --target-dir /abhijit;

7. Import subset of data.
sqoop import --connect jdbc:mysql://localhost:3306/vaibhav --username root --password hr --table employees --where 'salary>10000' --target-dir /abhijit/subset;

8. Import Subset with specific columns

sqoop import --connect jdbc:mysql://localhost:3306/vaibhav --username root --password hr --table employees --where 'salary>10000' --columns "employee_id, first_name, last_name" --target-dir /abhijit/subset;

9. IMport Subset with Specifying the delimiters

sqoop import --connect jdbc:mysql://localhost:3306/vaibhav --username root --password hr --table employees --where 'salary>10000' --target-dir /abhijit/subset/terminated --fields-terminated-by '\t' --lines-terminated-by '\n';

10. An incremental import of new data

sqoop import --connect jdbc:mysql://localhost:3306/vaibhav --username root --password hr --table employees --check-column employee_id --incremental append --last-value 208;

11. Import to sequenceFile format

sqoop import --connect jdbc:mysql://localhost:3306/vaibhav --username root --password hr --table employees --where 'salary>10000' --target-dir /abhijit/subset/sequenceFile --as-sequencefile;

12. Import to avrodatafile format

sqoop import --connect jdbc:mysql://localhost:3306/vaibhav --username root --password hr --table employees --where 'salary>10000' --target-dir /abhijit/subset/avroFile --as-avrodatafile;


12. sqoop-import-all-tables
The import-all-tables tool imports a set of tables from an RDBMS to HDFS. Data from each table is stored in a separate directory in HDFS.
For the import-all-tables tool to be useful, the following conditions must be met:

    Each table must have a single-column primary key.
    You must intend to import all columns of each table.
    You must not intend to use non-default splitting column, nor impose any conditions via a WHERE clause.

===============================================================================================================================

Importing Data Into Hive

Argument Description
--hive-home <dir> Override $HIVE_HOME
--hive-import Import tables into Hive (Uses Hive’s default delimiters if none are set.)
--hive-overwrite Overwrite existing data in the Hive table.
--create-hive-table If set, then the job will fail if the target hive
table exits. By default this property is false.

--hive-table <table-name> Sets the table name to use when importing to Hive.
--hive-drop-import-delims Drops \n, \r, and \01 from string fields when importing to Hive.
--hive-delims-replacement Replace \n, \r, and \01 from string fields with user defined string when importing to Hive.
--hive-partition-key Name of a hive field to partition are sharded on
--hive-partition-value <v> String-value that serves as partition key for this imported into hive in this job.
--map-column-hive <map> Override default mapping from SQL type to Hive type for configured columns.


Sqoop will by default import NULL values as string null. Hive is however using string \N to denote NULL values and therefore predicates dealing with NULL (like IS NULL) will not work correctly. You should append parameters --null-string and --null-non-string in case of import job or --input-null-string and --input-null-non-string in case of an export job if you wish to properly preserve NULL values.

Hive can put data into partitions for more efficient query performance. You can tell a Sqoop job to import data for Hive into a particular partition by specifying the --hive-partition-key and --hive-partition-value arguments. The partition value must be a string.

Wednesday 6 April 2016

How does count work using apache-pig?

I am learning pig and came across some question like how grouping works and how to get the count.
I thought to put some example which could be useful for others when they go for doing some grouping and count out of it.

I am not going to cover what is pig and the advantages of it. Will directly jump on with an example and try to elaborate more on to it.

Let's start with it.

Here is sample data for the example.

1,The Nightmare Before Christmas,1993,3.9,4568
2,The Mummy,1932,3.5,4388
3,Orphans of the Storm,1921,3.2,9062
4,The Object of Beauty,1991,2.8,6150
5,Night Tide,1963,2.8,5126
6,One Magic Christmas,1985,3.8,5333
7,Muriel's Wedding,1994,3.5,6323
8,Mother's Boys,1994,3.4,5733
9,Nosferatu: Original Version,1929,3.5,5651
10,Nick of Time,1995,3.4,5333


The data is related with movies as movie name, release year, rating for the movie and number of likes.

Our assignment is to get the count of movies for a year.
Below are steps to be taken to analyse the data.

1. Loading Data : Will use the Load function to read the data into Pig. Here Load is just a pointer and does not load the data. When Store or Dump is requested then only pig starts with an execution.

movies = load '/home/abhijit/Downloads/movies.txt' Using PigStorage(',');

2. To verify the output or for testing/debugging of your changes you can use DUMP.
DUMP displays the results on to the terminal.(Never use the same on production. Use the STORE instead)

DUMP movies;

The result on the terminal is :

(1,The Nightmare Before Christmas,1993,3.9,4568)
(2,The Mummy,1932,3.5,4388)
(3,Orphans of the Storm,1921,3.2,9062)
(4,The Object of Beauty,1991,2.8,6150)
(5,Night Tide,1963,2.8,5126)
(6,One Magic Christmas,1985,3.8,5333)
(7,Muriel's Wedding,1994,3.5,6323)
(8,Mother's Boys,1994,3.4,5733)
(9,Nosferatu: Original Version,1929,3.5,5651)
(10,Nick of Time,1995,3.4,5333)


3. Group the data by key year. We will use group operator for the same. It group together the tuples that have same key, which will be called group key. In our case release year is the group key.

groupByYear = group movies by $2;

$2 here is the position that represents the value of release year(We are using the position here as we have not created any schema. We will talk about schema later).

Again call DUMP to verify the result.

DUMP groupByYear ;

The result on the terminal is :

(1921,{(3,Orphans of the Storm,1921,3.2,9062)})
(1929,{(9,Nosferatu: Original Version,1929,3.5,5651)})
(1932,{(2,The Mummy,1932,3.5,4388)})
(1963,{(5,Night Tide,1963,2.8,5126)})
(1985,{(6,One Magic Christmas,1985,3.8,5333)})
(1991,{(4,The Object of Beauty,1991,2.8,6150)})
(1993,{(1,The Nightmare Before Christmas,1993,3.9,4568)})
(1994,{(7,Muriel's Wedding,1994,3.5,6323),(8,Mother's Boys,1994,3.4,5733)})
(1995,{(10,Nick of Time,1995,3.4,5333)})


4. Now from the output it is clear why we do grouping before we go for the count.

(1921,{(3,Orphans of the Storm,1921,3.2,9062)})

1921 signifies the key and (3,Orphans of the Storm,1921,3.2,9062) is the Tuple or the value of it. Here we have single tuple in the form of (3,Orphans of the Storm,1921,3.2,9062).

Tuple is an ordered set of fields.
 (3,Orphans of the Storm,1921,3.2,9062).

A bag is a collection of Tuples
Representation of bag is
{(3,Orphans of the Storm,1921,3.2,9062)}

Now we want to get the count of the tuples or the values of key.

movieCountOfYear = foreach groupByYear generate $0, COUNT($1);

$0 represents the key and $1 represents the value.

To verify the result call DUMP.

DUMP movieCountOfYear;

(1921,1)
(1929,1)
(1932,1)
(1963,1)
(1985,1)
(1991,1)
(1993,1)
(1994,2)
(1995,1)

In the result set you can see by the year and the respective movie count of it.