use our own output buffer

parent a131ea98
...@@ -584,6 +584,7 @@ read_default_config(void) { ...@@ -584,6 +584,7 @@ read_default_config(void) {
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
printf("Reading config from %s\n", DEFAULT_CONFIG); printf("Reading config from %s\n", DEFAULT_CONFIG);
(void) fflush(stdout);
if (CONF_ReadFile(DEFAULT_CONFIG) != 0) if (CONF_ReadFile(DEFAULT_CONFIG) != 0)
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
...@@ -700,6 +701,7 @@ main(int argc, char *argv[]) ...@@ -700,6 +701,7 @@ main(int argc, char *argv[])
if (! EMPTY(cli_config_filename)) { if (! EMPTY(cli_config_filename)) {
printf("Reading config from %s\n", cli_config_filename); printf("Reading config from %s\n", cli_config_filename);
(void) fflush(stdout);
if (CONF_ReadFile(cli_config_filename) != 0) { if (CONF_ReadFile(cli_config_filename) != 0) {
fprintf(stderr, "Error reading config from %s\n", fprintf(stderr, "Error reading config from %s\n",
cli_config_filename); cli_config_filename);
......
...@@ -89,10 +89,7 @@ chunkhead_t wrt_freechunks; ...@@ -89,10 +89,7 @@ chunkhead_t wrt_freechunks;
static unsigned wrt_nfree_tx, wrt_nfree_recs, wrt_nfree_chunks; static unsigned wrt_nfree_tx, wrt_nfree_recs, wrt_nfree_chunks;
static FILE *fo; static int timeout = -1;
static struct pollfd fds[1];
static int blocking = 0, timeout = -1;
static char *obuf = NULL;
static pthread_mutex_t reopen_lock = PTHREAD_MUTEX_INITIALIZER; static pthread_mutex_t reopen_lock = PTHREAD_MUTEX_INITIALIZER;
/* stats */ /* stats */
...@@ -122,6 +119,8 @@ struct outb { ...@@ -122,6 +119,8 @@ struct outb {
int fd; int fd;
}; };
struct outb *fo;
static inline size_t static inline size_t
outb_space(const struct outb *b) outb_space(const struct outb *b)
{ {
...@@ -348,34 +347,18 @@ outb_close(struct outb **bp) ...@@ -348,34 +347,18 @@ outb_close(struct outb **bp)
static int static int
open_log(void) open_log(void)
{ {
struct stat st;
if (EMPTY(config.output_file) || strcmp(config.output_file, "-") == 0) if (EMPTY(config.output_file) || strcmp(config.output_file, "-") == 0)
fo = stdout; fo = outb_dup(1, config.output_bufsiz);
else { else
if ((fo = fopen(config.output_file, config.append ? "a" : "w")) fo = outb_open(config.output_file,
== NULL) O_CREAT | O_WRONLY |
return errno; (config.append ? O_APPEND : O_TRUNC),
if (stat(config.output_file, &st) < 0) 0666,
return errno; config.output_bufsiz);
if (S_ISDIR(st.st_mode))
return EISDIR; if (fo == NULL)
blocking = !S_ISREG(st.st_mode);
if (blocking) {
fds[0].fd = fileno(fo);
fds[0].events = POLLOUT;
}
}
if (obuf != NULL)
free(obuf);
obuf = (char *) malloc(config.output_bufsiz);
if (obuf == NULL)
return errno; return errno;
if (setvbuf(fo, obuf, _IOFBF, config.output_bufsiz) != 0)
return errno;
return 0; return 0;
} }
...@@ -422,13 +405,12 @@ void ...@@ -422,13 +405,12 @@ void
wrt_write(tx_t *tx) wrt_write(tx_t *tx)
{ {
char *os; char *os;
int ready = 1;
size_t len; size_t len;
CHECK_OBJ_NOTNULL(tx, TX_MAGIC); CHECK_OBJ_NOTNULL(tx, TX_MAGIC);
assert(tx->state == TX_SUBMITTED); assert(tx->state == TX_SUBMITTED);
while (reopen && fo != stdout) { while (reopen) {
int errnum; int errnum;
AZ(pthread_mutex_lock(&reopen_lock)); AZ(pthread_mutex_lock(&reopen_lock));
...@@ -437,10 +419,10 @@ wrt_write(tx_t *tx) ...@@ -437,10 +419,10 @@ wrt_write(tx_t *tx)
break; break;
} }
wrt_return_freelist(); wrt_return_freelist();
if (fflush(fo) != 0) if (outb_flush(fo) < 0)
LOG_Log(LOG_ERR, "Cannot flush to %s, DATA DISCARDED: %s", LOG_Log(LOG_ERR, "Cannot flush to %s, DATA DISCARDED: %s",
config.output_file, strerror(errno)); config.output_file, strerror(errno));
if (fclose(fo) != 0) { if (outb_close(&fo) < 0) {
LOG_Log(LOG_ALERT, "Cannot close %s, exiting: %s", LOG_Log(LOG_ALERT, "Cannot close %s, exiting: %s",
config.output_file, strerror(errno)); config.output_file, strerror(errno));
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
...@@ -459,53 +441,7 @@ wrt_write(tx_t *tx) ...@@ -459,53 +441,7 @@ wrt_write(tx_t *tx)
os = FMT_Format(tx, &len); os = FMT_Format(tx, &len);
assert(tx->state == TX_WRITTEN); assert(tx->state == TX_WRITTEN);
if (blocking) { (void) outb_write(fo, os, len);
int nfds;
ready = 0;
do {
double start = VTIM_mono();
nfds = poll(fds, 1, timeout);
pollt += VTIM_mono() - start;
if (nfds < 0)
assert(errno == EAGAIN || errno == EINTR);
} while (nfds < 0);
AZ(fds[0].revents & POLLNVAL);
if (fds[0].revents & POLLERR) {
LOG_Log(LOG_ERR,
"Error waiting for ready output %d (%s), "
"DATA DISCARDED: %s", errno, strerror(errno), os);
errors++;
}
else if (nfds == 0) {
wrt_return_freelist();
LOG_Log(LOG_ERR,
"Timeout waiting for ready output, DATA DISCARDED: %s", os);
timeouts++;
}
else if (nfds != 1)
WRONG("More than one ready file descriptor for output");
else {
AN(fds[0].revents & POLLOUT);
ready = 1;
}
}
if (ready) {
errno = 0;
double start = VTIM_mono();
int items = fwrite(os, 1, len, fo);
writet += VTIM_mono() - start;
if (ferror(fo) || items < len) {
LOG_Log(LOG_ERR, "Output error %d (%s), DATA DISCARDED: %s",
errno, strerror(errno), os);
errors++;
clearerr(fo);
}
else {
writes++;
bytes += len;
}
}
/* clean up */ /* clean up */
DATA_Clear_Tx(tx, &wrt_freetx, &wrt_freerecs, &wrt_freechunks, DATA_Clear_Tx(tx, &wrt_freetx, &wrt_freerecs, &wrt_freechunks,
...@@ -548,11 +484,7 @@ static void ...@@ -548,11 +484,7 @@ static void
* wait until data are available, or quit is signaled. * wait until data are available, or quit is signaled.
* flush ouput and return space before sleeping * flush ouput and return space before sleeping
*/ */
if (fflush(fo) != 0) { (void) outb_flush(fo);
LOG_Log(LOG_ERR, "Output flush failed, error %d (%s)",
errno, strerror(errno));
errors++;
}
wrt_return_freelist(); wrt_return_freelist();
wrt->state = WRT_WAITING; wrt->state = WRT_WAITING;
...@@ -576,11 +508,7 @@ static void ...@@ -576,11 +508,7 @@ static void
CHECK_OBJ(tx, TX_MAGIC); CHECK_OBJ(tx, TX_MAGIC);
wrt_write(tx); wrt_write(tx);
} }
if (fflush(fo) != 0) { (void) outb_flush(fo);
LOG_Log(LOG_ERR, "Output flush failed, error %d (%s)",
errno, strerror(errno));
errors++;
}
wrt->status = EXIT_SUCCESS; wrt->status = EXIT_SUCCESS;
LOG_Log0(LOG_NOTICE, "Writer thread exiting"); LOG_Log0(LOG_NOTICE, "Writer thread exiting");
...@@ -670,7 +598,6 @@ WRT_Fini(void) ...@@ -670,7 +598,6 @@ WRT_Fini(void)
{ {
/* WRT_Halt() must always be called first */ /* WRT_Halt() must always be called first */
AZ(run); AZ(run);
fclose(fo); outb_close(&fo);
free(obuf);
AZ(pthread_mutex_destroy(&reopen_lock)); AZ(pthread_mutex_destroy(&reopen_lock));
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment