Writing an echo server in libev and c++
Looking at event IO frameworks and found some really good comments about the “ev” library, the challenge is that I would rather work in C++ than pure C – I like methods.. Found that the documentation is lacking for the C++ side of the library and needed to build a test harness.
This is a very basic TCP Echo server using libev in c++. The C++ side is only using std::list and the ev side is playing with loop, signals and basic socket IO. It should be a useful bread crumb for anybody trying to build their own service, which of course is my next big activity.
Update: Fixed the small file descriptor leak and now available as a Gist – https://gist.github.com/3364414
1#include <unistd.h>
2#include <fcntl.h>
3#include <string.h>
4#include <stdlib.h>
5#include <ev++.h>
6#include <netinet/in.h>
7#include <sys/socket.h>
8#include <resolv.h>
9#include <errno.h>
10#include <list>
11
12//
13// Buffer class - allow for output buffering such that it can be written out
14// into async pieces
15//
16struct Buffer {
17 char *data;
18 ssize_t len;
19 ssize_t pos;
20
21 Buffer(const char *bytes, ssize_t nbytes) {
22 pos = 0;
23 len = nbytes;
24 data = new char[nbytes];
25 memcpy(data, bytes, nbytes);
26 }
27
28 virtual ~Buffer() {
29 delete [] data;
30 }
31
32 char *dpos() {
33 return data + pos;
34 }
35
36 ssize_t nbytes() {
37 return len - pos;
38 }
39};
40
41//
42// A single instance of a non-blocking Echo handler
43//
44class EchoInstance {
45private:
46 ev::io io;
47 static int total_clients;
48 int sfd;
49
50 // Buffers that are pending write
51 std::list<Buffer*> write_queue;
52
53 // Generic callback
54 void callback(ev::io &watcher, int revents) {
55 if (EV_ERROR & revents) {
56 perror("got invalid event");
57 return;
58 }
59
60 if (revents & EV_READ)
61 read_cb(watcher);
62
63 if (revents & EV_WRITE)
64 write_cb(watcher);
65
66 if (write_queue.empty()) {
67 io.set(ev::READ);
68 } else {
69 io.set(ev::READ|ev::WRITE);
70 }
71 }
72
73 // Socket is writable
74 void write_cb(ev::io &watcher) {
75 if (write_queue.empty()) {
76 io.set(ev::READ);
77 return;
78 }
79
80 Buffer* buffer = write_queue.front();
81
82 ssize_t written = write(watcher.fd, buffer->dpos(), buffer->nbytes());
83 if (written < 0) {
84 perror("read error");
85 return;
86 }
87
88 buffer->pos += written;
89 if (buffer->nbytes() == 0) {
90 write_queue.pop_front();
91 delete buffer;
92 }
93 }
94
95 // Receive message from client socket
96 void read_cb(ev::io &watcher) {
97 char buffer[1024];
98
99 ssize_t nread = recv(watcher.fd, buffer, sizeof(buffer), 0);
100
101 if (nread < 0) {
102 perror("read error");
103 return;
104 }
105
106 if (nread == 0) {
107 // Gack - we're deleting ourself inside of ourself!
108 delete this;
109 } else {
110 // Send message bach to the client
111 write_queue.push_back(new Buffer(buffer, nread));
112 }
113 }
114
115 // effictivly a close and a destroy
116 virtual ~EchoInstance() {
117 // Stop and free watcher if client socket is closing
118 io.stop();
119
120 close(sfd);
121
122 printf("%d client(s) connected.\n", --total_clients);
123 }
124
125public:
126 EchoInstance(int s) : sfd(s) {
127 fcntl(s, F_SETFL, fcntl(s, F_GETFL, 0) | O_NONBLOCK);
128
129 printf("Got connection\n");
130 total_clients++;
131
132 io.set<EchoInstance, &EchoInstance::callback>(this);
133
134 io.start(s, ev::READ);
135 }
136};
137
138class EchoServer {
139private:
140 ev::io io;
141 ev::sig sio;
142 int s;
143
144public:
145
146 void io_accept(ev::io &watcher, int revents) {
147 if (EV_ERROR & revents) {
148 perror("got invalid event");
149 return;
150 }
151
152 struct sockaddr_in client_addr;
153 socklen_t client_len = sizeof(client_addr);
154
155 int client_sd = accept(watcher.fd, (struct sockaddr *)&client_addr, &client_len);
156
157 if (client_sd < 0) {
158 perror("accept error");
159 return;
160 }
161
162 EchoInstance *client = new EchoInstance(client_sd);
163 }
164
165 static void signal_cb(ev::sig &signal, int revents) {
166 signal.loop.break_loop();
167 }
168
169 EchoServer(int port) {
170 printf("Listening on port %d\n", port);
171
172 struct sockaddr_in addr;
173
174 s = socket(PF_INET, SOCK_STREAM, 0);
175
176 addr.sin_family = AF_INET;
177 addr.sin_port = htons(port);
178 addr.sin_addr.s_addr = INADDR_ANY;
179
180 if (bind(s, (struct sockaddr *)&addr, sizeof(addr)) != 0) {
181 perror("bind");
182 }
183
184 fcntl(s, F_SETFL, fcntl(s, F_GETFL, 0) | O_NONBLOCK);
185
186 listen(s, 5);
187
188 io.set<EchoServer, &EchoServer::io_accept>(this);
189 io.start(s, ev::READ);
190
191 sio.set<&EchoServer::signal_cb>();
192 sio.start(SIGINT);
193 }
194
195 virtual ~EchoServer() {
196 shutdown(s, SHUT_RDWR);
197 close(s);
198 }
199};
200
201int EchoInstance::total_clients = 0;
202
203int main(int argc, char **argv)
204{
205 int port = 8192;
206
207 if (argc > 1)
208 port = atoi(argv[1]);
209
210 ev::default_loop loop;
211 EchoServer echo(port);
212
213 loop.run(0);
214
215 return 0;
216}