Filter Spark ジョブの使用

  1. AdvanceMatchFactory のインスタンスを、その静的メソッド getInstance() を使用して作成します。
  2. Filter ジョブの入力と出力の詳細を指定します。以下の手順に従って、FilterDetail を指定する ProcessType のインスタンスを作成することによって、これを行います。このインスタンスは、SparkProcessType タイプを使用する必要があります。
    1. GroupbyOption のインスタンスを作成することによって、レコードのグループ化に使用する列を指定します。
      GroupbySparkOption のインスタンスを使用して、Group-By 列を指定します。
    2. FilterConfiguration のインスタンスを作成することによって、ジョブの統合ルールを生成します。このインスタンスの中で、ConsolidationCondition のインスタンスを使用して統合条件を定義し、論理演算子を使用して条件を組み合わせます。
      ConsolidationCondition の各インスタンスは、ConsolidationRule インスタンスとそれに対応する ConsolidationAction インスタンスを使用して定義されます。
      注: ConsolidationRule の各インスタンスは、SimpleRule の 1 つのインスタンスによって定義するか、または、子の SimpleRule インスタンスとネストされた ConjoinedRule インスタンスが、論理演算子で結合された階層を使用して定義できます。列挙 JoinType、および列挙 Operationを参照してください。
    3. FilterDetail のインスタンスを作成します。JobConfig タイプのインスタンスと、上で作成した GroupbyOption インスタンスおよび FilterConfiguration インスタンスを、コンストラクタの引数として渡します。
      JobConfig パラメータは、SparkJobConfig タイプのインスタンスである必要があります。
    4. inputPath インスタンスの FilterDetail フィールドを使用して、入力ファイルの詳細を設定します。
      • テキスト入力ファイルの場合は、適切なコンストラクタを呼び出して、関連する詳細な入力ファイル情報を指定してFilePath のインスタンスを作成します。
      • ORC 入力ファイルの場合、ORC 入力ファイルのパスを引数に指定して OrcFilePath のインスタンスを作成します。
      • PARQUET 入力ファイルの場合、PARQUET 入力ファイルのパスを引数に指定して ParquetFilePath のインスタンスを作成します。
    5. FilterDetail インスタンスの outputPath フィールドを使用して、出力ファイルの詳細を設定します。
      • テキスト出力ファイルの場合は、適切なコンストラクタを呼び出して、関連する詳細な出力ファイル情報を指定してFilePath のインスタンスを作成します。
      • ORC 出力ファイルの場合、ORC 出力ファイルのパスを引数に指定して OrcFilePath のインスタンスを作成します。
      • PARQUET 出力ファイルの場合、PARQUET 出力ファイルのパスを引数に指定して ParquetFilePath のインスタンスを作成します。
    6. jobName インスタンスの FilterDetail フィールドを使用して、ジョブの名前を設定します。
    7. FilterDetail インスタンスの compressOutput フラグを true に設定して、ジョブの出力を圧縮します。
  3. Spark ジョブを作成して実行するには、先ほど作成した AdvanceMatchFactory のインスタンスを使用してそのメソッド runSparkJob() を呼び出します。ここで、上の FilterDetail のインスタンスを引数として渡します。
    runSparkJob() メソッドはジョブを実行し、ジョブのレポート カウンタの Map を返します。
  4. カウンタを表示することにより、ジョブに対する統計レポートを表示します。