Techno Blender
Digitally Yours.

How To Read/Write DataFrames From/To SQL over HTTP with FastAPI

0 37


In Less Than 10 Minutes

Photo thanks to growtika from Unsplash.

Dealing with data in industry is a pain. This is because data can be scattered across multiple projects, stored in different formats and maintained to different degrees. Typically, this leads to different teams developing different internal tools for processing their data. The whole thing is a resulting mess as you are left with no standard method of reading and writing data.

One solution to this is to develop data ingestion microservices. In essence, these are apps that enable data transfer over HTTP or RPC calls. They attempt to provide a unified format for reading from and writing to different data sources (e.g. Google BigQuery, Postgres, etc…). The idea is that other apps (think dashboards) would use the data ingestion app to load data. A simple use case is described in the figure below:

In this tutorial, I will describe the process of developing a simple data ingestion app for reading from and writing to any Postgres database using FastAPI.

Ingest Data

The ingest endpoint will be used for reading data. For this, we require parameters for connecting to our database. These will be the db_user, db_password, db_port, db_name and db_host. We also require a way to query the database, which in this case can simply be a string sql_query.

We define our app using fastapi and start building the endpoint:

from fastapi import FastAPI
from pydantic import BaseModel

# initialise app
app = FastAPI()
# pydantic class for collecting our parameters into a json object
class IngestionParams(BaseModel):
sql_query: str
username: str
password: str
port: int
host: str
database_name: str
# define POST endpoint "ingest"
@app.post("/ingest")
def ingest(
ingestion_params: IngestionParams
):
# retrieve ingestion params as dictionary
ingestion_params = ingestion_params.dict()
db_user = ingestion_params['username']
db_password = ingestion_params['password']
db_host = ingestion_params['host']
db_port = ingestion_params['port']
db_name = ingestion_params['database_name']
sql_query = ingestion_params['sql_query']

An important note: according to REST API design patterns, there should be a distinction between get and post requests. Typically, get is used for reading data, and post is used for sending data. However, in this context, using post is preferred to get because: 1) browser logs typically contain URL history (meaning that sensitive parameters such as the DB credentials would be exposed), 2) there is a limit on the URL length [1]

Now, we need to connect to our client and execute the query. For this, we can use sqlalchemy :

import sqlalchemy as db
import pandas as pd

# connect to db
db_uri = f'postgresql+psycopg2://{db_user}:{db_password}@{db_host}:{db_port}/{db_name}'
engine = db.create_engine(db_uri)
con = engine.connect()
# query db
query_result = con.execute(sql_query)
data = query_result.fetchall()
# convert to dataframe
columns = query_result.keys()
df = pd.DataFrame(data, columns=columns)

The final thing that remains is how we’re going to send the DataFrame back over the HTTP request. There are multiple ways that we can do this, however in order to preserve the dtype, we convert the DataFrame to a Parquet file and send it over as a binary file using the content type application/octer-stream.

# added import
from fastapi import Response
def ingest(
response: Response, # add response parameter
ingestion_params: IngestionParams
)

...
# return parquet file in Response object
return Response(df.to_parquet(engine='pyarrow', index=False), media_type='application/octet-stream')

Insert Data

For inserting data, we need the same parameters as before for connecting to the database. However, we also need the table_name, the dataset_name, and the conflict_resolution_strategy.

We also need a file parameter for sending the data. With this, we begin designing the insert endpoint:

from fastapi import File, UploadFile
from pydantic import Json
# pydantic class for collecting our parameters into a json object
class InsertionParams(BaseModel):
username: str
password: str
port: int = 5432
host: str
database_name: str
table_name: str
conflict_resolution_strategy: str = 'replace' # default value

