Golang and Hive/Impala – Thrift
This started out as a quick project to see about taking a component of our service and migrating it from Python to Go. We’ve been talking about migrating services from the semi-monolithic version to more loosely coupled – the general idea is to move to Thrift oriented services. We have a core component of our system that uses Impala as a key backend, it’s a very stable service that could be logically broken out.
As an exercise for the weekend, started looking at golang and the ability to migrate it. While I can’t share the full code / details. As I was putting this together the resources for Go and Thrift are weak at best.
The first thing is that you need to get your “thrift build” aka Makefile right
1thrift:
2 thrift -r -gen go:package_prefix=github.com/koblas/test/services/ interfaces/ImpalaService.thrift
3 rm -rf ./services
4 mv gen-go services
Turns out that **package_prefix **is a big deal, since the thrift build is going to build a collection (not-just one) interface we need a way to make sure that they’re all available and name spaced.
Also – worth noting that HiveThing is a useful reference – though useses Hive Server 2 not beeswax.
The example code I ended up writing is here – hopefully this helps somebody in the future, figure things out just a bit faster.
1package main
2
3import (
4 "git.apache.org/thrift.git/lib/go/thrift"
5 impala "github.com/Tubular/audience-also-watches/services/ImpalaService"
6 "github.com/Tubular/audience-also-watches/services/beeswax"
7 "fmt"
8 "log"
9 "time"
10)
11
12func main() {
13 addr := "impala-cluster"
14 port := 21000
15
16 log.Println("Starting")
17
18 socket, err := thrift.NewTSocket(fmt.Sprintf("%s:%d", addr, port))
19
20 if err != nil {
21 log.Fatal("Error opening socket:", err)
22
23 return
24 }
25
26 log.Println("Factory Land")
27
28 transportFactory := thrift.NewTBufferedTransportFactory(24*1024*1024)
29 protocolFactory := thrift.NewTBinaryProtocolFactoryDefault()
30
31 transport := transportFactory.GetTransport(socket)
32 defer transport.Close()
33
34 log.Println("Doing Open")
35
36 if err := transport.Open(); err != nil {
37 log.Fatal("Unable to open transport", err)
38
39 return
40 }
41
42 log.Println("Getting Client")
43
44 client := impala.NewImpalaServiceClientFactory(transport, protocolFactory)
45
46 if client == nil {
47 log.Fatal("Unable to estalbish client")
48 return
49 }
50
51 log.Println("Before ping")
52 client.PingImpalaService()
53 log.Println("DONE")
54
55 //
56 log.Println("Building Query")
57
58 query := beeswax.Query{}
59
60 query.Query = "SELECT * FROM engagements LIMIT 10000"
61 query.Configuration = []string{}
62
63 handle, err := client.Query(&query)
64 log.Println("Query sent")
65
66 if err != nil {
67 log.Fatal("QUERY Failed")
68 return
69 }
70
71 for true {
72 time.Sleep(100 * time.Millisecond)
73
74 state, err := client.GetState(handle)
75 if err != nil {
76 log.Println("GetState Error", err)
77 break
78 }
79
80 log.Println("State = ", state)
81
82 if state == beeswax.QueryState_FINISHED {
83 log.Println("Got finish state of", state)
84 break
85 }
86 if state == beeswax.QueryState_EXCEPTION {
87 log.Println("Exception", state)
88 return
89 }
90 }
91
92 log.Println("Fetching results")
93
94 // first := true
95 total := 0
96 startTime := time.Now()
97 for true {
98 // log.Println("Getting Row")
99 result, err := client.Fetch(handle, false, 1000000)
100
101 if err != nil {
102 log.Fatal("Error in fetch", err)
103 return
104 }
105
106 /*
107 if first {
108 first = false
109
110 fmt.Println(result.Columns)
111 }
112 */
113
114 // log.Println("Got count=", len(result.Data))
115
116 total = total + len(result.Data)
117
118 /*
119 for _, row := range(result.Data) {
120 fmt.Println(row)
121 }
122 */
123
124 if ! result.HasMore {
125 break
126 }
127 }
128 log.Printf("Fetch %d rows(s) in %.2fs", total, time.Duration(time.Since(startTime)).Seconds())
129
130 log.Println("All Done")
131}