En tant que plombiers de données glorifiés, nous sommes souvent chargés de charger des données extraites d'une source distante dans nos systèmes. Si nous avons de la chance, les données sont sérialisées au format JSON ou YAML. Lorsque nous sommes moins chanceux, nous obtenons une feuille de calcul Excel ou un fichier CSV qui est toujours cassé d'une manière ou d'une autre, je ne peux pas l'expliquer.
Les données des grandes entreprises ou des anciens systèmes sont en quelque sorte toujours encodées de manière étrange, et les administrateurs système pensent toujours qu'ils nous rendent service en compressant les fichiers (veuillez gzip) ou en les décomposant en fichiers plus petits avec des noms aléatoires.
Les services modernes peuvent fournir une API décente, mais le plus souvent, nous devons récupérer un fichier à partir d'un FTP, SFTP, S3 ou d'un coffre-fort propriétaire qui ne fonctionne que sous Windows.
Dans cet article, nous explorons la meilleure façon d'importer des données désordonnées depuis une source distante dans PostgreSQL.
Pour fournir une solution pratique et réaliste, nous définissons les rôles fondamentaux suivants :
- Les données sont extraites d'une source distante.
- Les données sont sales et doivent être transformées.
- Les données sont volumineuses.
Configuration :une brasserie de bière
J'ai trouvé cette excellente API publique pour les bières, nous allons donc importer des données dans une table de bière dans la base de données.
Les données
Une seule bière de l'API ressemble à ceci :
$ curl https://api.punkapi.com/v2/beers/?per_page=1&page=1 [ { "id": 1, "name": "Buzz", "tagline": "A Real Bitter Experience.", "first_brewed": "09/2007", "description": "A light, crisp and bitter IPA ...", "image_url": "https://images.punkapi.com/v2/keg.png", "abv": 4.5, "ibu": 60, "target_fg": 1010, "target_og": 1044, "ebc": 20, "srm": 10, "ph": 4.4, "attenuation_level": 75, "volume": { "value": 20, "unit": "litres" }, "contributed_by": "Sam Mason <samjbmason>" "brewers_tips": "The earthy and floral aromas from...", "boil_volume": {}, "method": {}, "ingredients": {}, "food_pairing": [], } ]
J'ai coupé la sortie par souci de brièveté, mais il y a beaucoup d'informations sur les bières ici. Dans cet article, nous voulons importer tous les champs avant brewers_tips
à une table de la base de données.
Le champ volume
est imbriqué. Nous voulons extraire uniquement les value
du champ et enregistrez-le dans un champ appelé volume
dans le tableau.
volume = beer['volume']['value']
Le champ first_brewed
contient uniquement l'année et le mois, et dans certains cas, uniquement l'année. Nous voulons transformer la valeur en une date valide. Par exemple, la valeur 09/2007
sera transformé à ce jour 2007-09-01
. La valeur 2006
sera transformé à ce jour 2016-01-01
.
Écrivons une fonction simple pour transformer la valeur de texte dans le champ, en Python datetime.date
:
import datetime def parse_first_brewed(text: str) -> datetime.date: parts = text.split('/') if len(parts) == 2: return datetime.date(int(parts[1]), int(parts[0]), 1) elif len(parts) == 1: return datetime.date(int(parts[0]), 1, 1) else: assert False, 'Unknown date format'
Assurons-nous rapidement que cela fonctionne :
>>> parse_first_brewed('09/2007') datetime.date(2007, 9, 1) >>> parse_first_brewed('2006') datetime.date(2006, 1, 1)
Dans la vraie vie, les transformations peuvent être beaucoup plus compliquées. Mais pour notre objectif, c'est plus que suffisant.
Récupérer les données
L'API fournit des résultats paginés. Pour encapsuler la pagination, nous créons un générateur qui produit des bières une par une :
from typing import Iterator, Dict, Any from urllib.parse import urlencode import requests def iter_beers_from_api(page_size: int = 5) -> Iterator[Dict[str, Any]]: session = requests.Session() page = 1 while True: response = session.get('https://api.punkapi.com/v2/beers?' + urlencode({ 'page': page, 'per_page': page_size })) response.raise_for_status() data = response.json() if not data: break yield from data page += 1
Et pour utiliser la fonction générateur, nous l'appelons et l'itérons :
>>> beers = iter_beers_from_api() >>> next(beers) {'id': 1, 'name': 'Buzz', 'tagline': 'A Real Bitter Experience.', 'first_brewed': '09/2007', 'description': 'A light, crisp and bitter IPA brewed...', 'image_url': 'https://images.punkapi.com/v2/keg.png', 'abv': 4.5, 'ibu': 60, 'target_fg': 1010, ... } >>> next(beers) {'id': 2, 'name': 'Trashy Blonde', 'tagline': "You Know You Shouldn't", 'first_brewed': '04/2008', 'description': 'A titillating, ...', 'image_url': 'https://images.punkapi.com/v2/2.png', 'abv': 4.1, 'ibu': 41.5,
Vous remarquerez que le premier résultat de chaque page prend un peu plus de temps. C'est parce qu'il fait une requête réseau pour récupérer la page.
Créer une table dans la base de données
L'étape suivante consiste à créer une table dans la base de données dans laquelle importer les données.
Créer une base de données :
$ createdb -O haki testload
Changer haki
dans l'exemple à votre utilisateur local.
Pour se connecter de Python à une base de données PostgreSQL, nous utilisons psycopg :
$ python -m pip install psycopg2
À l'aide de psycopg, créez une connexion à la base de données :
import psycopg2 connection = psycopg2.connect( host="localhost", database="testload", user="haki", password=None, ) connection.autocommit = True
Nous définissons autocommit=True
Ainsi, chaque commande que nous exécutons prendra effet immédiatement. Pour les besoins de cet article, c'est très bien.
Maintenant que nous avons une connexion, nous pouvons écrire une fonction pour créer une table :
def create_staging_table(cursor) -> None: cursor.execute(""" DROP TABLE IF EXISTS staging_beers; CREATE UNLOGGED TABLE staging_beers ( id INTEGER, name TEXT, tagline TEXT, first_brewed DATE, description TEXT, image_url TEXT, abv DECIMAL, ibu DECIMAL, target_fg DECIMAL, target_og DECIMAL, ebc DECIMAL, srm DECIMAL, ph DECIMAL, attenuation_level DECIMAL, brewers_tips TEXT, contributed_by TEXT, volume INTEGER ); """)
La fonction reçoit un curseur et crée une table non journalisée appelée staging_beers
.
TABLE NON ENREGISTRÉE
Les données écrites dans une table non consignée ne seront pas consignées dans le journal d'écriture anticipée (WAL), ce qui en fait l'outil idéal pour les tables intermédiaires. Notez que UNLOGGED
les tables ne seront pas restaurées en cas de plantage et ne seront pas répliquées.
En utilisant la connexion que nous avons créée précédemment, voici comment la fonction est utilisée :
>>> with connection.cursor() as cursor: >>> create_staging_table(cursor)
Nous sommes maintenant prêts à passer à la partie suivante.
Métriques
Tout au long de cet article, nous nous intéressons à deux métriques principales :le temps et la mémoire.
Mesurer le temps
Pour mesurer le temps pour chaque méthode, nous utilisons le time
intégré modules :
>>> import time >>> start = time.perf_counter() >>> time.sleep(1) # do work >>> elapsed = time.perf_counter() - start >>> print(f'Time {elapsed:0.4}') Time 1.001
La fonction perf_counter
fournit à l'horloge la plus haute résolution disponible, ce qui la rend idéale pour nos besoins.
Mesurer la mémoire
Pour mesurer la consommation mémoire, nous allons utiliser le package memory-profiler.
$ python -m pip install memory-profiler
Ce package fournit l'utilisation de la mémoire et l'utilisation de la mémoire incrémentielle pour chaque ligne du code. Ceci est très utile lors de l'optimisation de la mémoire. Pour illustrer, voici l'exemple fourni dans PyPI :
$ python -m memory_profiler example.py Line # Mem usage Increment Line Contents ============================================== 3 @profile 4 5.97 MB 0.00 MB def my_func(): 5 13.61 MB 7.64 MB a = [1] * (10 ** 6) 6 166.20 MB 152.59 MB b = [2] * (2 * 10 ** 7) 7 13.61 MB -152.59 MB del b 8 13.61 MB 0.00 MB return a
La partie intéressante est le Increment
colonne qui montre la mémoire supplémentaire allouée par le code dans chaque ligne.
Dans cet article, nous nous intéressons au pic de mémoire utilisé par la fonction. Le pic de mémoire est la différence entre la valeur de départ de la colonne « Utilisation de la mémoire » et la valeur la plus élevée (également appelée « high watermark »).
Pour obtenir la liste "Mem usage" nous utilisons la fonction memory_usage
à partir de memory_profiler
:
>>> from memory_profiler import memory_usage >>> mem, retval = memory_usage((fn, args, kwargs), retval=True, interval=1e-7)
Utilisée ainsi, la fonction memory_usage
exécute la fonction fn
avec le args
fourni et kwargs
, mais lance également un autre processus en arrière-plan pour surveiller l'utilisation de la mémoire tous les interval
secondes.
Pour des opérations très rapides la fonction fn
peut être exécuté plus d'une fois. En définissant interval
à une valeur inférieure à 1e-6, nous le forçons à s'exécuter une seule fois.
L'argument retval
indique à la fonction de renvoyer le résultat de fn
.
profile
Décorateur
Pour tout mettre ensemble, nous créons le décorateur suivant pour mesurer et rapporter le temps et la mémoire :
import time from functools import wraps from memory_profiler import memory_usage def profile(fn): @wraps(fn) def inner(*args, **kwargs): fn_kwargs_str = ', '.join(f'{k}={v}' for k, v in kwargs.items()) print(f'\n{fn.__name__}({fn_kwargs_str})') # Measure time t = time.perf_counter() retval = fn(*args, **kwargs) elapsed = time.perf_counter() - t print(f'Time {elapsed:0.4}') # Measure memory mem, retval = memory_usage((fn, args, kwargs), retval=True, timeout=200, interval=1e-7) print(f'Memory {max(mem) - min(mem)}') return retval return inner
Pour éliminer les effets mutuels de la synchronisation sur la mémoire et vice versa, nous exécutons la fonction deux fois. D'abord pour chronométrer, ensuite pour mesurer l'utilisation de la mémoire.
Le décorateur imprimera le nom de la fonction et tous les arguments de mots-clés, et rapportera le temps et la mémoire utilisés :
>>> @profile >>> def work(n): >>> for i in range(n): >>> 2 ** n >>> work(10) work() Time 0.06269 Memory 0.0 >>> work(n=10000) work(n=10000) Time 0.3865 Memory 0.0234375
Seuls les arguments de mots-clés sont imprimés. C'est intentionnel, nous allons l'utiliser dans des tests paramétrés.
Référence
Au moment de la rédaction, l'API des bières ne contient que 325 bières. Pour travailler sur un grand ensemble de données, nous le dupliquons 100 fois et le stockons en mémoire. L'ensemble de données résultant contient 32 500 bières :
>>> beers = list(iter_beers_from_api()) * 100 >>> len(beers) 32,500
Pour imiter une API distante, nos fonctions accepteront des itérateurs similaires à la valeur de retour de iter_beers_from_api
:
def process(beers: Iterator[Dict[str, Any]])) -> None: # Process beers...
Pour le benchmark, nous allons importer les données de la bière dans la base de données. Pour éliminer les influences externes telles que le réseau, nous récupérons les données de l'API à l'avance et les diffusons localement.
Pour obtenir un timing précis, nous "truquons" l'API distante :
>>> beers = list(iter_beers_from_api()) * 100 >>> process(beers)
Dans une situation réelle, vous utiliseriez la fonction iter_beers_from_api
directement :
>>> process(iter_beers_from_api())
Nous sommes maintenant prêts à commencer !
Insérer des lignes une par une
Pour établir une ligne de base, nous commençons par l'approche la plus simple, insérez les lignes une par une :
@profile def insert_one_by_one(connection, beers: Iterator[Dict[str, Any]]) -> None: with connection.cursor() as cursor: create_staging_table(cursor) for beer in beers: cursor.execute(""" INSERT INTO staging_beers VALUES ( %(id)s, %(name)s, %(tagline)s, %(first_brewed)s, %(description)s, %(image_url)s, %(abv)s, %(ibu)s, %(target_fg)s, %(target_og)s, %(ebc)s, %(srm)s, %(ph)s, %(attenuation_level)s, %(brewers_tips)s, %(contributed_by)s, %(volume)s ); """, { **beer, 'first_brewed': parse_first_brewed(beer['first_brewed']), 'volume': beer['volume']['value'], })
Notez que lorsque nous itérons les bières, nous transformons le first_brewed
à un datetime.date
et extrait la valeur de volume du volume
imbriqué champ.
L'exécution de cette fonction produit la sortie suivante :
>>> insert_one_by_one(connection, beers) insert_one_by_one() Time 128.8 Memory 0.08203125
La fonction a pris 129 secondes pour importer 32 000 lignes. Le profileur de mémoire montre que la fonction a consommé très peu de mémoire.
Intuitivement, insérer des lignes une par une ne semble pas très efficace. Le changement constant de contexte entre le programme et la base de données doit le ralentir.
Exécuter plusieurs
Psycopg2 fournit un moyen d'insérer plusieurs lignes à la fois en utilisant executemany
. À partir de la documentation :
Cela semble prometteur !
Essayons d'importer les données en utilisant executemany
:
@profile def insert_executemany(connection, beers: Iterator[Dict[str, Any]]) -> None: with connection.cursor() as cursor: create_staging_table(cursor) all_beers = [{ **beer, 'first_brewed': parse_first_brewed(beer['first_brewed']), 'volume': beer['volume']['value'], } for beer in beers] cursor.executemany(""" INSERT INTO staging_beers VALUES ( %(id)s, %(name)s, %(tagline)s, %(first_brewed)s, %(description)s, %(image_url)s, %(abv)s, %(ibu)s, %(target_fg)s, %(target_og)s, %(ebc)s, %(srm)s, %(ph)s, %(attenuation_level)s, %(brewers_tips)s, %(contributed_by)s, %(volume)s ); """, all_beers)
La fonction ressemble beaucoup à la fonction précédente et les transformations sont les mêmes. La principale différence ici est que nous transformons d'abord toutes les données en mémoire, puis que nous les importons dans la base de données.
L'exécution de cette fonction produit la sortie suivante :
>>> insert_executemany(connection, beers) insert_executemany() Time 124.7 Memory 2.765625
C'est décevant. Le timing est juste un peu meilleur, mais la fonction consomme maintenant 2,7 Mo de mémoire.
Pour mettre l'utilisation de la mémoire en perspective, un fichier JSON contenant uniquement les données que nous importons pèse 25 Mo sur disque. Compte tenu de la proportion, l'utilisation de cette méthode pour importer un fichier de 1 Go nécessitera 110 Mo de mémoire.
Exécuter plusieurs depuis l'itérateur
La méthode précédente consommait beaucoup de mémoire car les données transformées étaient stockées en mémoire avant d'être traitées par psycopg.
Voyons si nous pouvons utiliser un itérateur pour éviter de stocker les données en mémoire :
@profile def insert_executemany_iterator(connection, beers: Iterator[Dict[str, Any]]) -> None: with connection.cursor() as cursor: create_staging_table(cursor) cursor.executemany(""" INSERT INTO staging_beers VALUES ( %(id)s, %(name)s, %(tagline)s, %(first_brewed)s, %(description)s, %(image_url)s, %(abv)s, %(ibu)s, %(target_fg)s, %(target_og)s, %(ebc)s, %(srm)s, %(ph)s, %(attenuation_level)s, %(brewers_tips)s, %(contributed_by)s, %(volume)s ); """, ({ **beer, 'first_brewed': parse_first_brewed(beer['first_brewed']), 'volume': beer['volume']['value'], } for beer in beers))
La différence ici est que les données transformées sont "diffusées" en executemany
en utilisant un itérateur.
Cette fonction produit le résultat suivant :
>>> insert_executemany_iterator(connection, beers) insert_executemany_iterator() Time 129.3 Memory 0.0
Notre solution de "streaming" a fonctionné comme prévu et nous avons réussi à ramener la mémoire à zéro. Cependant, le timing reste à peu près le même, même par rapport à la méthode un par un.
Exécuter le lot
La documentation de psycopg contient une note très intéressante sur executemany
dans la section "aides à l'exécution rapide" :
Donc, nous nous sommes trompés depuis le début !
La fonction juste en dessous de cette section est execute_batch
:
Implémentons la fonction de chargement en utilisant execute_batch
:
import psycopg2.extras @profile def insert_execute_batch(connection, beers: Iterator[Dict[str, Any]]) -> None: with connection.cursor() as cursor: create_staging_table(cursor) all_beers = [{ **beer, 'first_brewed': parse_first_brewed(beer['first_brewed']), 'volume': beer['volume']['value'], } for beer in beers] psycopg2.extras.execute_batch(cursor, """ INSERT INTO staging_beers VALUES ( %(id)s, %(name)s, %(tagline)s, %(first_brewed)s, %(description)s, %(image_url)s, %(abv)s, %(ibu)s, %(target_fg)s, %(target_og)s, %(ebc)s, %(srm)s, %(ph)s, %(attenuation_level)s, %(brewers_tips)s, %(contributed_by)s, %(volume)s ); """, all_beers)
Exécution de la fonction :
>>> insert_execute_batch(connection, beers) insert_execute_batch() Time 3.917 Memory 2.50390625
Ouah! C'est un énorme bond en avant. La fonction terminée en un peu moins de 4 secondes. C'est environ 33 fois plus rapide que les 129 secondes avec lesquelles nous avons commencé.
Exécuter le lot à partir de l'itérateur
La fonction execute_batch
utilisé moins de mémoire que executemany
fait pour les mêmes données. Essayons d'éliminer la mémoire en "streamant" les données dans execute_batch
en utilisant un itérateur :
@profile def insert_execute_batch_iterator(connection, beers: Iterator[Dict[str, Any]]) -> None: with connection.cursor() as cursor: create_staging_table(cursor) iter_beers = ({ **beer, 'first_brewed': parse_first_brewed(beer['first_brewed']), 'volume': beer['volume']['value'], } for beer in beers) psycopg2.extras.execute_batch(cursor, """ INSERT INTO staging_beers VALUES ( %(id)s, %(name)s, %(tagline)s, %(first_brewed)s, %(description)s, %(image_url)s, %(abv)s, %(ibu)s, %(target_fg)s, %(target_og)s, %(ebc)s, %(srm)s, %(ph)s, %(attenuation_level)s, %(brewers_tips)s, %(contributed_by)s, %(volume)s ); """, iter_beers)
Exécution de la fonction
>>> insert_execute_batch_iterator(connection, beers) insert_execute_batch_iterator() Time 4.333 Memory 0.2265625
Nous avons à peu près le même temps, mais avec moins de mémoire.
Exécuter un lot à partir de l'itérateur avec la taille de la page
Lors de la lecture de la documentation pour execute_batch
, l'argument page_size
a attiré mon attention :
La documentation indiquait précédemment que la fonction fonctionnait mieux car elle effectuait moins d'allers-retours vers la base de données. Si tel est le cas, une taille de page plus grande devrait réduire le nombre d'allers-retours et entraîner un temps de chargement plus rapide.
Ajoutons un argument pour la taille de la page à notre fonction afin que nous puissions expérimenter :
@profile def insert_execute_batch_iterator( connection, beers: Iterator[Dict[str, Any]], page_size: int = 100, ) -> None: with connection.cursor() as cursor: create_staging_table(cursor) iter_beers = ({ **beer, 'first_brewed': parse_first_brewed(beer['first_brewed']), 'volume': beer['volume']['value'], } for beer in beers) psycopg2.extras.execute_batch(cursor, """ INSERT INTO staging_beers VALUES ( %(id)s, %(name)s, %(tagline)s, %(first_brewed)s, %(description)s, %(image_url)s, %(abv)s, %(ibu)s, %(target_fg)s, %(target_og)s, %(ebc)s, %(srm)s, %(ph)s, %(attenuation_level)s, %(brewers_tips)s, %(contributed_by)s, %(volume)s ); """, iter_beers, page_size=page_size)
La taille de page par défaut est de 100. Comparons différentes valeurs et comparons les résultats :
>>> insert_execute_batch_iterator(connection, iter(beers), page_size=1) insert_execute_batch_iterator(page_size=1) Time 130.2 Memory 0.0 >>> insert_execute_batch_iterator(connection, iter(beers), page_size=100) insert_execute_batch_iterator(page_size=100) Time 4.333 Memory 0.0 >>> insert_execute_batch_iterator(connection, iter(beers), page_size=1000) insert_execute_batch_iterator(page_size=1000) Time 2.537 Memory 0.2265625 >>> insert_execute_batch_iterator(connection, iter(beers), page_size=10000) insert_execute_batch_iterator(page_size=10000) Time 2.585 Memory 25.4453125
Nous avons obtenu des résultats intéressants, décomposons-les :
- 1 :Les résultats sont similaires aux résultats que nous avons obtenus en insérant les lignes une par une.
- 100 :il s'agit du
page_size
par défaut , les résultats sont donc similaires à ceux de notre benchmark précédent. - 1 000 :la synchronisation est ici environ 40 % plus rapide et la mémoire est faible.
- 10 000 :la synchronisation n'est pas beaucoup plus rapide qu'avec une taille de page de 1 000, mais la mémoire est nettement plus élevée.
Les résultats montrent qu'il existe un compromis entre la mémoire et la vitesse. Dans ce cas, il semble que le sweet spot corresponde à une taille de page de 1 000.
Exécuter les valeurs
Les joyaux de la documentation de psycopg ne se terminent pas par execute_batch
. En parcourant la documentation, une autre fonction appelée execute_values
a attiré mon attention :
La fonction execute_values
fonctionne en générant une énorme liste de VALEURS à la requête.
Essayons :
import psycopg2.extras @profile def insert_execute_values(connection, beers: Iterator[Dict[str, Any]]) -> None: with connection.cursor() as cursor: create_staging_table(cursor) psycopg2.extras.execute_values(cursor, """ INSERT INTO staging_beers VALUES %s; """, [( beer['id'], beer['name'], beer['tagline'], parse_first_brewed(beer['first_brewed']), beer['description'], beer['image_url'], beer['abv'], beer['ibu'], beer['target_fg'], beer['target_og'], beer['ebc'], beer['srm'], beer['ph'], beer['attenuation_level'], beer['brewers_tips'], beer['contributed_by'], beer['volume']['value'], ) for beer in beers])
Importer des bières avec la fonction :
>>> insert_execute_values(connection, beers) insert_execute_values() Time 3.666 Memory 4.50390625
Donc, dès la sortie de la boîte, nous obtenons une légère accélération par rapport à execute_batch
. Cependant, la mémoire est légèrement supérieure.
Exécuter les valeurs de l'itérateur
Tout comme nous l'avons fait auparavant, pour réduire la consommation de mémoire, nous essayons d'éviter de stocker des données en mémoire en utilisant un itérateur au lieu d'une liste :
@profile def insert_execute_values_iterator(connection, beers: Iterator[Dict[str, Any]]) -> None: with connection.cursor() as cursor: create_staging_table(cursor) psycopg2.extras.execute_values(cursor, """ INSERT INTO staging_beers VALUES %s; """, (( beer['id'], beer['name'], beer['tagline'], parse_first_brewed(beer['first_brewed']), beer['description'], beer['image_url'], beer['abv'], beer['ibu'], beer['target_fg'], beer['target_og'], beer['ebc'], beer['srm'], beer['ph'], beer['attenuation_level'], beer['brewers_tips'], beer['contributed_by'], beer['volume']['value'], ) for beer in beers))
L'exécution de la fonction a produit les résultats suivants :
>>> insert_execute_values_iterator(connection, beers) insert_execute_values_iterator() Time 3.677 Memory 0.0
Ainsi, le timing est presque le même, mais la mémoire est de retour à zéro.
Exécuter les valeurs de l'itérateur avec la taille de la page
Tout comme execute_batch
, la fonction execute_values
accepter également un page_size
argument :
@profile def insert_execute_values_iterator( connection, beers: Iterator[Dict[str, Any]], page_size: int = 100, ) -> None: with connection.cursor() as cursor: create_staging_table(cursor) psycopg2.extras.execute_values(cursor, """ INSERT INTO staging_beers VALUES %s; """, (( beer['id'], beer['name'], beer['tagline'], parse_first_brewed(beer['first_brewed']), beer['description'], beer['image_url'], beer['abv'], beer['ibu'], beer['target_fg'], beer['target_og'], beer['ebc'], beer['srm'], beer['ph'], beer['attenuation_level'], beer['brewers_tips'], beer['contributed_by'], beer['volume']['value'], ) for beer in beers), page_size=page_size)
Exécution avec différentes tailles de page :
>>> insert_execute_values_iterator(connection, iter(beers), page_size=1) insert_execute_values_iterator(page_size=1) Time 127.4 Memory 0.0 >>> insert_execute_values_iterator(connection, iter(beers), page_size=100) insert_execute_values_iterator(page_size=100) Time 3.677 Memory 0.0 >>> insert_execute_values_iterator(connection, iter(beers), page_size=1000) insert_execute_values_iterator(page_size=1000) Time 1.468 Memory 0.0 >>> insert_execute_values_iterator(connection, iter(beers), page_size=10000) insert_execute_values_iterator(page_size=10000) Time 1.503 Memory 2.25
Tout comme execute_batch
, nous voyons un compromis entre la mémoire et la vitesse. Ici aussi, le sweet spot se situe autour de la taille de page 1000. Cependant, en utilisant execute_values
nous avons obtenu des résultats environ 20 % plus rapides par rapport à la même taille de page en utilisant execute_batch
.
Copier
La documentation officielle de PostgreSQL contient une section entière sur le remplissage d'une base de données. Selon la documentation, la meilleure façon de charger des données dans une base de données est d'utiliser le copy
commande.
Pour utiliser copy
à partir de Python, psycopg fournit une fonction spéciale appelée copy_from
. Le copy
La commande nécessite un fichier CSV. Voyons si nous pouvons transformer nos données en CSV et les charger dans la base de données en utilisant copy_from
:
import io def clean_csv_value(value: Optional[Any]) -> str: if value is None: return r'\N' return str(value).replace('\n', '\\n') @profile def copy_stringio(connection, beers: Iterator[Dict[str, Any]]) -> None: with connection.cursor() as cursor: create_staging_table(cursor) csv_file_like_object = io.StringIO() for beer in beers: csv_file_like_object.write('|'.join(map(clean_csv_value, ( beer['id'], beer['name'], beer['tagline'], parse_first_brewed(beer['first_brewed']), beer['description'], beer['image_url'], beer['abv'], beer['ibu'], beer['target_fg'], beer['target_og'], beer['ebc'], beer['srm'], beer['ph'], beer['attenuation_level'], beer['contributed_by'], beer['brewers_tips'], beer['volume']['value'], ))) + '\n') csv_file_like_object.seek(0) cursor.copy_from(csv_file_like_object, 'staging_beers', sep='|')
Décomposons-le :
clean_csv_value
:Transforme une seule valeur- Échappez aux nouvelles lignes :certains des champs de texte incluent des retours à la ligne, nous échappons donc à
\n
->\\n
. - Les valeurs vides sont transformées en
\N
:La chaîne"\N"
est la chaîne par défaut utilisée par PostgreSQL pour indiquer NULL dans COPY (cela peut être changé en utilisant leNULL
option).
- Échappez aux nouvelles lignes :certains des champs de texte incluent des retours à la ligne, nous échappons donc à
csv_file_like_object
:Générer un fichier comme un objet en utilisantio.StringIO
. UnStringIO
objet contient une chaîne qui peut être utilisée comme un fichier. Dans notre cas, un fichier CSV.csv_file_like_object.write
:Transformer une bière en ligne CSV- Transformez les données :transformations sur
first_brewed
etvolume
sont exécutés ici. - Choisissez un délimiteur :Certains des champs du jeu de données contiennent du texte libre avec des virgules. Pour éviter les conflits, nous choisissons "|" comme délimiteur (une autre option consiste à utiliser
QUOTE
).
- Transformez les données :transformations sur
Voyons maintenant si tout ce travail acharné a porté ses fruits :
>>> copy_stringio(connection, beers) copy_stringio() Time 0.6274 Memory 99.109375
Le copy
commande est la plus rapide que nous ayons vue jusqu'à présent ! Utilisation de COPY
, le processus terminé en moins d'une seconde. Cependant, il semble que cette méthode soit beaucoup plus coûteuse en termes d'utilisation de la mémoire. La fonction consomme 99 Mo, soit plus du double de la taille de notre fichier JSON sur le disque.
Copier les données d'un itérateur de chaîne
L'un des principaux inconvénients de l'utilisation de la copie avec StringIO
est que le fichier entier est créé en mémoire. Et si au lieu de créer le fichier entier en mémoire, nous créons un objet semblable à un fichier qui agira comme un tampon entre la source distante et le COPY
commande. Le tampon consommera JSON via l'itérateur, nettoiera et transformera les données, et produira un CSV propre.
Inspirés par cette réponse de débordement de pile, nous avons créé un objet qui se nourrit d'un itérateur et fournit une interface de type fichier :
from typing import Iterator, Optional import io class StringIteratorIO(io.TextIOBase): def __init__(self, iter: Iterator[str]): self._iter = iter self._buff = '' def readable(self) -> bool: return True def _read1(self, n: Optional[int] = None) -> str: while not self._buff: try: self._buff = next(self._iter) except StopIteration: break ret = self._buff[:n] self._buff = self._buff[len(ret):] return ret def read(self, n: Optional[int] = None) -> str: line = [] if n is None or n < 0: while True: m = self._read1() if not m: break line.append(m) else: while n > 0: m = self._read1(n) if not m: break n -= len(m) line.append(m) return ''.join(line)
Pour montrer comment cela fonctionne, voici comment un objet semblable à un fichier CSV peut être généré à partir d'une liste de nombres :
>>> gen = (f'{i},{i**2}\n' for i in range(3)) >>> gen <generator object <genexpr> at 0x7f58bde7f5e8> >>> f = StringIteratorIO(gen) >>> print(f.read()) 0,0 1,1 2,4
Notez que nous avons utilisé f
comme un fichier. En interne, il a récupéré les lignes de gen
uniquement lorsque son tampon de ligne interne était vide.
La fonction de chargement utilisant StringIteratorIO
ressemble à ceci :
@profile def copy_string_iterator(connection, beers: Iterator[Dict[str, Any]]) -> None: with connection.cursor() as cursor: create_staging_table(cursor) beers_string_iterator = StringIteratorIO(( '|'.join(map(clean_csv_value, ( beer['id'], beer['name'], beer['tagline'], parse_first_brewed(beer['first_brewed']).isoformat(), beer['description'], beer['image_url'], beer['abv'], beer['ibu'], beer['target_fg'], beer['target_og'], beer['ebc'], beer['srm'], beer['ph'], beer['attenuation_level'], beer['brewers_tips'], beer['contributed_by'], beer['volume']['value'], ))) + '\n' for beer in beers )) cursor.copy_from(beers_string_iterator, 'staging_beers', sep='|')
La principale différence est que le fichier CSV beers est consommé à la demande et que les données ne sont pas stockées en mémoire après leur utilisation.
Exécutons la fonction et voyons les résultats :
>>> copy_string_iterator(connection, beers) copy_string_iterator() Time 0.4596 Memory 0.0
Super! Le temps est faible et la mémoire est de retour à zéro.
Copier les données d'un itérateur de chaîne avec une taille de tampon
Dans une tentative de presser une dernière goutte de performance, nous remarquons que tout comme page_size
, le copy
La commande accepte également un argument similaire appelé size
:
Ajoutons un size
argument de la fonction :
@profile def copy_string_iterator(connection, beers: Iterator[Dict[str, Any]], size: int = 8192) -> None: with connection.cursor() as cursor: create_staging_table(cursor) beers_string_iterator = StringIteratorIO(( '|'.join(map(clean_csv_value, ( beer['id'], beer['name'], beer['tagline'], parse_first_brewed(beer['first_brewed']).isoformat(), beer['description'], beer['image_url'], beer['abv'], beer['ibu'], beer['target_fg'], beer['target_og'], beer['ebc'], beer['srm'], beer['ph'], beer['attenuation_level'], beer['brewers_tips'], beer['contributed_by'], beer['volume']['value'], ))) + '\n' for beer in beers )) cursor.copy_from(beers_string_iterator, 'staging_beers', sep='|', size=size)
La valeur par défaut pour la taille est 8192, qui est 2 ** 13
, nous conserverons donc les tailles en puissances de 2 :
>>> copy_string_iterator(connection, iter(beers), size=1024) copy_string_iterator(size=1024) Time 0.4536 Memory 0.0 >>> copy_string_iterator(connection, iter(beers), size=8192) copy_string_iterator(size=8192) Time 0.4596 Memory 0.0 >>> copy_string_iterator(connection, iter(beers), size=16384) copy_string_iterator(size=16384) Time 0.4649 Memory 0.0 >>> copy_string_iterator(connection, iter(beers), size=65536) copy_string_iterator(size=65536) Time 0.6171 Memory 0.0
Contrairement aux exemples précédents, il semble qu'il n'y ait pas de compromis entre la vitesse et la mémoire. Cela a du sens car cette méthode a été conçue pour ne consommer aucune mémoire. Cependant, nous obtenons un timing différent lors de la modification de la taille de la page. Pour notre ensemble de données, la valeur par défaut 8192 est le point idéal.
Résumé des résultats
Un résumé des résultats :
Fonction | Temps (secondes) | Mémoire (Mo) |
---|---|---|
insert_one_by_one() | 128,8 | 0.08203125 |
insert_executemany() | 124,7 | 2.765625 |
insert_executemany_iterator() | 129.3 | 0.0 |
insert_execute_batch() | 3.917 | 2.50390625 |
insert_execute_batch_iterator(page_size=1) | 130.2 | 0.0 |
insert_execute_batch_iterator(page_size=100) | 4.333 | 0.0 |
insert_execute_batch_iterator(page_size=1000) | 2.537 | 0.2265625 |
insert_execute_batch_iterator(page_size=10000) | 2.585 | 25.4453125 |
insert_execute_values() | 3.666 | 4.50390625 |
insert_execute_values_iterator(page_size=1) | 127,4 | 0.0 |
insert_execute_values_iterator(page_size=100) | 3.677 | 0.0 |
insert_execute_values_iterator(page_size=1000) | 1.468 | 0.0 |
insert_execute_values_iterator(page_size=10000) | 1.503 | 2.25 |
copy_stringio() | 0.6274 | 99.109375 |
copy_string_iterator(size=1024) | 0.4536 | 0.0 |
copy_string_iterator(size=8192) | 0,4596 | 0.0 |
copy_string_iterator(size=16384) | 0.4649 | 0.0 |
copy_string_iterator(size=65536) | 0.6171 | 0.0 |
Résumé
La grande question est maintenant Que dois-je utiliser ? comme toujours, la réponse est Cela dépend .
Chaque méthode a ses propres avantages et inconvénients, et est adaptée à différentes circonstances :
A emporter
Préférez les approches intégrées pour les types de données complexes.
Exécuter plusieurs, exécuter des valeurs et traiter par lots la conversion entre les types de données Python en types de base de données. Les approches CSV nécessitaient une fuite.
A emporter
Préférez les approches intégrées pour les petits volumes de données.
Les approches intégrées sont plus lisibles et moins susceptibles de se casser à l'avenir. Si la mémoire et le temps ne sont pas un problème, restez simple !
A emporter
Préférez les approches de copie pour les gros volumes de données.
L'approche de copie est plus adaptée aux grandes quantités de données où la mémoire peut devenir un problème.
Code source
Le code source de ce benchmark peut être trouvé ici.