-- Register Advance Matching Module[AMM] Hive UDF jar
ADD JAR <Directory path>/amm.hive.${project.version}.jar;
-- Provide alias to UDF class (optional). String in quotes represent class names needed for this job to run.
CREATE TEMPORARY FUNCTION rowid as 'com.pb.bdq.hive.common.RowIDGeneratorUDF';
-- This rowid is needed by Filter to maintain the order of rows while creating groups. This is a UDF (User Defined Function) and associates an incremental unique integer number to each row of the data.
CREATE TEMPORARY FUNCTION filter as 'com.pb.bdq.amm.process.hive.consolidation.filter.FilterUDAF';
-- Filter is implemented as a UDAF (User Defined Aggregation function). It processes one group of rows at a time and generates the result for that group of rows.
-- Disable map side aggregation
set hive.map.aggr = false;
-- Set the rule using configuration property 'pb.bdq.consolidation.rule'
set pb.bdq.consolidation.rule={"consolidationConditions": [{"consolidationRule":{"conditionClass":"simpleRule", "operation":"HIGHEST", "fieldName":"column2", "value":null, "valueFromField":false, "valueNumeric":true}, "actions":[]}], "removeDuplicates":true};
-- Set header (along with the id field alias used in the query) using configuration property 'pb.bdq.consolidation.header'
set pb.bdq.consolidation.header=column1,column2,column3,column4,column5,id;
-- Set sort field name to alias used in query using configuration property 'pb.bdq.consolidation.sort.field'
set pb.bdq.consolidation.sort.field=id;
-- Execute Query on the desired table. The query uses a UDF rowid, which must be present in the query to maintain the ordering of the data while reading.
SELECT tmp2.record["column1"],
tmp2.record["column2"],
tmp2.record["column3"],
tmp2.record["column4"],
tmp2.record["column5"]
FROM (
SELECT filter (innerRowID.column1,
innerRowID.column2,
innerRowID.column3,
innerRowID.column4,
innerRowID.column5,
innerRowID.id
) AS matchgroup
FROM (
SELECT column1, column2, column3, column4, column5, rowid(*)
AS id
FROM data
) innerRowID
GROUP BY column3
) AS innerResult
LATERAL VIEW explode(innerResult.matchgroup) tmp2 AS record ;
-- Query to dump the output to a file
INSERT OVERWRITE LOCAL DIRECTORY '/home/hadoop/HiveUDF/filter/'
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
collection items terminated by '||' map keys terminated by ':'
SELECT tmp2.record["column1"],
tmp2.record["column2"],
tmp2.record["column3"],
tmp2.record["column4"],
tmp2.record["column5"]
FROM (
SELECT filter (innerRowID.column1,
innerRowID.column2,
innerRowID.column3,
innerRowID.column4,
innerRowID.column5,
innerRowID.id
) AS matchgroup
FROM (
SELECT column1, column2, column3, column4, column5, rowid(*)
AS id
FROM data
) innerRowID
GROUP BY column3
) AS innerResult
LATERAL VIEW explode(innerResult.matchgroup) tmp2 AS record ;
--sample input data
--+----------+----------+----------+----------+----------+
--| column1 | column2 | column3 | column4 | column5 |
--+----------+----------+----------+----------+----------+
--| Duplicate| 80 | 98 | | EUNICE L |
--| Suspect | | 98 | | ERIC L BR|
--+----------+----------+----------+----------+----------+
--sample output data
--+----------+----------+----------+----------+----------+
--| column1 | column2 | column3 | column4 | column5 |
--+----------+----------+----------+----------+----------+
--| Suspect | | 98 | | ERIC L BR|
--+----------+----------+----------+----------+----------+