Python >> Tutoriel Python >  >> Python

Le moyen le plus rapide de charger des données dans PostgreSQL à l'aide de Python

En tant que plombiers de données glorifiés, nous sommes souvent chargés de charger des données extraites d'une source distante dans nos systèmes. Si nous avons de la chance, les données sont sérialisées au format JSON ou YAML. Lorsque nous sommes moins chanceux, nous obtenons une feuille de calcul Excel ou un fichier CSV qui est toujours cassé d'une manière ou d'une autre, je ne peux pas l'expliquer.

Les données des grandes entreprises ou des anciens systèmes sont en quelque sorte toujours encodées de manière étrange, et les administrateurs système pensent toujours qu'ils nous rendent service en compressant les fichiers (veuillez gzip) ou en les décomposant en fichiers plus petits avec des noms aléatoires.

Les services modernes peuvent fournir une API décente, mais le plus souvent, nous devons récupérer un fichier à partir d'un FTP, SFTP, S3 ou d'un coffre-fort propriétaire qui ne fonctionne que sous Windows.

Dans cet article, nous explorons la meilleure façon d'importer des données désordonnées depuis une source distante dans PostgreSQL.

Pour fournir une solution pratique et réaliste, nous définissons les rôles fondamentaux suivants :

  1. Les données sont extraites d'une source distante.
  2. Les données sont sales et doivent être transformées.
  3. Les données sont volumineuses.

Configuration :une brasserie de bière

J'ai trouvé cette excellente API publique pour les bières, nous allons donc importer des données dans une table de bière dans la base de données.

Les données

Une seule bière de l'API ressemble à ceci :

$ curl https://api.punkapi.com/v2/beers/?per_page=1&page=1
[
 {
 "id": 1,
 "name": "Buzz",
 "tagline": "A Real Bitter Experience.",
 "first_brewed": "09/2007",
 "description": "A light, crisp and bitter IPA ...",
 "image_url": "https://images.punkapi.com/v2/keg.png",
 "abv": 4.5,
 "ibu": 60,
 "target_fg": 1010,
 "target_og": 1044,
 "ebc": 20,
 "srm": 10,
 "ph": 4.4,
 "attenuation_level": 75,
 "volume": {
 "value": 20,
 "unit": "litres"
 },
 "contributed_by": "Sam Mason <samjbmason>"
 "brewers_tips": "The earthy and floral aromas from...",
 "boil_volume": {},
 "method": {},
 "ingredients": {},
 "food_pairing": [],
 }
]

J'ai coupé la sortie par souci de brièveté, mais il y a beaucoup d'informations sur les bières ici. Dans cet article, nous voulons importer tous les champs avant brewers_tips à une table de la base de données.

Le champ volume est imbriqué. Nous voulons extraire uniquement les value du champ et enregistrez-le dans un champ appelé volume dans le tableau.

volume = beer['volume']['value']

Le champ first_brewed contient uniquement l'année et le mois, et dans certains cas, uniquement l'année. Nous voulons transformer la valeur en une date valide. Par exemple, la valeur 09/2007 sera transformé à ce jour 2007-09-01 . La valeur 2006 sera transformé à ce jour 2016-01-01 .

Écrivons une fonction simple pour transformer la valeur de texte dans le champ, en Python datetime.date :

import datetime

def parse_first_brewed(text: str) -> datetime.date:
 parts = text.split('/')
 if len(parts) == 2:
 return datetime.date(int(parts[1]), int(parts[0]), 1)
 elif len(parts) == 1:
 return datetime.date(int(parts[0]), 1, 1)
 else:
 assert False, 'Unknown date format'

Assurons-nous rapidement que cela fonctionne :

>>> parse_first_brewed('09/2007')
datetime.date(2007, 9, 1)

>>> parse_first_brewed('2006')
datetime.date(2006, 1, 1)

Dans la vraie vie, les transformations peuvent être beaucoup plus compliquées. Mais pour notre objectif, c'est plus que suffisant.

Récupérer les données

L'API fournit des résultats paginés. Pour encapsuler la pagination, nous créons un générateur qui produit des bières une par une :

from typing import Iterator, Dict, Any
from urllib.parse import urlencode
import requests


def iter_beers_from_api(page_size: int = 5) -> Iterator[Dict[str, Any]]:
 session = requests.Session()
 page = 1
 while True:
 response = session.get('https://api.punkapi.com/v2/beers?' + urlencode({
 'page': page,
 'per_page': page_size
 }))
 response.raise_for_status()

 data = response.json()
 if not data:
 break

 yield from data

 page += 1

Et pour utiliser la fonction générateur, nous l'appelons et l'itérons :

