Advanced Topics: Top-K and Self Joins

Setup

In [1]:
import ibis
import os
hdfs_port = os.environ.get('IBIS_WEBHDFS_PORT', 50070)
hdfs = ibis.hdfs_connect(host='quickstart.cloudera', port=hdfs_port)
con = ibis.impala.connect(host='quickstart.cloudera', database='ibis_testing',
                          hdfs_client=hdfs)
ibis.options.interactive = True

“Top-K” Filtering

A common analytical pattern involves subsetting based on some method of ranking. For example, “the 5 most frequently occurring widgets in a dataset”. By choosing the right metric, you can obtain the most important or least important items from some dimension, for some definition of important.

To carry out the pattern by hand involves the following

  • Choose a ranking metric
  • Aggregate, computing the ranking metric, by the target dimension
  • Order by the ranking metric and take the highest K values
  • Use those values as a set filter (either with semi_join or isin) in your next query

For example, let’s look at the TPC-H tables and find the 5 or 10 customers who placed the most orders over their lifetime:

In [2]:
orders = con.table('tpch_orders')

top_orders = (orders
              .group_by('o_custkey')
              .size()
              .sort_by(('count', False))
              .limit(5))
top_orders
Out[2]:
../../_images/notebooks_tutorial_6-Advanced-Topics-TopK-SelfJoins_4_0.png

Now, we could use these customer keys as a filter in some other analysis:

In [3]:
# Among the top 5 most frequent customers, what's the histogram of their order statuses?
analysis = (orders[orders.o_custkey.isin(top_orders.o_custkey)]
            .group_by('o_orderstatus')
            .size())
analysis
Out[3]:
../../_images/notebooks_tutorial_6-Advanced-Topics-TopK-SelfJoins_6_0.png

This is such a common pattern that Ibis supports a high level primitive topk operation, which can be used immediately as a filter:

In [4]:
top_orders = orders.o_custkey.topk(5)
orders[top_orders].group_by('o_orderstatus').size()
Out[4]:
../../_images/notebooks_tutorial_6-Advanced-Topics-TopK-SelfJoins_8_0.png

This goes a little further. Suppose now we want to rank customers by their total spending instead of the number of orders, perhaps a more meaningful metric:

In [5]:
total_spend = orders.o_totalprice.sum().name('total')
top_spenders = (orders
                .group_by('o_custkey')
                .aggregate(total_spend)
                .sort_by(('total', False))
                .limit(5))
top_spenders
Out[5]:
../../_images/notebooks_tutorial_6-Advanced-Topics-TopK-SelfJoins_10_0.png

To use another metric, just pass it to the by argument in topk:

In [6]:
top_spenders = orders.o_custkey.topk(5, by=total_spend)
orders[top_spenders].group_by('o_orderstatus').size()
Out[6]:
../../_images/notebooks_tutorial_6-Advanced-Topics-TopK-SelfJoins_12_0.png

Self joins

If you’re a relational data guru, you may have wondered how it’s possible to join tables with themselves, because joins clauses involve column references back to the original table.

Consider the SQL

SELECT t1.key, sum(t1.value - t2.value) AS metric
FROM my_table t1
  JOIN my_table t2
    ON t1.key = t2.subkey
GROUP BY 1

Here, we have an unambiguous way to refer to each of the tables through aliasing.

Let’s consider the TPC-H database, and support we want to compute year-over-year change in total order amounts by region using joins.

In [7]:
region = con.table('tpch_region')
nation = con.table('tpch_nation')
customer = con.table('tpch_customer')
orders = con.table('tpch_orders')

orders.limit(5)
Out[7]:
../../_images/notebooks_tutorial_6-Advanced-Topics-TopK-SelfJoins_15_0.png

First, let’s join all the things and select the fields we care about:

In [8]:
fields_of_interest = [region.r_name.name('region'),
                      nation.n_name.name('nation'),
                      orders.o_totalprice.name('amount'),
                      orders.o_orderdate.cast('timestamp').name('odate') # these are strings
                      ]

joined_all = (region.join(nation, region.r_regionkey == nation.n_regionkey)
              .join(customer, customer.c_nationkey == nation.n_nationkey)
              .join(orders, orders.o_custkey == customer.c_custkey)
              [fields_of_interest])

Okay, great, let’s have a look:

In [9]:
joined_all.limit(5)
Out[9]:
        region        nation     amount      odate
0  MIDDLE EAST        JORDAN  173665.47 1996-01-02
1  MIDDLE EAST          IRAN   46929.18 1996-12-01
2       AFRICA       MOROCCO  193846.25 1993-10-14
3  MIDDLE EAST          IRAN   32151.78 1995-10-11
4  MIDDLE EAST  SAUDI ARABIA  144659.20 1994-07-30

Sweet, now let’s aggregate by year and region:

In [10]:
year = joined_all.odate.year().name('year')

total = joined_all.amount.sum().cast('double').name('total')

annual_amounts = (joined_all
                  .group_by(['region', year])
                  .aggregate(total))
annual_amounts
Out[10]:
         region  year         total
