-- Register Advance Matching Module[AMM] Hive UDF jar
ADD JAR <Directory path>/amm.hive.${project.version}.jar;
-- Provide alias to UDF class (optional).
String in quotes represent class names needed for this job to run.
CREATE TEMPORARY FUNCTION rowid as 'com.pb.bdq.hive.common.RowIDGeneratorUDF';
-- Filter is implemented as a UDAF (User Defined Aggregation function).
It processes one group of rows at a time based on join
field and generates the result for that group of rows.
CREATE TEMPORARY FUNCTION filter as
'com.pb.bdq.amm.process.hive.consolidation.filter.FilterUDAF';
-- This rowid is needed by filter to maintain the order of
rows while creating groups. This is a UDF (User Defined Function)
and associates an incremental unique integer number to each row of the data.
-- Disable map side aggregation
set hive.map.aggr = false;
-- Set the rule using configuration property 'hivevar:rule'
set hivevar:rule='{"consolidationConditions": [{"consolidationRule":
{"conditionClass":"simpleRule", "operation":"HIGHEST", "fieldName":
"column2", "value":null, "valueFromField":false, "valueNumeric":true},
"actions":[]}], "removeDuplicates":true}';
-- Set header (along with the id field alias used in the query)
using configuration property 'hivevar:header'
set hivevar:header='column1,column2,column3,column4,column5,id';
-- Set sort field name to alias used in query using
configuration property 'hivevar:sortfield'
set hivevar:sortfield='id';
-- Execute Query on the desired table. The query uses a UDF rowid,
which must be present in the query to maintain the ordering of the data while reading.
SELECT tmp2.record["column1"],
tmp2.record["column2"],
tmp2.record["column3"],
tmp2.record["column4"],
tmp2.record["column5"]
FROM (
SELECT filter (${hivevar:rule},
${hivevar:sortfield},
${hivevar:header},
innerRowID.column1,
innerRowID.column2,
innerRowID.column3,
innerRowID.column4,
innerRowID.column5,
innerRowID.id
) AS matchgroup
FROM (
SELECT column1, column2, column3, column4, column5, rowid(*)
AS id
FROM data
) innerRowID
GROUP BY column3
) AS innerResult
LATERAL VIEW explode(innerResult.matchgroup) tmp2 AS record ;
-- Query to dump the output to a file
INSERT OVERWRITE LOCAL DIRECTORY '/home/hadoop/HiveUDF/filter/'
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
collection items terminated by '||' map keys terminated by ':'
SELECT tmp2.record["column1"],
tmp2.record["column2"],
tmp2.record["column3"],
tmp2.record["column4"],
tmp2.record["column5"]
FROM (
SELECT filter (innerRowID.column1,
innerRowID.column2,
innerRowID.column3,
innerRowID.column4,
innerRowID.column5,
innerRowID.id
) AS matchgroup
FROM (
SELECT column1, column2, column3, column4, column5, rowid(*)
AS id
FROM data
) innerRowID
GROUP BY column3
) AS innerResult
LATERAL VIEW explode(innerResult.matchgroup) tmp2 AS record ;
--sample input data
--+----------+----------+----------+----------+----------+
--| column1 | column2 | column3 | column4 | column5 |
--+----------+----------+----------+----------+----------+
--| Duplicate| 80 | 98 | | EUNICE L |
--| Suspect | | 98 | | ERIC L BR|
--+----------+----------+----------+----------+----------+
--sample output data
--+----------+----------+----------+----------+----------+
--| column1 | column2 | column3 | column4 | column5 |
--+----------+----------+----------+----------+----------+
--| Suspect | | 98 | | ERIC L BR|
--+----------+----------+----------+----------+----------+