Multi-Table Datasets

Accessing multi-table datasets on Dewey via API

Currently, accessing multi-table products is not available in the deweydatapy (Python) or deweydataR (R) libraries. To work around this, you can use the provided code snippets to:

  • Python: Create a Multitable class with methods to download data from tables in a multi-table dataset.
  • R: Utilize separate functions that perform similar tasks to handle multi-table datasets.
import random
import requests
import time
import os
import pandas as pd

class Multitable:
    """
    A helper class to interact with multi-table products through APIs.
    """
    def __init__(self, apikey: str, pp_: str):
        """
        Initialize the Multitable class with an API key and the base URL for the multi-table product.

        :param apikey: Your API key created on Dewey
        :param pp_: The base URL for the multi-table product
        """
        self.apikey = apikey
        self.pp_ = pp_
    
    def make_get_request(self, url: str, params: dict = None, attempt_count: int = 1, max_attempts: int = 5):
        """
        Call API using GET request, including retry logic for rate limiting.

        :param url: API endpoint.
        :param params: Parameters for the API call.
        :param attempt_count: Current retry attempt.
        :return: Response object from the GET request.
        """

        headers = {'X-API-KEY': self.apikey,
                   'Content-Type': 'application/json',
                   'accept': 'application/json'}

        response = requests.get(url, params=params, headers=headers)

        if response.status_code == 429 and attempt_count <= max_attempts:
            sleep_time = (2 ** attempt_count) + random.uniform(0, 2)
            print(f'Ratelimit reached, retrying after {sleep_time:.1f} seconds')
            time.sleep(sleep_time)
            return self.make_get_request(url, params=params, attempt_count=attempt_count+1)

        if response.status_code == 400:
            raise Exception(f'Request failed with {response.status_code}')

        if not (response.status_code >= 200 and response.status_code < 300):
            raise Exception(f'Request failed with {response.status_code} and {response.json()}')

        return response
    
    
    def get_table_list(self):
        """
        Get a list of tables available for download from a multi-table product.
        """
        results = self.make_get_request(url=self.pp_+'/multi-table-product-metadata')
        table_options = [table_dict['table_name'] for table_dict in results.json()['items']]
        return table_options
    
    
    def get_mt_meta(self, tables: list = 'all', df: bool = False):
        """
        Get metadata for a multi-tables.

        :param tables: List of tables to get metadata for. Call get_table_list() to get a list of available tables.
        :param df: Return metadata as a pandas DataFrame.
        """
        if tables == 'all':
            tables = self.get_table_list()

        metadata_list = []

        for table in tables:
            results = self.make_get_request(url=f'{self.pp_}/metadata?table_name={table}')
            meta_data = results.json()

            partition = meta_data.get('partition_column')
            min_partition_key = meta_data.get('min_partition_key')
            max_partition_key = meta_data.get('max_partition_key')
            total_files = meta_data.get('total_files')
            total_size = meta_data.get('total_size')

            metadata_list.append({
                "table_name": table,
                "partition": partition,
                "min_partition_key": min_partition_key,
                "max_partition_key": max_partition_key,
                "total_files": total_files,
                "total_size": total_size

            })
        
        if df:
            return pd.DataFrame(metadata_list)
        else:
            return metadata_list

    
    
    def get_multi_tables(self, table_list: list, directory: str, skip_exists: bool = True, date_start: str = None, date_end: str = None):
        """
        Download all files from a list of tables from a multi-table product.
        
        :param table_list: List of tables to download. Call get_table_list() to get a list of available tables.
        :param directory: Directory to save downloaded files
        :param date_start: Start date for partitioned tables if applicable
        :param date_end: End date for partitioned tables if applicable
        """
        def determine_partition_params(metadata, date_start, date_end):
            """
            Determine partition parameters based on metadata and provided dates.
            """
            partition = metadata.get('partition')
            if partition:
                beg_date = date_start or metadata.get('min_partition_key')
                end_date = date_end or metadata.get('max_partition_key')
                return {'partition_key_before': end_date, 'partition_key_after': beg_date}
            return {}


        for table_name in table_list:
            table_dir = os.path.join(directory, table_name)
            os.makedirs(table_dir, exist_ok=True)
        
        # loop through all API result pages, keeping track of number of downloaded files
        for table_name in table_list:
            page = 1
            download_count = 0

            metadata = self.get_mt_meta(tables=[table_name])[0]
            partition_params = determine_partition_params(metadata, date_start, date_end)


            while True:
                params = {'page': page, 'table_name': table_name, **partition_params}

                # get results from API endpoint, using API key for authentication, for a specific page
                results = self.make_get_request(
                    url=self.pp_,
                    params=params
                )
                response_json = results.json()

                # for each result page, loop through download links and save to your computer
                for link_data in response_json['download_links']:
                        file_name = link_data['file_name']
                        file_path = os.path.join(directory, table_name, file_name)

                        if skip_exists and os.path.exists(file_path):
                            print(f"File {file_name} already exists. Skipping download.")
                            continue


                        print(f"Downloading file {file_name} to folder {table_name}...")
                        data = self.make_get_request(link_data['link'])

                        with open(file_path, 'wb') as file:
                            file.write(data.content)
                        download_count += 1
                
                # only continue if there are more result pages to process
                total_pages = response_json['total_pages']
                if page >= total_pages:
                    break
                page += 1
            
            print(f"Successfully downloaded {download_count} files to folder {table_name}.")