>>> beers = iter_beers_from_api()
>>> next(beers)
{'id': 1,
 'name': 'Buzz',
 'tagline': 'A Real Bitter Experience.',
 'first_brewed': '09/2007',
 'description': 'A light, crisp and bitter IPA brewed...',
 'image_url': 'https://images.punkapi.com/v2/keg.png',
 'abv': 4.5,
 'ibu': 60,
 'target_fg': 1010,
...
}
>>> next(beers)
{'id': 2,
 'name': 'Trashy Blonde',
 'tagline': "You Know You Shouldn't",
 'first_brewed': '04/2008',
 'description': 'A titillating, ...',
 'image_url': 'https://images.punkapi.com/v2/2.png',
 'abv': 4.1,
 'ibu': 41.5,

Vous remarquerez que le premier résultat de chaque page prend un peu plus de temps. C'est parce qu'il fait une requête réseau pour récupérer la page.

Créer une table dans la base de données

L'étape suivante consiste à créer une table dans la base de données dans laquelle importer les données.

Créer une base de données :

$ createdb -O haki testload

Changer haki dans l'exemple à votre utilisateur local.

Pour se connecter de Python à une base de données PostgreSQL, nous utilisons psycopg :

$ python -m pip install psycopg2

À l'aide de psycopg, créez une connexion à la base de données :

import psycopg2

connection = psycopg2.connect(
 host="localhost",
 database="testload",
 user="haki",
 password=None,
)
connection.autocommit = True

Nous définissons autocommit=True Ainsi, chaque commande que nous exécutons prendra effet immédiatement. Pour les besoins de cet article, c'est très bien.

Maintenant que nous avons une connexion, nous pouvons écrire une fonction pour créer une table :

def create_staging_table(cursor) -> None:
 cursor.execute("""
 DROP TABLE IF EXISTS staging_beers;
 CREATE UNLOGGED TABLE staging_beers (
 id INTEGER,
 name TEXT,
 tagline TEXT,
 first_brewed DATE,
 description TEXT,
 image_url TEXT,
 abv DECIMAL,
 ibu DECIMAL,
 target_fg DECIMAL,
 target_og DECIMAL,
 ebc DECIMAL,
 srm DECIMAL,
 ph DECIMAL,
 attenuation_level DECIMAL,
 brewers_tips TEXT,
 contributed_by TEXT,
 volume INTEGER
 );
 """)

La fonction reçoit un curseur et crée une table non journalisée appelée staging_beers .

TABLE NON ENREGISTRÉE

Les données écrites dans une table non consignée ne seront pas consignées dans le journal d'écriture anticipée (WAL), ce qui en fait l'outil idéal pour les tables intermédiaires. Notez que UNLOGGED les tables ne seront pas restaurées en cas de plantage et ne seront pas répliquées.

En utilisant la connexion que nous avons créée précédemment, voici comment la fonction est utilisée :

>>> with connection.cursor() as cursor:
>>> create_staging_table(cursor)

Nous sommes maintenant prêts à passer à la partie suivante.

Métriques

Tout au long de cet article, nous nous intéressons à deux métriques principales :le temps et la mémoire.

Mesurer le temps

Pour mesurer le temps pour chaque méthode, nous utilisons le time intégré modules :

>>> import time
>>> start = time.perf_counter()
>>> time.sleep(1) # do work
>>> elapsed = time.perf_counter() - start
>>> print(f'Time {elapsed:0.4}')
Time 1.001

La fonction perf_counter fournit à l'horloge la plus haute résolution disponible, ce qui la rend idéale pour nos besoins.

Mesurer la mémoire

Pour mesurer la consommation mémoire, nous allons utiliser le package memory-profiler.

$ python -m pip install memory-profiler

Ce package fournit l'utilisation de la mémoire et l'utilisation de la mémoire incrémentielle pour chaque ligne du code. Ceci est très utile lors de l'optimisation de la mémoire. Pour illustrer, voici l'exemple fourni dans PyPI :

$ python -m memory_profiler example.py

Line # Mem usage Increment Line Contents
==============================================
 3 @profile
 4 5.97 MB 0.00 MB def my_func():
 5 13.61 MB 7.64 MB a = [1] * (10 ** 6)
 6 166.20 MB 152.59 MB b = [2] * (2 * 10 ** 7)
 7 13.61 MB -152.59 MB del b
 8 13.61 MB 0.00 MB return a

La partie intéressante est le Increment colonne qui montre la mémoire supplémentaire allouée par le code dans chaque ligne.

Dans cet article, nous nous intéressons au pic de mémoire utilisé par la fonction. Le pic de mémoire est la différence entre la valeur de départ de la colonne « Utilisation de la mémoire » et la valeur la plus élevée (également appelée « high watermark »).

Pour obtenir la liste "Mem usage" nous utilisons la fonction memory_usage à partir de memory_profiler :

>>> from memory_profiler import memory_usage
>>> mem, retval = memory_usage((fn, args, kwargs), retval=True, interval=1e-7)

Utilisée ainsi, la fonction memory_usage exécute la fonction fn avec le args fourni et kwargs , mais lance également un autre processus en arrière-plan pour surveiller l'utilisation de la mémoire tous les interval secondes.

Pour des opérations très rapides la fonction fn peut être exécuté plus d'une fois. En définissant interval à une valeur inférieure à 1e-6, nous le forçons à s'exécuter une seule fois.

L'argument retval indique à la fonction de renvoyer le résultat de fn .

profile Décorateur

Pour tout mettre ensemble, nous créons le décorateur suivant pour mesurer et rapporter le temps et la mémoire :

import time
from functools import wraps
from memory_profiler import memory_usage

def profile(fn):
 @wraps(fn)
 def inner(*args, **kwargs):
 fn_kwargs_str = ', '.join(f'{k}={v}' for k, v in kwargs.items())
 print(f'\n{fn.__name__}({fn_kwargs_str})')

 # Measure time
 t = time.perf_counter()
 retval = fn(*args, **kwargs)
 elapsed = time.perf_counter() - t
 print(f'Time {elapsed:0.4}')

 # Measure memory
 mem, retval = memory_usage((fn, args, kwargs), retval=True, timeout=200, interval=1e-7)

 print(f'Memory {max(mem) - min(mem)}')
 return retval

 return inner

Pour éliminer les effets mutuels de la synchronisation sur la mémoire et vice versa, nous exécutons la fonction deux fois. D'abord pour chronométrer, ensuite pour mesurer l'utilisation de la mémoire.

Le décorateur imprimera le nom de la fonction et tous les arguments de mots-clés, et rapportera le temps et la mémoire utilisés :

>>> @profile
>>> def work(n):
>>> for i in range(n):
>>> 2 ** n

>>> work(10)
work()
Time 0.06269
Memory 0.0

>>> work(n=10000)
work(n=10000)
Time 0.3865
Memory 0.0234375

Seuls les arguments de mots-clés sont imprimés. C'est intentionnel, nous allons l'utiliser dans des tests paramétrés.

Référence

Au moment de la rédaction, l'API des bières ne contient que 325 bières. Pour travailler sur un grand ensemble de données, nous le dupliquons 100 fois et le stockons en mémoire. L'ensemble de données résultant contient 32 500 bières :

>>> beers = list(iter_beers_from_api()) * 100
>>> len(beers)
32,500

Pour imiter une API distante, nos fonctions accepteront des itérateurs similaires à la valeur de retour de iter_beers_from_api :

def process(beers: Iterator[Dict[str, Any]])) -> None:
 # Process beers...

