import ibis
import os
hdfs_port = os.environ.get('IBIS_WEBHDFS_PORT', 50070)
hdfs = ibis.hdfs_connect(host='quickstart.cloudera', port=hdfs_port)
con = ibis.impala.connect(host='quickstart.cloudera', database='ibis_testing',
                          hdfs_client=hdfs)
ibis.options.interactive = True

“Top-K” Filtering

A common analytical pattern involves subsetting based on some method of ranking. For example, “the 5 most frequently occurring widgets in a dataset”. By choosing the right metric, you can obtain the most important or least important items from some dimension, for some definition of important.

To carry out the pattern by hand involves the following

  • Choose a ranking metric
  • Aggregate, computing the ranking metric, by the target dimension
  • Order by the ranking metric and take the highest K values
  • Use those values as a set filter (either with semi_join or isin) in your next query

For example, let’s look at the TPC-H tables and find the 5 or 10 customers who placed the most orders over their lifetime:

orders = con.table('tpch_orders')

top_orders = (orders
              .group_by('o_custkey')
              .size()
              .sort_by(('count', False))
              .limit(5))
top_orders
   o_custkey  count
0       3451     41
1     102004     41
2     102022     41
3      79300     40
4     122623     40

Now, we could use these customer keys as a filter in some other analysis:

# Among the top 5 most frequent customers, what's the histogram of their order statuses?
analysis = (orders[orders.o_custkey.isin(top_orders.o_custkey)]
            .group_by('o_orderstatus')
            .size())
analysis
  o_orderstatus  count
0             O    107
1             P      8
2             F     88

This is such a common pattern that Ibis supports a high level primitive topk operation, which can be used immediately as a filter:

top_orders = orders.o_custkey.topk(5)
orders[top_orders].group_by('o_orderstatus').size()
  o_orderstatus  count
0             O    107
1             P      8
2             F     88

This goes a little further. Suppose now we want to rank customers by their total spending instead of the number of orders, perhaps a more meaningful metric:

total_spend = orders.o_totalprice.sum().name('total')
top_spenders = (orders
                .group_by('o_custkey')
                .aggregate(total_spend)
                .sort_by(('total', False))
                .limit(5))
top_spenders
   o_custkey       total
0     143500  7012696.48
1      95257  6563511.23
2      87115  6457526.26
3     131113  6311428.86
4     103834  6306524.23

To use another metric, just pass it to the by argument in topk:

top_spenders = orders.o_custkey.topk(5, by=total_spend)
orders[top_spenders].group_by('o_orderstatus').size()
  o_orderstatus  count
0             O     98
1             P      1
2             F     78

Self joins

If you’re a relational data guru, you may have wondered how it’s possible to join tables with themselves, because joins clauses involve column references back to the original table.

Consider the SQL

SELECT t1.key, sum(t1.value - t2.value) AS metric
FROM my_table t1
  JOIN my_table t2
    ON t1.key = t2.subkey
GROUP BY 1

Here, we have an unambiguous way to refer to each of the tables through aliasing.

Let’s consider the TPC-H database, and support we want to compute year-over-year change in total order amounts by region using joins.

region = con.table('tpch_region')
nation = con.table('tpch_nation')
customer = con.table('tpch_customer')
orders = con.table('tpch_orders')

orders.limit(5)
   o_orderkey  o_custkey o_orderstatus o_totalprice o_orderdate  0           1      36901             O    173665.47  1996-01-02
1           2      78002             O     46929.18  1996-12-01
2           3     123314             F    193846.25  1993-10-14
3           4     136777             O     32151.78  1995-10-11
4           5      44485             F    144659.20  1994-07-30

  o_orderpriority          o_clerk  o_shippriority  0           5-LOW  Clerk#000000951               0
1        1-URGENT  Clerk#000000880               0
2           5-LOW  Clerk#000000955               0
3           5-LOW  Clerk#000000124               0
4           5-LOW  Clerk#000000925               0

                                           o_comment
0                 nstructions sleep furiously among
1   foxes. pending accounts at the pending, silen...
2  sly final accounts boost. carefully regular id...
3  sits. slyly regular warthogs cajole. regular, ...
4  quickly. bold deposits sleep slyly. packages u...

First, let’s join all the things and select the fields we care about:

fields_of_interest = [region.r_name.name('region'),
                      nation.n_name.name('nation'),
                      orders.o_totalprice.name('amount'),
                      orders.o_orderdate.cast('timestamp').name('odate') # these are strings
                      ]

joined_all = (region.join(nation, region.r_regionkey == nation.n_regionkey)
              .join(customer, customer.c_nationkey == nation.n_nationkey)
              .join(orders, orders.o_custkey == customer.c_custkey)
              [fields_of_interest])

Okay, great, let’s have a look:

joined_all.limit(5)
        region         nation     amount      odate
0      AMERICA  UNITED STATES  160843.35 1992-06-22
1  MIDDLE EAST           IRAN   78307.91 1996-04-19
2       EUROPE         FRANCE  103237.90 1994-10-12
3       EUROPE         FRANCE  201463.59 1997-09-12
4         ASIA          JAPAN  166098.86 1995-09-12

Sweet, now let’s aggregate by year and region:

year = joined_all.odate.year().name('year')

total = joined_all.amount.sum().cast('double').name('total')

annual_amounts = (joined_all
                  .group_by(['region', year])
                  .aggregate(total))
annual_amounts
         region  year         total
