Many hyperlinks are disabled.
Use anonymous login
to enable hyperlinks.
Overview
Comment: | Initial import. Compiles and tests clean on Linux, FreeBSD. |
---|---|
Downloads: | Tarball | ZIP archive |
Timelines: | family | trunk |
Files: | files | file ages | folders |
SHA1: |
ea36ea13eb77f34dbdb8f80cb68f4227 |
User & Date: | aspect 2015-03-10 04:51:24.857 |
Context
2015-03-10
| ||
04:51 | Initial import. Compiles and tests clean on Linux, FreeBSD. Leaf check-in: ea36ea13eb user: aspect tags: trunk | |
Changes
Added CHANGES.
> > > > > > > > > | 1 2 3 4 5 6 7 8 9 | This file is uninteresting unless you are interested in the silly decisions I had to revise while building this. Bad idea #1: line-oriented communication. Works best with FILE*s, but they don't play well with select. Length+value comms are actually better here so that's good. Bad idea #2: storing thread state in a contiguous array, indexed from 0 to num_workers. Works lovely until you want to prune threads, at which point trying to memmove() entries back into contiguity becomes a terrible idea. Oops. Fortunately the only state the thread needs is a nice little fd which fits in a void*. Bad idea #3: not reporting errors, and not leaving any scope for exceptional communication. Okay that isn't a bad idea yet, and it's actually not true because I can put an invalid character at the start of a message. But it might be a bad idea. Bad idea #4: trying to use select()/read() with line buffering on stdin in the test program. Messes up input redirection. Simple answer is to use a thread (I never thought I'd say that!) which does blocking gets() on stdin and packets them up for an uncooked socket to the select() loop. |
Added Makefile.
> > > > > > > > > > > > > > | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 | DEBUG_CFLAGS = -g -O0 -DDEBUG CFLAGS = -Wall #CFLAGS += $(DEBUG_CFLAGS) LDFLAGS = -lpthread ares: ares.o ares.o: ares.c clean: rm ares ares.o test: ares test.out ./ares www.tcl.tk nonexist localhost < /dev/null | diff -u - test.out |
Added README.
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 | "ares" is lightweight asynchronous resolver for POSIX systems, designed as a first step to getting this functionality into portable scripting languages (notably Tcl) without large code footprint or overhead. By which I mean around 500 loc and 10k ELF executable. The basic design uses pthreads. One dispatcher thread, managing N resolver threads. Communication with the dispatcher is over a (AF_LOCAL) socket. The dispatcher sits in a select() loop, forwarding requests to (spawned on demand) threads. Resolvers use gethostbyname() and gethostbyaddr() to resolve the given hostname to a set of IP address strings. Once spawned, resolver threads wait around to be reused, or to be terminated by a special request to the dispatcher. Client code only needs to be aware of a single socket, which can be trivially integrated into a select() loop in your main program. Threads are all created with detachstate=1, so they can be cleaned up by closing file descriptors without having to call pthread_wait(). == Tested on == * Linux (Debian jessie, gcc + glibc) * FreeBSD (10.1, clang) == System Requirements == The following POSIX interfaces are assumed: * libpthread, including pthread_setdetachstate() so we don't have to wait * socketpair * getnameinfo/getaddrinfo == Internal Communication == Requests and responses are exchanged in length+value encoding, with the length a uint16_t in host byte order and the value an (unterminated) ASCII string. Two layers of communication are used: host program <-> dispatcher and dispatcher <-> resolver. Both request and response strings must be no longer than NI_HOSTMAX. Dispatcher request: | length | hostname | Dispatcher response: | length | hostname | length | ipaddr | Resolver request: | length | hostname | Resolver responses: | length | ipaddr | Special cases: * a 0-length request tells the dispatcher to kill all idle threads * a 0-length ipaddr follows the final result for a given request * dispatcher signals resolvers to shut down by closing their fd * main signals dispatcher to shut down by closing its fd == Out of Scope == This utility isn't very general, so it makes certain assumptions: * responses are always strings from getaddrinfo(), and include IPv4 and v6 addresses without discrimination. * getnameinfo flags are fixed at AF_UNSPEC + IPPROTO_TCP. The latter only to ensure we do not get duplicate IP address responses for a hostname. * getaddrinfo hints are fixed at AI_ADDRCONFIG. * no support for IDN. == Rejected Alternatives == * http://c-ares.haxx.se/: rolls its own resolv.conf parser, ignores nsswitch, loads of code * getaddrinfo_a: glibc-only, though it builds on POSIX sigevent/lio_listio * asr.h: BSD's version. Also non-portable * OpenMP: much, much nicer than pthread, and almost as available, but unlike Tcl_Thread Tcl users may be interested in Treso, which is more mature and complete and knows how to use <asr.h> on BSD. When not using asr, Treso uses a single thread (and pipe) per request and makes no attempt to cap the number of threads spawned. == Usage == Parameters are always const. `int ares_init(int)`: takes a maximum number of resolver threads as a parameter, and returns a file descriptor. `void ares_request(int fd, char *hostname, int hlen)` |
Added TODO.
> > > > > | 1 2 3 4 5 | * figure out if I can bear make an ares_wait(), which will only have use for testing * make READSV a macro. And WRITESV? for variadic dispatch * librify * test on multiple platforms * Brace for Tcl'ing! |
Added ares.c.
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 | /* * FIXMEs: * - check eof * - paranoid rc checking? Search "(void) " * - READSV as a macro? */ #ifdef DEBUG #define DPRINT(...) fprintf(stderr, "DEBUG:" __VA_ARGS__) #else #define DPRINT(...) /* ... */ #endif #include <assert.h> #include <pthread.h> #include <sys/types.h> #include <sys/socket.h> #include <netdb.h> #include <netinet/in.h> // needed for IPPROTO_TCP on BSD #include <string.h> #include <stdlib.h> #include <stdio.h> #include <unistd.h> #include <sys/select.h> #include <stdint.h> #include <fcntl.h> struct worker_state { int state; pthread_t threadid; int fd; char *hostname; // resolving, or null }; struct dispatcher_state { pthread_t masterid; int masterfd; int max_workers; int n_workers; struct worker_state workers[0]; }; void *dispatcher_thread(void *arg); void dispatcher_cleanup(void *arg); void *resolver_thread(void *arg); void resolver_cleanup(void *arg); int do_resolve(int fd, const char *hostname); /* * Helpers */ void set_nonblock(const int fd) { int flags; flags = fcntl(fd, F_GETFD); flags |= O_NONBLOCK; fcntl(fd, F_SETFD, flags); } int writesv(int fd, uint16_t len, void *buf) { DPRINT("writesv %d %d '%.*s'\n", fd, len, len, buf); int rc; rc = write(fd, &len, sizeof(len)); if(rc != sizeof(len)) { return -1; } rc = write(fd, buf, len); if(rc != len) { return -1; } return 0; } int readsv(int fd, uint16_t *len, void *buf) { DPRINT("readsv %d %d '%.*s'\n", fd, len, len, buf); int rc; rc = read(fd, len, sizeof(*len)); if(rc != sizeof(*len)) { return (rc == 0) ? -1 : rc; } rc = read(fd, buf, *len); if(rc != *len) { return (rc == 0) ? -1 : rc; } return 0; } // as a macro, we can assert on sizeof(msgbuf) #define READSV(fd, msglen, msgbuf) \ do { \ int rc; \ rc = read(fd, &msglen, sizeof(msglen)); \ assert( rc == sizeof(msglen) ); \ assert( msglen <= sizeof(msgbuf) ); \ rc = read(fd, msgbuf, msglen); \ assert( rc == msglen ); \ } while (0) int max(const int a, const int b) { return a > b ? a : b; } int min(const int a, const int b) { return a < b ? a : b; } /* * "Generic" setup for socketpair-driven thread pool * Not generic enough, as it understands dispatcher_state */ int sockdrawer_init(int fd, int max_workers, void *(*worker)(void*)) { struct dispatcher_state *state; pthread_attr_t attr; int rc; int ssize = sizeof(struct dispatcher_state) + max_workers * sizeof(struct worker_state); state = calloc(1, ssize); state->masterfd = fd; state->max_workers = max_workers; (void) pthread_attr_init(&attr); (void) pthread_attr_setdetachstate(&attr, 1); // don't have to clean up rc = pthread_create(&state->masterid, &attr, worker, state); if(rc) { return -rc; } pthread_attr_destroy(&attr); return 0; } /* * Main loop for dispatcher thread */ void *dispatcher_thread(void *arg) { struct dispatcher_state *state = arg; fd_set rfds; int masterfd; int maxfd; int nfds; int i; int rc; uint16_t msglen; char msgbuf[NI_MAXHOST]; pthread_cleanup_push(&dispatcher_cleanup, state); masterfd = state->masterfd; state->n_workers = 0; while(1) { FD_ZERO(&rfds); nfds = 0; maxfd = 0; for(i=0; i < state->n_workers; ++i) { if(state->workers[i].hostname) { FD_SET(state->workers[i].fd, &rfds); ++nfds; maxfd = max(maxfd, state->workers[i].fd); } } if(nfds < state->max_workers) { FD_SET(masterfd, &rfds); maxfd = max(maxfd, masterfd); } else { DPRINT("dispatcher: saturated! Ignoring masterfd"); } DPRINT("dispatcher: entering select for %d up to %d\n", nfds, maxfd); nfds = select(maxfd+1, &rfds, NULL, NULL, NULL); DPRINT("dispatcher: %d fds ready\n", nfds); if(nfds < 0) { perror("dispatcher select"); break; } if(FD_ISSET(masterfd, &rfds)) { DPRINT("dispatcher: reading on masterfd\n"); // read a message rc = read(masterfd, &msglen, sizeof(msglen)); if(rc != sizeof(msglen)) { DPRINT("dispatcher: read returned %d (not %d), bailing!\n", rc, sizeof(msglen)); break; } assert(msglen <= sizeof(msgbuf)); rc = read(masterfd, msgbuf, msglen); if(rc != msglen) { DPRINT("dispatcher: read returned %d (not %d), bailing!\n", rc, msglen); break; } if(msglen == 0) { DPRINT("dispatcher: pruning threads\n"); for(i = state->n_workers-1; i >= 0; --i) { if(state->workers[i].hostname == NULL) { DPRINT("dispatcher: shutting down worker %d\n", i); close(state->workers[i].fd); memmove( &state->workers[i], &state->workers[i+1], sizeof(state->workers[0]) * (state->n_workers-1 - i)); --state->n_workers; } else { DPRINT("dispatcher: not shutting down worker %d\n", i); } } DPRINT("dispatcher: pruning finished, %d workers left\n", state->n_workers); continue; // back to select } // is there an available worker? for(i = 0; i < state->n_workers; ++i) { if(state->workers[i].hostname == NULL) { break; } } DPRINT("dispatcher: sending '%.*s' to worker %d\n", msglen, msgbuf, i); if(i == state->n_workers) { assert(i <= state->max_workers); // this won't be reached pthread_attr_t attr; int fds[2]; (void) pthread_attr_init(&attr); (void) pthread_attr_setdetachstate(&attr, 1); // don't have to clean up rc = socketpair(AF_LOCAL, SOCK_STREAM, 0, fds); if(rc) { perror("dispatcher socketpair"); exit(rc); } state->workers[i].fd = fds[0]; rc = pthread_create(&state->workers[i].threadid, &attr, &resolver_thread, (void*) (intptr_t) fds[1]); if(rc) { perror("dispatcher pthread_create"); exit(rc); } state->n_workers ++; } // mark the worker busy state->workers[i].hostname = strndup(msgbuf, msglen); // send on the message writesv(state->workers[i].fd, msglen, msgbuf); DPRINT("dispatcher: worker %d is resolving '%.*s'\n", i, msglen, msgbuf); --nfds; } for(i=0; nfds > 0 && i < state->n_workers; ++i) { if(FD_ISSET(state->workers[i].fd, &rfds)) { DPRINT("dispatcher: reading from worker %d\n", i); // read the result (void) read(state->workers[i].fd, &msglen, 2); DPRINT("dispatcher: reading from worker %d: %d bytes\n", i, msglen); assert(msglen <= sizeof(msgbuf)); (void) read(state->workers[i].fd, msgbuf, msglen); DPRINT("dispatcher: read from worker %d: '%.*s'\n", i, msglen, msgbuf); // write it back, prefixed with the hostname DPRINT("dispatcher: writing back to %d: '%s %.*s'\n", state->masterfd, state->workers[i].hostname, msglen, msgbuf); (void) writesv(state->masterfd, strlen(state->workers[i].hostname), state->workers[i].hostname); (void) writesv(state->masterfd, msglen, msgbuf); if(msglen == 0) { // mark the worker available free(state->workers[i].hostname); state->workers[i].hostname = NULL; } if(--nfds < 1) { break; } } } } pthread_cleanup_pop(1); // yep, go POSIX macros! rc = 0; pthread_exit(&rc); return NULL; } void dispatcher_cleanup(void *arg) { struct dispatcher_state *state = arg; int i; for(i=0; i < state->n_workers; ++i) { (void) pthread_cancel(state->workers[i].threadid); // strictly unneccessary - the close will do close(state->workers[i].fd); } free(state); } /* * Main loop for resolver thread(s) */ void *resolver_thread(void *arg) { int fd = (int) (intptr_t) arg; int rc; uint16_t msglen; char msgbuf[NI_MAXHOST]; pthread_cleanup_push(&resolver_cleanup, arg); while(1) { DPRINT("worker %d: reading\n", fd); rc = read(fd, &msglen, 2); if(rc != 2) break; DPRINT("worker %d: reading %d bytes\n", fd, msglen); assert(msglen < sizeof(msgbuf)); rc = read(fd, msgbuf, msglen); if(rc != msglen) break; DPRINT("worker %d: read '%.*s'\n", fd, msglen, msgbuf); msgbuf[msglen] = '\0'; rc = do_resolve(fd, msgbuf); // FIXME: ignore RC? DPRINT("worker %d: resolved %d addresses\n", fd, rc); rc = writesv(fd, 0, ""); // indicate completion if(rc < 0) break; } pthread_cleanup_pop(1); rc = 0; pthread_exit(&rc); return NULL; } void resolver_cleanup(void *arg) { int fd = (int) (intptr_t) arg; close(fd); } /* * Resolve given hostname, emitting results (size16, value) * on given fd. * Result is num results, <0 on error. */ int do_resolve(int fd, const char *hostname) { struct addrinfo hints; struct addrinfo *ai; int rc; int i; struct addrinfo *rp; char ipstr[NI_MAXHOST]; // too big! // initialise hints memset(&hints, 0, sizeof(hints)); hints.ai_family = AF_UNSPEC; hints.ai_socktype = 0; hints.ai_protocol = IPPROTO_TCP; // so we don't get multiple instances of each addr hints.ai_flags = AI_ADDRCONFIG; // AI_IDN ? // can use AI_V4MAPPED on linux+glibc, but not on FreeBSD // which has it defined, but doesn't support it. Nor is it in man. wtf. // resolve rc = getaddrinfo(hostname, NULL, &hints, &ai); if(rc) { DPRINT("do_resolve: gai(%s)\n", gai_strerror(rc)); return rc < 0 ? rc : -rc; } i = 0; for(rp = ai; rp != NULL; rp = rp->ai_next) { rc = getnameinfo(rp->ai_addr, rp->ai_addrlen, ipstr, sizeof(ipstr), NULL, 0, NI_NUMERICHOST); if(rc) { DPRINT("error in getnaminfo for '%s' (name %d): %d\n", hostname, i, rc); continue; } (void) writesv(fd, strlen(ipstr), ipstr); ++i; } freeaddrinfo(ai); return i; } /* * Main interface */ int ares_init(const int max_workers) { int rc; int fds[2]; rc = socketpair(AF_LOCAL, SOCK_STREAM, 0, fds); if(rc < 0) { return rc; } sockdrawer_init(fds[0], 4, &dispatcher_thread); return fds[1]; } void ares_request(const int fd, const char *hostname, const size_t hlen) { uint16_t len; len = hlen; (void) write(fd, &len, sizeof(uint16_t)); (void) write(fd, hostname, len); } /* * hostname and ipstr should be at least NI_MAXHOST bytes. Otherwise the * results may be truncated. Results are not null-terminated. */ void ares_getresponse(int fd, char *hostname, size_t *hostlen, char *ipstr, size_t *iplen) { char msgbuf[NI_MAXHOST]; uint16_t msglen; DPRINT("resolving into buffers of size %d, %d\n", *hostlen, *iplen); read(fd, &msglen, sizeof(msglen)); read(fd, msgbuf, msglen); *hostlen = min(*hostlen, msglen); memcpy(hostname, msgbuf, *hostlen); read(fd, &msglen, sizeof(msglen)); read(fd, msgbuf, msglen); *iplen = min(*iplen, msglen); memcpy(ipstr, msgbuf, *iplen); } void ares_prunethreads(const int fd) { (void) writesv(fd, 0, ""); } void ares_shutdown(const int fd) { close(fd); } /* * Test program */ void *stdin_thread(void *arg) { int fd = (int) (intptr_t) arg; char buf[NI_MAXHOST]; int len; while(fgets(buf, sizeof(buf), stdin)) { len = strlen(buf); if(buf[len-1] == '\n') { buf[--len] = '\0'; } writesv(fd, len, buf); } close(fd); return arg; } int stdin_fd() { int fds[2]; pthread_t thread; pthread_attr_t attr; (void) socketpair(AF_LOCAL, SOCK_STREAM, 0, fds); (void) pthread_attr_init(&attr); (void) pthread_attr_setdetachstate(&attr, 1); // don't have to clean up (void) pthread_create(&thread, &attr, &stdin_thread, (void*) (intptr_t) fds[1]); return fds[0]; } int main(int argc, char **argv) { fd_set rfds; int infd; int maxfd; int nfds; int i; int rc; char msgbuf[NI_MAXHOST]; uint16_t msglen; char hostbuf[NI_MAXHOST]; size_t hostlen; char ipbuf[NI_MAXHOST]; size_t iplen; int resfd = ares_init(4); if(resfd < 0) { perror("ares_init"); return -1; } for(i = 1; i < argc; ++i) { printf("request '%s'\n", argv[i]); ares_request(resfd, argv[i], strlen(argv[i])); } /* * select on stdin, resfd */ infd = stdin_fd(); FD_ZERO(&rfds); maxfd = max(resfd, infd); while(1) { DPRINT("select ...\n"); FD_SET(infd, &rfds); FD_SET(resfd, &rfds); nfds = select(maxfd+1, &rfds, NULL, NULL, NULL); if(nfds < 0) { perror("select"); return -1; } if(FD_ISSET(infd, &rfds)) { // FIXME: catch EOF rc = read(infd, &msglen, sizeof(msglen)); if(rc < sizeof(msglen)) { break; // probably EOF } rc = read(infd, msgbuf, msglen); assert( rc == msglen ); if(msglen < 1) { ares_prunethreads(resfd); printf("pruning threads ..\n"); continue; } printf("request '%.*s'\n", msglen, msgbuf); ares_request(resfd, msgbuf, msglen); } if(FD_ISSET(resfd, &rfds)) { hostlen = sizeof(hostbuf); iplen = sizeof(ipbuf); ares_getresponse(resfd, hostbuf, &hostlen, ipbuf, &iplen); printf("result '%.*s' = '%.*s'\n", (int) hostlen, hostbuf, (int) iplen, ipbuf); } } ares_shutdown(resfd); printf("ares has been shut down\n"); return 0; } |