diff options
-rw-r--r-- | ChangeLog | 8 | ||||
-rw-r--r-- | sshfs.c | 375 |
2 files changed, 269 insertions, 114 deletions
@@ -1,3 +1,11 @@ +2011-11-16 Miklos Szeredi <miklos@szeredi.hu> + + * Submit max 32k reads and writes to the sftp server. Also don't + limit the kernel to 64k reads and writes, rather split into 32k + sized chunks and send them to the server all at once. This is + more efficient and less demanding from the server. Reported by + Ludovic Courtès. Fix suggested by Niels Möller + 2011-11-14 Miklos Szeredi <miklos@szeredi.hu> * Fix double free if reconnection races with request sending. @@ -169,14 +169,27 @@ struct request { struct list_head list; }; +struct sshfs_io { + int num_reqs; + pthread_cond_t finished; + int error; +}; + +struct read_req { + struct sshfs_io *sio; + struct list_head list; + struct buffer data; + size_t size; + ssize_t res; +}; + struct read_chunk { - sem_t ready; off_t offset; size_t size; - struct buffer data; int refs; - int res; long modifver; + struct list_head reqs; + struct sshfs_io sio; }; struct sshfs_file { @@ -1263,8 +1276,14 @@ static void request_free(struct request *req) static void chunk_free(struct read_chunk *chunk) { - buf_free(&chunk->data); - sem_destroy(&chunk->ready); + while (!list_empty(&chunk->reqs)) { + struct read_req *rreq; + + rreq = list_entry(chunk->reqs.prev, struct read_req, list); + list_del(&rreq->list); + buf_free(&rreq->data); + g_free(rreq); + } g_free(chunk); } @@ -2463,131 +2482,178 @@ static int sshfs_release(const char *path, struct fuse_file_info *fi) return 0; } -static int sshfs_sync_read(struct sshfs_file *sf, char *rbuf, size_t size, - off_t offset) -{ - int err; - struct buffer buf; - struct buffer data; - struct buffer *handle = &sf->handle; - buf_init(&buf, 0); - buf_add_buf(&buf, handle); - buf_add_uint64(&buf, offset); - buf_add_uint32(&buf, size); - err = sftp_request(SSH_FXP_READ, &buf, SSH_FXP_DATA, &data); - if (!err) { - uint32_t retsize; - err = -EIO; - if (buf_get_uint32(&data, &retsize) != -1) { - if (retsize > size) - fprintf(stderr, "long read\n"); - else { - buf_get_mem(&data, rbuf, retsize); - err = retsize; - } - } - buf_free(&data); - } else if (err == MY_EOF) - err = 0; - buf_free(&buf); - return err; -} - static void sshfs_read_end(struct request *req) { - struct read_chunk *chunk = (struct read_chunk *) req->data; + struct read_req *rreq = (struct read_req *) req->data; if (req->error) - chunk->res = req->error; + rreq->res = req->error; else if (req->replied) { - chunk->res = -EIO; + rreq->res = -EIO; if (req->reply_type == SSH_FXP_STATUS) { uint32_t serr; if (buf_get_uint32(&req->reply, &serr) != -1) { if (serr == SSH_FX_EOF) - chunk->res = 0; + rreq->res = 0; + } else { + rreq->res = -sftp_error_to_errno(serr); } } else if (req->reply_type == SSH_FXP_DATA) { uint32_t retsize; if (buf_get_uint32(&req->reply, &retsize) != -1) { - if (retsize > chunk->size) + if (retsize > rreq->size) { fprintf(stderr, "long read\n"); - else { - chunk->res = retsize; - chunk->data = req->reply; + } else if (buf_check_get(&req->reply, retsize) != -1) { + rreq->res = retsize; + rreq->data = req->reply; buf_init(&req->reply, 0); } } - } else + } else { fprintf(stderr, "protocol error\n"); - } else - chunk->res = -EIO; + } + } else { + rreq->res = -EIO; + } - sem_post(&chunk->ready); - chunk_put(chunk); + rreq->sio->num_reqs--; + if (!rreq->sio->num_reqs) + pthread_cond_broadcast(&rreq->sio->finished); } static void sshfs_read_begin(struct request *req) { - struct read_chunk *chunk = (struct read_chunk *) req->data; - chunk->refs++; + struct read_req *rreq = (struct read_req *) req->data; + rreq->sio->num_reqs++; } -static void sshfs_send_async_read(struct sshfs_file *sf, - struct read_chunk *chunk) +static struct read_chunk *sshfs_send_read(struct sshfs_file *sf, size_t size, + off_t offset) { - struct buffer buf; + struct read_chunk *chunk = g_new0(struct read_chunk, 1); struct buffer *handle = &sf->handle; - struct iovec iov; - buf_init(&buf, 0); - buf_add_buf(&buf, handle); - buf_add_uint64(&buf, chunk->offset); - buf_add_uint32(&buf, chunk->size); - buf_to_iov(&buf, &iov); - sftp_request_send(SSH_FXP_READ, &iov, 1, sshfs_read_begin, - sshfs_read_end, 0, chunk, NULL); - buf_free(&buf); + pthread_cond_init(&chunk->sio.finished, NULL); + list_init(&chunk->reqs); + chunk->size = size; + chunk->offset = offset; + chunk->refs = 1; + + while (size) { + int err; + struct buffer buf; + struct iovec iov[1]; + struct read_req *rreq; + size_t bsize = size < sshfs.max_read ? size : sshfs.max_read; + + rreq = g_new0(struct read_req, 1); + rreq->sio = &chunk->sio; + rreq->size = bsize; + buf_init(&rreq->data, 0); + list_add(&rreq->list, &chunk->reqs); + + buf_init(&buf, 0); + buf_add_buf(&buf, handle); + buf_add_uint64(&buf, offset); + buf_add_uint32(&buf, bsize); + buf_to_iov(&buf, &iov[0]); + err = sftp_request_send(SSH_FXP_READ, iov, 1, + sshfs_read_begin, + sshfs_read_end, + 0, rreq, NULL); + + buf_free(&buf); + if (err) + break; + + size -= bsize; + offset += bsize; + } + + return chunk; } -static void submit_read(struct sshfs_file *sf, size_t size, off_t offset, - struct read_chunk **chunkp) +static int wait_chunk(struct read_chunk *chunk, char *buf, size_t size) { - struct read_chunk *chunk = g_new0(struct read_chunk, 1); + int res = 0; + struct read_req *rreq; - sem_init(&chunk->ready, 0, 0); - buf_init(&chunk->data, 0); - chunk->offset = offset; - chunk->size = size; - chunk->refs = 1; - pthread_mutex_lock(&sshfs.lock); - chunk->modifver = sshfs.modifver; - pthread_mutex_unlock(&sshfs.lock); - sshfs_send_async_read(sf, chunk); pthread_mutex_lock(&sshfs.lock); - chunk_put(*chunkp); - *chunkp = chunk; + while (chunk->sio.num_reqs) + pthread_cond_wait(&chunk->sio.finished, &sshfs.lock); pthread_mutex_unlock(&sshfs.lock); -} -static int wait_chunk(struct read_chunk *chunk, char *buf, size_t size) -{ - int res; - while (sem_wait(&chunk->ready)); - res = chunk->res; + + if (chunk->sio.error) { + if (chunk->sio.error != MY_EOF) + res = chunk->sio.error; + + goto out; + } + + while (!list_empty(&chunk->reqs) && size) { + rreq = list_entry(chunk->reqs.prev, struct read_req, list); + + if (rreq->res < 0) { + chunk->sio.error = rreq->res; + break; + } if (rreq->res == 0) { + chunk->sio.error = MY_EOF; + break; + } else if (size < (size_t) rreq->res) { + buf_get_mem(&rreq->data, buf, size); + rreq->res -= size; + rreq->size -= size; + res += size; + break; + } else { + buf_get_mem(&rreq->data, buf, rreq->res); + res += rreq->res; + if ((size_t) rreq->res < rreq->size) { + chunk->sio.error = MY_EOF; + break; + } + buf += rreq->res; + size -= rreq->res; + list_del(&rreq->list); + buf_free(&rreq->data); + g_free(rreq); + } + } + if (res > 0) { - if ((size_t) res > size) - res = size; - buf_get_mem(&chunk->data, buf, res); chunk->offset += res; chunk->size -= res; - chunk->res -= res; } - sem_post(&chunk->ready); + +out: chunk_put_locked(chunk); return res; } +static int sshfs_sync_read(struct sshfs_file *sf, char *buf, size_t size, + off_t offset) +{ + struct read_chunk *chunk; + + chunk = sshfs_send_read(sf, size, offset); + return wait_chunk(chunk, buf, size); +} + +static void submit_read(struct sshfs_file *sf, size_t size, off_t offset, + struct read_chunk **chunkp) +{ + struct read_chunk *chunk; + + chunk = sshfs_send_read(sf, size, offset); + pthread_mutex_lock(&sshfs.lock); + chunk->modifver = sshfs.modifver; + chunk_put(*chunkp); + *chunkp = chunk; + chunk->refs++; + pthread_mutex_unlock(&sshfs.lock); +} + static struct read_chunk *search_read_chunk(struct sshfs_file *sf, off_t offset) { struct read_chunk *ch = sf->readahead; @@ -2691,14 +2757,110 @@ static void sshfs_write_end(struct request *req) sshfs_file_put(sf); } +static int sshfs_async_write(struct sshfs_file *sf, const char *wbuf, + size_t size, off_t offset) +{ + int err = 0; + struct buffer *handle = &sf->handle; + + while (!err && size) { + struct buffer buf; + struct iovec iov[2]; + size_t bsize = size < sshfs.max_write ? size : sshfs.max_write; + + buf_init(&buf, 0); + buf_add_buf(&buf, handle); + buf_add_uint64(&buf, offset); + buf_add_uint32(&buf, bsize); + buf_to_iov(&buf, &iov[0]); + iov[1].iov_base = (void *) wbuf; + iov[1].iov_len = bsize; + err = sftp_request_send(SSH_FXP_WRITE, iov, 2, + sshfs_write_begin, sshfs_write_end, + 0, sf, NULL); + buf_free(&buf); + size -= bsize; + wbuf += bsize; + offset += bsize; + } + + return err; +} + +static void sshfs_sync_write_begin(struct request *req) +{ + struct sshfs_io *sio = (struct sshfs_io *) req->data; + sio->num_reqs++; +} + +static void sshfs_sync_write_end(struct request *req) +{ + uint32_t serr; + struct sshfs_io *sio = (struct sshfs_io *) req->data; + + if (req->error) { + sio->error = req->error; + } else if (req->replied) { + if (req->reply_type != SSH_FXP_STATUS) { + fprintf(stderr, "protocol error\n"); + } else if (buf_get_uint32(&req->reply, &serr) != -1 && + serr != SSH_FX_OK) { + sio->error = -EIO; + } + } + sio->num_reqs--; + if (!sio->num_reqs) + pthread_cond_broadcast(&sio->finished); +} + + +static int sshfs_sync_write(struct sshfs_file *sf, const char *wbuf, + size_t size, off_t offset) +{ + int err = 0; + struct buffer *handle = &sf->handle; + struct sshfs_io sio = { .error = 0, .num_reqs = 0 }; + + pthread_cond_init(&sio.finished, NULL); + + while (!err && size) { + struct buffer buf; + struct iovec iov[2]; + size_t bsize = size < sshfs.max_write ? size : sshfs.max_write; + + buf_init(&buf, 0); + buf_add_buf(&buf, handle); + buf_add_uint64(&buf, offset); + buf_add_uint32(&buf, bsize); + buf_to_iov(&buf, &iov[0]); + iov[1].iov_base = (void *) wbuf; + iov[1].iov_len = bsize; + err = sftp_request_send(SSH_FXP_WRITE, iov, 2, + sshfs_sync_write_begin, + sshfs_sync_write_end, + 0, &sio, NULL); + buf_free(&buf); + size -= bsize; + wbuf += bsize; + offset += bsize; + } + + pthread_mutex_lock(&sshfs.lock); + while (sio.num_reqs) + pthread_cond_wait(&sio.finished, &sshfs.lock); + pthread_mutex_unlock(&sshfs.lock); + + if (!err) + err = sio.error; + + return err; +} + static int sshfs_write(const char *path, const char *wbuf, size_t size, off_t offset, struct fuse_file_info *fi) { int err; - struct buffer buf; struct sshfs_file *sf = get_sshfs_file(fi); - struct buffer *handle = &sf->handle; - struct iovec iov[2]; (void) path; @@ -2706,22 +2868,12 @@ static int sshfs_write(const char *path, const char *wbuf, size_t size, return -EIO; sshfs_inc_modifver(); - buf_init(&buf, 0); - buf_add_buf(&buf, handle); - buf_add_uint64(&buf, offset); - buf_add_uint32(&buf, size); - buf_to_iov(&buf, &iov[0]); - iov[1].iov_base = (void *) wbuf; - iov[1].iov_len = size; - if (!sshfs.sync_write && !sf->write_error) { - err = sftp_request_send(SSH_FXP_WRITE, iov, 2, - sshfs_write_begin, sshfs_write_end, - 0, sf, NULL); - } else { - err = sftp_request_iov(SSH_FXP_WRITE, iov, 2, SSH_FXP_STATUS, - NULL); - } - buf_free(&buf); + + if (!sshfs.sync_write && !sf->write_error) + err = sshfs_async_write(sf, wbuf, size, offset); + else + err = sshfs_sync_write(sf, wbuf, size, offset); + return err ? err : (int) size; } @@ -3413,8 +3565,9 @@ int main(int argc, char *argv[]) g_thread_init(NULL); sshfs.blksize = 4096; - sshfs.max_read = 65536; - sshfs.max_write = 65536; + /* SFTP spec says all servers should allow at least 32k I/O */ + sshfs.max_read = 32768; + sshfs.max_write = 32768; sshfs.nodelay_workaround = 1; sshfs.nodelaysrv_workaround = 0; #if __APPLE__ @@ -3501,12 +3654,6 @@ int main(int argc, char *argv[]) if (fuse_is_lib_option("ac_attr_timeout=")) fuse_opt_insert_arg(&args, 1, "-oauto_cache,ac_attr_timeout=0"); - tmp = g_strdup_printf("-omax_read=%u", sshfs.max_read); - fuse_opt_insert_arg(&args, 1, tmp); - g_free(tmp); - tmp = g_strdup_printf("-omax_write=%u", sshfs.max_write); - fuse_opt_insert_arg(&args, 1, tmp); - g_free(tmp); #if FUSE_VERSION >= 27 libver = fuse_version(); assert(libver >= 27); |