Pour le benchmark, nous allons importer les données de la bière dans la base de données. Pour éliminer les influences externes telles que le réseau, nous récupérons les données de l'API à l'avance et les diffusons localement.

Pour obtenir un timing précis, nous "truquons" l'API distante :

>>> beers = list(iter_beers_from_api()) * 100
>>> process(beers)

Dans une situation réelle, vous utiliseriez la fonction iter_beers_from_api directement :

>>> process(iter_beers_from_api())

Nous sommes maintenant prêts à commencer !

Insérer des lignes une par une

Pour établir une ligne de base, nous commençons par l'approche la plus simple, insérez les lignes une par une :

@profile
def insert_one_by_one(connection, beers: Iterator[Dict[str, Any]]) -> None:
 with connection.cursor() as cursor:
 create_staging_table(cursor)
 for beer in beers:
 cursor.execute("""
 INSERT INTO staging_beers VALUES (
 %(id)s,
 %(name)s,
 %(tagline)s,
 %(first_brewed)s,
 %(description)s,
 %(image_url)s,
 %(abv)s,
 %(ibu)s,
 %(target_fg)s,
 %(target_og)s,
 %(ebc)s,
 %(srm)s,
 %(ph)s,
 %(attenuation_level)s,
 %(brewers_tips)s,
 %(contributed_by)s,
 %(volume)s
 );
 """, {
 **beer,
 'first_brewed': parse_first_brewed(beer['first_brewed']),
 'volume': beer['volume']['value'],
 })

Notez que lorsque nous itérons les bières, nous transformons le first_brewed à un datetime.date et extrait la valeur de volume du volume imbriqué champ.

L'exécution de cette fonction produit la sortie suivante :

>>> insert_one_by_one(connection, beers)
insert_one_by_one()
Time 128.8
Memory 0.08203125

La fonction a pris 129 secondes pour importer 32 000 lignes. Le profileur de mémoire montre que la fonction a consommé très peu de mémoire.

Intuitivement, insérer des lignes une par une ne semble pas très efficace. Le changement constant de contexte entre le programme et la base de données doit le ralentir.

Exécuter plusieurs

Psycopg2 fournit un moyen d'insérer plusieurs lignes à la fois en utilisant executemany . À partir de la documentation :

Cela semble prometteur !

Essayons d'importer les données en utilisant executemany :

