Gathering Data from the Steam Store API using Python
This post forms part of a larger series on downloading, processing and analysing data from the Steam store. See all posts here.
View original notebook on github (Download), with an interactive version available on Google Colaboratory. Datasets available on Kaggle.
The first time I used Steam was with an account I can't remember the name of, playing a game which could well have been Dark Messiah of Might & Magic, on a computer that would easily be blown away by the processing power of my phone today.
It was sometime in 2006, when every game I bought came on a disk in a box that now gathers dust somewhere in my parents' house. Surprisingly my current PC has a disk drive, though I can't remember the last time used it.
In order to play the multiplayer component of the game, annoyingly I had to install a piece of third-party software I had never heard of called Steam, creating the account I have long since lost in the process. Here's what that software probably looked like at the time:
Fast-forward a little over 10 years and the Steam Store is huge, ubiquitous as the home of PC gaming and distribution. Whilst physical copies still just about feel at home on consoles, the PC market has long since moved digital. In case you are not familiar, Steam is a digital store for purchasing, downloading and playing video games. It hosts a variety of community features, allows pushing game updates to users automatically, and gathers news stories relevant to each title. It's a bit like Google's Play Store or Apple's App Store for phones.
A large part of Steam's success as a platform is due to its use of frequent sales, convenience as a unified digital game library, and the aforementioned shift to digital over physical. Whilst other platforms are emerging and gaining traction, there is likely no better resource for examining gaming over the last decade. With that in mind, if we can construct a dataset from Steam's data, we will have access to a wealth of information about nearly 30,000 games released since 2003, when Steam first launched.
Project Goals¶
The motivation for this project is to download, process and analyse a data set of Steam apps (games) from the Steam store, and gain insights into what makes a game more successful in terms of sales, play-time and ratings. We will imagine that we have been approached by a company hoping to develop and release a new title, using the findings we provide them to inform decisions about how best to manage their budget and hopefully increase the success of their next release.
The first step will be tackling data collection - the actual retrieval of data from Steam's servers and databases. In the future we'll look at cleaning the data, transforming it into a more useful state, then on to data exploration and analysis. Finally we'll summarise our findings in a non-technical report which would be sent to the fictional company in question.
At the end of the data collection and cleaning stages, we'd like to end up with a table or database like this:
name | id | information | owners | price | rating |
---|---|---|---|---|---|
awesome game | 100 | genres, descriptors, variables | 100,000 | £9.99 | 9/10 |
generic shooter 4 | 200 | definitely the best shooter | 50,000 | £39.99 | 6/10 |
... | ... | ... | ... | ... | ... |
We can then interrogate the data, and investigate whether particular attributes tend to result in more successful games. Metrics like ownership and ratings should help define the success of a title.
Data Acquisition¶
There are a number of ways to get this information. Obviously we could search the web (and especially kaggle) for existing datasets, however to avoid letting someone else get away with all that hard work (and mainly for the purposes of learning) we'll be acquiring all the data ourselves from scratch.
Often when generating data the best place to start is to check for APIs. Fortunately Valve (the company behind Steam) make one available at https://partner.steamgames.com/. An API such as this allows anyone to interface with data on a website in a controlled way, usually providing a host of useful features to the end-user. Typically an API is a great way for developers to allow access to databases and information on a server. Unfortunately this documentation doesn't include all access points, but others have documented this for us. This documentation of the StorefrontAPI will be particularly useful.
We'll be able to get good information about the details of each game from the Steam API, however we're still missing information about popularity and sales. Luckily we can easily get this data from another website, SteamSpy.
SteamSpy is a Steam stats-gathering service and crucially has data easily available through its own API (documentation here). It provides a number of useful metrics including an estimation for total owners of each game.
We'll be retrieving data from both APIs and combining them to form our dataset. For the purposes of this project, we'll be performing as little data cleaning as possible at this stage, providing 'dirty' data for data cleaning, the next step in this project.
Section outline:¶
- Create an app list from SteamSpy API using 'all' request
- Retrieve individual app data from Steam API, by iterating through app list
- Retrieve individual app data from SteamSpy API, by iterating through app list
- Export app list, Steam data and SteamSpy data to csv files
API references:¶
- https://partner.steamgames.com/doc/webapi
- https://wiki.teamfortress.com/wiki/User:RJackson/StorefrontAPI
- https://steamapi.xpaw.me/#
- https://steamspy.com/api.php
Import Libraries¶
We begin by importing the libraries we will be using. We start with standard library imports, or those available by default in Python, then import the third-party packages. We'll be using requests to handle interacting with the APIs, then the popular pandas and numpy libraries for handling the downloaded data.
# standard library imports
import csv
import datetime as dt
import json
import os
import statistics
import time
# third-party imports
import numpy as np
import pandas as pd
import requests
# customisations - ensure tables show all columns
pd.set_option("max_columns", 100)
Next, we define a general, all-purpose function to process get requests from an API, supplied through a URL parameter. A dictionary of parameters can be supplied which is passed into the get request automatically, depending on the requirements of the API.
Rather than simply returning the response, we handle a couple of scenarios to help automation. Occasionally we encounter an SSL Error, in which case we simply wait a few seconds then try again (by recursively calling the function). When this happens, and generally throughout this project, we provide quite verbose feedback to show when these errors are encountered and how they are handled.
Sometimes there is no response when a request is made (returns None). This usually happens when too many requests are made in a short period of time, and the polling limit has been reached. We try to avoid this by pausing briefly between requests, as we'll see later, but in case we breach the polling limit we wait 10 seconds then try again.
Handling these errors in this way ensures that our function almost always returns the desired response, which we return in json format to make processing easier.
def get_request(url, parameters=None):
"""Return json-formatted response of a get request using optional parameters.
Parameters
----------
url : string
parameters : {'parameter': 'value'}
parameters to pass as part of get request
Returns
-------
json_data
json-formatted response (dict-like)
"""
try:
response = requests.get(url=url, params=parameters)
except SSLError as s:
print('SSL Error:', s)
for i in range(5, 0, -1):
print('\rWaiting... ({})'.format(i), end='')
time.sleep(1)
print('\rRetrying.' + ' '*10)
# recusively try again
return get_request(url, parameters)
if response:
return response.json()
else:
# response is none usually means too many requests. Wait and try again
print('No response, waiting 10 seconds...')
time.sleep(10)
print('Retrying.')
return get_request(url, parameters)
Generate List of App IDs¶
Every app on the steam store has a unique app ID. Whilst different apps can have the same name, they can't have the same ID. This will be very useful to us for identifying apps and eventually merging our tables of data.
Before we get to that, we need to generate a list of app ids which we can use to build our data sets. It's possible to generate one from the Steam API, however this has over 70,000 entries, many of which are demos and videos with no way to tell them apart. Instead, SteamSpy provides an 'all' request, supplying some information about the apps they track. It doesn't supply all information about each app, so we still need to request this information individually, but it provides a good starting point.
Because many of the return fields are strings containing commas and other punctuation, it is easiest to read the response into a pandas dataframe, and export the required appid and name fields to a csv. We could keep only the appid column as a list or pandas series, but it may be useful to keep the app name at this stage.
url = "https://steamspy.com/api.php"
parameters = {"request": "all"}
# request 'all' from steam spy and parse into dataframe
json_data = get_request(url, parameters=parameters)
steam_spy_all = pd.DataFrame.from_dict(json_data, orient='index')
# generate sorted app_list from steamspy data
app_list = steam_spy_all[['appid', 'name']].sort_values('appid').reset_index(drop=True)
# export disabled to keep consistency across download sessions
# app_list.to_csv('../data/download/app_list.csv', index=False)
# instead read from stored csv
app_list = pd.read_csv('../data/download/app_list.csv')
# display first few rows
app_list.head()
Define Download Logic¶
Now we have the app_list
dataframe, we can iterate over the app IDs and request individual app data from the servers. Here we set out our logic to retrieve and process this information, then finally store the data as a csv file.
Because it takes a long time to retrieve the data, it would be dangerous to attempt it all in one go as any errors or connection time-outs could cause the loss of all our data. For this reason we define a function to download and process the requests in batches, appending each batch to an external file and keeping track of the highest index written in a separate file.
This not only provides security, allowing us to easily restart the process if an error is encountered, but also means we can complete the download across multiple sessions.
Again, we provide verbose output for rows exported, batches complete, time taken and estimated time remaining.
def get_app_data(start, stop, parser, pause):
"""Return list of app data generated from parser.
parser : function to handle request
"""
app_data = []
# iterate through each row of app_list, confined by start and stop
for index, row in app_list[start:stop].iterrows():
print('Current index: {}'.format(index), end='\r')
appid = row['appid']
name = row['name']
# retrive app data for a row, handled by supplied parser, and append to list
data = parser(appid, name)
app_data.append(data)
time.sleep(pause) # prevent overloading api with requests
return app_data
def process_batches(parser, app_list, download_path, data_filename, index_filename,
columns, begin=0, end=-1, batchsize=100, pause=1):
"""Process app data in batches, writing directly to file.
parser : custom function to format request
app_list : dataframe of appid and name
download_path : path to store data
data_filename : filename to save app data
index_filename : filename to store highest index written
columns : column names for file
Keyword arguments:
begin : starting index (get from index_filename, default 0)
end : index to finish (defaults to end of app_list)
batchsize : number of apps to write in each batch (default 100)
pause : time to wait after each api request (defualt 1)
returns: none
"""
print('Starting at index {}:\n'.format(begin))
# by default, process all apps in app_list
if end == -1:
end = len(app_list) + 1
# generate array of batch begin and end points
batches = np.arange(begin, end, batchsize)
batches = np.append(batches, end)
apps_written = 0
batch_times = []
for i in range(len(batches) - 1):
start_time = time.time()
start = batches[i]
stop = batches[i+1]
app_data = get_app_data(start, stop, parser, pause)
rel_path = os.path.join(download_path, data_filename)
# writing app data to file
with open(rel_path, 'a', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=columns, extrasaction='ignore')
for j in range(3,0,-1):
print("\rAbout to write data, don't stop script! ({})".format(j), end='')
time.sleep(0.5)
writer.writerows(app_data)
print('\rExported lines {}-{} to {}.'.format(start, stop-1, data_filename), end=' ')
apps_written += len(app_data)
idx_path = os.path.join(download_path, index_filename)
# writing last index to file
with open(idx_path, 'w') as f:
index = stop
print(index, file=f)
# logging time taken
end_time = time.time()
time_taken = end_time - start_time
batch_times.append(time_taken)
mean_time = statistics.mean(batch_times)
est_remaining = (len(batches) - i - 2) * mean_time
remaining_td = dt.timedelta(seconds=round(est_remaining))
time_td = dt.timedelta(seconds=round(time_taken))
mean_td = dt.timedelta(seconds=round(mean_time))
print('Batch {} time: {} (avg: {}, remaining: {})'.format(i, time_td, mean_td, remaining_td))
print('\nProcessing batches complete. {} apps written'.format(apps_written))
Next we define some functions to handle and prepare the external files.
We use reset_index
for testing and demonstration, allowing us to easily reset the index in the stored file to 0, effectively restarting the entire download process.
We define get_index
to retrieve the index from file, maintaining persistence across sessions. Every time a batch of information (app data) is written to file, we write the highest index within app_data
that was retrieved. As stated, this is partially for security, ensuring that if there is an error during the download we can read the index from file and continue from the end of the last successful batch. Keeping track of the index also allows us to pause the download, continuing at a later time.
Finally, the prepare_data_file
function readies the csv for storing the data. If the index we retrieved is 0, it means we are either starting for the first time or starting over. In either case, we want a blank csv file with only the header row to begin writing to, se we wipe the file (by opening in write mode) and write the header. Conversely, if the index is anything other than 0, it means we already have downloaded information, and can leave the csv file alone.
def reset_index(download_path, index_filename):
"""Reset index in file to 0."""
rel_path = os.path.join(download_path, index_filename)
with open(rel_path, 'w') as f:
print(0, file=f)
def get_index(download_path, index_filename):
"""Retrieve index from file, returning 0 if file not found."""
try:
rel_path = os.path.join(download_path, index_filename)
with open(rel_path, 'r') as f:
index = int(f.readline())
except FileNotFoundError:
index = 0
return index
def prepare_data_file(download_path, filename, index, columns):
"""Create file and write headers if index is 0."""
if index == 0:
rel_path = os.path.join(download_path, filename)
with open(rel_path, 'w', newline='') as f:
writer = csv.DictWriter(f, fieldnames=columns)
writer.writeheader()
Download Steam Data¶
Now we are ready to start downloading data and writing to file. We define our logic particular to handling the steam API - in fact if no data is returned we return just the name and appid - then begin setting some parameters. We define the files we will write our data and index to, and the columns for the csv file. The API doesn't return every column for every app, so it is best to explicitly set these.
Next we run our functions to set up the files, and make a call to process_batches
to begin the process. Some additional parameters have been added for demonstration, to constrain the download to just a few rows and smaller batches. Removing these would allow the entire download process to be repeated.
def parse_steam_request(appid, name):
"""Unique parser to handle data from Steam Store API.
Returns : json formatted data (dict-like)
"""
url = "http://store.steampowered.com/api/appdetails/"
parameters = {"appids": appid}
json_data = get_request(url, parameters=parameters)
json_app_data = json_data[str(appid)]
if json_app_data['success']:
data = json_app_data['data']
else:
data = {'name': name, 'steam_appid': appid}
return data
# Set file parameters
download_path = '../data/download'
steam_app_data = 'steam_app_data.csv'
steam_index = 'steam_index.txt'
steam_columns = [
'type', 'name', 'steam_appid', 'required_age', 'is_free', 'controller_support',
'dlc', 'detailed_description', 'about_the_game', 'short_description', 'fullgame',
'supported_languages', 'header_image', 'website', 'pc_requirements', 'mac_requirements',
'linux_requirements', 'legal_notice', 'drm_notice', 'ext_user_account_notice',
'developers', 'publishers', 'demos', 'price_overview', 'packages', 'package_groups',
'platforms', 'metacritic', 'reviews', 'categories', 'genres', 'screenshots',
'movies', 'recommendations', 'achievements', 'release_date', 'support_info',
'background', 'content_descriptors'
]
# Overwrites last index for demonstration (would usually store highest index so can continue across sessions)
reset_index(download_path, steam_index)
# Retrieve last index downloaded from file
index = get_index(download_path, steam_index)
# Wipe or create data file and write headers if index is 0
prepare_data_file(download_path, steam_app_data, index, steam_columns)
# Set end and chunksize for demonstration - remove to run through entire app list
process_batches(
parser=parse_steam_request,
app_list=app_list,
download_path=download_path,
data_filename=steam_app_data,
index_filename=steam_index,
columns=steam_columns,
begin=index,
end=10,
batchsize=5
)
# inspect downloaded data
pd.read_csv('../data/download/steam_app_data.csv').head()
Download SteamSpy data¶
To retrieve data from SteamSpy we perform a very similar process. Our parse function is a little simpler because of the how data is returned, and the maximum polling rate of this API is higher so we can set a lower value for pause
in the process_batches
function and download more quickly. Apart from that we set the new variables and make a call to the process_batches
function once again.
def parse_steamspy_request(appid, name):
"""Parser to handle SteamSpy API data."""
url = "https://steamspy.com/api.php"
parameters = {"request": "appdetails", "appid": appid}
json_data = get_request(url, parameters)
return json_data
# set files and columns
download_path = '../data/download'
steamspy_data = 'steamspy_data.csv'
steamspy_index = 'steamspy_index.txt'
steamspy_columns = [
'appid', 'name', 'developer', 'publisher', 'score_rank', 'positive',
'negative', 'userscore', 'owners', 'average_forever', 'average_2weeks',
'median_forever', 'median_2weeks', 'price', 'initialprice', 'discount',
'languages', 'genre', 'ccu', 'tags'
]
reset_index(download_path, steamspy_index)
index = get_index(download_path, steamspy_index)
# Wipe data file if index is 0
prepare_data_file(download_path, steamspy_data, index, steamspy_columns)
process_batches(
parser=parse_steamspy_request,
app_list=app_list,
download_path=download_path,
data_filename=steamspy_data,
index_filename=steamspy_index,
columns=steamspy_columns,
begin=index,
end=20,
batchsize=5,
pause=0.3
)
# inspect downloaded steamspy data
pd.read_csv('../data/download/steamspy_data.csv').head()
Next Steps¶
Here we have defined and demonstrated the download process used to generate the data sets. This was completed separately but the full, raw data can be found on Kaggle.
We now have two tables of data with a variety of information about apps on the Steam store. From the Steam data it looks like there are some useful columns like required_age
, developers
and genres
which we can eventually turn into features for analysis, and a price_overview
column which may inform the success and sales of each game. The owners
column of the SteamSpy data could be useful, however the margin of error means data may not be accurate enough for meaningful analysis, we'll have to see what we can manage after cleaning. Instead we may have to use the positive
and negative
ratings or average play-time to create our metrics. There is also a tags
column which appears to crossover with the categories
and genres
columns in the Steam data. We may wish to merge these, or keep one over the other.
These are all decisions we'll come to in later stages of the project. With the data downloaded, this stage is now complete. In the next step, we'll take care of preparing and cleaning the data, readying a complete data set to use for analysis.
Thanks for joining me, and I welcome any feedback or suggestions you may have in the comments below.
Comments