# define POST endpoint "insert" using async
@app.post("/insert")
async def insert(
response: Response, # for returning the dataframe that we insert
insertion_params: Json[InsertionParams],
file: UploadFile = File(...)
):
# retrieve insertion params as dictionary
insertion_params = insertion_params.dict()
db_user = insertion_params['username']
db_password = insertion_params['password']
db_host = insertion_params['host']
db_port = insertion_params['port']
db_name = insertion_params['database_name']
table_name = insertion_params['table_name']
conflict_resolution_strategy = insertion_params['conflict_resolution_strategy']
dataset_name = insertion_params.get('dataset_name', None)
content = await file.read()

Now, we want to design our API to enable different file types as inputs. We can use the content_type attribute of the file object to determine the file type, and then read it appropriately.

import io
with io.BytesIO(content) as data:
if 'csv' in file.content_type:
df = pd.read_csv(data)
if file.content_type == 'text/tab-separated-values':
df = pd.read_csv(data, delimiter='\t')
if file.content_type == 'application/octet-stream': # TODO can you have other 'octet-stream'?
df = pd.read_parquet(data, engine='pyarrow')

Similar to before, we connect to the database by initialising the client, and then we write to postgres using the .to_sql method from pandas. However, we must make sure to pass the data types when using this method, as otherwise your table would be populated incorrectly. Therefore:

# import types
from sqlalchemy import INTEGER, FLOAT, TIMESTAMP, VARCHAR, BOOLEAN
from pandas.api.types import is_datetime64tz_dtype
# connect to database
...
DTYPE_MAP = {
'int64': INTEGER,
'float64': FLOAT,
'datetime64[ns]': TIMESTAMP,
'datetime64[ns, UTC]': TIMESTAMP(timezone=True),
'bool': BOOLEAN,
'object': VARCHAR
}
def _get_pg_datatypes(df):
dtypes = {}
for col, dtype in df.dtypes.items():
if is_datetime64tz_dtype(dtype):
dtypes[col] = DTYPE_MAP['datetime64[ns, UTC]']
else:
dtypes[col] = DTYPE_MAP[str(dtype)]
return dtypes
dtypes = _get_pg_datatypes(df)
df.to_sql(table_name, con, schema=dataset_name, if_exists=conflict_resolution_strategy, index=False, method='multi', dtype=dtypes)
response.status_code = 201
return "Created table"

Overall, the code should look like as follows:

from fastapi import FastAPI, Response, File, UploadFile
from pydantic import BaseModel, Json
import sqlalchemy as db
import pandas as pd
import io
from sqlalchemy import INTEGER, FLOAT, TIMESTAMP, VARCHAR, BOOLEAN
from pandas.api.types import is_datetime64tz_dtype
# initialise app
app = FastAPI()
# pydantic class for collecting our parameters into a json object
class IngestionParams(BaseModel):
sql_query: str
username: str
password: str
port: int
host: str
database_name: str
class InsertionParams(BaseModel):
username: str
password: str
port: int = 5432
host: str
database_name: str
table_name: str
conflict_resolution_strategy: str = 'replace' # default value
def _connect_to_db(user, password, host, port, name):
db_uri = f'postgresql+psycopg2://{user}:{password}@{host}:{port}/{name}'
engine = db.create_engine(db_uri)
con = engine.connect()
return con
# define POST endpoint "ingest"
@app.post("/ingest")
def ingest(
response: Response,
ingestion_params: IngestionParams
):
# retrieve ingestion params as dictionary
ingestion_params = ingestion_params.dict()
db_user = ingestion_params['username']
db_password = ingestion_params['password']
db_host = ingestion_params['host']
db_port = ingestion_params['port']
db_name = ingestion_params['database_name']
sql_query = ingestion_params['sql_query']
# connect to db
con = _connect_to_db(db_user, db_password, db_host, db_port, db_name)
# query db
query_result = con.execute(sql_query)
data = query_result.fetchall()
# convert to dataframe
columns = query_result.keys()
df = pd.DataFrame(data, columns=columns)
# return parquet file in Response object
return Response(df.to_parquet(engine='pyarrow', index=False), media_type='application/octet-stream')