@profile
def insert_executemany(connection, beers: Iterator[Dict[str, Any]]) -> None:
 with connection.cursor() as cursor:
 create_staging_table(cursor)

 all_beers = [{
 **beer,
 'first_brewed': parse_first_brewed(beer['first_brewed']),
 'volume': beer['volume']['value'],
 } for beer in beers]

 cursor.executemany("""
 INSERT INTO staging_beers VALUES (
 %(id)s,
 %(name)s,
 %(tagline)s,
 %(first_brewed)s,
 %(description)s,
 %(image_url)s,
 %(abv)s,
 %(ibu)s,
 %(target_fg)s,
 %(target_og)s,
 %(ebc)s,
 %(srm)s,
 %(ph)s,
 %(attenuation_level)s,
 %(brewers_tips)s,
 %(contributed_by)s,
 %(volume)s
 );
 """, all_beers)

La fonction ressemble beaucoup à la fonction précédente et les transformations sont les mêmes. La principale différence ici est que nous transformons d'abord toutes les données en mémoire, puis que nous les importons dans la base de données.

L'exécution de cette fonction produit la sortie suivante :

>>> insert_executemany(connection, beers)
insert_executemany()
Time 124.7
Memory 2.765625

C'est décevant. Le timing est juste un peu meilleur, mais la fonction consomme maintenant 2,7 Mo de mémoire.

Pour mettre l'utilisation de la mémoire en perspective, un fichier JSON contenant uniquement les données que nous importons pèse 25 Mo sur disque. Compte tenu de la proportion, l'utilisation de cette méthode pour importer un fichier de 1 Go nécessitera 110 Mo de mémoire.

Exécuter plusieurs depuis l'itérateur

La méthode précédente consommait beaucoup de mémoire car les données transformées étaient stockées en mémoire avant d'être traitées par psycopg.

Voyons si nous pouvons utiliser un itérateur pour éviter de stocker les données en mémoire :

@profile
def insert_executemany_iterator(connection, beers: Iterator[Dict[str, Any]]) -> None:
 with connection.cursor() as cursor:
 create_staging_table(cursor)
 cursor.executemany("""
 INSERT INTO staging_beers VALUES (
 %(id)s,
 %(name)s,
 %(tagline)s,
 %(first_brewed)s,
 %(description)s,
 %(image_url)s,
 %(abv)s,
 %(ibu)s,
 %(target_fg)s,
 %(target_og)s,
 %(ebc)s,
 %(srm)s,
 %(ph)s,
 %(attenuation_level)s,
 %(brewers_tips)s,
 %(contributed_by)s,
 %(volume)s
 );
 """, ({
 **beer,
 'first_brewed': parse_first_brewed(beer['first_brewed']),
 'volume': beer['volume']['value'],
 } for beer in beers))

La différence ici est que les données transformées sont "diffusées" en executemany en utilisant un itérateur.

Cette fonction produit le résultat suivant :

>>> insert_executemany_iterator(connection, beers)
insert_executemany_iterator()
Time 129.3
Memory 0.0

Notre solution de "streaming" a fonctionné comme prévu et nous avons réussi à ramener la mémoire à zéro. Cependant, le timing reste à peu près le même, même par rapport à la méthode un par un.

Exécuter le lot

La documentation de psycopg contient une note très intéressante sur executemany dans la section "aides à l'exécution rapide" :

Donc, nous nous sommes trompés depuis le début !

La fonction juste en dessous de cette section est execute_batch :

Implémentons la fonction de chargement en utilisant execute_batch :

import psycopg2.extras

@profile
def insert_execute_batch(connection, beers: Iterator[Dict[str, Any]]) -> None:
 with connection.cursor() as cursor:
 create_staging_table(cursor)

 all_beers = [{
 **beer,
 'first_brewed': parse_first_brewed(beer['first_brewed']),
 'volume': beer['volume']['value'],
 } for beer in beers]

 psycopg2.extras.execute_batch(cursor, """
 INSERT INTO staging_beers VALUES (
 %(id)s,
 %(name)s,
 %(tagline)s,
 %(first_brewed)s,
 %(description)s,
 %(image_url)s,
 %(abv)s,
 %(ibu)s,
 %(target_fg)s,
 %(target_og)s,
 %(ebc)s,
 %(srm)s,
 %(ph)s,
 %(attenuation_level)s,
 %(brewers_tips)s,
 %(contributed_by)s,
 %(volume)s
 );
 """, all_beers)

Exécution de la fonction :

>>> insert_execute_batch(connection, beers)
insert_execute_batch()
Time 3.917
Memory 2.50390625

Ouah! C'est un énorme bond en avant. La fonction terminée en un peu moins de 4 secondes. C'est environ 33 fois plus rapide que les 129 secondes avec lesquelles nous avons commencé.

Exécuter le lot à partir de l'itérateur

La fonction execute_batch utilisé moins de mémoire que executemany fait pour les mêmes données. Essayons d'éliminer la mémoire en "streamant" les données dans execute_batch en utilisant un itérateur :

