メインコンテンツへスキップ

Data Contract CLI の実装を読み解く

·
tech data
目次

これは datatech-jp Advent Calendar 2024 の20日目の記事です。

こんにちは chanyou です。

先日、 datatech-jp の Data Contract事例共有会 というイベントに登壇させていただきました。 Data Contract の導入の難しさを語り合う貴重な機会でした。

イベントではそこまで触れていなかったですが Data Contract CLI というオープンソースのツールがあります。 Data Contract CLI を使うことで、Data Contract の YAML ファイルの読み書きやカタログ生成が容易に行えます。

Data Contract CLI の実装を読む機会がありましたので、この記事で触れていきたいと思います。

Data Contract CLI とは?
#

ドキュメントは以下から確認できます。

https://cli.datacontract.com

ソースコードは以下で、MIT ライセンスで公開されています。

datacontract/datacontract-cli

Enforce Data Contracts

Python
522
100

Data Contract CLI の基本的な使い方
#

インストール
#

pip でインストールできます。

pip install 'datacontract-cli'

依存関係が切り出されており、使いたい機能に応じて追加モジュールをインストールできます。

pip install datacontract-cli[bigquery]

インストールすると datacontract コマンドが使えるようになっています。

$ datacontract
                                                                                                                                                                                  
 Usage: datacontract [OPTIONS] COMMAND [ARGS]...                                                                                                                                  
                                                                                                                                                                                  
 The datacontract CLI is an open source command-line tool for working with Data Contracts (https://datacontract.com).                                                             
 It uses data contract YAML files to lint the data contract, connect to data sources and execute schema and quality tests, detect breaking changes, and export to different       
 formats.                                                                                                                                                                         
                                                                                                                                                                                  
╭─ Options ──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╮
│ --version          Prints the current version.                                                                                                                                 │
│ --help             Show this message and exit.                                                                                                                                 │
╰────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯
╭─ Commands ─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╮
│ init        Download a datacontract.yaml template and write it to file.                                                                                                        │
│ lint        Validate that the datacontract.yaml is correctly formatted.                                                                                                        │
test        Run schema and quality tests on configured servers.                                                                                                                │
export      Convert data contract to a specific format. Saves to file specified by `output` option if present, otherwise prints to stdout.                                     │
│ import      Create a data contract from the given source location. Saves to file specified by `output` option if present, otherwise prints to stdout.                          │
│ publish     Publish the data contract to the Data Mesh Manager.                                                                                                                │
│ catalog     Create an html catalog of data contracts.                                                                                                                          │
│ breaking    Identifies breaking changes between data contracts. Prints to stdout.                                                                                              │
│ changelog   Generate a changelog between data contracts. Prints to stdout.                                                                                                     │
│ diff        PLACEHOLDER. Currently works as 'changelog' does.                                                                                                                  │
│ serve       Start the datacontract web server.                                                                                                                                 │
╰────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯

できることは出力の通りですが、一通り触ってみましょう。

Data Contract を生成する
#

init で Data Contract ファイルのテンプレートを生成できます。

$ datacontract init
📄 data contract written to datacontract.yaml

import で dbt のモデルファイルや dbml から Data Contract ファイルを生成することができます。

datacontract import --format dbml --source sample.dbml

権限を渡せば、BigQuery のテーブルなどのクラウドサービス上のリソースから Data Contract ファイルを生成することもできます。

Data Contract から出力する
#

export で Data Contract ファイルからコードベースやスキーマファイルなどの多様な形式での出力が可能です。

$ datacontract export --format sql  
-- Data Contract: urn:datacontract:checkout:orders-latest
-- SQL Dialect: snowflake
CREATE TABLE orders (
  order_id TEXT not null,
  order_timestamp TIMESTAMP_TZ not null,
  order_total NUMBER not null,
  customer_id TEXT,
  customer_email_address TEXT not null,
  processed_timestamp TIMESTAMP_TZ not null
);
CREATE TABLE line_items (
  line_item_id TEXT not null,
  order_id TEXT,
  sku TEXT
);

変更ログを出力する
#

changelog で Data Contract の差分を出力できます。

$ datacontract changelog datacontract_old.yaml datacontract_new.yaml 
1 changes: 0 error, 0 warning, 1 info
info    [model_added] at datacontract_new.yaml
        in models.line_items
            added the model

breaking で2つの Data Contract に破壊的変更があった場合に終了ステータスを1で返します。

Data Contract をコード管理しており、 CI で破壊的変更が発生した場合にマージさせたくない場合に有用だと思います。

カタログを出力する
#

catalog で Data Contract から HTML ファイルを生成できます。

$ datacontract catalog

$ tree catalog 
catalog
├── datacontract.html
└── index.html

1 directory, 2 files

出力された HTML ファイルのサンプルは以下です。

https://datacontract.com/examples/orders-latest/datacontract.html

テストを実行する
#

test で Data Contract の情報をもとにテストを実施できます。

$ datacontract test
Testing datacontract.yaml
╭────────┬─────────────────────────────────────────────────────────────────────┬───────────────────────────────┬─────────────────────────────╮
│ Result │ Check                                                               │ Field                         │ Details                     │
├────────┼─────────────────────────────────────────────────────────────────────┼───────────────────────────────┼─────────────────────────────┤
│ passed │ Check that JSON has valid schema                                    │ line_items                    │ All JSON entries are valid. │
│ passed │ Check that field line_item_id is present                            │ line_items                    │                             │
│ passed │ Check that field order_id is present                                │ line_items                    │                             │
│ passed │ Check that field sku is present                                     │ line_items                    │                             │
│ passed │ Check that required field line_item_id has no null values           │ line_items.line_item_id       │                             │
│ passed │ Check that field sku matches regex pattern ^[A-Za-z0-9]{8,14}$      │ line_items.sku                │                             │
│ passed │ Check that JSON has valid schema                                    │ orders                        │ All JSON entries are valid. │
│ passed │ Check that field order_id is present                                │ orders                        │                             │
│ passed │ Check that field order_timestamp is present                         │ orders                        │                             │
│ passed │ Check that field order_total is present                             │ orders                        │                             │
│ passed │ Check that field customer_id is present                             │ orders                        │                             │
│ passed │ Check that field customer_email_address is present                  │ orders                        │                             │
│ passed │ Check that field processed_timestamp is present                     │ orders                        │                             │
│ passed │ Check that required field customer_email_address has no null values │ orders.customer_email_address │                             │
│ passed │ Check that field customer_id has a min length of 10                 │ orders.customer_id            │                             │
│ passed │ Check that field customer_id has a max length of 20                 │ orders.customer_id            │                             │
│ passed │ Check that required field order_id has no null values               │ orders.order_id               │                             │
│ passed │ Check that unique field order_id has no duplicate values            │ orders.order_id               │                             │
│ passed │ Check that required field order_timestamp has no null values        │ orders.order_timestamp        │                             │
│ passed │ Check that required field order_total has no null values            │ orders.order_total            │                             │
│ passed │ Check that required field processed_timestamp has no null values    │ orders.processed_timestamp    │                             │
╰────────┴─────────────────────────────────────────────────────────────────────┴───────────────────────────────┴─────────────────────────────╯
🟢 data contract is valid. Run 21 checks. Took 38.441876 seconds.

その他
#

この他にも以下が可能です。

  • lint による Data Contract ファイルのバリデーション
  • publish による Data Mesh Manager への公開

実装を読み解く
#

一通り Data Contract CLI を触ったところで、実装を見ていきましょう。

Data Contract CLI は Python で実装されています。

Typer が採用されている
#

CLI 構築ライブラリとして Typer が採用されています。

fastapi/typer

Typer, build great CLIs. Easy to code. Based on Python type hints.

Python
16054
682

Typer とは FastAPI of CLIs とも呼ばれている、開発体験の良い CLI 構築ライブラリです。

FastAPI の兄弟ライブラリということもあって、型ヒントによる直感的な実装が可能となっています。

インターフェイス部分は datacontract/cli.py に定義されているので、このファイルから処理を追えば読み解きやすいです。

例えば、以下は init の help 出力とサブコマンドの実装です。

$ datacontract init --help          
                                                                                                                                                                                  
 Usage: datacontract init [OPTIONS] [LOCATION]                                                                                                                                    
                                                                                                                                                                                  
 Download a datacontract.yaml template and write it to file.                                                                                                                      
                                                                                                                                                                                  
╭─ Arguments ────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╮
│   location      [LOCATION]  The location (url or path) of the data contract yaml to create. [default: datacontract.yaml]╰────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯
╭─ Options ──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╮
│ --template                       TEXT  URL of a template or data contract [default: https://datacontract.com/datacontract.init.yaml]│ --overwrite    --no-overwrite          Replace the existing datacontract.yaml [default: no-overwrite]│ --help                                 Show this message and exit.                                                                                                             │
╰────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯
@app.command()
def init(
    location: Annotated[
        str,
        typer.Argument(help="The location (url or path) of the data contract yaml to create."),
    ] = "datacontract.yaml",
    template: Annotated[
        str, typer.Option(help="URL of a template or data contract")
    ] = "https://datacontract.com/datacontract.init.yaml",
    overwrite: Annotated[bool, typer.Option(help="Replace the existing datacontract.yaml")] = False,
):
    """
    Download a datacontract.yaml template and write it to file.
    """
    try:
        download_datacontract_file(location, template, overwrite)
    except FileExistsException:
        console.print("File already exists, use --overwrite to overwrite")
        raise typer.Exit(code=1)
    else:
        console.print("📄 data contract written to " + location)

コマンド引数が init 関数の引数として定義されていて typer.Argument でヘルプテキストの定義が行われています。 docstring もコマンドの概要として help に出力されていますね。

Data Contract ファイルをダウンロードする処理は download_datacontract_file 関数で定義されているようです。

シンプルに requests で指定された URL から取得する実装でした。

def download_datacontract_file(file_path: str, from_url: str, overwrite_file: bool):
    if not overwrite_file and os.path.exists(file_path):
        raise FileExistsException()

    with requests.get(from_url) as response:
        response.raise_for_status()
        with open(file_path, "w") as f:
            f.write(response.text)

https://github.com/datacontract/datacontract-cli/blob/main/datacontract/init/download_datacontract_file.py

型ヒントがしっかり定義されていて読みやすいです。

import を読み解く
#

init の例はシンプルでしたが、Data Contract ファイルのパースが必要そうな import はどうでしょうか。

@app.command(name="import")
def import_(...):  # 引数は長いので省略
    result = DataContract().import_from_source(
        format=format,
        source=source,
        glue_table=glue_table,
        bigquery_table=bigquery_table,
        bigquery_project=bigquery_project,
        bigquery_dataset=bigquery_dataset,
        unity_table_full_name=unity_table_full_name,
        dbt_model=dbt_model,
        dbml_schema=dbml_schema,
        dbml_table=dbml_table,
        iceberg_table=iceberg_table,
    )
    if output is None:
        console.print(result.to_yaml(), markup=False, soft_wrap=True)
    else:
        with output.open("w") as f:
            f.write(result.to_yaml())
        console.print(f"Written result to {output}")

DataContract().import_from_source() が呼ばれているようです。その返り値に対して to_yaml() メソッドを実行して出力していますね。

DataContract クラスは datacontract/data_contract.py で定義されています。

class DataContract:
    ...

    def import_from_source(
        self, format: str, source: typing.Optional[str] = None, **kwargs
    ) -> DataContractSpecification:
        data_contract_specification_initial = DataContract.init()

        return importer_factory.create(format).import_source(
            data_contract_specification=data_contract_specification_initial, source=source, import_args=kwargs
        )

import_from_source() メソッドは DataContractSpecification を返しているようです。

これは datacontract/model/data_contract_specification.py で定義されている Pydantic の BaseModel を継承したクラスです。 Pydantic の説明は割愛しますが、これを使うことによって型ヒントによるフィールドのバリデーションや入出力を容易に行うことができます。

class DataContractSpecification(pyd.BaseModel):
    dataContractSpecification: str = None
    id: str = None
    info: Info = None
    servers: Dict[str, Server] = {}
    terms: Terms = None
    models: Dict[str, Model] = {}
    definitions: Dict[str, Definition] = {}
    examples: List[Example] = pyd.Field(
        default_factory=list,
        deprecated="Removed in Data Contract Specification " "v1.1.0. Use models.examples instead.",
    )
    quality: DeprecatedQuality = pyd.Field(
        default=None,
        deprecated="Removed in Data Contract Specification v1.1.0. Use " "model-level and field-level quality instead.",
    )
    servicelevels: Optional[ServiceLevel] = None
    links: Dict[str, str] = {}
    tags: List[str] = []

    @classmethod
    def from_file(cls, file):
        if not os.path.exists(file):
            raise FileNotFoundError(f"The file '{file}' does not exist.")
        with open(file, "r") as file:
            file_content = file.read()
        return DataContractSpecification.from_string(file_content)

    @classmethod
    def from_string(cls, data_contract_str):
        data = yaml.safe_load(data_contract_str)
        return DataContractSpecification(**data)

    def to_yaml(self):
        return yaml.dump(
            self.model_dump(exclude_defaults=True, exclude_none=True, by_alias=True),
            sort_keys=False,
            allow_unicode=True,
        )

つまり Data Contract ファイルのパースには Pydantic の肩の上に乗っかっていたわけですね。

DataContract().import_from_source() の実装に戻ります。

importer_factory.create(format).import_source(...) とあり、ファクトリから DataContractSpecification が生成されていそうな気配を感じます。

datacontract/imports ディレクトリに各 Importer の実装がなされています。興味や馴染のある Importer を覗いてみてください。

Impoerter の実装まで踏み込むと長くなってしまうので触れませんが、どの Importer もライブラリでメタデータを読むなどして DataContractSpecification を生成しています。

export を読み解く
#

import と同様に export を軽くみてみましょう。datacontract/cli.py から見ていきます。

Data Contract のパスを受け取って、結果を標準出力なりファイルなりに書き出しています。

@app.command()
def export(...):  # 引数は長いので省略
    """
    Convert data contract to a specific format. Saves to file specified by `output` option if present, otherwise prints to stdout.
    """
    # TODO exception handling
    result = DataContract(data_contract_file=location, schema_location=schema, server=server).export(
        export_format=format,
        model=model,
        server=server,
        rdf_base=rdf_base,
        sql_server_type=sql_server_type,
        engine=engine,
    )
    # Don't interpret console markup in output.
    if output is None:
        console.print(result, markup=False, soft_wrap=True)
    else:
        with output.open("w") as f:
            f.write(result)
        console.print(f"Written result to {output}")

出力処理は DataContract().export() で実行されているようです。

datacontract/data_contract.pyexport() メソッドが定義されています。

class DataContract:
    ...

    def export(self, export_format: ExportFormat, model: str = "all", sql_server_type: str = "auto", **kwargs) -> str:
        data_contract = resolve.resolve_data_contract(
            self._data_contract_file,
            self._data_contract_str,
            self._data_contract,
            schema_location=self._schema_location,
            inline_definitions=self._inline_definitions,
            inline_quality=self._inline_quality,
        )

        return exporter_factory.create(export_format).export(
            data_contract=data_contract,
            model=model,
            server=self._server,
            sql_server_type=sql_server_type,
            export_args=kwargs,
        )

import と同様にフォーマットごとに Exporter が定義されており、ファクトリを経由して出力していますね。

ここまでくれば Expoter の実装を読んでいけば OK です。

dbt の Exporter であれば datacontract/export/dbt_converter.py で以下のように実装にされています。

class DbtExporter(Exporter):
    def export(self, data_contract, model, server, sql_server_type, export_args) -> dict:
        return to_dbt_models_yaml(data_contract)


def to_dbt_models_yaml(data_contract_spec: DataContractSpecification):
    dbt = {
        "version": 2,
        "models": [],
    }
    for model_key, model_value in data_contract_spec.models.items():
        dbt_model = _to_dbt_model(model_key, model_value, data_contract_spec)
        dbt["models"].append(dbt_model)
    return yaml.dump(dbt, indent=2, sort_keys=False, allow_unicode=True)

DataContractSpecification から YAML を生成しているシンプルな作りとなっています。

まとめ
#

この記事では Data Contract CLI の一通りの機能に触れながら、基本となる実装を読み解きました。

Typer と Pydantic のフレームワークに乗っかった、型ヒントが整備されたモダンな Python ライブラリという印象を受けました。

自分はまだ Data Contract CLI にコントリビュートできていませんが、 TODO コメントが散見されるなどコントリビュートしがいのあるプロジェクトです。

Data Contract による堅牢なデータ基盤を作るためにも、使うだけでなく開発にも関わっていきたいと思います。

このエントリーをはてなブックマークに追加

関連記事

connpass API ではじめるデータ分析基盤入門
tech data
これは 呉高専 Advent Calendar 2022 - Adventar の14日目の記事です。
Palworld サーバーを Kubernetes で立ててみた
tech homelab
だいぶ前にこういうポストをしていて、諸々環境が整ってきたので実際にやってみました。
Kubernetes でブログを配信するようにした
tech homelab
Kubespray 使ってみた # 呉高専 Advent Calendar 2022 の14日目の記事を担当しているんだけど、せっかくなのでこのブログにポストしたい。