# define POST endpoint "insert" using async
@app.post("/insert")
async def insert(
response: Response, # for returning the dataframe that we insert
insertion_params: Json[InsertionParams],
file: UploadFile = File(...)
):
# retrieve insertion params as dictionary
insertion_params = insertion_params.dict()
db_user = insertion_params['username']
db_password = insertion_params['password']
db_host = insertion_params['host']
db_port = insertion_params['port']
db_name = insertion_params['database_name']
table_name = insertion_params['table_name']
conflict_resolution_strategy = insertion_params['conflict_resolution_strategy']
dataset_name = insertion_params.get('dataset_name', None)
content = await file.read()

with io.BytesIO(content) as data:
if 'csv' in file.content_type:
df = pd.read_csv(data)
if file.content_type == 'text/tab-separated-values':
df = pd.read_csv(data, delimiter='\t')
if file.content_type == 'application/octet-stream': # TODO can you have other 'octet-stream'?
df = pd.read_parquet(data, engine='pyarrow')
DTYPE_MAP = {
'int64': INTEGER,
'float64': FLOAT,
'datetime64[ns]': TIMESTAMP,
'datetime64[ns, UTC]': TIMESTAMP(timezone=True),
'bool': BOOLEAN,
'object': VARCHAR
}
def _get_pg_datatypes(df):
dtypes = {}
for col, dtype in df.dtypes.items():
if is_datetime64tz_dtype(dtype):
dtypes[col] = DTYPE_MAP['datetime64[ns, UTC]']
else:
dtypes[col] = DTYPE_MAP[str(dtype)]
return dtypes
dtypes = _get_pg_datatypes(df)
# connect to db
con = _connect_to_db(db_user, db_password, db_host, db_port, db_name)
df.to_sql(table_name, con, schema=dataset_name, if_exists=conflict_resolution_strategy, index=False, method='multi', dtype=dtypes)
response.status_code = 201
return "Created table"

Note that we have cleaned things up a bit!

We can save this into a file called main.py and then run the following command in the terminal:

uvicorn main:app --reload

You should now be able to see the app in your browser by going to the following url:

localhost:8000/docs

This is what you should see on localhost:8000/docs

Setup

In order to test that our app is working, we need a postgres instance to test against. For the purpose of this article, we’ll be using a Docker Postgres instance (although you can use any of your liking).

For this, you need to have docker installed. Then you can run the following command in your terminal:

docker run -p 5432:5432 -e POSTGRES_PASSWORD=postgres -d postgres

You should now have a locally running postgres instance with a connection on the 5432 port. You can view this using database viewing software. In my case, I use DBeaver [2].

The connection parameters for the database are:

  • username: postgres
  • host: localhost
  • port: 5432
  • password: postgres
  • database_name: postgres

Reading Files Using Requests

Using the UI to read/write data is relatively intuitive. This is less so when you want to call the endpoints from within Python. For that, we use the requests module.

import io
import pandas as pd
import requests
BASE_URL = 'http://localhost:8000' # url for app comprised of host and port
# headers for request
headers = {
'Content-Type': 'application/json',
'Accept': 'application/octet-stream'
}
# function for validating successful request
def _is_status_code_valid(status_code):
if str(status_code).startswith('2'):
return True
def read_data(
ingestion_param,
):
#
url = f'{BASE_URL}/ingest'

resp = requests.post(url, json=ingestion_param, headers=headers)
status_code = resp.status_code
if _is_status_code_valid(status_code):
df = pd.read_parquet(io.BytesIO(resp.content), engine='pyarrow')
return df, status_code

The key aspect above is that we read the response as bytes, but we need our data as a dataframe. As a result, we convert the response content into a BytesIO object before reading it as parquet. We conserve the dtypes because parquet files contain the table dtype information.

Sending Files Using Requests