library(httr)
library(jsonlite)
library(stringr)
library(fs)
library(dplyr)
library(purrr)

# Helper function for making GET request with retry logic
make_get_request <- function(apikey, url, params = list(), attempt_count = 1, max_attempts = 5) {
  headers <- add_headers(
    `X-API-KEY` = apikey,
    `Content-Type` = "application/json",
    `accept` = "application/json"
  )

  response <- GET(url, headers, query = params)

  if (status_code(response) == 429 && attempt_count <= max_attempts) {
    sleep_time <- (2 ^ attempt_count) + runif(1, 0, 2)
    message(sprintf("Rate limit reached, retrying after %.1f seconds", sleep_time))
    Sys.sleep(sleep_time)
    return(make_get_request(apikey, url, params, attempt_count + 1))
  }

  if (status_code(response) == 400) {
    stop(sprintf("Request failed with %d", status_code(response)))
  }

  if (!(status_code(response) >= 200 && status_code(response) < 300)) {
    stop(sprintf("Request failed with %d and %s", 
                 status_code(response), 
                 content(response, "text", encoding = "UTF-8")))
  }

  return(response)
}


# Function to fetch list of avaiable tables
get_table_list <- function(apikey, base_url) {
  url <- paste0(base_url, "/multi-table-product-metadata")
  response <- make_get_request(apikey, url)
  response_json <- content(response, as = "parsed")
  
  table_options <- purrr::map_chr(response_json$items, "table_name")
  return(table_options)
}


# Function to fetch metadata for specific tables
get_table_metadata <- function(apikey, base_url, tables = "all") {
  if (length(tables) == 1 && tables == "all") {
    tables <- get_table_list(apikey, base_url)
  }
  
  if (is.character(tables)) {
    tables <- as.character(tables)
  }

  metadata_list <- map(tables, function(table_name) {
    url <- paste0(base_url, "/metadata?table_name=", table_name)
    response <- make_get_request(apikey, url)
    response_json <- content(response, as = "parsed")
    
    tibble(
      table_name = table_name,
      total_partitions = response_json$total_partitions,
      partition = response_json$partition_column,
      min_partition_key = response_json$min_partition_key,
      max_partition_key = response_json$max_partition_key
    )
  })
  
  return(bind_rows(metadata_list))
}