@profile
def insert_execute_batch_iterator(connection, beers: Iterator[Dict[str, Any]]) -> None:
 with connection.cursor() as cursor:
 create_staging_table(cursor)

 iter_beers = ({
 **beer,
 'first_brewed': parse_first_brewed(beer['first_brewed']),
 'volume': beer['volume']['value'],
 } for beer in beers)

 psycopg2.extras.execute_batch(cursor, """
 INSERT INTO staging_beers VALUES (
 %(id)s,
 %(name)s,
 %(tagline)s,
 %(first_brewed)s,
 %(description)s,
 %(image_url)s,
 %(abv)s,
 %(ibu)s,
 %(target_fg)s,
 %(target_og)s,
 %(ebc)s,
 %(srm)s,
 %(ph)s,
 %(attenuation_level)s,
 %(brewers_tips)s,
 %(contributed_by)s,
 %(volume)s
 );
 """, iter_beers)

Exécution de la fonction

>>> insert_execute_batch_iterator(connection, beers)
insert_execute_batch_iterator()
Time 4.333
Memory 0.2265625

Nous avons à peu près le même temps, mais avec moins de mémoire.

Exécuter un lot à partir de l'itérateur avec la taille de la page

Lors de la lecture de la documentation pour execute_batch , l'argument page_size a attiré mon attention :

La documentation indiquait précédemment que la fonction fonctionnait mieux car elle effectuait moins d'allers-retours vers la base de données. Si tel est le cas, une taille de page plus grande devrait réduire le nombre d'allers-retours et entraîner un temps de chargement plus rapide.

Ajoutons un argument pour la taille de la page à notre fonction afin que nous puissions expérimenter :

@profile
def insert_execute_batch_iterator(
 connection,
 beers: Iterator[Dict[str, Any]],
 page_size: int = 100,
) -> None:
 with connection.cursor() as cursor:
 create_staging_table(cursor)

 iter_beers = ({
 **beer,
 'first_brewed': parse_first_brewed(beer['first_brewed']),
 'volume': beer['volume']['value'],
 } for beer in beers)

 psycopg2.extras.execute_batch(cursor, """
 INSERT INTO staging_beers VALUES (
 %(id)s,
 %(name)s,
 %(tagline)s,
 %(first_brewed)s,
 %(description)s,
 %(image_url)s,
 %(abv)s,
 %(ibu)s,
 %(target_fg)s,
 %(target_og)s,
 %(ebc)s,
 %(srm)s,
 %(ph)s,
 %(attenuation_level)s,
 %(brewers_tips)s,
 %(contributed_by)s,
 %(volume)s
 );
 """, iter_beers, page_size=page_size)

La taille de page par défaut est de 100. Comparons différentes valeurs et comparons les résultats :

>>> insert_execute_batch_iterator(connection, iter(beers), page_size=1)
insert_execute_batch_iterator(page_size=1)
Time 130.2
Memory 0.0

>>> insert_execute_batch_iterator(connection, iter(beers), page_size=100)
insert_execute_batch_iterator(page_size=100)
Time 4.333
Memory 0.0

>>> insert_execute_batch_iterator(connection, iter(beers), page_size=1000)
insert_execute_batch_iterator(page_size=1000)
Time 2.537
Memory 0.2265625

>>> insert_execute_batch_iterator(connection, iter(beers), page_size=10000)
insert_execute_batch_iterator(page_size=10000)
Time 2.585
Memory 25.4453125

Nous avons obtenu des résultats intéressants, décomposons-les :

  • 1 :Les résultats sont similaires aux résultats que nous avons obtenus en insérant les lignes une par une.
  • 100 :il s'agit du page_size par défaut , les résultats sont donc similaires à ceux de notre benchmark précédent.
  • 1 000 :la synchronisation est ici environ 40 % plus rapide et la mémoire est faible.
  • 10 000 :la synchronisation n'est pas beaucoup plus rapide qu'avec une taille de page de 1 000, mais la mémoire est nettement plus élevée.

Les résultats montrent qu'il existe un compromis entre la mémoire et la vitesse. Dans ce cas, il semble que le sweet spot corresponde à une taille de page de 1 000.

Exécuter les valeurs

Les joyaux de la documentation de psycopg ne se terminent pas par execute_batch . En parcourant la documentation, une autre fonction appelée execute_values a attiré mon attention :

La fonction execute_values fonctionne en générant une énorme liste de VALEURS à la requête.

Essayons :

import psycopg2.extras