Sending files is slightly more complicated than reading them. This is because our request parameters for the post request are not so simple, and we have to find a way to convert our dataframe into an acceptable format.

import io
import json
import requests

BASE_URL = 'http://localhost:8000' # url for app comprised of host and port
def write_data(
database_name,
table_name,
database_type='pg',
df=pd.DataFrame(),
content_type='csv',
conflict_resolution_strategy='fail',
username='postgres',
password='postgres',
port=5432,
host='localhost',
):
url = f'{BASE_URL}/insert'

# in principle, it is possible to add converters for any content_type
if content_type == 'parquet':
memory_buffer = io.BytesIO()
df.to_parquet(
memory_buffer,
engine='pyarrow'
)
memory_buffer.seek(0)
# need to encode parameters as json string
data = {
'insertion_params': json.dumps(dict(
username=username,
password=password,
port=port,
database_name=database_name,
table_name=table_name,
conflict_resolution_strategy=conflict_resolution_strategy,
host=host
))
}
# need to send files separately
files = {
'file': ('Test', memory_buffer, 'application/octet-stream')
}
resp = requests.post(url, data=data, files=files)
return resp.text, resp.status_code

In this article we went through designing a data ingestion app using FastAPI for Postgres. Here is a quick summary of the key takeaways:

  • A data ingestion app is preferable to building ingestion functions in separate projects because it is standardised, more maintainable, and adaptable to changes
  • Converting dataframes to Parquet is useful for sending them over HTTP because we preserve data type information
  • The current app can be easily extended to support other data sources, such as BigQuery, Google Drive, etc…

Limitations

  • Support only exists for postgres
  • At present, the method of using .to_sql does not allow for upsert functionality

The entire code used for this article can be found in the following repository: https://github.com/namiyousef/in-n-out.

All images and code above are by author unless specified otherwise


In Less Than 10 Minutes

Photo thanks to growtika from Unsplash.

Dealing with data in industry is a pain. This is because data can be scattered across multiple projects, stored in different formats and maintained to different degrees. Typically, this leads to different teams developing different internal tools for processing their data. The whole thing is a resulting mess as you are left with no standard method of reading and writing data.

One solution to this is to develop data ingestion microservices. In essence, these are apps that enable data transfer over HTTP or RPC calls. They attempt to provide a unified format for reading from and writing to different data sources (e.g. Google BigQuery, Postgres, etc…). The idea is that other apps (think dashboards) would use the data ingestion app to load data. A simple use case is described in the figure below:

In this tutorial, I will describe the process of developing a simple data ingestion app for reading from and writing to any Postgres database using FastAPI.

Ingest Data

The ingest endpoint will be used for reading data. For this, we require parameters for connecting to our database. These will be the db_user, db_password, db_port, db_name and db_host. We also require a way to query the database, which in this case can simply be a string sql_query.

We define our app using fastapi and start building the endpoint:

from fastapi import FastAPI
from pydantic import BaseModel

# initialise app
app = FastAPI()
# pydantic class for collecting our parameters into a json object
class IngestionParams(BaseModel):
sql_query: str
username: str
password: str
port: int
host: str
database_name: str
# define POST endpoint "ingest"
@app.post("/ingest")
def ingest(
ingestion_params: IngestionParams
):
# retrieve ingestion params as dictionary
ingestion_params = ingestion_params.dict()
db_user = ingestion_params['username']
db_password = ingestion_params['password']
db_host = ingestion_params['host']
db_port = ingestion_params['port']
db_name = ingestion_params['database_name']
sql_query = ingestion_params['sql_query']

An important note: according to REST API design patterns, there should be a distinction between get and post requests. Typically, get is used for reading data, and post is used for sending data. However, in this context, using post is preferred to get because: 1) browser logs typically contain URL history (meaning that sensitive parameters such as the DB credentials would be exposed), 2) there is a limit on the URL length [1]