# Function to download tables based on metadata and optional date filters
download_tables <- function(apikey, base_url, table_list, directory, skip_exists = TRUE, date_start = NULL, date_end = NULL) {
  determine_partition_params <- function(metadata, date_start, date_end) {
    if (metadata$total_partitions > 0) {
      start_date <- ifelse(!is.null(date_start), date_start, metadata$min_partition_key)
      end_date <- ifelse(!is.null(date_end), date_end, metadata$max_partition_key)
      return(list(partition_key_before = end_date, partition_key_after = start_date))
    }
    return(list())
  }

  dir_create(directory)

  walk(table_list, function(table_name) {
    
    
    table_dir <- path(directory, table_name)
    dir_create(table_dir)
    
    metadata <- get_table_metadata(apikey, base_url, tables = table_name)
    
    partition_params <- determine_partition_params(metadata, date_start, date_end)

    page <- 1
    download_count <- 0

    repeat {
      params <- modifyList(
        list(page = page, table_name = table_name), 
        partition_params)
      response <- make_get_request(apikey, base_url, params)
      response_json <- content(response, as = "parsed")

      walk(response_json$download_links, function(link_info) {
        file_name <- link_info$file_name
        file_path <- path(table_dir, file_name)

        if (skip_exists && file_exists(file_path)) {
          message(sprintf("File %s already exists. Skipping download.", file_name))
        } else {
          message(sprintf("Downloading file %s to folder %s...", file_name, table_name))
          data <- make_get_request(apikey, link_info$link)
          writeBin(content(data, "raw"), file_path)
          download_count <<- download_count + 1
        }
      })

      if (page >= response_json$total_pages) {
        break
      }

      page <- page + 1
    }
    
    message(sprintf("Successfully downloaded %d files to folder %s.", download_count, table_name))
  })
}

Example use

Set up

# API Key
apikey = "<API_KEY_STEP_1>"

# Product path
pp_ = "<PRODUCT_PATH_STEP_2>"

# Initialize the class and store in variable mtp
mtp = Multitable(apikey, pp_)
# API Key
api_key <- "<API_KEY_STEP_1>"

# Product path
pp_ <- "<PRODUCT_PATH_STEP_2>"

Get meta information on each table in the product

Returns partition type, min key, max_key, total files, and total size

# Get meta information
mtp.get_mt_meta(df = True)
# Get meta data information for all tables
get_table_metadata(api_key, pp_)

Set df = True to return a dataframe

Get a list of available tables and download

# Show all tables available
tables =  mtp.get_table_list()
print(tables)

# Download all tables in the list
mtp.get_multi_tables(tables, "<YOUR_DIRECTORY>")
# Show all tables available
tables <-  get_table_list(api_key, pp_)
tables

# Download all tables in the list
get_multi_tables(api_key, pp_, tables, "<YOUR_DIRECTORY>")

Download select tables from the list

# Define a subset from the list of tables available
table_subset = ['TABLE_1', 'TABLE_2']

# Download the tables in the list
mtp.get_multi_tables(table_subset, "<YOUR_DIRECTORY>")
# Define a subset from the list of tables available
table_subset <-  ['TABLE_1', 'TABLE_2']

# Download the tables in the list
download_tables(api_key, pp_, table_subset, "<YOUR_DIRECTORY>")

Set date partition

If the table has a defined date partition column, set the start and end date to filter accordingly. If the file is not partitioned, the entire table will be returned. The minimum and maximum dates are applied as the default if none are specified.

# Set partition dates to apply to applly to partioned tables
mtp.get_multi_tables(table_subset, "<YOUR_DIRECTORY>", date_start="<DATE>", date_end="<DATE>")
# Set partition dates to apply to applly to partioned tables
download_tables(api_key, pp_, table_subset, date_start="<DATE>", date_end="<DATE>", "<YOUR_DIRECTORY>"

Overwrite current files

By default skip_exists is set to True. To overwrite previously downloaded files, set the skip_exists parameter to False

# Overwrite previously downloaded files
mtp.get_multi_tables(table_subset, "<YOUR_DIRECTORY>", skip_exists = False)
# Overwrite previously downloaded files
download_tables(api_key, pp_, table_subset, "<YOUR_DIRECTORY>", skip_exists = FALSE)