Using Python for Streaming Data with Generators

Continuous from the same dataset from the World Bank World Development Indicator. Before this, I wrote about using the iterators to load data chunk by chunk and then, using the generators to load data or file line by line.

Generators work for streaming data where the data is written line by line from time to time. Generators able to read and process the data until it reaches end of file or no lines to process. Sometimes, data sources can be so large in size that storing the entire dataset in memory becomes too resource-intensive.

In this exercise from DataCamp’s tutorials, I process the first 1000 rows of a file line by line, to create a dictionary of the counts of how many times each country appears in a column in the dataset. Below is some details about the dataset and how to import the dataset.

To begin, I need to open a connection to a file using what is known as a context manager. For example, the command,

with open(‘datacamp.csv’) as datacamp

binds the csv file ‘datacamp.csv’ as datacamp in the context manager.

Here, the with statement is the context manager, and its purpose is to ensure that resources are efficiently allocated when opening a connection to a file.

The sample code below uses the .readline() method to read a line from the file object. It then, can split the line into a list using .split() method.

# Open a connection to the file
with open('world_dev_ind.csv') as file:

    # Skip the column names
    file.readline()

    # Initialize an empty dictionary: counts_dict
    counts_dict = {}

    # Process only the first 1000 rows
    for j in range(1000):

        # Split the current line into a list: line
        line = file.readline().split(',')

        # Get the value for the first column: first_col
        first_col = line[0]

        # If the column value is in the dict, increment its value
        if first_col in counts_dict.keys():
            counts_dict[first_col] += 1

        # Else, add to the dict and set value to 1
        else:
            counts_dict[first_col] = 1

# Print the resulting dictionary
print(counts_dict)

The output looks as below:

{‘Arab World’: 80, ‘Caribbean small states’: 77, ‘Central Europe and the Baltics’: 71, ‘East Asia & Pacific (all income levels)’: 122, ‘East Asia & Pacific (developing only)’: 123, ‘Euro area’: 119, ‘Europe & Central Asia (all income levels)’: 109, ‘Europe & Central Asia (developing only)’: 89, ‘European Union’: 116, ‘Fragile and conflict affected situations’: 76, ‘Heavily indebted poor countries (HIPC)’: 18}

Use generator to load data
Generators allow users to lazily evaluate data. This concept of lazy evaluation is useful when you have to deal with very large datasets because it lets you generate values in an efficient manner by yielding only chunks of data at a time instead of the whole thing at once.

The tutorial requires to define a generator function read_large_file()that produces a generator object which yields a single line from a file each time next() is called on it. 

# Define read_large_file()
def read_large_file(file_object):
    """A generator function to read a large file lazily."""

    # Loop indefinitely until the end of the file
    while True:

        # Read a line from the file: data
        data = file_object.readline()

        # Break if this is the end of the file
        if not data:
            break

        # Yield the line of data
        yield data
        
# Open a connection to the file
with open('world_dev_ind.csv') as file:

    # Create a generator object for the file: gen_file
    gen_file = read_large_file(file)

    # Print the first three lines of the file
    print(next(gen_file))
    print(next(gen_file))
    print(next(gen_file))

Next, use the generator function created above to call a file line by line, create a dictionary of count of how many times each country appears in a column in the dataset and process all the rows in the file and print out the result. See the sample code below:

# Initialize an empty dictionary: counts_dict
counts_dict = {}

# Open a connection to the file
with open('world_dev_ind.csv') as file:

    # Iterate over the generator from read_large_file()
    for line in read_large_file(file):

        row = line.split(',')
        first_col = row[0]

        if first_col in counts_dict.keys():
            counts_dict[first_col] += 1
        else:
            counts_dict[first_col] = 1

# Print            
print(counts_dict)

And, the output looks as below:

{‘CountryName’: 1, ‘Arab World’: 80, ‘Caribbean small states’: 77, ‘Central Europe and the Baltics’: 71, ‘East Asia & Pacific (all income levels)’: 122, ‘East Asia & Pacific (developing only)’: 123, ‘Euro area’: 119, ‘Europe & Central Asia (all income levels)’: 109, ‘Europe & Central Asia (developing only)’: 89, ‘European Union’: 116, ‘Fragile and conflict affected situations’: 76, ‘Heavily indebted poor countries (HIPC)’: 99, ‘High income’: 131, ‘High income: nonOECD’: 68, ‘High income: OECD’: 127, ‘Latin America & Caribbean (all income levels)’: 130, ‘Latin America & Caribbean (developing only)’: 133, ‘Least developed countries: UN classification’: 78, ‘Low & middle income’: 138, ‘Low income’: 80, ‘Lower middle income’: 126, ‘Middle East & North Africa (all income levels)’: 89, ‘Middle East & North Africa (developing only)’: 94, ‘Middle income’: 138, ‘North America’: 123, ‘OECD members’: 130, ‘Other small states’: 63, ‘Pacific island small states’: 66, ‘Small states’: 69, ‘South Asia’: 36}

Obviously, it shows more data than before this because it does not limit to 1000 rows.

Summary of the data:

  • Generators for streaming data.
  • Context manager, open a file connection.
  • Use generator function to load data.
  • Use a dictionary to store result.