Now, we need to connect to our client and execute the query. For this, we can use sqlalchemy :

import sqlalchemy as db
import pandas as pd

# connect to db
db_uri = f'postgresql+psycopg2://{db_user}:{db_password}@{db_host}:{db_port}/{db_name}'
engine = db.create_engine(db_uri)
con = engine.connect()
# query db
query_result = con.execute(sql_query)
data = query_result.fetchall()
# convert to dataframe
columns = query_result.keys()
df = pd.DataFrame(data, columns=columns)

The final thing that remains is how we’re going to send the DataFrame back over the HTTP request. There are multiple ways that we can do this, however in order to preserve the dtype, we convert the DataFrame to a Parquet file and send it over as a binary file using the content type application/octer-stream.

# added import
from fastapi import Response
def ingest(
response: Response, # add response parameter
ingestion_params: IngestionParams
)

...
# return parquet file in Response object
return Response(df.to_parquet(engine='pyarrow', index=False), media_type='application/octet-stream')

Insert Data

For inserting data, we need the same parameters as before for connecting to the database. However, we also need the table_name, the dataset_name, and the conflict_resolution_strategy.

We also need a file parameter for sending the data. With this, we begin designing the insert endpoint:

from fastapi import File, UploadFile
from pydantic import Json
# pydantic class for collecting our parameters into a json object
class InsertionParams(BaseModel):
username: str
password: str
port: int = 5432
host: str
database_name: str
table_name: str
conflict_resolution_strategy: str = 'replace' # default value

# define POST endpoint "insert" using async
@app.post("/insert")
async def insert(
response: Response, # for returning the dataframe that we insert
insertion_params: Json[InsertionParams],
file: UploadFile = File(...)
):
# retrieve insertion params as dictionary
insertion_params = insertion_params.dict()
db_user = insertion_params['username']
db_password = insertion_params['password']
db_host = insertion_params['host']
db_port = insertion_params['port']
db_name = insertion_params['database_name']
table_name = insertion_params['table_name']
conflict_resolution_strategy = insertion_params['conflict_resolution_strategy']
dataset_name = insertion_params.get('dataset_name', None)
content = await file.read()

Now, we want to design our API to enable different file types as inputs. We can use the content_type attribute of the file object to determine the file type, and then read it appropriately.

import io
with io.BytesIO(content) as data:
if 'csv' in file.content_type:
df = pd.read_csv(data)
if file.content_type == 'text/tab-separated-values':
df = pd.read_csv(data, delimiter='\t')
if file.content_type == 'application/octet-stream': # TODO can you have other 'octet-stream'?
df = pd.read_parquet(data, engine='pyarrow')

Similar to before, we connect to the database by initialising the client, and then we write to postgres using the .to_sql method from pandas. However, we must make sure to pass the data types when using this method, as otherwise your table would be populated incorrectly. Therefore:

# import types
from sqlalchemy import INTEGER, FLOAT, TIMESTAMP, VARCHAR, BOOLEAN
from pandas.api.types import is_datetime64tz_dtype
# connect to database
...
DTYPE_MAP = {
'int64': INTEGER,
'float64': FLOAT,
'datetime64[ns]': TIMESTAMP,
'datetime64[ns, UTC]': TIMESTAMP(timezone=True),
'bool': BOOLEAN,
'object': VARCHAR
}
def _get_pg_datatypes(df):
dtypes = {}
for col, dtype in df.dtypes.items():
if is_datetime64tz_dtype(dtype):
dtypes[col] = DTYPE_MAP['datetime64[ns, UTC]']
else:
dtypes[col] = DTYPE_MAP[str(dtype)]
return dtypes
dtypes = _get_pg_datatypes(df)
df.to_sql(table_name, con, schema=dataset_name, if_exists=conflict_resolution_strategy, index=False, method='multi', dtype=dtypes)
response.status_code = 201
return "Created table"

Overall, the code should look like as follows:

