I am loading about 2 - 2.5 million records into a Postgres database every day.
I then read this data with pd.read_sql to turn it into a dataframe and then I do some column manipulation and some minor merging. I am saving this modified data as a separate table for other people to use.
When I do pd.to_sql it takes forever. If I save a csv file and use COPY FROM in Postgres, the whole thing only takes a few minutes but the server is on a separate machine and it is a pain to transfer files there.
Using psycopg2, it looks like I can use copy_expert to benefit from the bulk copying, but still use python. I want to, if possible, avoid writing an actual csv file. Can I do this in memory with a pandas dataframe?
Here is an example of my pandas code. I would like to add the copy_expert or something to make saving this data much faster if possible.
for date in required_date_range:
df = pd.read_sql(sql=query, con=pg_engine, params={'x' : date})
...
do stuff to the columns
...
df.to_sql('table_name', pg_engine, index=False, if_exists='append', dtype=final_table_dtypes)
Can someone help me with example code? I would prefer to use pandas still and it would be nice to do it in memory. If not, I will just write a csv temporary file and do it that way.
Edit- here is my final code which works. It only takes a couple of hundred seconds per date (millions of rows) instead of a couple of hours.
to_sql = """COPY %s FROM STDIN WITH CSV HEADER"""
def process_file(conn, table_name, file_object):
fake_conn = cms_dtypes.pg_engine.raw_connection()
fake_cur = fake_conn.cursor()
fake_cur.copy_expert(sql=to_sql % table_name, file=file_object)
fake_conn.commit()
fake_cur.close()
#after doing stuff to the dataframe
s_buf = io.StringIO()
df.to_csv(s_buf)
process_file(cms_dtypes.pg_engine, 'fact_cms_employee', s_buf)
Python module io
(docs) has necessary tools for file-like objects.
import io
# text buffer
s_buf = io.StringIO()
# saving a data frame to a buffer (same as with a regular file):
df.to_csv(s_buf)
Edit. (I forgot) In order to read from the buffer afterwards, its position should be set to the beginning:
s_buf.seek(0)
I'm not familiar with psycopg2
but according to docs both copy_expert
and copy_from
can be used, for example:
cur.copy_from(s_buf, table)
(For Python 2, see StringIO.)