これは datatech-jp Advent Calendar 2024 の20日目の記事です。
こんにちは chanyou です。
先日、 datatech-jp の Data Contract事例共有会 というイベントに登壇させていただきました。 Data Contract の導入の難しさを語り合う貴重な機会でした。
イベントではそこまで触れていなかったですが Data Contract CLI というオープンソースのツールがあります。 Data Contract CLI を使うことで、Data Contract の YAML ファイルの読み書きやカタログ生成が容易に行えます。
Data Contract CLI の実装を読む機会がありましたので、この記事で触れていきたいと思います。
Data Contract CLI とは? #
ドキュメントは以下から確認できます。
ソースコードは以下で、MIT ライセンスで公開されています。
Enforce Data Contracts
Data Contract CLI の基本的な使い方 #
インストール #
pip でインストールできます。
pip install 'datacontract-cli'
依存関係が切り出されており、使いたい機能に応じて追加モジュールをインストールできます。
pip install datacontract-cli[bigquery]
インストールすると datacontract コマンドが使えるようになっています。
$ datacontract
Usage: datacontract [OPTIONS] COMMAND [ARGS]...
The datacontract CLI is an open source command-line tool for working with Data Contracts (https://datacontract.com).
It uses data contract YAML files to lint the data contract, connect to data sources and execute schema and quality tests, detect breaking changes, and export to different
formats.
╭─ Options ──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╮
│ --version Prints the current version. │
│ --help Show this message and exit. │
╰────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯
╭─ Commands ─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╮
│ init Download a datacontract.yaml template and write it to file. │
│ lint Validate that the datacontract.yaml is correctly formatted. │
│ test Run schema and quality tests on configured servers. │
│ export Convert data contract to a specific format. Saves to file specified by `output` option if present, otherwise prints to stdout. │
│ import Create a data contract from the given source location. Saves to file specified by `output` option if present, otherwise prints to stdout. │
│ publish Publish the data contract to the Data Mesh Manager. │
│ catalog Create an html catalog of data contracts. │
│ breaking Identifies breaking changes between data contracts. Prints to stdout. │
│ changelog Generate a changelog between data contracts. Prints to stdout. │
│ diff PLACEHOLDER. Currently works as 'changelog' does. │
│ serve Start the datacontract web server. │
╰────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯
できることは出力の通りですが、一通り触ってみましょう。
Data Contract を生成する #
init
で Data Contract ファイルのテンプレートを生成できます。
$ datacontract init
📄 data contract written to datacontract.yaml
import
で dbt のモデルファイルや dbml から Data Contract ファイルを生成することができます。
datacontract import --format dbml --source sample.dbml
権限を渡せば、BigQuery のテーブルなどのクラウドサービス上のリソースから Data Contract ファイルを生成することもできます。
Data Contract から出力する #
export
で Data Contract ファイルからコードベースやスキーマファイルなどの多様な形式での出力が可能です。
$ datacontract export --format sql
-- Data Contract: urn:datacontract:checkout:orders-latest
-- SQL Dialect: snowflake
CREATE TABLE orders (
order_id TEXT not null,
order_timestamp TIMESTAMP_TZ not null,
order_total NUMBER not null,
customer_id TEXT,
customer_email_address TEXT not null,
processed_timestamp TIMESTAMP_TZ not null
);
CREATE TABLE line_items (
line_item_id TEXT not null,
order_id TEXT,
sku TEXT
);
変更ログを出力する #
changelog
で Data Contract の差分を出力できます。
$ datacontract changelog datacontract_old.yaml datacontract_new.yaml
1 changes: 0 error, 0 warning, 1 info
info [model_added] at datacontract_new.yaml
in models.line_items
added the model
breaking
で2つの Data Contract に破壊的変更があった場合に終了ステータスを1で返します。
Data Contract をコード管理しており、 CI で破壊的変更が発生した場合にマージさせたくない場合に有用だと思います。
カタログを出力する #
catalog
で Data Contract から HTML ファイルを生成できます。
$ datacontract catalog
$ tree catalog
catalog
├── datacontract.html
└── index.html
1 directory, 2 files
出力された HTML ファイルのサンプルは以下です。
https://datacontract.com/examples/orders-latest/datacontract.html
テストを実行する #
test
で Data Contract の情報をもとにテストを実施できます。
$ datacontract test
Testing datacontract.yaml
╭────────┬─────────────────────────────────────────────────────────────────────┬───────────────────────────────┬─────────────────────────────╮
│ Result │ Check │ Field │ Details │
├────────┼─────────────────────────────────────────────────────────────────────┼───────────────────────────────┼─────────────────────────────┤
│ passed │ Check that JSON has valid schema │ line_items │ All JSON entries are valid. │
│ passed │ Check that field line_item_id is present │ line_items │ │
│ passed │ Check that field order_id is present │ line_items │ │
│ passed │ Check that field sku is present │ line_items │ │
│ passed │ Check that required field line_item_id has no null values │ line_items.line_item_id │ │
│ passed │ Check that field sku matches regex pattern ^[A-Za-z0-9]{8,14}$ │ line_items.sku │ │
│ passed │ Check that JSON has valid schema │ orders │ All JSON entries are valid. │
│ passed │ Check that field order_id is present │ orders │ │
│ passed │ Check that field order_timestamp is present │ orders │ │
│ passed │ Check that field order_total is present │ orders │ │
│ passed │ Check that field customer_id is present │ orders │ │
│ passed │ Check that field customer_email_address is present │ orders │ │
│ passed │ Check that field processed_timestamp is present │ orders │ │
│ passed │ Check that required field customer_email_address has no null values │ orders.customer_email_address │ │
│ passed │ Check that field customer_id has a min length of 10 │ orders.customer_id │ │
│ passed │ Check that field customer_id has a max length of 20 │ orders.customer_id │ │
│ passed │ Check that required field order_id has no null values │ orders.order_id │ │
│ passed │ Check that unique field order_id has no duplicate values │ orders.order_id │ │
│ passed │ Check that required field order_timestamp has no null values │ orders.order_timestamp │ │
│ passed │ Check that required field order_total has no null values │ orders.order_total │ │
│ passed │ Check that required field processed_timestamp has no null values │ orders.processed_timestamp │ │
╰────────┴─────────────────────────────────────────────────────────────────────┴───────────────────────────────┴─────────────────────────────╯
🟢 data contract is valid. Run 21 checks. Took 38.441876 seconds.
その他 #
この他にも以下が可能です。
lint
による Data Contract ファイルのバリデーションpublish
による Data Mesh Manager への公開
実装を読み解く #
一通り Data Contract CLI を触ったところで、実装を見ていきましょう。
Data Contract CLI は Python で実装されています。
Typer が採用されている #
CLI 構築ライブラリとして Typer が採用されています。
Typer, build great CLIs. Easy to code. Based on Python type hints.
Typer とは FastAPI of CLIs とも呼ばれている、開発体験の良い CLI 構築ライブラリです。
FastAPI の兄弟ライブラリということもあって、型ヒントによる直感的な実装が可能となっています。
インターフェイス部分は datacontract/cli.py に定義されているので、このファイルから処理を追えば読み解きやすいです。
例えば、以下は init
の help 出力とサブコマンドの実装です。
$ datacontract init --help
Usage: datacontract init [OPTIONS] [LOCATION]
Download a datacontract.yaml template and write it to file.
╭─ Arguments ────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╮
│ location [LOCATION] The location (url or path) of the data contract yaml to create. [default: datacontract.yaml] │
╰────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯
╭─ Options ──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╮
│ --template TEXT URL of a template or data contract [default: https://datacontract.com/datacontract.init.yaml] │
│ --overwrite --no-overwrite Replace the existing datacontract.yaml [default: no-overwrite] │
│ --help Show this message and exit. │
╰────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯
@app.command()
def init(
location: Annotated[
str,
typer.Argument(help="The location (url or path) of the data contract yaml to create."),
] = "datacontract.yaml",
template: Annotated[
str, typer.Option(help="URL of a template or data contract")
] = "https://datacontract.com/datacontract.init.yaml",
overwrite: Annotated[bool, typer.Option(help="Replace the existing datacontract.yaml")] = False,
):
"""
Download a datacontract.yaml template and write it to file.
"""
try:
download_datacontract_file(location, template, overwrite)
except FileExistsException:
console.print("File already exists, use --overwrite to overwrite")
raise typer.Exit(code=1)
else:
console.print("📄 data contract written to " + location)
コマンド引数が init
関数の引数として定義されていて typer.Argument
でヘルプテキストの定義が行われています。 docstring もコマンドの概要として help に出力されていますね。
Data Contract ファイルをダウンロードする処理は download_datacontract_file
関数で定義されているようです。
シンプルに requests
で指定された URL から取得する実装でした。
def download_datacontract_file(file_path: str, from_url: str, overwrite_file: bool):
if not overwrite_file and os.path.exists(file_path):
raise FileExistsException()
with requests.get(from_url) as response:
response.raise_for_status()
with open(file_path, "w") as f:
f.write(response.text)
型ヒントがしっかり定義されていて読みやすいです。
import を読み解く #
init の例はシンプルでしたが、Data Contract ファイルのパースが必要そうな import はどうでしょうか。
@app.command(name="import")
def import_(...): # 引数は長いので省略
result = DataContract().import_from_source(
format=format,
source=source,
glue_table=glue_table,
bigquery_table=bigquery_table,
bigquery_project=bigquery_project,
bigquery_dataset=bigquery_dataset,
unity_table_full_name=unity_table_full_name,
dbt_model=dbt_model,
dbml_schema=dbml_schema,
dbml_table=dbml_table,
iceberg_table=iceberg_table,
)
if output is None:
console.print(result.to_yaml(), markup=False, soft_wrap=True)
else:
with output.open("w") as f:
f.write(result.to_yaml())
console.print(f"Written result to {output}")
DataContract().import_from_source()
が呼ばれているようです。その返り値に対して to_yaml()
メソッドを実行して出力していますね。
DataContract
クラスは datacontract/data_contract.py で定義されています。
class DataContract:
...
def import_from_source(
self, format: str, source: typing.Optional[str] = None, **kwargs
) -> DataContractSpecification:
data_contract_specification_initial = DataContract.init()
return importer_factory.create(format).import_source(
data_contract_specification=data_contract_specification_initial, source=source, import_args=kwargs
)
import_from_source()
メソッドは DataContractSpecification
を返しているようです。
これは datacontract/model/data_contract_specification.py で定義されている Pydantic の BaseModel を継承したクラスです。 Pydantic の説明は割愛しますが、これを使うことによって型ヒントによるフィールドのバリデーションや入出力を容易に行うことができます。
class DataContractSpecification(pyd.BaseModel):
dataContractSpecification: str = None
id: str = None
info: Info = None
servers: Dict[str, Server] = {}
terms: Terms = None
models: Dict[str, Model] = {}
definitions: Dict[str, Definition] = {}
examples: List[Example] = pyd.Field(
default_factory=list,
deprecated="Removed in Data Contract Specification " "v1.1.0. Use models.examples instead.",
)
quality: DeprecatedQuality = pyd.Field(
default=None,
deprecated="Removed in Data Contract Specification v1.1.0. Use " "model-level and field-level quality instead.",
)
servicelevels: Optional[ServiceLevel] = None
links: Dict[str, str] = {}
tags: List[str] = []
@classmethod
def from_file(cls, file):
if not os.path.exists(file):
raise FileNotFoundError(f"The file '{file}' does not exist.")
with open(file, "r") as file:
file_content = file.read()
return DataContractSpecification.from_string(file_content)
@classmethod
def from_string(cls, data_contract_str):
data = yaml.safe_load(data_contract_str)
return DataContractSpecification(**data)
def to_yaml(self):
return yaml.dump(
self.model_dump(exclude_defaults=True, exclude_none=True, by_alias=True),
sort_keys=False,
allow_unicode=True,
)
つまり Data Contract ファイルのパースには Pydantic の肩の上に乗っかっていたわけですね。
DataContract().import_from_source()
の実装に戻ります。
importer_factory.create(format).import_source(...)
とあり、ファクトリから DataContractSpecification
が生成されていそうな気配を感じます。
datacontract/imports ディレクトリに各 Importer の実装がなされています。興味や馴染のある Importer を覗いてみてください。
Impoerter の実装まで踏み込むと長くなってしまうので触れませんが、どの Importer もライブラリでメタデータを読むなどして DataContractSpecification
を生成しています。
export を読み解く #
import と同様に export を軽くみてみましょう。datacontract/cli.py から見ていきます。
Data Contract のパスを受け取って、結果を標準出力なりファイルなりに書き出しています。
@app.command()
def export(...): # 引数は長いので省略
"""
Convert data contract to a specific format. Saves to file specified by `output` option if present, otherwise prints to stdout.
"""
# TODO exception handling
result = DataContract(data_contract_file=location, schema_location=schema, server=server).export(
export_format=format,
model=model,
server=server,
rdf_base=rdf_base,
sql_server_type=sql_server_type,
engine=engine,
)
# Don't interpret console markup in output.
if output is None:
console.print(result, markup=False, soft_wrap=True)
else:
with output.open("w") as f:
f.write(result)
console.print(f"Written result to {output}")
出力処理は DataContract().export()
で実行されているようです。
datacontract/data_contract.py で export()
メソッドが定義されています。
class DataContract:
...
def export(self, export_format: ExportFormat, model: str = "all", sql_server_type: str = "auto", **kwargs) -> str:
data_contract = resolve.resolve_data_contract(
self._data_contract_file,
self._data_contract_str,
self._data_contract,
schema_location=self._schema_location,
inline_definitions=self._inline_definitions,
inline_quality=self._inline_quality,
)
return exporter_factory.create(export_format).export(
data_contract=data_contract,
model=model,
server=self._server,
sql_server_type=sql_server_type,
export_args=kwargs,
)
import と同様にフォーマットごとに Exporter が定義されており、ファクトリを経由して出力していますね。
ここまでくれば Expoter の実装を読んでいけば OK です。
dbt の Exporter であれば datacontract/export/dbt_converter.py で以下のように実装にされています。
class DbtExporter(Exporter):
def export(self, data_contract, model, server, sql_server_type, export_args) -> dict:
return to_dbt_models_yaml(data_contract)
def to_dbt_models_yaml(data_contract_spec: DataContractSpecification):
dbt = {
"version": 2,
"models": [],
}
for model_key, model_value in data_contract_spec.models.items():
dbt_model = _to_dbt_model(model_key, model_value, data_contract_spec)
dbt["models"].append(dbt_model)
return yaml.dump(dbt, indent=2, sort_keys=False, allow_unicode=True)
DataContractSpecification
から YAML を生成しているシンプルな作りとなっています。
まとめ #
この記事では Data Contract CLI の一通りの機能に触れながら、基本となる実装を読み解きました。
Typer と Pydantic のフレームワークに乗っかった、型ヒントが整備されたモダンな Python ライブラリという印象を受けました。
自分はまだ Data Contract CLI にコントリビュートできていませんが、 TODO コメントが散見されるなどコントリビュートしがいのあるプロジェクトです。
Data Contract による堅牢なデータ基盤を作るためにも、使うだけでなく開発にも関わっていきたいと思います。