from fastapi import FastAPI, Response, File, UploadFile
from pydantic import BaseModel, Json
import sqlalchemy as db
import pandas as pd
import io
from sqlalchemy import INTEGER, FLOAT, TIMESTAMP, VARCHAR, BOOLEAN
from pandas.api.types import is_datetime64tz_dtype
# initialise app
app = FastAPI()
# pydantic class for collecting our parameters into a json object
class IngestionParams(BaseModel):
sql_query: str
username: str
password: str
port: int
host: str
database_name: str
class InsertionParams(BaseModel):
username: str
password: str
port: int = 5432
host: str
database_name: str
table_name: str
conflict_resolution_strategy: str = 'replace' # default value
def _connect_to_db(user, password, host, port, name):
db_uri = f'postgresql+psycopg2://{user}:{password}@{host}:{port}/{name}'
engine = db.create_engine(db_uri)
con = engine.connect()
return con
# define POST endpoint "ingest"
@app.post("/ingest")
def ingest(
response: Response,
ingestion_params: IngestionParams
):
# retrieve ingestion params as dictionary
ingestion_params = ingestion_params.dict()
db_user = ingestion_params['username']
db_password = ingestion_params['password']
db_host = ingestion_params['host']
db_port = ingestion_params['port']
db_name = ingestion_params['database_name']
sql_query = ingestion_params['sql_query']
# connect to db
con = _connect_to_db(db_user, db_password, db_host, db_port, db_name)
# query db
query_result = con.execute(sql_query)
data = query_result.fetchall()
# convert to dataframe
columns = query_result.keys()
df = pd.DataFrame(data, columns=columns)
# return parquet file in Response object
return Response(df.to_parquet(engine='pyarrow', index=False), media_type='application/octet-stream')

# define POST endpoint "insert" using async
@app.post("/insert")
async def insert(
response: Response, # for returning the dataframe that we insert
insertion_params: Json[InsertionParams],
file: UploadFile = File(...)
):
# retrieve insertion params as dictionary
insertion_params = insertion_params.dict()
db_user = insertion_params['username']
db_password = insertion_params['password']
db_host = insertion_params['host']
db_port = insertion_params['port']
db_name = insertion_params['database_name']
table_name = insertion_params['table_name']
conflict_resolution_strategy = insertion_params['conflict_resolution_strategy']
dataset_name = insertion_params.get('dataset_name', None)
content = await file.read()

with io.BytesIO(content) as data:
if 'csv' in file.content_type:
df = pd.read_csv(data)
if file.content_type == 'text/tab-separated-values':
df = pd.read_csv(data, delimiter='\t')
if file.content_type == 'application/octet-stream': # TODO can you have other 'octet-stream'?
df = pd.read_parquet(data, engine='pyarrow')
DTYPE_MAP = {
'int64': INTEGER,
'float64': FLOAT,
'datetime64[ns]': TIMESTAMP,
'datetime64[ns, UTC]': TIMESTAMP(timezone=True),
'bool': BOOLEAN,
'object': VARCHAR
}
def _get_pg_datatypes(df):
dtypes = {}
for col, dtype in df.dtypes.items():
if is_datetime64tz_dtype(dtype):
dtypes[col] = DTYPE_MAP['datetime64[ns, UTC]']
else:
dtypes[col] = DTYPE_MAP[str(dtype)]
return dtypes
dtypes = _get_pg_datatypes(df)
# connect to db
con = _connect_to_db(db_user, db_password, db_host, db_port, db_name)
df.to_sql(table_name, con, schema=dataset_name, if_exists=conflict_resolution_strategy, index=False, method='multi', dtype=dtypes)
response.status_code = 201
return "Created table"

Note that we have cleaned things up a bit!

We can save this into a file called main.py and then run the following command in the terminal:

uvicorn main:app --reload

You should now be able to see the app in your browser by going to the following url:

localhost:8000/docs

