Python >> python チュートリアル >  >> Python

Git リポジトリを Azure Data Lake Storage (ADLS Gen 2) に複製する方法は?

Azure Data Factory は、Microsoft Azure のスケーラブルなデータ統合ワークフロー サービスです。 ADF には git 統合機能があります。ユーザーは、パイプラインを git リポジトリに保存できます。パイプラインのメタデータと構成は、バージョン管理システムに保存されます。 ADF には、カスタム コードまたはスクリプトを含む Spark、Hive、SQL アクティビティを使用してワークフローを作成する機能があります。コードまたはスクリプトは ADLS に保存され、パイプライン内で参照されます。 ADLS は、バージョン管理のためにコードを保持するのに最適な場所ではありません。そのため、ベスト プラクティスとして、コードを git に保持し、git パイプラインを使用して git から ADLS への同期を有​​効にする必要があります。このようにして、開発者はコードを git と ADLS にコミットします。ここでは、Git と ADLS 間の同期を有​​効にするために使用できるサンプル コードについて説明します。

コードを実行するための詳細は、コード自体内のコメントとして提供されます。このプログラムには、2 つの依存する python パッケージが必要です。

pip install gitpython
pip install azure-storage-blob

gitpython パッケージには git の依存関係があります クライアント。 Windows の場合は git bash をインストールでき、Linux の場合は git クライアントを使用できます。

GIT_PYTHON_GIT_EXECUTABLE を設定する必要があります git 実行可能ファイルがシステム パスに追加されていない場合。この環境変数は、gitpython パッケージのインポートを行う前に設定する必要があります。

os のインポート
# 実行環境に合わせてこのパスを変更
os.environ['GIT_PYTHON_GIT_EXECUTABLE'] =r"C:\Program Files\Git\ bin\git.exe"
uuid のインポート
git のインポート
import shutil
git import RemoteProgress から
azure.storage.blob から BlobServiceClient をインポート
class CloneProgress(RemoteProgress):
"""
これはクローン プロセスの進行状況を出力するためのものです
"""
def update(self, op_code, cur_count, max_count=None, message=''):
メッセージの場合:
print(メッセージ)
def clean_up_local(directory):
"""
ローカル マシンのディレクトリをクリーンアップする関数
この関数は、ローカルの一時ディレクトリをクリーンアップします
:param ディレクトリ:
:return:
"""
os.path.exists(ディレクトリ) の場合:
shutil.rmtree(ディレクトリ、ignore_errors=True)
hidden_​​path =os.path.join(ディレクトリ, '.git')
if os.path.exists(hidden_​​path):
shutil.rmtree('.unwanted', ignore_errors=True)
試してください:
os.mkdir(ディレクトリ)
エラーとして OSError を除く:
印刷 (エラー)
pass
temp_path =str(uuid.uuid4())[0:6]
return temp_path
def clone_git(git_url, git_branch, git_local_clone_path):
"""
git リポジトリをローカル ディスクに複製する関数。
:return:status – ステータスに基づく True/False。
"""
git.Repo.clone_from(git_url, git_local_clone_path, branch=git_branch, progress=CloneProgress() )
True を返す<​​/td>
def delete_adls_directory(connect_str, container_name, prefix):
"""
ADLS でディレクトリを削除する関数
:param connect_str:
:param container_name:
:param プレフィックス:
:return:
"""
blob_service_client =BlobServiceClient.from_connection_string(connect_str)
container_client =blob_service_client.get_container_client(container=container_name)
blob_list =container_client.list_blobs(name_starts_with=prefix)
new_blob_list =[]
blob_list 内の blob の場合:
new_blob_list.append(str(blob.name))
print("長さ —>", len(new_blob_list), type(new_blob_list))
逆方向の blb の場合 (new_blob_list):
print("削除中 –>", blb)
container_client.delete_blob(blb)
def upload_data_to_adls(connect_str, container_name, path_to_remove, local_path, target_base_path):
"""
ローカル ディレクトリを ADLS にアップロードする関数
:return:
"""
print("ADLS Container Base Path —>", target_base_path)
blob_service_client =BlobServiceClient.from_connection_string(connect_str)
os.walk(local_path) の r、d、f:
f:の場合
f:のファイル
file_path_on_azure =os.path.join(r, file).replace(path_to_remove, "" )
file_path_on_azure =file_path_on_azure.lstrip("\\").lstrip('/')
file_path_on_azure =os.path.join(target_base_path, file_path_on_azure)
print("Azure 上のファイル パス ———>", file_path_on_azure)
print(file_path_on_azure)
file_path_on_local =os.path.join(r, file)
blob_client =blob_service_client.get_blob_client(container=container_name, blob=file_path_on_azure)
データとして open(file_path_on_local, "rb") を使用:
blob_client.upload_blob(データ)
print("ファイルのアップロード —->", file_path_on_local)
blob_client.close()
def main():
# Git / Repos の詳細
# 形式の Git URL –> https://username:password@giturl
git_url =""
# Git ブランチ
git_branch =""
# Git プロジェクト名。これは、git プロジェクトのベース フォルダー名になります
git_project_name =""
# Base path in the execution environment to store temporary data
temp_base_path ="localtemp"
# The relative directory of the git project to upload
upload_src_directory =""
# Azure Storage account connection string
connect_str =""
# Name of the Azure container
container_name =""
# Base path in the ADLS container. Keep this empty if you want to upload to the root path of the container
container_base_path =""
temp_path =clean_up_local(temp_base_path)
git_local_clone_path =os.path.join(temp_base_path, temp_path, git_project_name)
clone_git(git_url, git_branch, git_local_clone_path)
# The path to be removed from the local directory path while uploading it to ADLS
path_to_remove =os.path.join(temp_base_path, temp_path, git_project_name)
# The local directory to upload to ADLS
azure_upload_src_directory =os.path.join(temp_base_path, temp_path, upload_src_directory)
adls_target_path =os.path.join(container_base_path, azure_upload_src_directory.replace(path_to_remove, "").lstrip("\\").lstrip("/"))
print("ADLS Location to upload the files –>", adls_target_path)
print("Checking and cleaning up ADLS")
delete_adls_directory(connect_str, container_name, adls_target_path)
print("Uploading files to ADLS")
upload_data_to_adls(connect_str, container_name, path_to_remove, azure_upload_src_directory, container_base_path)
if __name__ =='__main__':
# Main invoke.
main()
view raw python_git_to_adls_sync.py hosted with ❤ by GitHub