Replace fellow_cache_ref_in with a simpler implementation for auxattr

parent f8e2f6da
......@@ -3663,177 +3663,6 @@ fellow_cache_seg_check(struct fellow_cache_seg *fcs)
return (err);
}
enum fellow_cache_io_e {
FCIO_SYNC,
FCIO_ASYNC
};
/* for potentially multiple segments from the same fco, grab references and read
* them
*/
static void
fellow_cache_seg_ref_in(struct fellow_cache *fc, enum fellow_cache_io_e type,
struct fellow_cache_seg * const *segs, const unsigned n)
{
struct fellow_cache_seg *fcs, *iosegs[n], *racesegs[n];
struct fellow_cache_obj *fco;
unsigned u, ion = 0, racen = 0;
unsigned req2seg[n];
uint8_t r;
struct buddy_ptr_extent mem[n];
struct buddy_reqs *reqs;
struct buddy_returns *rets;
CHECK_OBJ_NOTNULL(*segs, FELLOW_CACHE_SEG_MAGIC);
AN(n);
fco = (*segs)->fco;
CHECK_OBJ_NOTNULL(fco, FELLOW_CACHE_OBJ_MAGIC);
struct fellow_lru_chgbatch lcb[1] =
FELLOW_LRU_CHGBATCH_INIT(lcb, fco, 64);
assert(n <= BUDDY_REQS_MAX);
reqs = BUDDY_REQS_STK(fc->membuddy, BUDDY_REQS_MAX);
rets = BUDDY_RETURNS_STK(fc->membuddy, BUDDY_RETURNS_MAX);
memset(iosegs, 0, sizeof iosegs);
memset(racesegs, 0, sizeof racesegs);
memset(req2seg, 0, sizeof req2seg);
memset(mem, 0, sizeof mem);
/* opportunistically alloc memory and conduct checks on the way */
for (r = 0, u = 0; u < n; u++) {
struct fellow_disk_seg *fds;
BUDDY_REQS_PRI(reqs, FEP_ITER);
fcs = segs[u];
CHECK_OBJ_NOTNULL(fcs, FELLOW_CACHE_SEG_MAGIC);
fds = fcs->disk_seg;
CHECK_OBJ_NOTNULL(fds, FELLOW_DISK_SEG_MAGIC);
assert(fcs->fco == fco);
switch (fcs->state) {
case FCS_DISK:
req2seg[r++] = u;
AN(buddy_req_extent(reqs,
fellow_rndup(fc->ffd, fds->seg.size), 0));
break;
default:
break;
}
if (r != BUDDY_REQS_MAX)
continue;
r = buddy_alloc_wait(reqs);
assert(r == BUDDY_REQS_MAX);
while (r--) {
AZ(mem[req2seg[r]].ptr);
mem[req2seg[r]] = buddy_get_ptr_extent(reqs, r);
}
buddy_alloc_wait_done(reqs);
r = 0;
}
r = buddy_alloc_wait(reqs);
while (r--) {
AZ(mem[req2seg[r]].ptr);
mem[req2seg[r]] = buddy_get_ptr_extent(reqs, r);
}
buddy_alloc_wait_done(reqs);
r = 0;
/* grab references, remember segments which need io and those for which
* we raced memory alloc
*/
AZ(pthread_mutex_lock(&fco->mtx));
for (u = 0; u < n; u++) {
fcs = segs[u];
(void) fellow_cache_seg_ref_locked(lcb, fcs);
while (type == FCIO_SYNC &&
(fcs->state == FCS_BUSY || fcs->state == FCS_READING)) {
fellow_cache_lru_chgbatch_apply(lcb);
fellow_cache_seg_wait_locked(fcs);
}
switch (fcs->state) {
case FCS_BUSY:
case FCS_READING:
assert(type == FCIO_ASYNC);
break;
case FCS_READFAIL:
INCOMPL();
break;
case FCS_WRITING:
case FCS_CHECK:
case FCS_INCORE:
break;
case FCS_DISK:
if (mem[u].ptr == NULL) {
/*
* race exception (should be rare). we do not
* want to malloc under the fco mtx, so we
* restart later
*/
(void) fellow_cache_seg_deref_locked(NULL, fcs);
racesegs[racen++] = fcs;
break;
}
// reference for io
(void) fellow_cache_seg_ref_locked(NULL, fcs);
fcs->alloc = mem[u];
mem[u] = buddy_ptr_extent_nil;
fellow_cache_seg_transition_locked_notincore(fcs,
FCS_READING);
iosegs[ion++] = fcs;
break;
default:
WRONG("cache_seg_in state");
}
}
/*
* when we get here, the fco must already be ref'd
* each io get a reference on the fco also, plus
*/
AN(FCO_REFCNT(fco));
FCO_REFCNT(fco) += ion;
fellow_cache_lru_chgbatch_apply(lcb);
AZ(pthread_mutex_unlock(&fco->mtx));
/* free unused allocations */
for (u = 0; u < n; u++) {
if (mem[u].ptr == NULL)
continue;
AN(buddy_return_ptr_extent(rets, &mem[u]));
}
buddy_return(rets);
/* issue ios */
switch (type) {
case FCIO_SYNC:
fellow_cache_seg_sync_read(fc, iosegs, ion);
break;
case FCIO_ASYNC:
fellow_cache_seg_async_read(fc, iosegs, ion);
break;
default:
WRONG("io type");
}
/* retry any raced */
if (racen > 0)
fellow_cache_seg_ref_in(fc, type, racesegs, racen);
}
#define NREQS (newreqs ? 1 : 0)
#define OREQS (newreqs ? 0 : 1)
......@@ -4234,6 +4063,79 @@ fellow_cache_obj_iter(struct fellow_cache *fc, struct fellow_cache_obj *fco,
return (fcr);
}
/* Auxiliary attributes
* ref is never returned and just ignored by
* fellow_cache_seg_auxattr_free()
*
*/
static void
fellow_cache_seg_auxattr_ref_in(
struct fellow_cache *fc,
struct fellow_cache_seg *fcs)
{
struct fellow_cache_obj *fco;
struct buddy_ptr_extent mem;
unsigned io = 0;
CHECK_OBJ_NOTNULL(fcs, FELLOW_CACHE_SEG_MAGIC);
if (fcs->state == FCS_INCORE && fcs->refcnt >= 3)
return;
fco = fcs->fco;
CHECK_OBJ_NOTNULL(fco, FELLOW_CACHE_OBJ_MAGIC);
struct fellow_lru_chgbatch lcb[1] =
FELLOW_LRU_CHGBATCH_INIT(lcb, fco, 1);
/* racy memory allocation */
mem = buddy_alloc1_ptr_extent_wait(fc->membuddy, FEP_META,
fellow_rndup(fc->ffd, fcs->disk_seg->seg.size), 0);
AN(mem.ptr);
AZ(pthread_mutex_lock(&fco->mtx));
(void) fellow_cache_seg_ref_locked(lcb, fcs);
while (fcs->state == FCS_BUSY || fcs->state == FCS_READING) {
fellow_cache_lru_chgbatch_apply(lcb);
fellow_cache_seg_wait_locked(fcs);
}
switch (fcs->state) {
case FCS_BUSY:
case FCS_READING:
WRONG("auxattr ref_in: state can't be BUSY or READING");
break;
case FCS_READFAIL:
INCOMPL();
break;
case FCS_WRITING:
case FCS_CHECK:
case FCS_INCORE:
break;
case FCS_DISK:
// reference for io
(void) fellow_cache_seg_ref_locked(lcb, fcs);
fcs->alloc = mem;
mem = buddy_ptr_extent_nil;
fellow_cache_seg_transition_locked_notincore(fcs, FCS_READING);
io = 1;
break;
default:
WRONG("cache_seg_in state");
}
AN(FCO_REFCNT(fco));
FCO_REFCNT(fco) += io;
fellow_cache_lru_chgbatch_apply(lcb);
AZ(pthread_mutex_unlock(&fco->mtx));
if (mem.ptr)
buddy_return1_ptr_extent(fc->membuddy, &mem);
if (io)
fellow_cache_seg_sync_read(fc, &fcs, 1);
}
static struct fellow_cache_res
fellow_busy_body_seg_next(struct fellow_busy *fbo)
{
......@@ -5333,7 +5235,7 @@ fellow_cache_obj_getattr(struct fellow_cache *fc,
struct fellow_cache_seg *fcs = &fco->aa_##l##_seg; \
if (fcs->disk_seg->seg.size == 0) \
return (NULL); \
fellow_cache_seg_ref_in(fc, FCIO_SYNC, &fcs, 1); \
fellow_cache_seg_auxattr_ref_in(fc, fcs); \
XXXAZ(fellow_cache_seg_check(fcs)); \
*len = fcs->len; \
return (fcs->alloc.ptr); \
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment