Zapisz Dataframe do csv bezpośrednio do s3 Python

126

Mam pandy DataFrame, które chcę przesłać do nowego pliku CSV. Problem w tym, że nie chcę zapisywać pliku lokalnie przed przesłaniem go do s3. Czy istnieje metoda, taka jak to_csv, do bezpośredniego zapisu ramki danych do s3? Używam boto3.
Oto, co mam do tej pory:

import boto3
s3 = boto3.client('s3', aws_access_key_id='key', aws_secret_access_key='secret_key')
read_file = s3.get_object(Bucket, Key)
df = pd.read_csv(read_file['Body'])

# Make alterations to DataFrame

# Then export DataFrame to CSV through direct transfer to s3
user2494275
źródło
3
df.to_csv('s3://mybucket/dfs/somedf.csv'). stackoverflow.com/a/56275519/908886, aby uzyskać więcej informacji.
Peter Berg,

Odpowiedzi:

160

Możesz użyć:

from io import StringIO # python3; python2: BytesIO 
import boto3

bucket = 'my_bucket_name' # already created on S3
csv_buffer = StringIO()
df.to_csv(csv_buffer)
s3_resource = boto3.resource('s3')
s3_resource.Object(bucket, 'df.csv').put(Body=csv_buffer.getvalue())
Stefan
źródło
9
Jeśli jest to duży plik, co to robi z pamięcią ...?
citynorman
2
Jeśli plik jest większy, to dostępna pamięć RAM zakończy się niepowodzeniem i będzie wyjątkiem wyjątku (nie wiem, który). Należy to zaakceptować jako odpowiedź
Eran Moshe
5
Wystąpił TypeError: unicode argument expected, got 'str'błąd podczas używania StringIO. Użyłem BytesIOi działało idealnie. Uwaga: to było w Pythonie 2.7
Abhishek Upadhyaya
1
co to jest bucketprzedmiot jak to stworzyłeś?
Charles Chow,
1
bucketto miejsce, w którym przechowujesz obiekty na S3. Kod zakłada, że ​​już utworzyłeś miejsce docelowe (myśl: katalog), w którym to ma być przechowywane. Zobacz dokumentację S3
Stefan
67

Możesz bezpośrednio użyć ścieżki S3. Używam Pandy 0.24.1

In [1]: import pandas as pd

In [2]: df = pd.DataFrame( [ [1, 1, 1], [2, 2, 2] ], columns=['a', 'b', 'c'])

In [3]: df
Out[3]:
   a  b  c
0  1  1  1
1  2  2  2

In [4]: df.to_csv('s3://experimental/playground/temp_csv/dummy.csv', index=False)

In [5]: pd.__version__
Out[5]: '0.24.1'

In [6]: new_df = pd.read_csv('s3://experimental/playground/temp_csv/dummy.csv')

In [7]: new_df
Out[7]:
   a  b  c
0  1  1  1
1  2  2  2

Informacje o wersji:

Obsługa plików S3

pandy używają teraz s3fs do obsługi połączeń S3. To nie powinno złamać żadnego kodu. Jednak ponieważ s3fs nie jest wymaganą zależnością, będziesz musiał zainstalować go osobno, tak jak boto we wcześniejszych wersjach pand. GH11915 .

miernik 17
źródło
7
to zdecydowanie najłatwiejsza odpowiedź teraz, używa s3fs za kulisami, więc musisz to dodać do swoich wymagań.txt
JD D
1
Podoba mi się, że jest to łatwe, ale wydaje się, że tak naprawdę nie działa, ponieważ ciągle otrzymuję następujący błąd NoCredentialsError: Unable to locate credentials. Jakieś sugestie?
CathyQian
1
Mogę potwierdzić, że to nie działa z pandami <= 0,23,4, więc pamiętaj, aby zaktualizować do pandy 0,24
Guido
1
To jest błąd, który widzę, gdy próbuję użyć polecenia to_csv TypeError: argument write () 1 musi być unicode, a nie str
Raj
13
Używam pandy 0.24.2 i otrzymuję to NotImplementedError: Text mode not supported, use mode='wb' and manage bytes. jakieś sugestie?
Binyamin Nawet
57

Lubię s3fs, który pozwala ci używać s3 (prawie) jak lokalnego systemu plików.

Możesz to zrobić:

import s3fs

bytes_to_write = df.to_csv(None).encode()
fs = s3fs.S3FileSystem(key=key, secret=secret)
with fs.open('s3://bucket/path/to/file.csv', 'wb') as f:
    f.write(bytes_to_write)