This is what you should see on localhost:8000/docs

Setup

In order to test that our app is working, we need a postgres instance to test against. For the purpose of this article, we’ll be using a Docker Postgres instance (although you can use any of your liking).

For this, you need to have docker installed. Then you can run the following command in your terminal:

docker run -p 5432:5432 -e POSTGRES_PASSWORD=postgres -d postgres

You should now have a locally running postgres instance with a connection on the 5432 port. You can view this using database viewing software. In my case, I use DBeaver [2].

The connection parameters for the database are:

  • username: postgres
  • host: localhost
  • port: 5432
  • password: postgres
  • database_name: postgres

Reading Files Using Requests

Using the UI to read/write data is relatively intuitive. This is less so when you want to call the endpoints from within Python. For that, we use the requests module.

import io
import pandas as pd
import requests
BASE_URL = 'http://localhost:8000' # url for app comprised of host and port
# headers for request
headers = {
'Content-Type': 'application/json',
'Accept': 'application/octet-stream'
}
# function for validating successful request
def _is_status_code_valid(status_code):
if str(status_code).startswith('2'):
return True
def read_data(
ingestion_param,
):
#
url = f'{BASE_URL}/ingest'

resp = requests.post(url, json=ingestion_param, headers=headers)
status_code = resp.status_code
if _is_status_code_valid(status_code):
df = pd.read_parquet(io.BytesIO(resp.content), engine='pyarrow')
return df, status_code

The key aspect above is that we read the response as bytes, but we need our data as a dataframe. As a result, we convert the response content into a BytesIO object before reading it as parquet. We conserve the dtypes because parquet files contain the table dtype information.

Sending Files Using Requests

Sending files is slightly more complicated than reading them. This is because our request parameters for the post request are not so simple, and we have to find a way to convert our dataframe into an acceptable format.

import io
import json
import requests

BASE_URL = 'http://localhost:8000' # url for app comprised of host and port
def write_data(
database_name,
table_name,
database_type='pg',
df=pd.DataFrame(),
content_type='csv',
conflict_resolution_strategy='fail',
username='postgres',
password='postgres',
port=5432,
host='localhost',
):
url = f'{BASE_URL}/insert'

# in principle, it is possible to add converters for any content_type
if content_type == 'parquet':
memory_buffer = io.BytesIO()
df.to_parquet(
memory_buffer,
engine='pyarrow'
)
memory_buffer.seek(0)
# need to encode parameters as json string
data = {
'insertion_params': json.dumps(dict(
username=username,
password=password,
port=port,
database_name=database_name,
table_name=table_name,
conflict_resolution_strategy=conflict_resolution_strategy,
host=host
))
}
# need to send files separately
files = {
'file': ('Test', memory_buffer, 'application/octet-stream')
}
resp = requests.post(url, data=data, files=files)
return resp.text, resp.status_code

In this article we went through designing a data ingestion app using FastAPI for Postgres. Here is a quick summary of the key takeaways:

  • A data ingestion app is preferable to building ingestion functions in separate projects because it is standardised, more maintainable, and adaptable to changes
  • Converting dataframes to Parquet is useful for sending them over HTTP because we preserve data type information
  • The current app can be easily extended to support other data sources, such as BigQuery, Google Drive, etc…

Limitations

  • Support only exists for postgres
  • At present, the method of using .to_sql does not allow for upsert functionality

The entire code used for this article can be found in the following repository: https://github.com/namiyousef/in-n-out.

All images and code above are by author unless specified otherwise

FOLLOW US ON GOOGLE NEWS

Read original article here

Denial of responsibility! Techno Blender is an automatic aggregator of the all world’s media. In each content, the hyperlink to the primary source is specified. All trademarks belong to their rightful owners, all materials to their authors. If you are the owner of the content and do not want us to publish your materials, please contact us by email – [email protected]. The content will be deleted within 24 hours.
Leave a comment