-- Register Advance Matching Module[AMM] Hive UDF jar
ADD JAR <Directory path>/amm.hive.${project.version}.jar;
-- Provide alias to UDF class (optional). String in quotes represent class names needed for this job to run.
CREATE TEMPORARY FUNCTION rowid as 'com.pb.bdq.hive.common.RowIDGeneratorUDF';
-- This rowid is needed by Duplicate Synchronization to maintain the order of rows while creating groups. This is a UDF (User Defined Function) and associates an incremental unique integer number to each row of the data.
CREATE TEMPORARY FUNCTION dupsync as 'com.pb.bdq.amm.process.hive.consolidation.duplicatesync.DuplicateSyncUDAF';
-- Duplicate Sync is implemented as a UDAF (User Defined Aggregation function). It processes one group of rows at a time and generates the result for that group of rows.
-- Disable map side aggregation
set hive.map.aggr = false;
-- Set the rule using configuration property 'pb.bdq.consolidation.rule'
set pb.bdq.consolidation.rule={"consolidationConditions": [{"consolidationRule":
{"conditionClass":"conjoinedRule", "joinType":"AND", "consolidationRules":[{"conditionClass":"simpleRule", "operation":"HIGHEST", "fieldName":"column2", "value":null, "valueFromField":false, "valueNumeric":true}]},
"actions":[{"accumulate":false, "copyFromField":true, "sourceData":"column5", "destinationFieldName":"column5"}]}]};
-- Set header (along with the id field alias used in the query) using configuration property 'pb.bdq.consolidation.header'
set pb.bdq.consolidation.header=column1,column2,column3,column4,column5,id;
-- Set sort field name to alias used in query using configuration property 'pb.bdq.consolidation.sort.field'
set pb.bdq.consolidation.sort.field=id;
-- Execute Query on the desired table. The query uses a UDF rowid, which must be present in the query to maintain the ordering of the data while reading.
-- Duplicate Sync returns a list of map containing <key=value> pairs. Each map in the list corresponds to a row in the group. The below query explodes that list of map and fetches fields from map by keys.
SELECT tmp2.record["column1"],
tmp2.record["column2"],
tmp2.record["column3"],
tmp2.record["column4"],
tmp2.record["column5"]
FROM (
SELECT dupsync (innerRowID.column1,
innerRowID.column2,
innerRowID.column3,
innerRowID.column4,
innerRowID.column5,
innerRowID.id
) AS matchgroup
FROM (
SELECT column1, column2, column3, column4, column5, rowid(*)
AS id
FROM databob
) innerRowID
GROUP BY column3
) AS innerResult
LATERAL VIEW explode(innerResult.matchgroup) tmp2 AS record ;
-- Query to dump the output to a file
INSERT OVERWRITE LOCAL DIRECTORY '/home/hadoop/dupsync/' ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' collection items terminated by '||' map keys terminated by ':'
SELECT tmp2.record["column1"],
tmp2.record["column2"],
tmp2.record["column3"],
tmp2.record["column4"],
tmp2.record["column5"]
FROM (
SELECT dupsync( innerRowID.column1,
innerRowID.column2,
innerRowID.column3,
innerRowID.column4,
innerRowID.column5,
innerRowID.id
) AS matchgroup
FROM (
SELECT column1, column2, column3, column4, column5, rowid(*)
AS id
FROM databob
) innerRowID
GROUP BY column3 ) AS innerResult
LATERAL VIEW explode(innerResult.matchgroup) tmp2 AS record ;
--sample input data
--+----------+----------+----------+----------+----------+
--| column1 | column2 | column3 | column4 | column5 |
--+----------+----------+----------+----------+----------+
--| Duplicate| 87 | 1 | |ANNA ABNEY|
--| Duplicate| 77 | 1 | |ANNA A ANN|
--| Suspect | | 1 | |ANNA A ABN|
--+----------+----------+----------+----------+----------+
--sample output data
--+----------+----------+----------+----------+----------+
--| column1 | column2 | column3 | column4 | column5 |
--+----------+----------+----------+----------+----------+
--| Duplicate| 87 | 1 | |ANNA ABNEY|
--| Duplicate| 77 | 1 | |ANNA A ANN|
--| Suspect | | 1 | |ANNA ABNEY|
--+----------+----------+----------+----------+----------+