s3fsobsługuje tylko rbi wbtryby otwierania pliku, dlatego zrobiłem to bytes_to_write.

michcio1234
źródło
Wspaniały! Jak mogę uzyskać adres URL pliku przy użyciu tego samego modułu s3fs?
M.Zaman
Szukałem adresu URL, z którego mogę pobrać zapisany plik, w każdym razie otrzymuję go za pośrednictwem S3FileSystem. Dzięki
M.Zaman
to jest to, czego używam; dzięki. Jestem ciekaw, dlaczego pd.read_csv (<s3path>) działa zgodnie z oczekiwaniami, ale do pisania musimy użyć tego obejścia .. z wyjątkiem przypadku, gdy piszę bezpośrednio do wiadra s3, w którym znajduje się mój jupyter.
Renée
@ michcio1234 Jak mogę zrobić to samo w trybie dołączania? Muszę dołączyć dane do istniejącego pliku csv na s3
j '
@j ' s3fsnie wydaje się obsługiwać trybu dołączania.
michcio1234
43

Oto bardziej aktualna odpowiedź:

import s3fs

s3 = s3fs.S3FileSystem(anon=False)

# Use 'w' for py3, 'wb' for py2
with s3.open('<bucket-name>/<filename>.csv','w') as f:
    df.to_csv(f)

Problem z StringIO polega na tym, że zżera twoją pamięć. Dzięki tej metodzie przesyłasz plik do s3, zamiast konwertować go na łańcuch, a następnie zapisywać do s3. Trzymanie ramki danych pandy i jej kopii w pamięci wydaje się bardzo nieefektywne.

Jeśli pracujesz w trybie ec2 natychmiast, możesz nadać mu rolę IAM, aby umożliwić zapisywanie go do s3, dzięki czemu nie musisz bezpośrednio przekazywać poświadczeń. Możesz jednak również połączyć się z zasobnikiem, przekazując dane uwierzytelniające do S3FileSystem()funkcji. Zobacz dokumentację: https://s3fs.readthedocs.io/en/latest/

erncyp
źródło
Z jakiegoś powodu, kiedy to zrobiłem, każda linia została pominięta w wyjściowym
pliku
hmm. nie wiem, dlaczego tak się stało. może spróbuj z inną pandą df, aby sprawdzić, czy nadal masz problem? Jeśli Twoja wersja pandy to obsługuje, wypróbuj odpowiedź @ amit-kushwaha, w której przekażesz adres URL s3 bezpośrednio do to_csv(). wydaje się czystszą implementacją.
Erncyp
@erncyp Wydaje mi się, że dostaję tam błąd: botocore.exceptions.ClientError: An error occurred (AccessDenied) when calling the PutObject operation: Access Denied ... wykonałem nawet PUBLICZNY ODCZYT zasobnika i dodałem następujące akcje, w ramach mojego konkretnego konta IAM, w Polityce "Action": [ "s3:PutObject", "s3:PutObjectAcl", "s3:GetObject", "s3:GetObjectAcl", "s3:DeleteObject" ]
zasobnika
wygląda na to, że nie masz uprawnień? Upewnij się, że przypisałeś uprawnienia do odczytu zapisu S3 do roli IAM, której używasz
erncyp
@erncyp Mam politykę AdministratorAccess dołączoną do mojego użytkownika IAM, więc teoretycznie powinienem być w stanie dobrze czytać / pisać ... Co dziwne, jestem w stanie pisać dobrze, gdy używam następującej funkcji, którą stworzyłem, używając innego użytkownika StackOverflow rada (średniki fyi to koniec linii, ponieważ nie wiem, jak sformatować w sekcji komentarzy):def send_to_bucket(df, fn_out, bucketname): csv_buffer = StringIO(); df.to_csv(csv_buffer); s3_resource = boto3.resource('s3'); s3_resource.Object(bucketname, fn_out).put(Body=csv_buffer.getvalue());
ajoros
13

Jeśli przekażesz Nonejako pierwszy argument, to_csv()dane zostaną zwrócone jako ciąg. Stamtąd łatwo jest przesłać to do S3 za jednym razem.

Powinno być również możliwe przekazanie StringIOobiektu to_csv(), ale użycie łańcucha będzie łatwiejsze.