0        EUROPE  1994  6.979473e+09
1        EUROPE  1996  7.015421e+09
2          ASIA  1997  6.910663e+09
3          ASIA  1998  4.058824e+09
4        EUROPE  1992  6.926705e+09
5       AMERICA  1995  6.905139e+09
6       AMERICA  1992  6.834349e+09
7        EUROPE  1998  4.113448e+09
8       AMERICA  1996  6.883057e+09
9        AFRICA  1993  6.859733e+09
10  MIDDLE EAST  1998  4.025011e+09
11  MIDDLE EAST  1997  6.814699e+09
12  MIDDLE EAST  1996  6.877095e+09
13         ASIA  1994  6.957170e+09
14         ASIA  1993  6.864540e+09
15       AFRICA  1997  6.848983e+09
16  MIDDLE EAST  1992  6.761499e+09
17  MIDDLE EAST  1993  6.797943e+09
18  MIDDLE EAST  1994  6.778384e+09
19  MIDDLE EAST  1995  6.830827e+09
20       AFRICA  1995  6.908429e+09
21         ASIA  1992  6.934801e+09
22      AMERICA  1998  3.991377e+09
23      AMERICA  1997  6.922465e+09
24       EUROPE  1993  6.911395e+09
25       EUROPE  1997  6.876824e+09
26       EUROPE  1995  6.970001e+09
27         ASIA  1995  6.931738e+09
28       AFRICA  1994  6.837587e+09
29       AFRICA  1996  6.878112e+09
30      AMERICA  1994  6.863756e+09
31       AFRICA  1992  6.873319e+09
32       AFRICA  1998  4.024061e+09
33      AMERICA  1993  6.906800e+09
34         ASIA  1996  6.955679e+09

Looking good so far. Now, we need to join this table on itself, by subtracting 1 from one of the year columns.

We do this by creating a “joinable” view of a table that is considered a distinct object within Ibis. To do this, use the view function:

In [11]:
current = annual_amounts
prior = annual_amounts.view()

yoy_change = (current.total - prior.total).name('yoy_change')

results = (current.join(prior, ((current.region == prior.region) &
                                (current.year == (prior.year - 1))))
           [current.region, current.year, yoy_change])
df = results.execute()
In [12]:
df['yoy_pretty'] = df.yoy_change.map(lambda x: '$%.2fmm' % (x / 1000000.))
df
Out[12]:
region year yoy_change yoy_pretty
0 AFRICA 1994 -7.084172e+07 $-70.84mm
1 AMERICA 1996 -3.940791e+07 $-39.41mm
2 AFRICA 1996 2.912979e+07 $29.13mm
3 AFRICA 1992 1.358699e+07 $13.59mm
4 ASIA 1997 2.851839e+09 $2851.84mm
5 ASIA 1993 -9.262979e+07 $-92.63mm
6 EUROPE 1993 -6.807773e+07 $-68.08mm
7 MIDDLE EAST 1995 -4.626817e+07 $-46.27mm
8 MIDDLE EAST 1994 -5.244317e+07 $-52.44mm
9 MIDDLE EAST 1993 1.955937e+07 $19.56mm
10 MIDDLE EAST 1992 -3.644384e+07 $-36.44mm
11 EUROPE 1997 2.763376e+09 $2763.38mm
12 AMERICA 1992 -7.245078e+07 $-72.45mm
13 AMERICA 1995 2.208216e+07 $22.08mm
14 EUROPE 1995 -4.542062e+07 $-45.42mm
15 ASIA 1994 2.543198e+07 $25.43mm
16 MIDDLE EAST 1996 6.239623e+07 $62.40mm
17 MIDDLE EAST 1997 2.789688e+09 $2789.69mm
18 AFRICA 1993 2.214559e+07 $22.15mm
19 ASIA 1995 -2.394126e+07 $-23.94mm
20 ASIA 1992 7.026156e+07 $70.26mm
21 AFRICA 1997 2.824921e+09 $2824.92mm
22 AMERICA 1993 4.304359e+07 $43.04mm
23 AMERICA 1994 -4.138320e+07 $-41.38mm
24 AFRICA 1995 3.031631e+07 $30.32mm
25 AMERICA 1997 2.931088e+09 $2931.09mm
26 EUROPE 1994 9.471985e+06 $9.47mm
27 EUROPE 1996 1.385975e+08 $138.60mm
28 ASIA 1996 4.501570e+07 $45.02mm
29 EUROPE 1992 1.531005e+07 $15.31mm

If you’re being fastidious and want to consider the first year occurring in the dataset for each region to have 0 for the prior year, you will instead need to do an outer join and treat nulls in the prior side of the join as zero:

In [13]:
yoy_change = (current.total - prior.total.zeroifnull()).name('yoy_change')
results = (current.outer_join(prior, ((current.region == prior.region) &
                                      (current.year == (prior.year - 1))))
           [current.region, current.year, current.total,
            prior.total.zeroifnull().name('prior_total'),
            yoy_change])

results.limit(10)
Out[13]:
        region  year         total   prior_total    yoy_change
0         ASIA  1998  4.058824e+09  0.000000e+00  4.058824e+09
1       AFRICA  1994  6.837587e+09  6.908429e+09 -7.084172e+07
2      AMERICA  1996  6.883057e+09  6.922465e+09 -3.940791e+07
3       AFRICA  1996  6.878112e+09  6.848983e+09  2.912979e+07
4       AFRICA  1992  6.873319e+09  6.859733e+09  1.358699e+07
5       AFRICA  1998  4.024061e+09  0.000000e+00  4.024061e+09
6         ASIA  1997  6.910663e+09  4.058824e+09  2.851839e+09
7         ASIA  1993  6.864540e+09  6.957170e+09 -9.262979e+07
8       EUROPE  1993  6.911395e+09  6.979473e+09 -6.807773e+07
9  MIDDLE EAST  1995  6.830827e+09  6.877095e+09 -4.626817e+07