aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--ChangeLog8
-rw-r--r--sshfs.c375
2 files changed, 269 insertions, 114 deletions
diff --git a/ChangeLog b/ChangeLog
index 4c6ba2d..8f11917 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,3 +1,11 @@
+2011-11-16 Miklos Szeredi <miklos@szeredi.hu>
+
+ * Submit max 32k reads and writes to the sftp server. Also don't
+ limit the kernel to 64k reads and writes, rather split into 32k
+ sized chunks and send them to the server all at once. This is
+ more efficient and less demanding from the server. Reported by
+ Ludovic Courtès. Fix suggested by Niels Möller
+
2011-11-14 Miklos Szeredi <miklos@szeredi.hu>
* Fix double free if reconnection races with request sending.
diff --git a/sshfs.c b/sshfs.c
index 632bb1c..e14aa71 100644
--- a/sshfs.c
+++ b/sshfs.c
@@ -146,14 +146,27 @@ struct request {
struct list_head list;
};
+struct sshfs_io {
+ int num_reqs;
+ pthread_cond_t finished;
+ int error;
+};
+
+struct read_req {
+ struct sshfs_io *sio;
+ struct list_head list;
+ struct buffer data;
+ size_t size;
+ ssize_t res;
+};
+
struct read_chunk {
- sem_t ready;
off_t offset;
size_t size;
- struct buffer data;
int refs;
- int res;
long modifver;
+ struct list_head reqs;
+ struct sshfs_io sio;
};
struct sshfs_file {
@@ -1191,8 +1204,14 @@ static void request_free(struct request *req)
static void chunk_free(struct read_chunk *chunk)
{
- buf_free(&chunk->data);
- sem_destroy(&chunk->ready);
+ while (!list_empty(&chunk->reqs)) {
+ struct read_req *rreq;
+
+ rreq = list_entry(chunk->reqs.prev, struct read_req, list);
+ list_del(&rreq->list);
+ buf_free(&rreq->data);
+ g_free(rreq);
+ }
g_free(chunk);
}
@@ -2348,131 +2367,178 @@ static int sshfs_release(const char *path, struct fuse_file_info *fi)
return 0;
}
-static int sshfs_sync_read(struct sshfs_file *sf, char *rbuf, size_t size,
- off_t offset)
-{
- int err;
- struct buffer buf;
- struct buffer data;
- struct buffer *handle = &sf->handle;
- buf_init(&buf, 0);
- buf_add_buf(&buf, handle);
- buf_add_uint64(&buf, offset);
- buf_add_uint32(&buf, size);
- err = sftp_request(SSH_FXP_READ, &buf, SSH_FXP_DATA, &data);
- if (!err) {
- uint32_t retsize;
- err = -EIO;
- if (buf_get_uint32(&data, &retsize) != -1) {
- if (retsize > size)
- fprintf(stderr, "long read\n");
- else {
- buf_get_mem(&data, rbuf, retsize);
- err = retsize;
- }
- }
- buf_free(&data);
- } else if (err == MY_EOF)
- err = 0;
- buf_free(&buf);
- return err;
-}
-
static void sshfs_read_end(struct request *req)
{
- struct read_chunk *chunk = (struct read_chunk *) req->data;
+ struct read_req *rreq = (struct read_req *) req->data;
if (req->error)
- chunk->res = req->error;
+ rreq->res = req->error;
else if (req->replied) {
- chunk->res = -EIO;
+ rreq->res = -EIO;
if (req->reply_type == SSH_FXP_STATUS) {
uint32_t serr;
if (buf_get_uint32(&req->reply, &serr) != -1) {
if (serr == SSH_FX_EOF)
- chunk->res = 0;
+ rreq->res = 0;
+ } else {
+ rreq->res = -sftp_error_to_errno(serr);
}
} else if (req->reply_type == SSH_FXP_DATA) {
uint32_t retsize;
if (buf_get_uint32(&req->reply, &retsize) != -1) {
- if (retsize > chunk->size)
+ if (retsize > rreq->size) {
fprintf(stderr, "long read\n");
- else {
- chunk->res = retsize;
- chunk->data = req->reply;
+ } else if (buf_check_get(&req->reply, retsize) != -1) {
+ rreq->res = retsize;
+ rreq->data = req->reply;
buf_init(&req->reply, 0);
}
}
- } else
+ } else {
fprintf(stderr, "protocol error\n");
- } else
- chunk->res = -EIO;
+ }
+ } else {
+ rreq->res = -EIO;
+ }
- sem_post(&chunk->ready);
- chunk_put(chunk);
+ rreq->sio->num_reqs--;
+ if (!rreq->sio->num_reqs)
+ pthread_cond_broadcast(&rreq->sio->finished);
}
static void sshfs_read_begin(struct request *req)
{
- struct read_chunk *chunk = (struct read_chunk *) req->data;
- chunk->refs++;
+ struct read_req *rreq = (struct read_req *) req->data;
+ rreq->sio->num_reqs++;
}
-static void sshfs_send_async_read(struct sshfs_file *sf,
- struct read_chunk *chunk)
+static struct read_chunk *sshfs_send_read(struct sshfs_file *sf, size_t size,
+ off_t offset)
{
- struct buffer buf;
+ struct read_chunk *chunk = g_new0(struct read_chunk, 1);
struct buffer *handle = &sf->handle;
- struct iovec iov;
- buf_init(&buf, 0);
- buf_add_buf(&buf, handle);
- buf_add_uint64(&buf, chunk->offset);
- buf_add_uint32(&buf, chunk->size);
- buf_to_iov(&buf, &iov);
- sftp_request_send(SSH_FXP_READ, &iov, 1, sshfs_read_begin,
- sshfs_read_end, 0, chunk, NULL);
- buf_free(&buf);
+ pthread_cond_init(&chunk->sio.finished, NULL);
+ list_init(&chunk->reqs);
+ chunk->size = size;
+ chunk->offset = offset;
+ chunk->refs = 1;
+
+ while (size) {
+ int err;
+ struct buffer buf;
+ struct iovec iov[1];
+ struct read_req *rreq;
+ size_t bsize = size < sshfs.max_read ? size : sshfs.max_read;
+
+ rreq = g_new0(struct read_req, 1);
+ rreq->sio = &chunk->sio;
+ rreq->size = bsize;
+ buf_init(&rreq->data, 0);
+ list_add(&rreq->list, &chunk->reqs);
+
+ buf_init(&buf, 0);
+ buf_add_buf(&buf, handle);
+ buf_add_uint64(&buf, offset);
+ buf_add_uint32(&buf, bsize);
+ buf_to_iov(&buf, &iov[0]);
+ err = sftp_request_send(SSH_FXP_READ, iov, 1,
+ sshfs_read_begin,
+ sshfs_read_end,
+ 0, rreq, NULL);
+
+ buf_free(&buf);
+ if (err)
+ break;
+
+ size -= bsize;
+ offset += bsize;
+ }
+
+ return chunk;
}
-static void submit_read(struct sshfs_file *sf, size_t size, off_t offset,
- struct read_chunk **chunkp)
+static int wait_chunk(struct read_chunk *chunk, char *buf, size_t size)
{
- struct read_chunk *chunk = g_new0(struct read_chunk, 1);
+ int res = 0;
+ struct read_req *rreq;
- sem_init(&chunk->ready, 0, 0);
- buf_init(&chunk->data, 0);
- chunk->offset = offset;
- chunk->size = size;
- chunk->refs = 1;
- pthread_mutex_lock(&sshfs.lock);
- chunk->modifver = sshfs.modifver;
- pthread_mutex_unlock(&sshfs.lock);
- sshfs_send_async_read(sf, chunk);
pthread_mutex_lock(&sshfs.lock);
- chunk_put(*chunkp);
- *chunkp = chunk;
+ while (chunk->sio.num_reqs)
+ pthread_cond_wait(&chunk->sio.finished, &sshfs.lock);
pthread_mutex_unlock(&sshfs.lock);
-}
-static int wait_chunk(struct read_chunk *chunk, char *buf, size_t size)
-{
- int res;
- while (sem_wait(&chunk->ready));
- res = chunk->res;
+
+ if (chunk->sio.error) {
+ if (chunk->sio.error != MY_EOF)
+ res = chunk->sio.error;
+
+ goto out;
+ }
+
+ while (!list_empty(&chunk->reqs) && size) {
+ rreq = list_entry(chunk->reqs.prev, struct read_req, list);
+
+ if (rreq->res < 0) {
+ chunk->sio.error = rreq->res;
+ break;
+ } if (rreq->res == 0) {
+ chunk->sio.error = MY_EOF;
+ break;
+ } else if (size < (size_t) rreq->res) {
+ buf_get_mem(&rreq->data, buf, size);
+ rreq->res -= size;
+ rreq->size -= size;
+ res += size;
+ break;
+ } else {
+ buf_get_mem(&rreq->data, buf, rreq->res);
+ res += rreq->res;
+ if ((size_t) rreq->res < rreq->size) {
+ chunk->sio.error = MY_EOF;
+ break;
+ }
+ buf += rreq->res;
+ size -= rreq->res;
+ list_del(&rreq->list);
+ buf_free(&rreq->data);
+ g_free(rreq);
+ }
+ }
+
if (res > 0) {
- if ((size_t) res > size)
- res = size;
- buf_get_mem(&chunk->data, buf, res);
chunk->offset += res;
chunk->size -= res;
- chunk->res -= res;
}
- sem_post(&chunk->ready);
+
+out:
chunk_put_locked(chunk);
return res;
}
+static int sshfs_sync_read(struct sshfs_file *sf, char *buf, size_t size,
+ off_t offset)
+{
+ struct read_chunk *chunk;
+
+ chunk = sshfs_send_read(sf, size, offset);
+ return wait_chunk(chunk, buf, size);
+}
+
+static void submit_read(struct sshfs_file *sf, size_t size, off_t offset,
+ struct read_chunk **chunkp)
+{
+ struct read_chunk *chunk;
+
+ chunk = sshfs_send_read(sf, size, offset);
+ pthread_mutex_lock(&sshfs.lock);
+ chunk->modifver = sshfs.modifver;
+ chunk_put(*chunkp);
+ *chunkp = chunk;
+ chunk->refs++;
+ pthread_mutex_unlock(&sshfs.lock);
+}
+
static struct read_chunk *search_read_chunk(struct sshfs_file *sf, off_t offset)
{
struct read_chunk *ch = sf->readahead;
@@ -2576,14 +2642,110 @@ static void sshfs_write_end(struct request *req)
sshfs_file_put(sf);
}
+static int sshfs_async_write(struct sshfs_file *sf, const char *wbuf,
+ size_t size, off_t offset)
+{
+ int err = 0;
+ struct buffer *handle = &sf->handle;
+
+ while (!err && size) {
+ struct buffer buf;
+ struct iovec iov[2];
+ size_t bsize = size < sshfs.max_write ? size : sshfs.max_write;
+
+ buf_init(&buf, 0);
+ buf_add_buf(&buf, handle);
+ buf_add_uint64(&buf, offset);
+ buf_add_uint32(&buf, bsize);
+ buf_to_iov(&buf, &iov[0]);
+ iov[1].iov_base = (void *) wbuf;
+ iov[1].iov_len = bsize;
+ err = sftp_request_send(SSH_FXP_WRITE, iov, 2,
+ sshfs_write_begin, sshfs_write_end,
+ 0, sf, NULL);
+ buf_free(&buf);
+ size -= bsize;
+ wbuf += bsize;
+ offset += bsize;
+ }
+
+ return err;
+}
+
+static void sshfs_sync_write_begin(struct request *req)
+{
+ struct sshfs_io *sio = (struct sshfs_io *) req->data;
+ sio->num_reqs++;
+}
+
+static void sshfs_sync_write_end(struct request *req)
+{
+ uint32_t serr;
+ struct sshfs_io *sio = (struct sshfs_io *) req->data;
+
+ if (req->error) {
+ sio->error = req->error;
+ } else if (req->replied) {
+ if (req->reply_type != SSH_FXP_STATUS) {
+ fprintf(stderr, "protocol error\n");
+ } else if (buf_get_uint32(&req->reply, &serr) != -1 &&
+ serr != SSH_FX_OK) {
+ sio->error = -EIO;
+ }
+ }
+ sio->num_reqs--;
+ if (!sio->num_reqs)
+ pthread_cond_broadcast(&sio->finished);
+}
+
+
+static int sshfs_sync_write(struct sshfs_file *sf, const char *wbuf,
+ size_t size, off_t offset)
+{
+ int err = 0;
+ struct buffer *handle = &sf->handle;
+ struct sshfs_io sio = { .error = 0, .num_reqs = 0 };
+
+ pthread_cond_init(&sio.finished, NULL);
+
+ while (!err && size) {
+ struct buffer buf;
+ struct iovec iov[2];
+ size_t bsize = size < sshfs.max_write ? size : sshfs.max_write;
+
+ buf_init(&buf, 0);
+ buf_add_buf(&buf, handle);
+ buf_add_uint64(&buf, offset);
+ buf_add_uint32(&buf, bsize);
+ buf_to_iov(&buf, &iov[0]);
+ iov[1].iov_base = (void *) wbuf;
+ iov[1].iov_len = bsize;
+ err = sftp_request_send(SSH_FXP_WRITE, iov, 2,
+ sshfs_sync_write_begin,
+ sshfs_sync_write_end,
+ 0, &sio, NULL);
+ buf_free(&buf);
+ size -= bsize;
+ wbuf += bsize;
+ offset += bsize;
+ }
+
+ pthread_mutex_lock(&sshfs.lock);
+ while (sio.num_reqs)
+ pthread_cond_wait(&sio.finished, &sshfs.lock);
+ pthread_mutex_unlock(&sshfs.lock);
+
+ if (!err)
+ err = sio.error;
+
+ return err;
+}
+
static int sshfs_write(const char *path, const char *wbuf, size_t size,
off_t offset, struct fuse_file_info *fi)
{
int err;
- struct buffer buf;
struct sshfs_file *sf = get_sshfs_file(fi);
- struct buffer *handle = &sf->handle;
- struct iovec iov[2];
(void) path;
@@ -2591,22 +2753,12 @@ static int sshfs_write(const char *path, const char *wbuf, size_t size,
return -EIO;
sshfs_inc_modifver();
- buf_init(&buf, 0);
- buf_add_buf(&buf, handle);
- buf_add_uint64(&buf, offset);
- buf_add_uint32(&buf, size);
- buf_to_iov(&buf, &iov[0]);
- iov[1].iov_base = (void *) wbuf;
- iov[1].iov_len = size;
- if (!sshfs.sync_write && !sf->write_error) {
- err = sftp_request_send(SSH_FXP_WRITE, iov, 2,
- sshfs_write_begin, sshfs_write_end,
- 0, sf, NULL);
- } else {
- err = sftp_request_iov(SSH_FXP_WRITE, iov, 2, SSH_FXP_STATUS,
- NULL);
- }
- buf_free(&buf);
+
+ if (!sshfs.sync_write && !sf->write_error)
+ err = sshfs_async_write(sf, wbuf, size, offset);
+ else
+ err = sshfs_sync_write(sf, wbuf, size, offset);
+
return err ? err : (int) size;
}
@@ -3272,8 +3424,9 @@ int main(int argc, char *argv[])
g_thread_init(NULL);
sshfs.blksize = 4096;
- sshfs.max_read = 65536;
- sshfs.max_write = 65536;
+ /* SFTP spec says all servers should allow at least 32k I/O */
+ sshfs.max_read = 32768;
+ sshfs.max_write = 32768;
sshfs.nodelay_workaround = 1;
sshfs.nodelaysrv_workaround = 0;
sshfs.rename_workaround = 0;
@@ -3353,12 +3506,6 @@ int main(int argc, char *argv[])
if (fuse_is_lib_option("ac_attr_timeout="))
fuse_opt_insert_arg(&args, 1, "-oauto_cache,ac_attr_timeout=0");
- tmp = g_strdup_printf("-omax_read=%u", sshfs.max_read);
- fuse_opt_insert_arg(&args, 1, tmp);
- g_free(tmp);
- tmp = g_strdup_printf("-omax_write=%u", sshfs.max_write);
- fuse_opt_insert_arg(&args, 1, tmp);
- g_free(tmp);
#if FUSE_VERSION >= 27
libver = fuse_version();
assert(libver >= 27);