mhawke
źródło
W jaki sposób będzie łatwiej? Jaki jest właściwy sposób, aby to zrobić?
Eran Moshe,
@EranMoshe: tak czy inaczej będzie działać poprawnie, ale oczywiście łatwiej jest przejść Nonedo to_csv()i korzystania zwrócony ciąg, niż jest, aby stworzyć StringIOobiekt, a następnie odczytać dane z powrotem na zewnątrz.
mhawke,
Jako leniwy programista to właśnie zrobiłem. A miałeś na myśli łatwiejsze dla programisty, który pisze mniej kodu:>
Eran Moshe,
3

Odkryłem, że można to zrobić clientrównież, a nie tylko resource.

from io import StringIO
import boto3
s3 = boto3.client("s3",\
                  region_name=region_name,\
                  aws_access_key_id=aws_access_key_id,\
                  aws_secret_access_key=aws_secret_access_key)
csv_buf = StringIO()
df.to_csv(csv_buf, header=True, index=False)
csv_buf.seek(0)
s3.put_object(Bucket=bucket, Body=csv_buf.getvalue(), Key='path/test.csv')
Harry_pb
źródło
2

Możesz również skorzystać z AWS Data Wrangler :

import awswrangler

session = awswrangler.Session()
session.pandas.to_csv(
    dataframe=df,
    path="s3://...",
)

Zauważ, że podzieli się na kilka części, ponieważ przesyła go równolegle.

Gabra
źródło
0

ponieważ używasz boto3.client(), spróbuj:

import boto3
from io import StringIO #python3 
s3 = boto3.client('s3', aws_access_key_id='key', aws_secret_access_key='secret_key')
def copy_to_s3(client, df, bucket, filepath):
    csv_buf = StringIO()
    df.to_csv(csv_buf, header=True, index=False)
    csv_buf.seek(0)
    client.put_object(Bucket=bucket, Body=csv_buf.getvalue(), Key=filepath)
    print(f'Copy {df.shape[0]} rows to S3 Bucket {bucket} at {filepath}, Done!')

copy_to_s3(client=s3, df=df_to_upload, bucket='abc', filepath='def/test.csv')
jerrytim
źródło
-1

Znalazłem bardzo proste rozwiązanie, które wydaje się działać:

s3 = boto3.client("s3")

s3.put_object(
    Body=open("filename.csv").read(),
    Bucket="your-bucket",
    Key="your-key"
)

Mam nadzieję, że to pomoże!

Antoine Krajnc
źródło
-5

Czytałem csv z dwiema kolumnami z wiadra s3, a zawartość pliku csv umieściłem w pandas dataframe.

Przykład:

config.json

{
  "credential": {
    "access_key":"xxxxxx",
    "secret_key":"xxxxxx"
}
,
"s3":{
       "bucket":"mybucket",
       "key":"csv/user.csv"
   }
}

cls_config.json

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import os
import json

class cls_config(object):

    def __init__(self,filename):

        self.filename = filename


    def getConfig(self):

        fileName = os.path.join(os.path.dirname(__file__), self.filename)
        with open(fileName) as f:
        config = json.load(f)
        return config

cls_pandas.py

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import pandas as pd
import io

class cls_pandas(object):

    def __init__(self):
        pass

    def read(self,stream):

        df = pd.read_csv(io.StringIO(stream), sep = ",")
        return df

cls_s3.py

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import boto3
import json

class cls_s3(object):

    def  __init__(self,access_key,secret_key):

        self.s3 = boto3.client('s3', aws_access_key_id=access_key, aws_secret_access_key=secret_key)

    def getObject(self,bucket,key):

        read_file = self.s3.get_object(Bucket=bucket, Key=key)
        body = read_file['Body'].read().decode('utf-8')
        return body

test.py

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from cls_config import *
from cls_s3 import *
from cls_pandas import *

class test(object):

    def __init__(self):
        self.conf = cls_config('config.json')

    def process(self):

        conf = self.conf.getConfig()

        bucket = conf['s3']['bucket']
        key = conf['s3']['key']

        access_key = conf['credential']['access_key']
        secret_key = conf['credential']['secret_key']

        s3 = cls_s3(access_key,secret_key)
        ob = s3.getObject(bucket,key)

        pa = cls_pandas()
        df = pa.read(ob)

        print df

if __name__ == '__main__':
    test = test()
    test.process()
Jamir Josimar Huamán Campos
źródło
4
proszę, nie publikuj rozwiązania, dodaj też jego wyjaśnienie.
sjaustirni
Czy jest jakaś korzyść z tworzenia tak złożonego (dla początkującego w Pythonie) rozwiązania?
Javier López Tomás
1
To czyta plik z s3, pytanie brzmi, jak zapisać df do s3.
Damian Satterthwaite-Phillips