@profile
def insert_execute_values(connection, beers: Iterator[Dict[str, Any]]) -> None:
 with connection.cursor() as cursor:
 create_staging_table(cursor)
 psycopg2.extras.execute_values(cursor, """
 INSERT INTO staging_beers VALUES %s;
 """, [(
 beer['id'],
 beer['name'],
 beer['tagline'],
 parse_first_brewed(beer['first_brewed']),
 beer['description'],
 beer['image_url'],
 beer['abv'],
 beer['ibu'],
 beer['target_fg'],
 beer['target_og'],
 beer['ebc'],
 beer['srm'],
 beer['ph'],
 beer['attenuation_level'],
 beer['brewers_tips'],
 beer['contributed_by'],
 beer['volume']['value'],
 ) for beer in beers])

Importer des bières avec la fonction :

>>> insert_execute_values(connection, beers)
insert_execute_values()
Time 3.666
Memory 4.50390625

Donc, dès la sortie de la boîte, nous obtenons une légère accélération par rapport à execute_batch . Cependant, la mémoire est légèrement supérieure.

Exécuter les valeurs de l'itérateur

Tout comme nous l'avons fait auparavant, pour réduire la consommation de mémoire, nous essayons d'éviter de stocker des données en mémoire en utilisant un itérateur au lieu d'une liste :

@profile
def insert_execute_values_iterator(connection, beers: Iterator[Dict[str, Any]]) -> None:
 with connection.cursor() as cursor:
 create_staging_table(cursor)
 psycopg2.extras.execute_values(cursor, """
 INSERT INTO staging_beers VALUES %s;
 """, ((
 beer['id'],
 beer['name'],
 beer['tagline'],
 parse_first_brewed(beer['first_brewed']),
 beer['description'],
 beer['image_url'],
 beer['abv'],
 beer['ibu'],
 beer['target_fg'],
 beer['target_og'],
 beer['ebc'],
 beer['srm'],
 beer['ph'],
 beer['attenuation_level'],
 beer['brewers_tips'],
 beer['contributed_by'],
 beer['volume']['value'],
 ) for beer in beers))

L'exécution de la fonction a produit les résultats suivants :

>>> insert_execute_values_iterator(connection, beers)
insert_execute_values_iterator()
Time 3.677
Memory 0.0

Ainsi, le timing est presque le même, mais la mémoire est de retour à zéro.

Exécuter les valeurs de l'itérateur avec la taille de la page

Tout comme execute_batch , la fonction execute_values accepter également un page_size argument :

@profile
def insert_execute_values_iterator(
 connection,
 beers: Iterator[Dict[str, Any]],
 page_size: int = 100,
) -> None:
 with connection.cursor() as cursor:
 create_staging_table(cursor)
 psycopg2.extras.execute_values(cursor, """
 INSERT INTO staging_beers VALUES %s;
 """, ((
 beer['id'],
 beer['name'],
 beer['tagline'],
 parse_first_brewed(beer['first_brewed']),
 beer['description'],
 beer['image_url'],
 beer['abv'],
 beer['ibu'],
 beer['target_fg'],
 beer['target_og'],
 beer['ebc'],
 beer['srm'],
 beer['ph'],
 beer['attenuation_level'],
 beer['brewers_tips'],
 beer['contributed_by'],
 beer['volume']['value'],
 ) for beer in beers), page_size=page_size)

Exécution avec différentes tailles de page :

>>> insert_execute_values_iterator(connection, iter(beers), page_size=1)
insert_execute_values_iterator(page_size=1)
Time 127.4
Memory 0.0

>>> insert_execute_values_iterator(connection, iter(beers), page_size=100)
insert_execute_values_iterator(page_size=100)
Time 3.677
Memory 0.0

>>> insert_execute_values_iterator(connection, iter(beers), page_size=1000)
insert_execute_values_iterator(page_size=1000)
Time 1.468
Memory 0.0

>>> insert_execute_values_iterator(connection, iter(beers), page_size=10000)
insert_execute_values_iterator(page_size=10000)
Time 1.503
Memory 2.25

Tout comme execute_batch , nous voyons un compromis entre la mémoire et la vitesse. Ici aussi, le sweet spot se situe autour de la taille de page 1000. Cependant, en utilisant execute_values nous avons obtenu des résultats environ 20 % plus rapides par rapport à la même taille de page en utilisant execute_batch .

Copier

La documentation officielle de PostgreSQL contient une section entière sur le remplissage d'une base de données. Selon la documentation, la meilleure façon de charger des données dans une base de données est d'utiliser le copy commande.

Pour utiliser copy à partir de Python, psycopg fournit une fonction spéciale appelée copy_from . Le copy La commande nécessite un fichier CSV. Voyons si nous pouvons transformer nos données en CSV et les charger dans la base de données en utilisant copy_from :

import io

def clean_csv_value(value: Optional[Any]) -> str:
 if value is None:
 return r'\N'
 return str(value).replace('\n', '\\n')

