Azure Data Factory は、Microsoft Azure のスケーラブルなデータ統合ワークフロー サービスです。 ADF には git 統合機能があります。ユーザーは、パイプラインを git リポジトリに保存できます。パイプラインのメタデータと構成は、バージョン管理システムに保存されます。 ADF には、カスタム コードまたはスクリプトを含む Spark、Hive、SQL アクティビティを使用してワークフローを作成する機能があります。コードまたはスクリプトは ADLS に保存され、パイプライン内で参照されます。 ADLS は、バージョン管理のためにコードを保持するのに最適な場所ではありません。そのため、ベスト プラクティスとして、コードを git に保持し、git パイプラインを使用して git から ADLS への同期を有効にする必要があります。このようにして、開発者はコードを git と ADLS にコミットします。ここでは、Git と ADLS 間の同期を有効にするために使用できるサンプル コードについて説明します。
コードを実行するための詳細は、コード自体内のコメントとして提供されます。このプログラムには、2 つの依存する python パッケージが必要です。
pip install gitpython
pip install azure-storage-blob
gitpython パッケージには git の依存関係があります クライアント。 Windows の場合は git bash をインストールでき、Linux の場合は git クライアントを使用できます。
GIT_PYTHON_GIT_EXECUTABLE を設定する必要があります git 実行可能ファイルがシステム パスに追加されていない場合。この環境変数は、gitpython パッケージのインポートを行う前に設定する必要があります。
このファイルには双方向の Unicode テキストが含まれており、以下に示すものとは異なる方法で解釈またはコンパイルされる可能性があります。確認するには、非表示の Unicode 文字を表示するエディターでファイルを開きます。双方向 Unicode 文字の詳細 隠し文字を表示するos のインポート | |
# 実行環境に合わせてこのパスを変更 | |
os.environ['GIT_PYTHON_GIT_EXECUTABLE'] =r"C:\Program Files\Git\ bin\git.exe" | |
uuid のインポート | |
git のインポート | |
import shutil | |
git import RemoteProgress から | |
azure.storage.blob から BlobServiceClient をインポート | |
class CloneProgress(RemoteProgress): | |
""" | |
これはクローン プロセスの進行状況を出力するためのものです | |
""" | |
def update(self, op_code, cur_count, max_count=None, message=''): | |
メッセージの場合: | |
print(メッセージ) | |
def clean_up_local(directory): | |
""" | |
ローカル マシンのディレクトリをクリーンアップする関数 | |
この関数は、ローカルの一時ディレクトリをクリーンアップします | |
:param ディレクトリ: | |
:return: | |
""" | |
os.path.exists(ディレクトリ) の場合: | |
shutil.rmtree(ディレクトリ、ignore_errors=True) | |
hidden_path =os.path.join(ディレクトリ, '.git') | |
if os.path.exists(hidden_path): | |
shutil.rmtree('.unwanted', ignore_errors=True) | |
試してください: | |
os.mkdir(ディレクトリ) | |
エラーとして OSError を除く: | |
印刷 (エラー) | |
pass | |
temp_path =str(uuid.uuid4())[0:6] | |
return temp_path | |
def clone_git(git_url, git_branch, git_local_clone_path): | |
""" | |
git リポジトリをローカル ディスクに複製する関数。 | |
:return:status – ステータスに基づく True/False。 | |
""" | |
git.Repo.clone_from(git_url, git_local_clone_path, branch=git_branch, progress=CloneProgress() ) | |
True を返す</td> | |
def delete_adls_directory(connect_str, container_name, prefix): | |
""" | |
ADLS でディレクトリを削除する関数 | |
:param connect_str: | |
:param container_name: | |
:param プレフィックス: | |
:return: | |
""" | |
blob_service_client =BlobServiceClient.from_connection_string(connect_str) | |
container_client =blob_service_client.get_container_client(container=container_name) | |
blob_list =container_client.list_blobs(name_starts_with=prefix) | |
new_blob_list =[] | |
blob_list 内の blob の場合: | |
new_blob_list.append(str(blob.name)) | |
print("長さ —>", len(new_blob_list), type(new_blob_list)) | |
逆方向の blb の場合 (new_blob_list): | |
print("削除中 –>", blb) | |
container_client.delete_blob(blb) | |
def upload_data_to_adls(connect_str, container_name, path_to_remove, local_path, target_base_path): | |
""" | |
ローカル ディレクトリを ADLS にアップロードする関数 | |
:return: | |
""" | |
print("ADLS Container Base Path —>", target_base_path) | |
blob_service_client =BlobServiceClient.from_connection_string(connect_str) | |
os.walk(local_path) の r、d、f: | |
f:の場合 | |
f:のファイル | |
file_path_on_azure =os.path.join(r, file).replace(path_to_remove, "" ) | |
file_path_on_azure =file_path_on_azure.lstrip("\\").lstrip('/') | |
file_path_on_azure =os.path.join(target_base_path, file_path_on_azure) | |
print("Azure 上のファイル パス ———>", file_path_on_azure) | |
print(file_path_on_azure) | |
file_path_on_local =os.path.join(r, file) | |
blob_client =blob_service_client.get_blob_client(container=container_name, blob=file_path_on_azure) | |
データとして open(file_path_on_local, "rb") を使用: | |
blob_client.upload_blob(データ) | |
print("ファイルのアップロード —->", file_path_on_local) | |
blob_client.close() | |
def main(): | |
# Git / Repos の詳細 | |
# 形式の Git URL –> https://username:password@giturl | |
git_url ="" | |
# Git ブランチ | |
git_branch ="" | |
# Git プロジェクト名。これは、git プロジェクトのベース フォルダー名になります | |
git_project_name ="" | |
# Base path in the execution environment to store temporary data | |
temp_base_path ="localtemp" | |
# The relative directory of the git project to upload | |
upload_src_directory ="" | |
# Azure Storage account connection string | |
connect_str ="" | |
# Name of the Azure container | |
container_name ="" | |
# Base path in the ADLS container. Keep this empty if you want to upload to the root path of the container | |
container_base_path ="" | |
temp_path =clean_up_local(temp_base_path) | |
git_local_clone_path =os.path.join(temp_base_path, temp_path, git_project_name) | |
clone_git(git_url, git_branch, git_local_clone_path) | |
# The path to be removed from the local directory path while uploading it to ADLS | |
path_to_remove =os.path.join(temp_base_path, temp_path, git_project_name) | |
# The local directory to upload to ADLS | |
azure_upload_src_directory =os.path.join(temp_base_path, temp_path, upload_src_directory) | |
adls_target_path =os.path.join(container_base_path, azure_upload_src_directory.replace(path_to_remove, "").lstrip("\\").lstrip("/")) | |
print("ADLS Location to upload the files –>", adls_target_path) | |
print("Checking and cleaning up ADLS") | |
delete_adls_directory(connect_str, container_name, adls_target_path) | |
print("Uploading files to ADLS") | |
upload_data_to_adls(connect_str, container_name, path_to_remove, azure_upload_src_directory, container_base_path) | |
if __name__ =='__main__': | |
# Main invoke. | |
main() |