Golang and Hive/Impala – Thrift

This started out as a quick project to see about taking a component of our service and migrating it from Python to Go. We’ve been talking about migrating services from the semi-monolithic version to more loosely coupled – the general idea is to move to Thrift oriented services. We have a core component of our system that uses Impala as a key backend, it’s a very stable service that could be logically broken out.

As an exercise for the weekend, started looking at golang and the ability to migrate it. While I can’t share the full code / details. As I was putting this together the resources for Go and Thrift are weak at best.

The first thing is that you need to get your “thrift build” aka Makefile right

1thrift:
2    thrift -r -gen go:package_prefix=github.com/koblas/test/services/ interfaces/ImpalaService.thrift
3    rm -rf ./services
4    mv gen-go services

Turns out that **package_prefix **is a big deal, since the thrift build is going to build a collection (not-just one) interface we need a way to make sure that they’re all available and name spaced.

Also – worth noting that HiveThing is a useful reference – though useses Hive Server 2 not beeswax.

The example code I ended up writing is here – hopefully this helps somebody in the future, figure things out just a bit faster.

  1package main
  2
  3import (
  4    "git.apache.org/thrift.git/lib/go/thrift"
  5    impala "github.com/Tubular/audience-also-watches/services/ImpalaService"
  6    "github.com/Tubular/audience-also-watches/services/beeswax"
  7    "fmt"
  8    "log"
  9    "time"
 10)
 11
 12func main() {
 13    addr := "impala-cluster"
 14    port := 21000
 15
 16    log.Println("Starting")
 17
 18    socket, err := thrift.NewTSocket(fmt.Sprintf("%s:%d", addr, port))
 19
 20    if err != nil {
 21        log.Fatal("Error opening socket:", err)
 22
 23        return
 24    }
 25
 26    log.Println("Factory Land")
 27
 28    transportFactory := thrift.NewTBufferedTransportFactory(24*1024*1024)
 29    protocolFactory := thrift.NewTBinaryProtocolFactoryDefault()
 30
 31    transport := transportFactory.GetTransport(socket)
 32    defer transport.Close()
 33
 34    log.Println("Doing Open")
 35
 36    if err := transport.Open(); err != nil {
 37        log.Fatal("Unable to open transport", err)
 38
 39        return
 40    }
 41
 42    log.Println("Getting Client")
 43
 44    client := impala.NewImpalaServiceClientFactory(transport, protocolFactory)
 45
 46    if client == nil {
 47        log.Fatal("Unable to estalbish client")
 48        return
 49    }
 50
 51    log.Println("Before ping")
 52    client.PingImpalaService()
 53    log.Println("DONE")
 54
 55    //
 56    log.Println("Building Query")
 57
 58    query := beeswax.Query{}
 59
 60    query.Query = "SELECT * FROM engagements LIMIT 10000"
 61    query.Configuration = []string{}
 62
 63    handle, err := client.Query(&query)
 64    log.Println("Query sent")
 65
 66    if err != nil {
 67        log.Fatal("QUERY Failed")
 68        return
 69    }
 70
 71    for true {
 72        time.Sleep(100 * time.Millisecond)
 73
 74        state, err := client.GetState(handle)
 75        if err != nil {
 76            log.Println("GetState Error", err)
 77            break
 78        }
 79
 80        log.Println("State = ", state)
 81
 82        if state == beeswax.QueryState_FINISHED {
 83            log.Println("Got finish state of", state)
 84            break
 85        }
 86        if state == beeswax.QueryState_EXCEPTION {
 87            log.Println("Exception", state)
 88            return
 89        }
 90    }
 91
 92    log.Println("Fetching results")
 93
 94    // first := true
 95    total := 0
 96    startTime := time.Now()
 97    for true {
 98        // log.Println("Getting Row")
 99        result, err := client.Fetch(handle, false, 1000000)
100
101        if err != nil {
102            log.Fatal("Error in fetch", err)
103            return
104        }
105
106        /*
107        if first {
108            first = false
109
110            fmt.Println(result.Columns)
111        }
112        */
113
114        // log.Println("Got count=", len(result.Data))
115
116        total = total + len(result.Data)
117
118        /*
119        for _, row := range(result.Data) {
120            fmt.Println(row)
121        }
122        */
123
124        if ! result.HasMore {
125            break
126        }
127    }
128    log.Printf("Fetch %d rows(s) in %.2fs", total, time.Duration(time.Since(startTime)).Seconds())
129
130    log.Println("All Done")
131}