@profile
def copy_stringio(connection, beers: Iterator[Dict[str, Any]]) -> None:
 with connection.cursor() as cursor:
 create_staging_table(cursor)
 csv_file_like_object = io.StringIO()
 for beer in beers:
 csv_file_like_object.write('|'.join(map(clean_csv_value, (
 beer['id'],
 beer['name'],
 beer['tagline'],
 parse_first_brewed(beer['first_brewed']),
 beer['description'],
 beer['image_url'],
 beer['abv'],
 beer['ibu'],
 beer['target_fg'],
 beer['target_og'],
 beer['ebc'],
 beer['srm'],
 beer['ph'],
 beer['attenuation_level'],
 beer['contributed_by'],
 beer['brewers_tips'],
 beer['volume']['value'],
 ))) + '\n')
 csv_file_like_object.seek(0)
 cursor.copy_from(csv_file_like_object, 'staging_beers', sep='|')

Décomposons-le :

  • clean_csv_value :Transforme une seule valeur
    • Échappez aux nouvelles lignes  :certains des champs de texte incluent des retours à la ligne, nous échappons donc à \n -> \\n .
    • Les valeurs vides sont transformées en \N :La chaîne "\N" est la chaîne par défaut utilisée par PostgreSQL pour indiquer NULL dans COPY (cela peut être changé en utilisant le NULL option).
  • csv_file_like_object :Générer un fichier comme un objet en utilisant io.StringIO . Un StringIO objet contient une chaîne qui peut être utilisée comme un fichier. Dans notre cas, un fichier CSV.
  • csv_file_like_object.write :Transformer une bière en ligne CSV
    • Transformez les données :transformations sur first_brewed et volume sont exécutés ici.
    • Choisissez un délimiteur :Certains des champs du jeu de données contiennent du texte libre avec des virgules. Pour éviter les conflits, nous choisissons "|" comme délimiteur (une autre option consiste à utiliser QUOTE ).

Voyons maintenant si tout ce travail acharné a porté ses fruits :

>>> copy_stringio(connection, beers)
copy_stringio()
Time 0.6274
Memory 99.109375

Le copy commande est la plus rapide que nous ayons vue jusqu'à présent ! Utilisation de COPY , le processus terminé en moins d'une seconde. Cependant, il semble que cette méthode soit beaucoup plus coûteuse en termes d'utilisation de la mémoire. La fonction consomme 99 Mo, soit plus du double de la taille de notre fichier JSON sur le disque.

Copier les données d'un itérateur de chaîne

L'un des principaux inconvénients de l'utilisation de la copie avec StringIO est que le fichier entier est créé en mémoire. Et si au lieu de créer le fichier entier en mémoire, nous créons un objet semblable à un fichier qui agira comme un tampon entre la source distante et le COPY commande. Le tampon consommera JSON via l'itérateur, nettoiera et transformera les données, et produira un CSV propre.

Inspirés par cette réponse de débordement de pile, nous avons créé un objet qui se nourrit d'un itérateur et fournit une interface de type fichier :

from typing import Iterator, Optional
import io

class StringIteratorIO(io.TextIOBase):
 def __init__(self, iter: Iterator[str]):
 self._iter = iter
 self._buff = ''

 def readable(self) -> bool:
 return True

 def _read1(self, n: Optional[int] = None) -> str:
 while not self._buff:
 try:
 self._buff = next(self._iter)
 except StopIteration:
 break
 ret = self._buff[:n]
 self._buff = self._buff[len(ret):]
 return ret

 def read(self, n: Optional[int] = None) -> str:
 line = []
 if n is None or n < 0:
 while True:
 m = self._read1()
 if not m:
 break
 line.append(m)
 else:
 while n > 0:
 m = self._read1(n)
 if not m:
 break
 n -= len(m)
 line.append(m)
 return ''.join(line)

Pour montrer comment cela fonctionne, voici comment un objet semblable à un fichier CSV peut être généré à partir d'une liste de nombres :

>>> gen = (f'{i},{i**2}\n' for i in range(3))
>>> gen
<generator object <genexpr> at 0x7f58bde7f5e8>
>>> f = StringIteratorIO(gen)
>>> print(f.read())
0,0
1,1
2,4

Notez que nous avons utilisé f comme un fichier. En interne, il a récupéré les lignes de gen uniquement lorsque son tampon de ligne interne était vide.

La fonction de chargement utilisant StringIteratorIO ressemble à ceci :

@profile
def copy_string_iterator(connection, beers: Iterator[Dict[str, Any]]) -> None:
 with connection.cursor() as cursor:
 create_staging_table(cursor)
 beers_string_iterator = StringIteratorIO((
 '|'.join(map(clean_csv_value, (
 beer['id'],
 beer['name'],
 beer['tagline'],
 parse_first_brewed(beer['first_brewed']).isoformat(),
 beer['description'],
 beer['image_url'],
 beer['abv'],
 beer['ibu'],
 beer['target_fg'],
 beer['target_og'],
 beer['ebc'],
 beer['srm'],
 beer['ph'],
 beer['attenuation_level'],
 beer['brewers_tips'],
 beer['contributed_by'],
 beer['volume']['value'],
 ))) + '\n'
 for beer in beers
 ))
 cursor.copy_from(beers_string_iterator, 'staging_beers', sep='|')

