User Tools

Site Tools


google_analytics
# -*- coding: utf-8 -*-
"""
__author__ = "Paulo de Melo"
__license__ = "GPL"
__version__ = "1.0"
__maintainer__ = ""
__date__ = "2015-01-19"
__email__ = "paulo.demelo@carmudi.com"
__status__ = "Development"
"""
 
import sys
import argparse
from datetime import timedelta
from datetime import datetime
from time import sleep
 
import pandas.io.ga as ga
import psycopg2
import pandas as pd
import pandas.io.gbq as gbq
from sqlalchemy import create_engine
from googleapiclient.errors import HttpError
 
from library.config import Config
 
 
 
 
#General carmudi account id
GENERAL_ACCOUNT_ID = '48959041'
#The google api secrets
SECRETS = 'library/configuration/client_secrets.json'
 
#The google api certificates
CERTIFICATE = 'library/configuration/analytics.dat'
 
 
def str2bool(v):
    """
    Convert a string to boolean type
        :param v: the string
        :return: True if v is as defined
    """
    return v.lower() in ("yes", "true", "t", "1")
 
 
def valid_date(day):
    """
    Validate a string with a valid date in the format YYYYmmdd
    :param day: string with the date
    :return: Boolean True/False
    """
    try:
        d = datetime.strptime(day, '%Y-%m-%d')
    except Exception:
        return False
    return d
 
 
def get_ga_id(cfg, server, country_iso):
    """
    Get the google analytics details from dwh_country
    :param cgf: the array with the configuration details to connect to the database
    :param country_iso: the country iso
    :return: ga_id and ga_property_id
    """
    con = psycopg2.connect(database=cfg.psql.database,
                           user=cfg.psql.username,
                           host=cfg.psql.host,
                           port=cfg.psql.port,
                           password=cfg.psql.password)
    try:
        data = pd.read_sql_query("SELECT ga_id, ga_property_id FROM il.dwh_country WHERE country_iso = '{}'".format(country_iso), con)
    except Exception, e:
        print 'Error %s connecting psql to obtain GA data' % e
        sys.exit(1)
    con.close()
    return data['ga_id'].values[0], data['ga_property_id'].values[0]
 
 
def check_and_convert(begin_date, end_date):
    """
    Given two dates as a string, in format YYYY-mm-dd, it will check if they are valid and
    return begin_date, end_date byt this orderr
    :param begin_date: string with begin date
    :param end_date: string with end date
    :return: begin and end date by the natural order
    """
    if begin_date > end_date:
        return end_date, begin_date
    else:
        return begin_date, end_date
 
 
def get_ga_data(profile_id, property_id, metrics, dimensions, day, filters=None):
    """
    Given a profile_id, property_id, metrics, dimensions, a day, filters download data
    from Google Analytics
    :param profile_id: string with profile_id
    :param property_id: string with property_id
    :param metrics: array with ga metrics
    :param dimensions: array with ga dimensions
    :param day: string with day in format 'yyyy-mm-dd'
    :param filters: array with ga filters
    :return: pandas dataframe object or int when error
    """
    try:
        df = ga.read_ga(account_id=GENERAL_ACCOUNT_ID,          #common carmudi's
                        profile_id=str(profile_id),
                        property_id=str(property_id),
                        metrics=metrics,
                        dimensions=dimensions,
                        start_date=day,
                        end_date=day,
                        index_col=0,
                        secrets=SECRETS,
                        filters=filters,
                        max_results=10000,
                        token_file_name=CERTIFICATE)
    except StopIteration, e:
        print "\t\t\tNote: No data to load for %s from GA. " % day
        return pd.DataFrame()
    except HttpError, e:
        return e.resp.status
    except Exception, e:
        print "\t\t\tError: Loading day %s data from GA. %s " % (day, e)
        return pd.DataFrame()
    #add the profile_id to the dataframe object
    df['ga_id'] = str(profile_id)
    return df
 
 
def get_psql_connection(hostname, port, database, username, password):
    """
    Create a psql connection object with sqlalchemy as requested by pandas
    :param hostname: string with hostname
    :param port: string with port number
    :param database:  string with database name
    :param username: string with username
    :param password: string with password
    :return: sqlalchemy engine object
    """
    engine = create_engine('postgresql://{username}:{password}@{hostname}:{port}/{database}'.format(username=username,
                                                                                                    password=password,
                                                                                                    hostname=hostname,
                                                                                                    port=port,
                                                                                                    database=database))
 
    return engine
 
 
def save_data_psql(hostname, port, database, username, password, schema, table, data, truncate=False):
    """
    Given a pandas dataframe, save it in a psql database
    :param hostname: string with hostname
    :param port: string with port number
    :param database: string with database name
    :param username: string with username
    :param password: string with password
    :param schema: string with schema name
    :param table: string with table name
    :param data: dataframe object
    :param truncate: boolean , True to truncate table
    :return: Nothing
    """
    con = get_psql_connection(hostname=hostname,
                              username=username,
                              password=password,
                              port=port,
                              database=database)
    if truncate:
        data.to_sql(schema=schema, name=table, con=con, if_exists='replace')
    else:
        data.to_sql(schema=schema, name=table, con=con, if_exists='append')
    return
 
 
def bq_schema(data):
    """
    Given a pandas dataframe object generate the json specific schema  as required by bigquery
    :param data: object dataframe
    :return:  return json
    """
    return gbq.generate_bq_schema(data, default_type='JSON')
 
 
def purge_database(cfg, server, ga_id, begin_date, end_date, schema, table):
    """
    delete records from a schema.table where date>= begin_date and date<= end_date
    :param cfg: the configuration file object
    :param server: string the specific server configuration
    :param begin_date: string with begin_date
    :param end_date: string with end_date
    :param schema: string with schema name
    :param table: string with table name
    :return: Nothing
    """
    con = get_psql_connection(hostname=cfg.psql.host,
                              port=cfg.psql.port,
                              database=cfg.psql.database,
                              username=cfg.psql.username,
                              password=cfg.psql.password)
    con.execute("DELETE FROM {schema}.{table} WHERE date >= '{begin_date}'::DATE AND date <= '{end_date}'::DATE AND ga_id='{ga_id}'".format(schema=schema,
                                                                                                                         ga_id=ga_id,
                                                                                                                         table=table,
                                                                                                                         begin_date=begin_date,
                                                                                                                         end_date=end_date))
    return None
 
 
def main(server, country_iso, metrics, dimensions, begin_date, end_date, schema, table, filters=None, truncate=False):
    #load configuration file
    cfg = Config(server)
 
    #get country_iso ga properties
    profile_id, property_id = get_ga_id(cfg=cfg, server=server, country_iso=country_iso.upper())
 
    #check dates
    begin_date, end_date = check_and_convert(begin_date, end_date)
 
    if not profile_id or not property_id:
        print "Error: could not obtain GA data for country_iso = '%s'" % country_iso
        sys.exit(1)
 
    print "Ready to save:\n \t\tmetrics:%s\n\t\tdimensions:%s\n\t\tfilters:%s" % (metrics, dimensions, filters)
 
    metrics = metrics.split(',')
    dimensions = dimensions.split(',')
    if filters:
        filters = filters.split(',')
 
    day = begin_date
    if not isinstance(day, datetime):
        day = datetime.strptime(day, '%Y-%m-%d')
    if not isinstance(end_date, datetime):
        end_date = datetime.strptime(end_date, '%Y-%m-%d')
 
    #counter of tries to load the data
    number_of_tries = 0
    while day <= end_date:
        data = get_ga_data(profile_id, property_id, metrics, dimensions, day.strftime('%Y-%m-%d'), filters)
        #if we get a google error
        if isinstance(data, int):
            if data in [403, 500, 503]:
                print "\t\t\tReloading: trying to reload data for %s from GA. Try # %s" % (day, number_of_tries)
                sleep(5)
                number_of_tries += 1
                #check if we reach the limit of tries
                if number_of_tries >= 3:
                    day = day + timedelta(days=1)
                    number_of_tries = 0
        else:
            if not data.empty:
                print '\t\t\tSave day % s to database' % day.strftime('%Y-%m-%d')
                #first remove the old data
                purge_database(cfg=cfg, server=server,  ga_id=profile_id, begin_date=day, end_date=day, schema=schema, table=table)
                #then input new
                save_data_psql(hostname=cfg.psql.host,
                               port=cfg.psql.port,
                               database=cfg.psql.database,
                               username=cfg.psql.username,
                               password= cfg.psql.password,
                               schema=schema,
                               table=table,
                               truncate=truncate,
                               data=data)
            day = day + timedelta(days=1)
            number_of_tries = 0
 
if __name__ == '__main__':
    parser = argparse.ArgumentParser(description="GA data")
    parser.add_argument('--schema', help="Schema's name")
    parser.add_argument('--table', help="Table's name")
    parser.add_argument('--server', help='Server name')
    parser.add_argument('--country_iso', help='Country iso')
    parser.add_argument('--metrics', help='GA metrics as a json string')
    parser.add_argument('--dimensions', help='GA dimensions as a json string')
    parser.add_argument('--filters', help='GA filters as a json string')
    parser.add_argument('--begin_date', help='GA begin date to process', type=valid_date)
    parser.add_argument('--end_date', help='GA end date to process', type=valid_date)
    parser.add_argument('--truncate', help='Truncate table', type=str2bool)
    args = parser.parse_args()
 
    main(server=args.server,
         schema=args.schema,
         table=args.table,
         country_iso=args.country_iso,
         metrics=args.metrics,
         dimensions=args.dimensions,
         filters=args.filters,
         begin_date=args.begin_date,
         end_date=args.end_date,
         truncate=args.truncate
         )
    # main(server='pantheon',
    #      country_iso='mx',
    #      metrics="impressions,adClicks,adCost",
    #      dimensions="date,campaign,keyword,isMobile",
    #      filters=None,
    #      begin_date='2013-01-01',
    #      end_date='2015-01-03',
    #      schema='dl',
    #      table='ga_adwords',
    #      truncate=False)
google_analytics.txt · Last modified: 2015/07/10 15:38 by vincenzo