Advanced Matching モジュールの Hive UDF の使用

Hive UDF の各ジョブを実行する場合は、Hive クライアントでの以下のステップの個別実行を 1 つのセッションの範囲内で行うことも、必要なすべてのステップをまとめた HQL ファイルを作成してそれを一度に実行することもできます。

  1. Hive クライアントで、必要な Hive データベースにログインします。
  2. Spectrum™ Data & Address Quality for Big Data SDK AMM モジュールの JAR ファイルを登録します。
    ADD JAR <Directory path>/amm.hive.${project.version}.jar;
  3. 実行するデータ品質ジョブの Hive UDF のエイリアスを作成します。
    注: 引用符で囲まれている文字列は、このジョブの実行に必要なクラス名です。
    例:
    CREATE TEMPORARY FUNCTION bestofbreed as 'com.pb.bdq.amm.process.hive.consolidation.bestofbreed.BestOfBreedUDAF';
  4. 次の例に示すように、Hive.Map.Aggr 環境変数の設定を false にして、Reducer と Mapper の間でのデータの集約をオフにします。
    set hive.map.aggr = false;
    注: この設定はすべての UDF で必要です。
  5. ジョブの環境設定と詳細情報を指定して、それぞれの変数または設定プロパティに代入します。
    注: ルールは JSON 形式である必要があります。

    例を次に示します。

    set hivevar:rule='{"consolidationConditions": [{"consolidationRule":{"conditionClass":"simpleRule",
    "operation":"HIGHEST", "fieldName":"column2", "value":null, "valueFromField":false, "valueNumeric":true},
    "actions":[]}], "removeDuplicates":true}';
    注: それぞれのジョブ環境設定の設定プロパティを使用してください。例えば、それぞれのサンプル HQL ファイルに記載されている pb.bdq.match.rulepb.bdq.match.express.columnpb.bdq.consolidation.sort.field などです。
  6. 入力テーブルのヘッダー フィールドをカンマ区切り形式で指定し、変数または設定プロパティに割り当てます。
    set hivevar:header ='column1,column2,column3,column4,column5,id';
    注: 記載されている設定プロパティを使用してください。例えば、それぞれのサンプル HQL ファイルに記載されている pb.bdq.match.headerpb.bdq.consolidation.header などです。
  7. 環境設定プロパティ 'hivevar:sortfield' を使用して、ソートのパラメータをクエリで使用するエイリアスに設定します。
    set hivevar:sortfield='id';
  8. ジョブを実行してジョブ出力をコンソールに表示するには、次の例に示すようにクエリを記述します。
    SELECT tmp2.record["column1"],
    	tmp2.record["column2"],
    	tmp2.record["column3"],
    	tmp2.record["column4"],
    	tmp2.record["column5"]
    FROM (
    	SELECT  filter (${hivevar:rule},
    			${hivevar:sortfield},
    			${hivevar:header},
    			innerRowID.column1,
    			innerRowID.column2,
    			innerRowID.column3,
    			innerRowID.column4,
    			innerRowID.column5,
    			innerRowID.id
    	) AS matchgroup  
    	FROM (
    		SELECT column1, column2, column3, column4, column5, rowid(*) 
    		AS id 
    		FROM data
    		) innerRowID 
    	GROUP BY column3 
    	) AS innerResult 
    LATERAL VIEW explode(innerResult.matchgroup) tmp2 AS record ;
    ジョブを実行してジョブ出力を指定されたファイルに書き出すには、以下の例に示すようにクエリを記述します。
    INSERT OVERWRITE LOCAL DIRECTORY '/home/hadoop/HiveUDF/filter/' 
    ROW FORMAT DELIMITED 
    FIELDS TERMINATED BY ',' 
    collection items terminated by '||' map keys terminated by ':'
    SELECT tmp2.record["column1"],
    	tmp2.record["column2"],
    	tmp2.record["column3"],
    	tmp2.record["column4"],
    	tmp2.record["column5"]
    FROM (
    	SELECT  filter (innerRowID.column1,
    			innerRowID.column2,
    			innerRowID.column3,
    			innerRowID.column4,
    			innerRowID.column5,
    			innerRowID.id
    	) AS matchgroup  
    	FROM (
    		SELECT column1, column2, column3, column4, column5, rowid(*) 
    		AS id 
    		FROM data
    		) innerRowID 
    	GROUP BY column3 
    	) AS innerResult 
    LATERAL VIEW explode(innerResult.matchgroup) tmp2 AS record ;
    注: 先ほど UDF に対して定義したエイリアスを使用してください。