La principale différence est que le fichier CSV beers est consommé à la demande et que les données ne sont pas stockées en mémoire après leur utilisation.

Exécutons la fonction et voyons les résultats :

>>> copy_string_iterator(connection, beers)
copy_string_iterator()
Time 0.4596
Memory 0.0

Super! Le temps est faible et la mémoire est de retour à zéro.

Copier les données d'un itérateur de chaîne avec une taille de tampon

Dans une tentative de presser une dernière goutte de performance, nous remarquons que tout comme page_size , le copy La commande accepte également un argument similaire appelé size :

Ajoutons un size argument de la fonction :

@profile
def copy_string_iterator(connection, beers: Iterator[Dict[str, Any]], size: int = 8192) -> None:
 with connection.cursor() as cursor:
 create_staging_table(cursor)
 beers_string_iterator = StringIteratorIO((
 '|'.join(map(clean_csv_value, (
 beer['id'],
 beer['name'],
 beer['tagline'],
 parse_first_brewed(beer['first_brewed']).isoformat(),
 beer['description'],
 beer['image_url'],
 beer['abv'],
 beer['ibu'],
 beer['target_fg'],
 beer['target_og'],
 beer['ebc'],
 beer['srm'],
 beer['ph'],
 beer['attenuation_level'],
 beer['brewers_tips'],
 beer['contributed_by'],
 beer['volume']['value'],
 ))) + '\n'
 for beer in beers
 ))
 cursor.copy_from(beers_string_iterator, 'staging_beers', sep='|', size=size)

La valeur par défaut pour la taille est 8192, qui est 2 ** 13 , nous conserverons donc les tailles en puissances de 2 :

>>> copy_string_iterator(connection, iter(beers), size=1024)
copy_string_iterator(size=1024)
Time 0.4536
Memory 0.0

>>> copy_string_iterator(connection, iter(beers), size=8192)
copy_string_iterator(size=8192)
Time 0.4596
Memory 0.0

>>> copy_string_iterator(connection, iter(beers), size=16384)
copy_string_iterator(size=16384)
Time 0.4649
Memory 0.0

>>> copy_string_iterator(connection, iter(beers), size=65536)
copy_string_iterator(size=65536)
Time 0.6171
Memory 0.0

Contrairement aux exemples précédents, il semble qu'il n'y ait pas de compromis entre la vitesse et la mémoire. Cela a du sens car cette méthode a été conçue pour ne consommer aucune mémoire. Cependant, nous obtenons un timing différent lors de la modification de la taille de la page. Pour notre ensemble de données, la valeur par défaut 8192 est le point idéal.

Résumé des résultats

Un résumé des résultats :

Fonction Temps (secondes) Mémoire (Mo)
insert_one_by_one() 128,8 0.08203125
insert_executemany() 124,7 2.765625
insert_executemany_iterator() 129.3 0.0
insert_execute_batch() 3.917 2.50390625
insert_execute_batch_iterator(page_size=1) 130.2 0.0
insert_execute_batch_iterator(page_size=100) 4.333 0.0
insert_execute_batch_iterator(page_size=1000) 2.537 0.2265625
insert_execute_batch_iterator(page_size=10000) 2.585 25.4453125
insert_execute_values() 3.666 4.50390625
insert_execute_values_iterator(page_size=1) 127,4 0.0
insert_execute_values_iterator(page_size=100) 3.677 0.0
insert_execute_values_iterator(page_size=1000) 1.468 0.0
insert_execute_values_iterator(page_size=10000) 1.503 2.25
copy_stringio() 0.6274 99.109375
copy_string_iterator(size=1024) 0.4536 0.0
copy_string_iterator(size=8192) 0,4596 0.0
copy_string_iterator(size=16384) 0.4649 0.0
copy_string_iterator(size=65536) 0.6171 0.0

Résumé

La grande question est maintenant Que dois-je utiliser ? comme toujours, la réponse est Cela dépend .

Chaque méthode a ses propres avantages et inconvénients, et est adaptée à différentes circonstances :

A emporter

Préférez les approches intégrées pour les types de données complexes.

Exécuter plusieurs, exécuter des valeurs et traiter par lots la conversion entre les types de données Python en types de base de données. Les approches CSV nécessitaient une fuite.

A emporter

Préférez les approches intégrées pour les petits volumes de données.

Les approches intégrées sont plus lisibles et moins susceptibles de se casser à l'avenir. Si la mémoire et le temps ne sont pas un problème, restez simple !

A emporter

Préférez les approches de copie pour les gros volumes de données.

L'approche de copie est plus adaptée aux grandes quantités de données où la mémoire peut devenir un problème.

Code source

Le code source de ce benchmark peut être trouvé ici.