0        EUROPE  1996  7.015421e+09
1       AMERICA  1995  6.905139e+09
2       AMERICA  1992  6.834349e+09
3        EUROPE  1998  4.113448e+09
4        EUROPE  1994  6.979473e+09
5   MIDDLE EAST  1998  4.025011e+09
6   MIDDLE EAST  1997  6.814699e+09
7          ASIA  1994  6.957170e+09
8          ASIA  1997  6.910663e+09
9          ASIA  1993  6.864540e+09
10         ASIA  1998  4.058824e+09
11       EUROPE  1992  6.926705e+09
12       AFRICA  1997  6.848983e+09
13  MIDDLE EAST  1995  6.830827e+09
14       AFRICA  1995  6.908429e+09
15      AMERICA  1996  6.883057e+09
16       AFRICA  1993  6.859733e+09
17      AMERICA  1997  6.922465e+09
18  MIDDLE EAST  1996  6.877095e+09
19  MIDDLE EAST  1992  6.761499e+09
20  MIDDLE EAST  1993  6.797943e+09
21  MIDDLE EAST  1994  6.778384e+09
22       EUROPE  1995  6.970001e+09
23         ASIA  1992  6.934801e+09
24       AFRICA  1994  6.837587e+09
25      AMERICA  1998  3.991377e+09
26       EUROPE  1993  6.911395e+09
27       AFRICA  1992  6.873319e+09
28       AFRICA  1998  4.024061e+09
29       EUROPE  1997  6.876824e+09
30         ASIA  1996  6.955679e+09
31         ASIA  1995  6.931738e+09
32       AFRICA  1996  6.878112e+09
33      AMERICA  1994  6.863756e+09
34      AMERICA  1993  6.906800e+09

Looking good so far. Now, we need to join this table on itself, by subtracting 1 from one of the year columns.

We do this by creating a “joinable” view of a table that is considered a distinct object within Ibis. To do this, use the view function:

current = annual_amounts
prior = annual_amounts.view()

yoy_change = (current.total - prior.total).name('yoy_change')

results = (current.join(prior, ((current.region == prior.region) &
                                (current.year == (prior.year - 1))))
           [current.region, current.year, yoy_change])
df = results.execute()
df['yoy_pretty'] = df.yoy_change.map(lambda x: '$%.2fmm' % (x / 1000000.))
df
region year yoy_change yoy_pretty
0 AFRICA 1994 -7.084172e+07 $-70.84mm
1 AFRICA 1996 2.912979e+07 $29.13mm
2 ASIA 1997 2.851839e+09 $2851.84mm
3 ASIA 1993 -9.262979e+07 $-92.63mm
4 EUROPE 1997 2.763376e+09 $2763.38mm
5 AMERICA 1992 -7.245078e+07 $-72.45mm
6 AMERICA 1995 2.208216e+07 $22.08mm
7 EUROPE 1995 -4.542062e+07 $-45.42mm
8 AMERICA 1996 -3.940791e+07 $-39.41mm
9 MIDDLE EAST 1996 6.239623e+07 $62.40mm
10 MIDDLE EAST 1997 2.789688e+09 $2789.69mm
11 AFRICA 1992 1.358699e+07 $13.59mm
12 AFRICA 1997 2.824921e+09 $2824.92mm
13 AMERICA 1994 -4.138320e+07 $-41.38mm
14 AFRICA 1995 3.031631e+07 $30.32mm
15 EUROPE 1993 -6.807773e+07 $-68.08mm
16 MIDDLE EAST 1995 -4.626817e+07 $-46.27mm
17 MIDDLE EAST 1994 -5.244317e+07 $-52.44mm
18 MIDDLE EAST 1993 1.955937e+07 $19.56mm
19 MIDDLE EAST 1992 -3.644384e+07 $-36.44mm
20 EUROPE 1994 9.471985e+06 $9.47mm
21 EUROPE 1996 1.385975e+08 $138.60mm
22 ASIA 1996 4.501570e+07 $45.02mm
23 ASIA 1994 2.543198e+07 $25.43mm
24 AFRICA 1993 2.214559e+07 $22.15mm
25 ASIA 1995 -2.394126e+07 $-23.94mm
26 ASIA 1992 7.026156e+07 $70.26mm
27 AMERICA 1993 4.304359e+07 $43.04mm
28 AMERICA 1997 2.931088e+09 $2931.09mm
29 EUROPE 1992 1.531005e+07 $15.31mm

If you’re being fastidious and want to consider the first year occurring in the dataset for each region to have 0 for the prior year, you will instead need to do an outer join and treat nulls in the prior side of the join as zero:

yoy_change = (current.total - prior.total.zeroifnull()).name('yoy_change')
results = (current.outer_join(prior, ((current.region == prior.region) &
                                      (current.year == (prior.year - 1))))
           [current.region, current.year, current.total,
            prior.total.zeroifnull().name('prior_total'),
            yoy_change])

results.limit(10)
    region  year         total   prior_total    yoy_change
0   AFRICA  1994  6.837587e+09  6.908429e+09 -7.084172e+07
1   AFRICA  1996  6.878112e+09  6.848983e+09  2.912979e+07
2   AFRICA  1998  4.024061e+09  0.000000e+00  4.024061e+09
3     ASIA  1997  6.910663e+09  4.058824e+09  2.851839e+09
4     ASIA  1998  4.058824e+09  0.000000e+00  4.058824e+09
5     ASIA  1993  6.864540e+09  6.957170e+09 -9.262979e+07
6  AMERICA  1996  6.883057e+09  6.922465e+09 -3.940791e+07
7   AFRICA  1992  6.873319e+09  6.859733e+09  1.358699e+07
8   EUROPE  1997  6.876824e+09  4.113448e+09  2.763376e+09
9  AMERICA  1992  6.834349e+09  6.906800e+09 -7.245078e+07