]> rtime.felk.cvut.cz Git - linux-imx.git/blob - net/sunrpc/clnt.c
Merge tag 'sound-3.11' of git://git.kernel.org/pub/scm/linux/kernel/git/tiwai/sound
[linux-imx.git] / net / sunrpc / clnt.c
1 /*
2  *  linux/net/sunrpc/clnt.c
3  *
4  *  This file contains the high-level RPC interface.
5  *  It is modeled as a finite state machine to support both synchronous
6  *  and asynchronous requests.
7  *
8  *  -   RPC header generation and argument serialization.
9  *  -   Credential refresh.
10  *  -   TCP connect handling.
11  *  -   Retry of operation when it is suspected the operation failed because
12  *      of uid squashing on the server, or when the credentials were stale
13  *      and need to be refreshed, or when a packet was damaged in transit.
14  *      This may be have to be moved to the VFS layer.
15  *
16  *  Copyright (C) 1992,1993 Rick Sladkey <jrs@world.std.com>
17  *  Copyright (C) 1995,1996 Olaf Kirch <okir@monad.swb.de>
18  */
19
20
21 #include <linux/module.h>
22 #include <linux/types.h>
23 #include <linux/kallsyms.h>
24 #include <linux/mm.h>
25 #include <linux/namei.h>
26 #include <linux/mount.h>
27 #include <linux/slab.h>
28 #include <linux/utsname.h>
29 #include <linux/workqueue.h>
30 #include <linux/in.h>
31 #include <linux/in6.h>
32 #include <linux/un.h>
33 #include <linux/rcupdate.h>
34
35 #include <linux/sunrpc/clnt.h>
36 #include <linux/sunrpc/addr.h>
37 #include <linux/sunrpc/rpc_pipe_fs.h>
38 #include <linux/sunrpc/metrics.h>
39 #include <linux/sunrpc/bc_xprt.h>
40 #include <trace/events/sunrpc.h>
41
42 #include "sunrpc.h"
43 #include "netns.h"
44
45 #ifdef RPC_DEBUG
46 # define RPCDBG_FACILITY        RPCDBG_CALL
47 #endif
48
49 #define dprint_status(t)                                        \
50         dprintk("RPC: %5u %s (status %d)\n", t->tk_pid,         \
51                         __func__, t->tk_status)
52
53 /*
54  * All RPC clients are linked into this list
55  */
56
57 static DECLARE_WAIT_QUEUE_HEAD(destroy_wait);
58
59
60 static void     call_start(struct rpc_task *task);
61 static void     call_reserve(struct rpc_task *task);
62 static void     call_reserveresult(struct rpc_task *task);
63 static void     call_allocate(struct rpc_task *task);
64 static void     call_decode(struct rpc_task *task);
65 static void     call_bind(struct rpc_task *task);
66 static void     call_bind_status(struct rpc_task *task);
67 static void     call_transmit(struct rpc_task *task);
68 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
69 static void     call_bc_transmit(struct rpc_task *task);
70 #endif /* CONFIG_SUNRPC_BACKCHANNEL */
71 static void     call_status(struct rpc_task *task);
72 static void     call_transmit_status(struct rpc_task *task);
73 static void     call_refresh(struct rpc_task *task);
74 static void     call_refreshresult(struct rpc_task *task);
75 static void     call_timeout(struct rpc_task *task);
76 static void     call_connect(struct rpc_task *task);
77 static void     call_connect_status(struct rpc_task *task);
78
79 static __be32   *rpc_encode_header(struct rpc_task *task);
80 static __be32   *rpc_verify_header(struct rpc_task *task);
81 static int      rpc_ping(struct rpc_clnt *clnt);
82
83 static void rpc_register_client(struct rpc_clnt *clnt)
84 {
85         struct net *net = rpc_net_ns(clnt);
86         struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
87
88         spin_lock(&sn->rpc_client_lock);
89         list_add(&clnt->cl_clients, &sn->all_clients);
90         spin_unlock(&sn->rpc_client_lock);
91 }
92
93 static void rpc_unregister_client(struct rpc_clnt *clnt)
94 {
95         struct net *net = rpc_net_ns(clnt);
96         struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
97
98         spin_lock(&sn->rpc_client_lock);
99         list_del(&clnt->cl_clients);
100         spin_unlock(&sn->rpc_client_lock);
101 }
102
103 static void __rpc_clnt_remove_pipedir(struct rpc_clnt *clnt)
104 {
105         if (clnt->cl_dentry) {
106                 if (clnt->cl_auth && clnt->cl_auth->au_ops->pipes_destroy)
107                         clnt->cl_auth->au_ops->pipes_destroy(clnt->cl_auth);
108                 rpc_remove_client_dir(clnt->cl_dentry);
109         }
110         clnt->cl_dentry = NULL;
111 }
112
113 static void rpc_clnt_remove_pipedir(struct rpc_clnt *clnt)
114 {
115         struct net *net = rpc_net_ns(clnt);
116         struct super_block *pipefs_sb;
117
118         pipefs_sb = rpc_get_sb_net(net);
119         if (pipefs_sb) {
120                 __rpc_clnt_remove_pipedir(clnt);
121                 rpc_put_sb_net(net);
122         }
123 }
124
125 static struct dentry *rpc_setup_pipedir_sb(struct super_block *sb,
126                                     struct rpc_clnt *clnt,
127                                     const char *dir_name)
128 {
129         static uint32_t clntid;
130         char name[15];
131         struct qstr q = { .name = name };
132         struct dentry *dir, *dentry;
133         int error;
134
135         dir = rpc_d_lookup_sb(sb, dir_name);
136         if (dir == NULL) {
137                 pr_info("RPC: pipefs directory doesn't exist: %s\n", dir_name);
138                 return dir;
139         }
140         for (;;) {
141                 q.len = snprintf(name, sizeof(name), "clnt%x", (unsigned int)clntid++);
142                 name[sizeof(name) - 1] = '\0';
143                 q.hash = full_name_hash(q.name, q.len);
144                 dentry = rpc_create_client_dir(dir, &q, clnt);
145                 if (!IS_ERR(dentry))
146                         break;
147                 error = PTR_ERR(dentry);
148                 if (error != -EEXIST) {
149                         printk(KERN_INFO "RPC: Couldn't create pipefs entry"
150                                         " %s/%s, error %d\n",
151                                         dir_name, name, error);
152                         break;
153                 }
154         }
155         dput(dir);
156         return dentry;
157 }
158
159 static int
160 rpc_setup_pipedir(struct rpc_clnt *clnt, const char *dir_name,
161                   struct super_block *pipefs_sb)
162 {
163         struct dentry *dentry;
164
165         clnt->cl_dentry = NULL;
166         if (dir_name == NULL)
167                 return 0;
168         dentry = rpc_setup_pipedir_sb(pipefs_sb, clnt, dir_name);
169         if (IS_ERR(dentry))
170                 return PTR_ERR(dentry);
171         clnt->cl_dentry = dentry;
172         return 0;
173 }
174
175 static inline int rpc_clnt_skip_event(struct rpc_clnt *clnt, unsigned long event)
176 {
177         if (((event == RPC_PIPEFS_MOUNT) && clnt->cl_dentry) ||
178             ((event == RPC_PIPEFS_UMOUNT) && !clnt->cl_dentry))
179                 return 1;
180         if ((event == RPC_PIPEFS_MOUNT) && atomic_read(&clnt->cl_count) == 0)
181                 return 1;
182         return 0;
183 }
184
185 static int __rpc_clnt_handle_event(struct rpc_clnt *clnt, unsigned long event,
186                                    struct super_block *sb)
187 {
188         struct dentry *dentry;
189         int err = 0;
190
191         switch (event) {
192         case RPC_PIPEFS_MOUNT:
193                 dentry = rpc_setup_pipedir_sb(sb, clnt,
194                                               clnt->cl_program->pipe_dir_name);
195                 if (!dentry)
196                         return -ENOENT;
197                 if (IS_ERR(dentry))
198                         return PTR_ERR(dentry);
199                 clnt->cl_dentry = dentry;
200                 if (clnt->cl_auth->au_ops->pipes_create) {
201                         err = clnt->cl_auth->au_ops->pipes_create(clnt->cl_auth);
202                         if (err)
203                                 __rpc_clnt_remove_pipedir(clnt);
204                 }
205                 break;
206         case RPC_PIPEFS_UMOUNT:
207                 __rpc_clnt_remove_pipedir(clnt);
208                 break;
209         default:
210                 printk(KERN_ERR "%s: unknown event: %ld\n", __func__, event);
211                 return -ENOTSUPP;
212         }
213         return err;
214 }
215
216 static int __rpc_pipefs_event(struct rpc_clnt *clnt, unsigned long event,
217                                 struct super_block *sb)
218 {
219         int error = 0;
220
221         for (;; clnt = clnt->cl_parent) {
222                 if (!rpc_clnt_skip_event(clnt, event))
223                         error = __rpc_clnt_handle_event(clnt, event, sb);
224                 if (error || clnt == clnt->cl_parent)
225                         break;
226         }
227         return error;
228 }
229
230 static struct rpc_clnt *rpc_get_client_for_event(struct net *net, int event)
231 {
232         struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
233         struct rpc_clnt *clnt;
234
235         spin_lock(&sn->rpc_client_lock);
236         list_for_each_entry(clnt, &sn->all_clients, cl_clients) {
237                 if (clnt->cl_program->pipe_dir_name == NULL)
238                         continue;
239                 if (rpc_clnt_skip_event(clnt, event))
240                         continue;
241                 spin_unlock(&sn->rpc_client_lock);
242                 return clnt;
243         }
244         spin_unlock(&sn->rpc_client_lock);
245         return NULL;
246 }
247
248 static int rpc_pipefs_event(struct notifier_block *nb, unsigned long event,
249                             void *ptr)
250 {
251         struct super_block *sb = ptr;
252         struct rpc_clnt *clnt;
253         int error = 0;
254
255         while ((clnt = rpc_get_client_for_event(sb->s_fs_info, event))) {
256                 error = __rpc_pipefs_event(clnt, event, sb);
257                 if (error)
258                         break;
259         }
260         return error;
261 }
262
263 static struct notifier_block rpc_clients_block = {
264         .notifier_call  = rpc_pipefs_event,
265         .priority       = SUNRPC_PIPEFS_RPC_PRIO,
266 };
267
268 int rpc_clients_notifier_register(void)
269 {
270         return rpc_pipefs_notifier_register(&rpc_clients_block);
271 }
272
273 void rpc_clients_notifier_unregister(void)
274 {
275         return rpc_pipefs_notifier_unregister(&rpc_clients_block);
276 }
277
278 static void rpc_clnt_set_nodename(struct rpc_clnt *clnt, const char *nodename)
279 {
280         clnt->cl_nodelen = strlen(nodename);
281         if (clnt->cl_nodelen > UNX_MAXNODENAME)
282                 clnt->cl_nodelen = UNX_MAXNODENAME;
283         memcpy(clnt->cl_nodename, nodename, clnt->cl_nodelen);
284 }
285
286 static int rpc_client_register(const struct rpc_create_args *args,
287                                struct rpc_clnt *clnt)
288 {
289         const struct rpc_program *program = args->program;
290         struct rpc_auth *auth;
291         struct net *net = rpc_net_ns(clnt);
292         struct super_block *pipefs_sb;
293         int err;
294
295         pipefs_sb = rpc_get_sb_net(net);
296         if (pipefs_sb) {
297                 err = rpc_setup_pipedir(clnt, program->pipe_dir_name, pipefs_sb);
298                 if (err)
299                         goto out;
300         }
301
302         rpc_register_client(clnt);
303         if (pipefs_sb)
304                 rpc_put_sb_net(net);
305
306         auth = rpcauth_create(args->authflavor, clnt);
307         if (IS_ERR(auth)) {
308                 dprintk("RPC:       Couldn't create auth handle (flavor %u)\n",
309                                 args->authflavor);
310                 err = PTR_ERR(auth);
311                 goto err_auth;
312         }
313         return 0;
314 err_auth:
315         pipefs_sb = rpc_get_sb_net(net);
316         __rpc_clnt_remove_pipedir(clnt);
317 out:
318         if (pipefs_sb)
319                 rpc_put_sb_net(net);
320         return err;
321 }
322
323 static struct rpc_clnt * rpc_new_client(const struct rpc_create_args *args, struct rpc_xprt *xprt)
324 {
325         const struct rpc_program *program = args->program;
326         const struct rpc_version *version;
327         struct rpc_clnt         *clnt = NULL;
328         int err;
329
330         /* sanity check the name before trying to print it */
331         dprintk("RPC:       creating %s client for %s (xprt %p)\n",
332                         program->name, args->servername, xprt);
333
334         err = rpciod_up();
335         if (err)
336                 goto out_no_rpciod;
337
338         err = -EINVAL;
339         if (args->version >= program->nrvers)
340                 goto out_err;
341         version = program->version[args->version];
342         if (version == NULL)
343                 goto out_err;
344
345         err = -ENOMEM;
346         clnt = kzalloc(sizeof(*clnt), GFP_KERNEL);
347         if (!clnt)
348                 goto out_err;
349         clnt->cl_parent = clnt;
350
351         rcu_assign_pointer(clnt->cl_xprt, xprt);
352         clnt->cl_procinfo = version->procs;
353         clnt->cl_maxproc  = version->nrprocs;
354         clnt->cl_protname = program->name;
355         clnt->cl_prog     = args->prognumber ? : program->number;
356         clnt->cl_vers     = version->number;
357         clnt->cl_stats    = program->stats;
358         clnt->cl_metrics  = rpc_alloc_iostats(clnt);
359         err = -ENOMEM;
360         if (clnt->cl_metrics == NULL)
361                 goto out_no_stats;
362         clnt->cl_program  = program;
363         INIT_LIST_HEAD(&clnt->cl_tasks);
364         spin_lock_init(&clnt->cl_lock);
365
366         if (!xprt_bound(xprt))
367                 clnt->cl_autobind = 1;
368
369         clnt->cl_timeout = xprt->timeout;
370         if (args->timeout != NULL) {
371                 memcpy(&clnt->cl_timeout_default, args->timeout,
372                                 sizeof(clnt->cl_timeout_default));
373                 clnt->cl_timeout = &clnt->cl_timeout_default;
374         }
375
376         clnt->cl_rtt = &clnt->cl_rtt_default;
377         rpc_init_rtt(&clnt->cl_rtt_default, clnt->cl_timeout->to_initval);
378         clnt->cl_principal = NULL;
379         if (args->client_name) {
380                 clnt->cl_principal = kstrdup(args->client_name, GFP_KERNEL);
381                 if (!clnt->cl_principal)
382                         goto out_no_principal;
383         }
384
385         atomic_set(&clnt->cl_count, 1);
386
387         /* save the nodename */
388         rpc_clnt_set_nodename(clnt, utsname()->nodename);
389
390         err = rpc_client_register(args, clnt);
391         if (err)
392                 goto out_no_path;
393         return clnt;
394
395 out_no_path:
396         kfree(clnt->cl_principal);
397 out_no_principal:
398         rpc_free_iostats(clnt->cl_metrics);
399 out_no_stats:
400         kfree(clnt);
401 out_err:
402         rpciod_down();
403 out_no_rpciod:
404         xprt_put(xprt);
405         return ERR_PTR(err);
406 }
407
408 /**
409  * rpc_create - create an RPC client and transport with one call
410  * @args: rpc_clnt create argument structure
411  *
412  * Creates and initializes an RPC transport and an RPC client.
413  *
414  * It can ping the server in order to determine if it is up, and to see if
415  * it supports this program and version.  RPC_CLNT_CREATE_NOPING disables
416  * this behavior so asynchronous tasks can also use rpc_create.
417  */
418 struct rpc_clnt *rpc_create(struct rpc_create_args *args)
419 {
420         struct rpc_xprt *xprt;
421         struct rpc_clnt *clnt;
422         struct xprt_create xprtargs = {
423                 .net = args->net,
424                 .ident = args->protocol,
425                 .srcaddr = args->saddress,
426                 .dstaddr = args->address,
427                 .addrlen = args->addrsize,
428                 .servername = args->servername,
429                 .bc_xprt = args->bc_xprt,
430         };
431         char servername[48];
432
433         if (args->flags & RPC_CLNT_CREATE_INFINITE_SLOTS)
434                 xprtargs.flags |= XPRT_CREATE_INFINITE_SLOTS;
435         if (args->flags & RPC_CLNT_CREATE_NO_IDLE_TIMEOUT)
436                 xprtargs.flags |= XPRT_CREATE_NO_IDLE_TIMEOUT;
437         /*
438          * If the caller chooses not to specify a hostname, whip
439          * up a string representation of the passed-in address.
440          */
441         if (xprtargs.servername == NULL) {
442                 struct sockaddr_un *sun =
443                                 (struct sockaddr_un *)args->address;
444                 struct sockaddr_in *sin =
445                                 (struct sockaddr_in *)args->address;
446                 struct sockaddr_in6 *sin6 =
447                                 (struct sockaddr_in6 *)args->address;
448
449                 servername[0] = '\0';
450                 switch (args->address->sa_family) {
451                 case AF_LOCAL:
452                         snprintf(servername, sizeof(servername), "%s",
453                                  sun->sun_path);
454                         break;
455                 case AF_INET:
456                         snprintf(servername, sizeof(servername), "%pI4",
457                                  &sin->sin_addr.s_addr);
458                         break;
459                 case AF_INET6:
460                         snprintf(servername, sizeof(servername), "%pI6",
461                                  &sin6->sin6_addr);
462                         break;
463                 default:
464                         /* caller wants default server name, but
465                          * address family isn't recognized. */
466                         return ERR_PTR(-EINVAL);
467                 }
468                 xprtargs.servername = servername;
469         }
470
471         xprt = xprt_create_transport(&xprtargs);
472         if (IS_ERR(xprt))
473                 return (struct rpc_clnt *)xprt;
474
475         /*
476          * By default, kernel RPC client connects from a reserved port.
477          * CAP_NET_BIND_SERVICE will not be set for unprivileged requesters,
478          * but it is always enabled for rpciod, which handles the connect
479          * operation.
480          */
481         xprt->resvport = 1;
482         if (args->flags & RPC_CLNT_CREATE_NONPRIVPORT)
483                 xprt->resvport = 0;
484
485         clnt = rpc_new_client(args, xprt);
486         if (IS_ERR(clnt))
487                 return clnt;
488
489         if (!(args->flags & RPC_CLNT_CREATE_NOPING)) {
490                 int err = rpc_ping(clnt);
491                 if (err != 0) {
492                         rpc_shutdown_client(clnt);
493                         return ERR_PTR(err);
494                 }
495         }
496
497         clnt->cl_softrtry = 1;
498         if (args->flags & RPC_CLNT_CREATE_HARDRTRY)
499                 clnt->cl_softrtry = 0;
500
501         if (args->flags & RPC_CLNT_CREATE_AUTOBIND)
502                 clnt->cl_autobind = 1;
503         if (args->flags & RPC_CLNT_CREATE_DISCRTRY)
504                 clnt->cl_discrtry = 1;
505         if (!(args->flags & RPC_CLNT_CREATE_QUIET))
506                 clnt->cl_chatty = 1;
507
508         return clnt;
509 }
510 EXPORT_SYMBOL_GPL(rpc_create);
511
512 /*
513  * This function clones the RPC client structure. It allows us to share the
514  * same transport while varying parameters such as the authentication
515  * flavour.
516  */
517 static struct rpc_clnt *__rpc_clone_client(struct rpc_create_args *args,
518                                            struct rpc_clnt *clnt)
519 {
520         struct rpc_xprt *xprt;
521         struct rpc_clnt *new;
522         int err;
523
524         err = -ENOMEM;
525         rcu_read_lock();
526         xprt = xprt_get(rcu_dereference(clnt->cl_xprt));
527         rcu_read_unlock();
528         if (xprt == NULL)
529                 goto out_err;
530         args->servername = xprt->servername;
531
532         new = rpc_new_client(args, xprt);
533         if (IS_ERR(new)) {
534                 err = PTR_ERR(new);
535                 goto out_err;
536         }
537
538         atomic_inc(&clnt->cl_count);
539         new->cl_parent = clnt;
540
541         /* Turn off autobind on clones */
542         new->cl_autobind = 0;
543         new->cl_softrtry = clnt->cl_softrtry;
544         new->cl_discrtry = clnt->cl_discrtry;
545         new->cl_chatty = clnt->cl_chatty;
546         return new;
547
548 out_err:
549         dprintk("RPC:       %s: returned error %d\n", __func__, err);
550         return ERR_PTR(err);
551 }
552
553 /**
554  * rpc_clone_client - Clone an RPC client structure
555  *
556  * @clnt: RPC client whose parameters are copied
557  *
558  * Returns a fresh RPC client or an ERR_PTR.
559  */
560 struct rpc_clnt *rpc_clone_client(struct rpc_clnt *clnt)
561 {
562         struct rpc_create_args args = {
563                 .program        = clnt->cl_program,
564                 .prognumber     = clnt->cl_prog,
565                 .version        = clnt->cl_vers,
566                 .authflavor     = clnt->cl_auth->au_flavor,
567                 .client_name    = clnt->cl_principal,
568         };
569         return __rpc_clone_client(&args, clnt);
570 }
571 EXPORT_SYMBOL_GPL(rpc_clone_client);
572
573 /**
574  * rpc_clone_client_set_auth - Clone an RPC client structure and set its auth
575  *
576  * @clnt: RPC client whose parameters are copied
577  * @flavor: security flavor for new client
578  *
579  * Returns a fresh RPC client or an ERR_PTR.
580  */
581 struct rpc_clnt *
582 rpc_clone_client_set_auth(struct rpc_clnt *clnt, rpc_authflavor_t flavor)
583 {
584         struct rpc_create_args args = {
585                 .program        = clnt->cl_program,
586                 .prognumber     = clnt->cl_prog,
587                 .version        = clnt->cl_vers,
588                 .authflavor     = flavor,
589                 .client_name    = clnt->cl_principal,
590         };
591         return __rpc_clone_client(&args, clnt);
592 }
593 EXPORT_SYMBOL_GPL(rpc_clone_client_set_auth);
594
595 /*
596  * Kill all tasks for the given client.
597  * XXX: kill their descendants as well?
598  */
599 void rpc_killall_tasks(struct rpc_clnt *clnt)
600 {
601         struct rpc_task *rovr;
602
603
604         if (list_empty(&clnt->cl_tasks))
605                 return;
606         dprintk("RPC:       killing all tasks for client %p\n", clnt);
607         /*
608          * Spin lock all_tasks to prevent changes...
609          */
610         spin_lock(&clnt->cl_lock);
611         list_for_each_entry(rovr, &clnt->cl_tasks, tk_task) {
612                 if (!RPC_IS_ACTIVATED(rovr))
613                         continue;
614                 if (!(rovr->tk_flags & RPC_TASK_KILLED)) {
615                         rovr->tk_flags |= RPC_TASK_KILLED;
616                         rpc_exit(rovr, -EIO);
617                         if (RPC_IS_QUEUED(rovr))
618                                 rpc_wake_up_queued_task(rovr->tk_waitqueue,
619                                                         rovr);
620                 }
621         }
622         spin_unlock(&clnt->cl_lock);
623 }
624 EXPORT_SYMBOL_GPL(rpc_killall_tasks);
625
626 /*
627  * Properly shut down an RPC client, terminating all outstanding
628  * requests.
629  */
630 void rpc_shutdown_client(struct rpc_clnt *clnt)
631 {
632         might_sleep();
633
634         dprintk_rcu("RPC:       shutting down %s client for %s\n",
635                         clnt->cl_protname,
636                         rcu_dereference(clnt->cl_xprt)->servername);
637
638         while (!list_empty(&clnt->cl_tasks)) {
639                 rpc_killall_tasks(clnt);
640                 wait_event_timeout(destroy_wait,
641                         list_empty(&clnt->cl_tasks), 1*HZ);
642         }
643
644         rpc_release_client(clnt);
645 }
646 EXPORT_SYMBOL_GPL(rpc_shutdown_client);
647
648 /*
649  * Free an RPC client
650  */
651 static void
652 rpc_free_client(struct rpc_clnt *clnt)
653 {
654         dprintk_rcu("RPC:       destroying %s client for %s\n",
655                         clnt->cl_protname,
656                         rcu_dereference(clnt->cl_xprt)->servername);
657         if (clnt->cl_parent != clnt)
658                 rpc_release_client(clnt->cl_parent);
659         rpc_clnt_remove_pipedir(clnt);
660         rpc_unregister_client(clnt);
661         rpc_free_iostats(clnt->cl_metrics);
662         kfree(clnt->cl_principal);
663         clnt->cl_metrics = NULL;
664         xprt_put(rcu_dereference_raw(clnt->cl_xprt));
665         rpciod_down();
666         kfree(clnt);
667 }
668
669 /*
670  * Free an RPC client
671  */
672 static void
673 rpc_free_auth(struct rpc_clnt *clnt)
674 {
675         if (clnt->cl_auth == NULL) {
676                 rpc_free_client(clnt);
677                 return;
678         }
679
680         /*
681          * Note: RPCSEC_GSS may need to send NULL RPC calls in order to
682          *       release remaining GSS contexts. This mechanism ensures
683          *       that it can do so safely.
684          */
685         atomic_inc(&clnt->cl_count);
686         rpcauth_release(clnt->cl_auth);
687         clnt->cl_auth = NULL;
688         if (atomic_dec_and_test(&clnt->cl_count))
689                 rpc_free_client(clnt);
690 }
691
692 /*
693  * Release reference to the RPC client
694  */
695 void
696 rpc_release_client(struct rpc_clnt *clnt)
697 {
698         dprintk("RPC:       rpc_release_client(%p)\n", clnt);
699
700         if (list_empty(&clnt->cl_tasks))
701                 wake_up(&destroy_wait);
702         if (atomic_dec_and_test(&clnt->cl_count))
703                 rpc_free_auth(clnt);
704 }
705 EXPORT_SYMBOL_GPL(rpc_release_client);
706
707 /**
708  * rpc_bind_new_program - bind a new RPC program to an existing client
709  * @old: old rpc_client
710  * @program: rpc program to set
711  * @vers: rpc program version
712  *
713  * Clones the rpc client and sets up a new RPC program. This is mainly
714  * of use for enabling different RPC programs to share the same transport.
715  * The Sun NFSv2/v3 ACL protocol can do this.
716  */
717 struct rpc_clnt *rpc_bind_new_program(struct rpc_clnt *old,
718                                       const struct rpc_program *program,
719                                       u32 vers)
720 {
721         struct rpc_create_args args = {
722                 .program        = program,
723                 .prognumber     = program->number,
724                 .version        = vers,
725                 .authflavor     = old->cl_auth->au_flavor,
726                 .client_name    = old->cl_principal,
727         };
728         struct rpc_clnt *clnt;
729         int err;
730
731         clnt = __rpc_clone_client(&args, old);
732         if (IS_ERR(clnt))
733                 goto out;
734         err = rpc_ping(clnt);
735         if (err != 0) {
736                 rpc_shutdown_client(clnt);
737                 clnt = ERR_PTR(err);
738         }
739 out:
740         return clnt;
741 }
742 EXPORT_SYMBOL_GPL(rpc_bind_new_program);
743
744 void rpc_task_release_client(struct rpc_task *task)
745 {
746         struct rpc_clnt *clnt = task->tk_client;
747
748         if (clnt != NULL) {
749                 /* Remove from client task list */
750                 spin_lock(&clnt->cl_lock);
751                 list_del(&task->tk_task);
752                 spin_unlock(&clnt->cl_lock);
753                 task->tk_client = NULL;
754
755                 rpc_release_client(clnt);
756         }
757 }
758
759 static
760 void rpc_task_set_client(struct rpc_task *task, struct rpc_clnt *clnt)
761 {
762         if (clnt != NULL) {
763                 rpc_task_release_client(task);
764                 task->tk_client = clnt;
765                 atomic_inc(&clnt->cl_count);
766                 if (clnt->cl_softrtry)
767                         task->tk_flags |= RPC_TASK_SOFT;
768                 if (sk_memalloc_socks()) {
769                         struct rpc_xprt *xprt;
770
771                         rcu_read_lock();
772                         xprt = rcu_dereference(clnt->cl_xprt);
773                         if (xprt->swapper)
774                                 task->tk_flags |= RPC_TASK_SWAPPER;
775                         rcu_read_unlock();
776                 }
777                 /* Add to the client's list of all tasks */
778                 spin_lock(&clnt->cl_lock);
779                 list_add_tail(&task->tk_task, &clnt->cl_tasks);
780                 spin_unlock(&clnt->cl_lock);
781         }
782 }
783
784 void rpc_task_reset_client(struct rpc_task *task, struct rpc_clnt *clnt)
785 {
786         rpc_task_release_client(task);
787         rpc_task_set_client(task, clnt);
788 }
789 EXPORT_SYMBOL_GPL(rpc_task_reset_client);
790
791
792 static void
793 rpc_task_set_rpc_message(struct rpc_task *task, const struct rpc_message *msg)
794 {
795         if (msg != NULL) {
796                 task->tk_msg.rpc_proc = msg->rpc_proc;
797                 task->tk_msg.rpc_argp = msg->rpc_argp;
798                 task->tk_msg.rpc_resp = msg->rpc_resp;
799                 if (msg->rpc_cred != NULL)
800                         task->tk_msg.rpc_cred = get_rpccred(msg->rpc_cred);
801         }
802 }
803
804 /*
805  * Default callback for async RPC calls
806  */
807 static void
808 rpc_default_callback(struct rpc_task *task, void *data)
809 {
810 }
811
812 static const struct rpc_call_ops rpc_default_ops = {
813         .rpc_call_done = rpc_default_callback,
814 };
815
816 /**
817  * rpc_run_task - Allocate a new RPC task, then run rpc_execute against it
818  * @task_setup_data: pointer to task initialisation data
819  */
820 struct rpc_task *rpc_run_task(const struct rpc_task_setup *task_setup_data)
821 {
822         struct rpc_task *task;
823
824         task = rpc_new_task(task_setup_data);
825         if (IS_ERR(task))
826                 goto out;
827
828         rpc_task_set_client(task, task_setup_data->rpc_client);
829         rpc_task_set_rpc_message(task, task_setup_data->rpc_message);
830
831         if (task->tk_action == NULL)
832                 rpc_call_start(task);
833
834         atomic_inc(&task->tk_count);
835         rpc_execute(task);
836 out:
837         return task;
838 }
839 EXPORT_SYMBOL_GPL(rpc_run_task);
840
841 /**
842  * rpc_call_sync - Perform a synchronous RPC call
843  * @clnt: pointer to RPC client
844  * @msg: RPC call parameters
845  * @flags: RPC call flags
846  */
847 int rpc_call_sync(struct rpc_clnt *clnt, const struct rpc_message *msg, int flags)
848 {
849         struct rpc_task *task;
850         struct rpc_task_setup task_setup_data = {
851                 .rpc_client = clnt,
852                 .rpc_message = msg,
853                 .callback_ops = &rpc_default_ops,
854                 .flags = flags,
855         };
856         int status;
857
858         WARN_ON_ONCE(flags & RPC_TASK_ASYNC);
859         if (flags & RPC_TASK_ASYNC) {
860                 rpc_release_calldata(task_setup_data.callback_ops,
861                         task_setup_data.callback_data);
862                 return -EINVAL;
863         }
864
865         task = rpc_run_task(&task_setup_data);
866         if (IS_ERR(task))
867                 return PTR_ERR(task);
868         status = task->tk_status;
869         rpc_put_task(task);
870         return status;
871 }
872 EXPORT_SYMBOL_GPL(rpc_call_sync);
873
874 /**
875  * rpc_call_async - Perform an asynchronous RPC call
876  * @clnt: pointer to RPC client
877  * @msg: RPC call parameters
878  * @flags: RPC call flags
879  * @tk_ops: RPC call ops
880  * @data: user call data
881  */
882 int
883 rpc_call_async(struct rpc_clnt *clnt, const struct rpc_message *msg, int flags,
884                const struct rpc_call_ops *tk_ops, void *data)
885 {
886         struct rpc_task *task;
887         struct rpc_task_setup task_setup_data = {
888                 .rpc_client = clnt,
889                 .rpc_message = msg,
890                 .callback_ops = tk_ops,
891                 .callback_data = data,
892                 .flags = flags|RPC_TASK_ASYNC,
893         };
894
895         task = rpc_run_task(&task_setup_data);
896         if (IS_ERR(task))
897                 return PTR_ERR(task);
898         rpc_put_task(task);
899         return 0;
900 }
901 EXPORT_SYMBOL_GPL(rpc_call_async);
902
903 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
904 /**
905  * rpc_run_bc_task - Allocate a new RPC task for backchannel use, then run
906  * rpc_execute against it
907  * @req: RPC request
908  * @tk_ops: RPC call ops
909  */
910 struct rpc_task *rpc_run_bc_task(struct rpc_rqst *req,
911                                 const struct rpc_call_ops *tk_ops)
912 {
913         struct rpc_task *task;
914         struct xdr_buf *xbufp = &req->rq_snd_buf;
915         struct rpc_task_setup task_setup_data = {
916                 .callback_ops = tk_ops,
917         };
918
919         dprintk("RPC: rpc_run_bc_task req= %p\n", req);
920         /*
921          * Create an rpc_task to send the data
922          */
923         task = rpc_new_task(&task_setup_data);
924         if (IS_ERR(task)) {
925                 xprt_free_bc_request(req);
926                 goto out;
927         }
928         task->tk_rqstp = req;
929
930         /*
931          * Set up the xdr_buf length.
932          * This also indicates that the buffer is XDR encoded already.
933          */
934         xbufp->len = xbufp->head[0].iov_len + xbufp->page_len +
935                         xbufp->tail[0].iov_len;
936
937         task->tk_action = call_bc_transmit;
938         atomic_inc(&task->tk_count);
939         WARN_ON_ONCE(atomic_read(&task->tk_count) != 2);
940         rpc_execute(task);
941
942 out:
943         dprintk("RPC: rpc_run_bc_task: task= %p\n", task);
944         return task;
945 }
946 #endif /* CONFIG_SUNRPC_BACKCHANNEL */
947
948 void
949 rpc_call_start(struct rpc_task *task)
950 {
951         task->tk_action = call_start;
952 }
953 EXPORT_SYMBOL_GPL(rpc_call_start);
954
955 /**
956  * rpc_peeraddr - extract remote peer address from clnt's xprt
957  * @clnt: RPC client structure
958  * @buf: target buffer
959  * @bufsize: length of target buffer
960  *
961  * Returns the number of bytes that are actually in the stored address.
962  */
963 size_t rpc_peeraddr(struct rpc_clnt *clnt, struct sockaddr *buf, size_t bufsize)
964 {
965         size_t bytes;
966         struct rpc_xprt *xprt;
967
968         rcu_read_lock();
969         xprt = rcu_dereference(clnt->cl_xprt);
970
971         bytes = xprt->addrlen;
972         if (bytes > bufsize)
973                 bytes = bufsize;
974         memcpy(buf, &xprt->addr, bytes);
975         rcu_read_unlock();
976
977         return bytes;
978 }
979 EXPORT_SYMBOL_GPL(rpc_peeraddr);
980
981 /**
982  * rpc_peeraddr2str - return remote peer address in printable format
983  * @clnt: RPC client structure
984  * @format: address format
985  *
986  * NB: the lifetime of the memory referenced by the returned pointer is
987  * the same as the rpc_xprt itself.  As long as the caller uses this
988  * pointer, it must hold the RCU read lock.
989  */
990 const char *rpc_peeraddr2str(struct rpc_clnt *clnt,
991                              enum rpc_display_format_t format)
992 {
993         struct rpc_xprt *xprt;
994
995         xprt = rcu_dereference(clnt->cl_xprt);
996
997         if (xprt->address_strings[format] != NULL)
998                 return xprt->address_strings[format];
999         else
1000                 return "unprintable";
1001 }
1002 EXPORT_SYMBOL_GPL(rpc_peeraddr2str);
1003
1004 static const struct sockaddr_in rpc_inaddr_loopback = {
1005         .sin_family             = AF_INET,
1006         .sin_addr.s_addr        = htonl(INADDR_ANY),
1007 };
1008
1009 static const struct sockaddr_in6 rpc_in6addr_loopback = {
1010         .sin6_family            = AF_INET6,
1011         .sin6_addr              = IN6ADDR_ANY_INIT,
1012 };
1013
1014 /*
1015  * Try a getsockname() on a connected datagram socket.  Using a
1016  * connected datagram socket prevents leaving a socket in TIME_WAIT.
1017  * This conserves the ephemeral port number space.
1018  *
1019  * Returns zero and fills in "buf" if successful; otherwise, a
1020  * negative errno is returned.
1021  */
1022 static int rpc_sockname(struct net *net, struct sockaddr *sap, size_t salen,
1023                         struct sockaddr *buf, int buflen)
1024 {
1025         struct socket *sock;
1026         int err;
1027
1028         err = __sock_create(net, sap->sa_family,
1029                                 SOCK_DGRAM, IPPROTO_UDP, &sock, 1);
1030         if (err < 0) {
1031                 dprintk("RPC:       can't create UDP socket (%d)\n", err);
1032                 goto out;
1033         }
1034
1035         switch (sap->sa_family) {
1036         case AF_INET:
1037                 err = kernel_bind(sock,
1038                                 (struct sockaddr *)&rpc_inaddr_loopback,
1039                                 sizeof(rpc_inaddr_loopback));
1040                 break;
1041         case AF_INET6:
1042                 err = kernel_bind(sock,
1043                                 (struct sockaddr *)&rpc_in6addr_loopback,
1044                                 sizeof(rpc_in6addr_loopback));
1045                 break;
1046         default:
1047                 err = -EAFNOSUPPORT;
1048                 goto out;
1049         }
1050         if (err < 0) {
1051                 dprintk("RPC:       can't bind UDP socket (%d)\n", err);
1052                 goto out_release;
1053         }
1054
1055         err = kernel_connect(sock, sap, salen, 0);
1056         if (err < 0) {
1057                 dprintk("RPC:       can't connect UDP socket (%d)\n", err);
1058                 goto out_release;
1059         }
1060
1061         err = kernel_getsockname(sock, buf, &buflen);
1062         if (err < 0) {
1063                 dprintk("RPC:       getsockname failed (%d)\n", err);
1064                 goto out_release;
1065         }
1066
1067         err = 0;
1068         if (buf->sa_family == AF_INET6) {
1069                 struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)buf;
1070                 sin6->sin6_scope_id = 0;
1071         }
1072         dprintk("RPC:       %s succeeded\n", __func__);
1073
1074 out_release:
1075         sock_release(sock);
1076 out:
1077         return err;
1078 }
1079
1080 /*
1081  * Scraping a connected socket failed, so we don't have a useable
1082  * local address.  Fallback: generate an address that will prevent
1083  * the server from calling us back.
1084  *
1085  * Returns zero and fills in "buf" if successful; otherwise, a
1086  * negative errno is returned.
1087  */
1088 static int rpc_anyaddr(int family, struct sockaddr *buf, size_t buflen)
1089 {
1090         switch (family) {
1091         case AF_INET:
1092                 if (buflen < sizeof(rpc_inaddr_loopback))
1093                         return -EINVAL;
1094                 memcpy(buf, &rpc_inaddr_loopback,
1095                                 sizeof(rpc_inaddr_loopback));
1096                 break;
1097         case AF_INET6:
1098                 if (buflen < sizeof(rpc_in6addr_loopback))
1099                         return -EINVAL;
1100                 memcpy(buf, &rpc_in6addr_loopback,
1101                                 sizeof(rpc_in6addr_loopback));
1102         default:
1103                 dprintk("RPC:       %s: address family not supported\n",
1104                         __func__);
1105                 return -EAFNOSUPPORT;
1106         }
1107         dprintk("RPC:       %s: succeeded\n", __func__);
1108         return 0;
1109 }
1110
1111 /**
1112  * rpc_localaddr - discover local endpoint address for an RPC client
1113  * @clnt: RPC client structure
1114  * @buf: target buffer
1115  * @buflen: size of target buffer, in bytes
1116  *
1117  * Returns zero and fills in "buf" and "buflen" if successful;
1118  * otherwise, a negative errno is returned.
1119  *
1120  * This works even if the underlying transport is not currently connected,
1121  * or if the upper layer never previously provided a source address.
1122  *
1123  * The result of this function call is transient: multiple calls in
1124  * succession may give different results, depending on how local
1125  * networking configuration changes over time.
1126  */
1127 int rpc_localaddr(struct rpc_clnt *clnt, struct sockaddr *buf, size_t buflen)
1128 {
1129         struct sockaddr_storage address;
1130         struct sockaddr *sap = (struct sockaddr *)&address;
1131         struct rpc_xprt *xprt;
1132         struct net *net;
1133         size_t salen;
1134         int err;
1135
1136         rcu_read_lock();
1137         xprt = rcu_dereference(clnt->cl_xprt);
1138         salen = xprt->addrlen;
1139         memcpy(sap, &xprt->addr, salen);
1140         net = get_net(xprt->xprt_net);
1141         rcu_read_unlock();
1142
1143         rpc_set_port(sap, 0);
1144         err = rpc_sockname(net, sap, salen, buf, buflen);
1145         put_net(net);
1146         if (err != 0)
1147                 /* Couldn't discover local address, return ANYADDR */
1148                 return rpc_anyaddr(sap->sa_family, buf, buflen);
1149         return 0;
1150 }
1151 EXPORT_SYMBOL_GPL(rpc_localaddr);
1152
1153 void
1154 rpc_setbufsize(struct rpc_clnt *clnt, unsigned int sndsize, unsigned int rcvsize)
1155 {
1156         struct rpc_xprt *xprt;
1157
1158         rcu_read_lock();
1159         xprt = rcu_dereference(clnt->cl_xprt);
1160         if (xprt->ops->set_buffer_size)
1161                 xprt->ops->set_buffer_size(xprt, sndsize, rcvsize);
1162         rcu_read_unlock();
1163 }
1164 EXPORT_SYMBOL_GPL(rpc_setbufsize);
1165
1166 /**
1167  * rpc_protocol - Get transport protocol number for an RPC client
1168  * @clnt: RPC client to query
1169  *
1170  */
1171 int rpc_protocol(struct rpc_clnt *clnt)
1172 {
1173         int protocol;
1174
1175         rcu_read_lock();
1176         protocol = rcu_dereference(clnt->cl_xprt)->prot;
1177         rcu_read_unlock();
1178         return protocol;
1179 }
1180 EXPORT_SYMBOL_GPL(rpc_protocol);
1181
1182 /**
1183  * rpc_net_ns - Get the network namespace for this RPC client
1184  * @clnt: RPC client to query
1185  *
1186  */
1187 struct net *rpc_net_ns(struct rpc_clnt *clnt)
1188 {
1189         struct net *ret;
1190
1191         rcu_read_lock();
1192         ret = rcu_dereference(clnt->cl_xprt)->xprt_net;
1193         rcu_read_unlock();
1194         return ret;
1195 }
1196 EXPORT_SYMBOL_GPL(rpc_net_ns);
1197
1198 /**
1199  * rpc_max_payload - Get maximum payload size for a transport, in bytes
1200  * @clnt: RPC client to query
1201  *
1202  * For stream transports, this is one RPC record fragment (see RFC
1203  * 1831), as we don't support multi-record requests yet.  For datagram
1204  * transports, this is the size of an IP packet minus the IP, UDP, and
1205  * RPC header sizes.
1206  */
1207 size_t rpc_max_payload(struct rpc_clnt *clnt)
1208 {
1209         size_t ret;
1210
1211         rcu_read_lock();
1212         ret = rcu_dereference(clnt->cl_xprt)->max_payload;
1213         rcu_read_unlock();
1214         return ret;
1215 }
1216 EXPORT_SYMBOL_GPL(rpc_max_payload);
1217
1218 /**
1219  * rpc_get_timeout - Get timeout for transport in units of HZ
1220  * @clnt: RPC client to query
1221  */
1222 unsigned long rpc_get_timeout(struct rpc_clnt *clnt)
1223 {
1224         unsigned long ret;
1225
1226         rcu_read_lock();
1227         ret = rcu_dereference(clnt->cl_xprt)->timeout->to_initval;
1228         rcu_read_unlock();
1229         return ret;
1230 }
1231 EXPORT_SYMBOL_GPL(rpc_get_timeout);
1232
1233 /**
1234  * rpc_force_rebind - force transport to check that remote port is unchanged
1235  * @clnt: client to rebind
1236  *
1237  */
1238 void rpc_force_rebind(struct rpc_clnt *clnt)
1239 {
1240         if (clnt->cl_autobind) {
1241                 rcu_read_lock();
1242                 xprt_clear_bound(rcu_dereference(clnt->cl_xprt));
1243                 rcu_read_unlock();
1244         }
1245 }
1246 EXPORT_SYMBOL_GPL(rpc_force_rebind);
1247
1248 /*
1249  * Restart an (async) RPC call from the call_prepare state.
1250  * Usually called from within the exit handler.
1251  */
1252 int
1253 rpc_restart_call_prepare(struct rpc_task *task)
1254 {
1255         if (RPC_ASSASSINATED(task))
1256                 return 0;
1257         task->tk_action = call_start;
1258         if (task->tk_ops->rpc_call_prepare != NULL)
1259                 task->tk_action = rpc_prepare_task;
1260         return 1;
1261 }
1262 EXPORT_SYMBOL_GPL(rpc_restart_call_prepare);
1263
1264 /*
1265  * Restart an (async) RPC call. Usually called from within the
1266  * exit handler.
1267  */
1268 int
1269 rpc_restart_call(struct rpc_task *task)
1270 {
1271         if (RPC_ASSASSINATED(task))
1272                 return 0;
1273         task->tk_action = call_start;
1274         return 1;
1275 }
1276 EXPORT_SYMBOL_GPL(rpc_restart_call);
1277
1278 #ifdef RPC_DEBUG
1279 static const char *rpc_proc_name(const struct rpc_task *task)
1280 {
1281         const struct rpc_procinfo *proc = task->tk_msg.rpc_proc;
1282
1283         if (proc) {
1284                 if (proc->p_name)
1285                         return proc->p_name;
1286                 else
1287                         return "NULL";
1288         } else
1289                 return "no proc";
1290 }
1291 #endif
1292
1293 /*
1294  * 0.  Initial state
1295  *
1296  *     Other FSM states can be visited zero or more times, but
1297  *     this state is visited exactly once for each RPC.
1298  */
1299 static void
1300 call_start(struct rpc_task *task)
1301 {
1302         struct rpc_clnt *clnt = task->tk_client;
1303
1304         dprintk("RPC: %5u call_start %s%d proc %s (%s)\n", task->tk_pid,
1305                         clnt->cl_protname, clnt->cl_vers,
1306                         rpc_proc_name(task),
1307                         (RPC_IS_ASYNC(task) ? "async" : "sync"));
1308
1309         /* Increment call count */
1310         task->tk_msg.rpc_proc->p_count++;
1311         clnt->cl_stats->rpccnt++;
1312         task->tk_action = call_reserve;
1313 }
1314
1315 /*
1316  * 1.   Reserve an RPC call slot
1317  */
1318 static void
1319 call_reserve(struct rpc_task *task)
1320 {
1321         dprint_status(task);
1322
1323         task->tk_status  = 0;
1324         task->tk_action  = call_reserveresult;
1325         xprt_reserve(task);
1326 }
1327
1328 static void call_retry_reserve(struct rpc_task *task);
1329
1330 /*
1331  * 1b.  Grok the result of xprt_reserve()
1332  */
1333 static void
1334 call_reserveresult(struct rpc_task *task)
1335 {
1336         int status = task->tk_status;
1337
1338         dprint_status(task);
1339
1340         /*
1341          * After a call to xprt_reserve(), we must have either
1342          * a request slot or else an error status.
1343          */
1344         task->tk_status = 0;
1345         if (status >= 0) {
1346                 if (task->tk_rqstp) {
1347                         task->tk_action = call_refresh;
1348                         return;
1349                 }
1350
1351                 printk(KERN_ERR "%s: status=%d, but no request slot, exiting\n",
1352                                 __func__, status);
1353                 rpc_exit(task, -EIO);
1354                 return;
1355         }
1356
1357         /*
1358          * Even though there was an error, we may have acquired
1359          * a request slot somehow.  Make sure not to leak it.
1360          */
1361         if (task->tk_rqstp) {
1362                 printk(KERN_ERR "%s: status=%d, request allocated anyway\n",
1363                                 __func__, status);
1364                 xprt_release(task);
1365         }
1366
1367         switch (status) {
1368         case -ENOMEM:
1369                 rpc_delay(task, HZ >> 2);
1370         case -EAGAIN:   /* woken up; retry */
1371                 task->tk_action = call_retry_reserve;
1372                 return;
1373         case -EIO:      /* probably a shutdown */
1374                 break;
1375         default:
1376                 printk(KERN_ERR "%s: unrecognized error %d, exiting\n",
1377                                 __func__, status);
1378                 break;
1379         }
1380         rpc_exit(task, status);
1381 }
1382
1383 /*
1384  * 1c.  Retry reserving an RPC call slot
1385  */
1386 static void
1387 call_retry_reserve(struct rpc_task *task)
1388 {
1389         dprint_status(task);
1390
1391         task->tk_status  = 0;
1392         task->tk_action  = call_reserveresult;
1393         xprt_retry_reserve(task);
1394 }
1395
1396 /*
1397  * 2.   Bind and/or refresh the credentials
1398  */
1399 static void
1400 call_refresh(struct rpc_task *task)
1401 {
1402         dprint_status(task);
1403
1404         task->tk_action = call_refreshresult;
1405         task->tk_status = 0;
1406         task->tk_client->cl_stats->rpcauthrefresh++;
1407         rpcauth_refreshcred(task);
1408 }
1409
1410 /*
1411  * 2a.  Process the results of a credential refresh
1412  */
1413 static void
1414 call_refreshresult(struct rpc_task *task)
1415 {
1416         int status = task->tk_status;
1417
1418         dprint_status(task);
1419
1420         task->tk_status = 0;
1421         task->tk_action = call_refresh;
1422         switch (status) {
1423         case 0:
1424                 if (rpcauth_uptodatecred(task))
1425                         task->tk_action = call_allocate;
1426                 return;
1427         case -ETIMEDOUT:
1428                 rpc_delay(task, 3*HZ);
1429         case -EKEYEXPIRED:
1430         case -EAGAIN:
1431                 status = -EACCES;
1432                 if (!task->tk_cred_retry)
1433                         break;
1434                 task->tk_cred_retry--;
1435                 dprintk("RPC: %5u %s: retry refresh creds\n",
1436                                 task->tk_pid, __func__);
1437                 return;
1438         }
1439         dprintk("RPC: %5u %s: refresh creds failed with error %d\n",
1440                                 task->tk_pid, __func__, status);
1441         rpc_exit(task, status);
1442 }
1443
1444 /*
1445  * 2b.  Allocate the buffer. For details, see sched.c:rpc_malloc.
1446  *      (Note: buffer memory is freed in xprt_release).
1447  */
1448 static void
1449 call_allocate(struct rpc_task *task)
1450 {
1451         unsigned int slack = task->tk_rqstp->rq_cred->cr_auth->au_cslack;
1452         struct rpc_rqst *req = task->tk_rqstp;
1453         struct rpc_xprt *xprt = req->rq_xprt;
1454         struct rpc_procinfo *proc = task->tk_msg.rpc_proc;
1455
1456         dprint_status(task);
1457
1458         task->tk_status = 0;
1459         task->tk_action = call_bind;
1460
1461         if (req->rq_buffer)
1462                 return;
1463
1464         if (proc->p_proc != 0) {
1465                 BUG_ON(proc->p_arglen == 0);
1466                 if (proc->p_decode != NULL)
1467                         BUG_ON(proc->p_replen == 0);
1468         }
1469
1470         /*
1471          * Calculate the size (in quads) of the RPC call
1472          * and reply headers, and convert both values
1473          * to byte sizes.
1474          */
1475         req->rq_callsize = RPC_CALLHDRSIZE + (slack << 1) + proc->p_arglen;
1476         req->rq_callsize <<= 2;
1477         req->rq_rcvsize = RPC_REPHDRSIZE + slack + proc->p_replen;
1478         req->rq_rcvsize <<= 2;
1479
1480         req->rq_buffer = xprt->ops->buf_alloc(task,
1481                                         req->rq_callsize + req->rq_rcvsize);
1482         if (req->rq_buffer != NULL)
1483                 return;
1484
1485         dprintk("RPC: %5u rpc_buffer allocation failed\n", task->tk_pid);
1486
1487         if (RPC_IS_ASYNC(task) || !fatal_signal_pending(current)) {
1488                 task->tk_action = call_allocate;
1489                 rpc_delay(task, HZ>>4);
1490                 return;
1491         }
1492
1493         rpc_exit(task, -ERESTARTSYS);
1494 }
1495
1496 static inline int
1497 rpc_task_need_encode(struct rpc_task *task)
1498 {
1499         return task->tk_rqstp->rq_snd_buf.len == 0;
1500 }
1501
1502 static inline void
1503 rpc_task_force_reencode(struct rpc_task *task)
1504 {
1505         task->tk_rqstp->rq_snd_buf.len = 0;
1506         task->tk_rqstp->rq_bytes_sent = 0;
1507 }
1508
1509 static inline void
1510 rpc_xdr_buf_init(struct xdr_buf *buf, void *start, size_t len)
1511 {
1512         buf->head[0].iov_base = start;
1513         buf->head[0].iov_len = len;
1514         buf->tail[0].iov_len = 0;
1515         buf->page_len = 0;
1516         buf->flags = 0;
1517         buf->len = 0;
1518         buf->buflen = len;
1519 }
1520
1521 /*
1522  * 3.   Encode arguments of an RPC call
1523  */
1524 static void
1525 rpc_xdr_encode(struct rpc_task *task)
1526 {
1527         struct rpc_rqst *req = task->tk_rqstp;
1528         kxdreproc_t     encode;
1529         __be32          *p;
1530
1531         dprint_status(task);
1532
1533         rpc_xdr_buf_init(&req->rq_snd_buf,
1534                          req->rq_buffer,
1535                          req->rq_callsize);
1536         rpc_xdr_buf_init(&req->rq_rcv_buf,
1537                          (char *)req->rq_buffer + req->rq_callsize,
1538                          req->rq_rcvsize);
1539
1540         p = rpc_encode_header(task);
1541         if (p == NULL) {
1542                 printk(KERN_INFO "RPC: couldn't encode RPC header, exit EIO\n");
1543                 rpc_exit(task, -EIO);
1544                 return;
1545         }
1546
1547         encode = task->tk_msg.rpc_proc->p_encode;
1548         if (encode == NULL)
1549                 return;
1550
1551         task->tk_status = rpcauth_wrap_req(task, encode, req, p,
1552                         task->tk_msg.rpc_argp);
1553 }
1554
1555 /*
1556  * 4.   Get the server port number if not yet set
1557  */
1558 static void
1559 call_bind(struct rpc_task *task)
1560 {
1561         struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
1562
1563         dprint_status(task);
1564
1565         task->tk_action = call_connect;
1566         if (!xprt_bound(xprt)) {
1567                 task->tk_action = call_bind_status;
1568                 task->tk_timeout = xprt->bind_timeout;
1569                 xprt->ops->rpcbind(task);
1570         }
1571 }
1572
1573 /*
1574  * 4a.  Sort out bind result
1575  */
1576 static void
1577 call_bind_status(struct rpc_task *task)
1578 {
1579         int status = -EIO;
1580
1581         if (task->tk_status >= 0) {
1582                 dprint_status(task);
1583                 task->tk_status = 0;
1584                 task->tk_action = call_connect;
1585                 return;
1586         }
1587
1588         trace_rpc_bind_status(task);
1589         switch (task->tk_status) {
1590         case -ENOMEM:
1591                 dprintk("RPC: %5u rpcbind out of memory\n", task->tk_pid);
1592                 rpc_delay(task, HZ >> 2);
1593                 goto retry_timeout;
1594         case -EACCES:
1595                 dprintk("RPC: %5u remote rpcbind: RPC program/version "
1596                                 "unavailable\n", task->tk_pid);
1597                 /* fail immediately if this is an RPC ping */
1598                 if (task->tk_msg.rpc_proc->p_proc == 0) {
1599                         status = -EOPNOTSUPP;
1600                         break;
1601                 }
1602                 if (task->tk_rebind_retry == 0)
1603                         break;
1604                 task->tk_rebind_retry--;
1605                 rpc_delay(task, 3*HZ);
1606                 goto retry_timeout;
1607         case -ETIMEDOUT:
1608                 dprintk("RPC: %5u rpcbind request timed out\n",
1609                                 task->tk_pid);
1610                 goto retry_timeout;
1611         case -EPFNOSUPPORT:
1612                 /* server doesn't support any rpcbind version we know of */
1613                 dprintk("RPC: %5u unrecognized remote rpcbind service\n",
1614                                 task->tk_pid);
1615                 break;
1616         case -EPROTONOSUPPORT:
1617                 dprintk("RPC: %5u remote rpcbind version unavailable, retrying\n",
1618                                 task->tk_pid);
1619                 task->tk_status = 0;
1620                 task->tk_action = call_bind;
1621                 return;
1622         case -ECONNREFUSED:             /* connection problems */
1623         case -ECONNRESET:
1624         case -ENOTCONN:
1625         case -EHOSTDOWN:
1626         case -EHOSTUNREACH:
1627         case -ENETUNREACH:
1628         case -EPIPE:
1629                 dprintk("RPC: %5u remote rpcbind unreachable: %d\n",
1630                                 task->tk_pid, task->tk_status);
1631                 if (!RPC_IS_SOFTCONN(task)) {
1632                         rpc_delay(task, 5*HZ);
1633                         goto retry_timeout;
1634                 }
1635                 status = task->tk_status;
1636                 break;
1637         default:
1638                 dprintk("RPC: %5u unrecognized rpcbind error (%d)\n",
1639                                 task->tk_pid, -task->tk_status);
1640         }
1641
1642         rpc_exit(task, status);
1643         return;
1644
1645 retry_timeout:
1646         task->tk_action = call_timeout;
1647 }
1648
1649 /*
1650  * 4b.  Connect to the RPC server
1651  */
1652 static void
1653 call_connect(struct rpc_task *task)
1654 {
1655         struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
1656
1657         dprintk("RPC: %5u call_connect xprt %p %s connected\n",
1658                         task->tk_pid, xprt,
1659                         (xprt_connected(xprt) ? "is" : "is not"));
1660
1661         task->tk_action = call_transmit;
1662         if (!xprt_connected(xprt)) {
1663                 task->tk_action = call_connect_status;
1664                 if (task->tk_status < 0)
1665                         return;
1666                 xprt_connect(task);
1667         }
1668 }
1669
1670 /*
1671  * 4c.  Sort out connect result
1672  */
1673 static void
1674 call_connect_status(struct rpc_task *task)
1675 {
1676         struct rpc_clnt *clnt = task->tk_client;
1677         int status = task->tk_status;
1678
1679         dprint_status(task);
1680
1681         trace_rpc_connect_status(task, status);
1682         switch (status) {
1683                 /* if soft mounted, test if we've timed out */
1684         case -ETIMEDOUT:
1685                 task->tk_action = call_timeout;
1686                 return;
1687         case -ECONNREFUSED:
1688         case -ECONNRESET:
1689         case -ENETUNREACH:
1690                 if (RPC_IS_SOFTCONN(task))
1691                         break;
1692                 /* retry with existing socket, after a delay */
1693         case 0:
1694         case -EAGAIN:
1695                 task->tk_status = 0;
1696                 clnt->cl_stats->netreconn++;
1697                 task->tk_action = call_transmit;
1698                 return;
1699         }
1700         rpc_exit(task, status);
1701 }
1702
1703 /*
1704  * 5.   Transmit the RPC request, and wait for reply
1705  */
1706 static void
1707 call_transmit(struct rpc_task *task)
1708 {
1709         dprint_status(task);
1710
1711         task->tk_action = call_status;
1712         if (task->tk_status < 0)
1713                 return;
1714         task->tk_status = xprt_prepare_transmit(task);
1715         if (task->tk_status != 0)
1716                 return;
1717         task->tk_action = call_transmit_status;
1718         /* Encode here so that rpcsec_gss can use correct sequence number. */
1719         if (rpc_task_need_encode(task)) {
1720                 rpc_xdr_encode(task);
1721                 /* Did the encode result in an error condition? */
1722                 if (task->tk_status != 0) {
1723                         /* Was the error nonfatal? */
1724                         if (task->tk_status == -EAGAIN)
1725                                 rpc_delay(task, HZ >> 4);
1726                         else
1727                                 rpc_exit(task, task->tk_status);
1728                         return;
1729                 }
1730         }
1731         xprt_transmit(task);
1732         if (task->tk_status < 0)
1733                 return;
1734         /*
1735          * On success, ensure that we call xprt_end_transmit() before sleeping
1736          * in order to allow access to the socket to other RPC requests.
1737          */
1738         call_transmit_status(task);
1739         if (rpc_reply_expected(task))
1740                 return;
1741         task->tk_action = rpc_exit_task;
1742         rpc_wake_up_queued_task(&task->tk_rqstp->rq_xprt->pending, task);
1743 }
1744
1745 /*
1746  * 5a.  Handle cleanup after a transmission
1747  */
1748 static void
1749 call_transmit_status(struct rpc_task *task)
1750 {
1751         task->tk_action = call_status;
1752
1753         /*
1754          * Common case: success.  Force the compiler to put this
1755          * test first.
1756          */
1757         if (task->tk_status == 0) {
1758                 xprt_end_transmit(task);
1759                 rpc_task_force_reencode(task);
1760                 return;
1761         }
1762
1763         switch (task->tk_status) {
1764         case -EAGAIN:
1765                 break;
1766         default:
1767                 dprint_status(task);
1768                 xprt_end_transmit(task);
1769                 rpc_task_force_reencode(task);
1770                 break;
1771                 /*
1772                  * Special cases: if we've been waiting on the
1773                  * socket's write_space() callback, or if the
1774                  * socket just returned a connection error,
1775                  * then hold onto the transport lock.
1776                  */
1777         case -ECONNREFUSED:
1778         case -EHOSTDOWN:
1779         case -EHOSTUNREACH:
1780         case -ENETUNREACH:
1781                 if (RPC_IS_SOFTCONN(task)) {
1782                         xprt_end_transmit(task);
1783                         rpc_exit(task, task->tk_status);
1784                         break;
1785                 }
1786         case -ECONNRESET:
1787         case -ENOTCONN:
1788         case -EPIPE:
1789                 rpc_task_force_reencode(task);
1790         }
1791 }
1792
1793 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
1794 /*
1795  * 5b.  Send the backchannel RPC reply.  On error, drop the reply.  In
1796  * addition, disconnect on connectivity errors.
1797  */
1798 static void
1799 call_bc_transmit(struct rpc_task *task)
1800 {
1801         struct rpc_rqst *req = task->tk_rqstp;
1802
1803         task->tk_status = xprt_prepare_transmit(task);
1804         if (task->tk_status == -EAGAIN) {
1805                 /*
1806                  * Could not reserve the transport. Try again after the
1807                  * transport is released.
1808                  */
1809                 task->tk_status = 0;
1810                 task->tk_action = call_bc_transmit;
1811                 return;
1812         }
1813
1814         task->tk_action = rpc_exit_task;
1815         if (task->tk_status < 0) {
1816                 printk(KERN_NOTICE "RPC: Could not send backchannel reply "
1817                         "error: %d\n", task->tk_status);
1818                 return;
1819         }
1820
1821         xprt_transmit(task);
1822         xprt_end_transmit(task);
1823         dprint_status(task);
1824         switch (task->tk_status) {
1825         case 0:
1826                 /* Success */
1827                 break;
1828         case -EHOSTDOWN:
1829         case -EHOSTUNREACH:
1830         case -ENETUNREACH:
1831         case -ETIMEDOUT:
1832                 /*
1833                  * Problem reaching the server.  Disconnect and let the
1834                  * forechannel reestablish the connection.  The server will
1835                  * have to retransmit the backchannel request and we'll
1836                  * reprocess it.  Since these ops are idempotent, there's no
1837                  * need to cache our reply at this time.
1838                  */
1839                 printk(KERN_NOTICE "RPC: Could not send backchannel reply "
1840                         "error: %d\n", task->tk_status);
1841                 xprt_conditional_disconnect(req->rq_xprt,
1842                         req->rq_connect_cookie);
1843                 break;
1844         default:
1845                 /*
1846                  * We were unable to reply and will have to drop the
1847                  * request.  The server should reconnect and retransmit.
1848                  */
1849                 WARN_ON_ONCE(task->tk_status == -EAGAIN);
1850                 printk(KERN_NOTICE "RPC: Could not send backchannel reply "
1851                         "error: %d\n", task->tk_status);
1852                 break;
1853         }
1854         rpc_wake_up_queued_task(&req->rq_xprt->pending, task);
1855 }
1856 #endif /* CONFIG_SUNRPC_BACKCHANNEL */
1857
1858 /*
1859  * 6.   Sort out the RPC call status
1860  */
1861 static void
1862 call_status(struct rpc_task *task)
1863 {
1864         struct rpc_clnt *clnt = task->tk_client;
1865         struct rpc_rqst *req = task->tk_rqstp;
1866         int             status;
1867
1868         if (req->rq_reply_bytes_recvd > 0 && !req->rq_bytes_sent)
1869                 task->tk_status = req->rq_reply_bytes_recvd;
1870
1871         dprint_status(task);
1872
1873         status = task->tk_status;
1874         if (status >= 0) {
1875                 task->tk_action = call_decode;
1876                 return;
1877         }
1878
1879         trace_rpc_call_status(task);
1880         task->tk_status = 0;
1881         switch(status) {
1882         case -EHOSTDOWN:
1883         case -EHOSTUNREACH:
1884         case -ENETUNREACH:
1885                 /*
1886                  * Delay any retries for 3 seconds, then handle as if it
1887                  * were a timeout.
1888                  */
1889                 rpc_delay(task, 3*HZ);
1890         case -ETIMEDOUT:
1891                 task->tk_action = call_timeout;
1892                 if (task->tk_client->cl_discrtry)
1893                         xprt_conditional_disconnect(req->rq_xprt,
1894                                         req->rq_connect_cookie);
1895                 break;
1896         case -ECONNRESET:
1897         case -ECONNREFUSED:
1898                 rpc_force_rebind(clnt);
1899                 rpc_delay(task, 3*HZ);
1900         case -EPIPE:
1901         case -ENOTCONN:
1902                 task->tk_action = call_bind;
1903                 break;
1904         case -EAGAIN:
1905                 task->tk_action = call_transmit;
1906                 break;
1907         case -EIO:
1908                 /* shutdown or soft timeout */
1909                 rpc_exit(task, status);
1910                 break;
1911         default:
1912                 if (clnt->cl_chatty)
1913                         printk("%s: RPC call returned error %d\n",
1914                                clnt->cl_protname, -status);
1915                 rpc_exit(task, status);
1916         }
1917 }
1918
1919 /*
1920  * 6a.  Handle RPC timeout
1921  *      We do not release the request slot, so we keep using the
1922  *      same XID for all retransmits.
1923  */
1924 static void
1925 call_timeout(struct rpc_task *task)
1926 {
1927         struct rpc_clnt *clnt = task->tk_client;
1928
1929         if (xprt_adjust_timeout(task->tk_rqstp) == 0) {
1930                 dprintk("RPC: %5u call_timeout (minor)\n", task->tk_pid);
1931                 goto retry;
1932         }
1933
1934         dprintk("RPC: %5u call_timeout (major)\n", task->tk_pid);
1935         task->tk_timeouts++;
1936
1937         if (RPC_IS_SOFTCONN(task)) {
1938                 rpc_exit(task, -ETIMEDOUT);
1939                 return;
1940         }
1941         if (RPC_IS_SOFT(task)) {
1942                 if (clnt->cl_chatty) {
1943                         rcu_read_lock();
1944                         printk(KERN_NOTICE "%s: server %s not responding, timed out\n",
1945                                 clnt->cl_protname,
1946                                 rcu_dereference(clnt->cl_xprt)->servername);
1947                         rcu_read_unlock();
1948                 }
1949                 if (task->tk_flags & RPC_TASK_TIMEOUT)
1950                         rpc_exit(task, -ETIMEDOUT);
1951                 else
1952                         rpc_exit(task, -EIO);
1953                 return;
1954         }
1955
1956         if (!(task->tk_flags & RPC_CALL_MAJORSEEN)) {
1957                 task->tk_flags |= RPC_CALL_MAJORSEEN;
1958                 if (clnt->cl_chatty) {
1959                         rcu_read_lock();
1960                         printk(KERN_NOTICE "%s: server %s not responding, still trying\n",
1961                         clnt->cl_protname,
1962                         rcu_dereference(clnt->cl_xprt)->servername);
1963                         rcu_read_unlock();
1964                 }
1965         }
1966         rpc_force_rebind(clnt);
1967         /*
1968          * Did our request time out due to an RPCSEC_GSS out-of-sequence
1969          * event? RFC2203 requires the server to drop all such requests.
1970          */
1971         rpcauth_invalcred(task);
1972
1973 retry:
1974         clnt->cl_stats->rpcretrans++;
1975         task->tk_action = call_bind;
1976         task->tk_status = 0;
1977 }
1978
1979 /*
1980  * 7.   Decode the RPC reply
1981  */
1982 static void
1983 call_decode(struct rpc_task *task)
1984 {
1985         struct rpc_clnt *clnt = task->tk_client;
1986         struct rpc_rqst *req = task->tk_rqstp;
1987         kxdrdproc_t     decode = task->tk_msg.rpc_proc->p_decode;
1988         __be32          *p;
1989
1990         dprint_status(task);
1991
1992         if (task->tk_flags & RPC_CALL_MAJORSEEN) {
1993                 if (clnt->cl_chatty) {
1994                         rcu_read_lock();
1995                         printk(KERN_NOTICE "%s: server %s OK\n",
1996                                 clnt->cl_protname,
1997                                 rcu_dereference(clnt->cl_xprt)->servername);
1998                         rcu_read_unlock();
1999                 }
2000                 task->tk_flags &= ~RPC_CALL_MAJORSEEN;
2001         }
2002
2003         /*
2004          * Ensure that we see all writes made by xprt_complete_rqst()
2005          * before it changed req->rq_reply_bytes_recvd.
2006          */
2007         smp_rmb();
2008         req->rq_rcv_buf.len = req->rq_private_buf.len;
2009
2010         /* Check that the softirq receive buffer is valid */
2011         WARN_ON(memcmp(&req->rq_rcv_buf, &req->rq_private_buf,
2012                                 sizeof(req->rq_rcv_buf)) != 0);
2013
2014         if (req->rq_rcv_buf.len < 12) {
2015                 if (!RPC_IS_SOFT(task)) {
2016                         task->tk_action = call_bind;
2017                         clnt->cl_stats->rpcretrans++;
2018                         goto out_retry;
2019                 }
2020                 dprintk("RPC:       %s: too small RPC reply size (%d bytes)\n",
2021                                 clnt->cl_protname, task->tk_status);
2022                 task->tk_action = call_timeout;
2023                 goto out_retry;
2024         }
2025
2026         p = rpc_verify_header(task);
2027         if (IS_ERR(p)) {
2028                 if (p == ERR_PTR(-EAGAIN))
2029                         goto out_retry;
2030                 return;
2031         }
2032
2033         task->tk_action = rpc_exit_task;
2034
2035         if (decode) {
2036                 task->tk_status = rpcauth_unwrap_resp(task, decode, req, p,
2037                                                       task->tk_msg.rpc_resp);
2038         }
2039         dprintk("RPC: %5u call_decode result %d\n", task->tk_pid,
2040                         task->tk_status);
2041         return;
2042 out_retry:
2043         task->tk_status = 0;
2044         /* Note: rpc_verify_header() may have freed the RPC slot */
2045         if (task->tk_rqstp == req) {
2046                 req->rq_reply_bytes_recvd = req->rq_rcv_buf.len = 0;
2047                 if (task->tk_client->cl_discrtry)
2048                         xprt_conditional_disconnect(req->rq_xprt,
2049                                         req->rq_connect_cookie);
2050         }
2051 }
2052
2053 static __be32 *
2054 rpc_encode_header(struct rpc_task *task)
2055 {
2056         struct rpc_clnt *clnt = task->tk_client;
2057         struct rpc_rqst *req = task->tk_rqstp;
2058         __be32          *p = req->rq_svec[0].iov_base;
2059
2060         /* FIXME: check buffer size? */
2061
2062         p = xprt_skip_transport_header(req->rq_xprt, p);
2063         *p++ = req->rq_xid;             /* XID */
2064         *p++ = htonl(RPC_CALL);         /* CALL */
2065         *p++ = htonl(RPC_VERSION);      /* RPC version */
2066         *p++ = htonl(clnt->cl_prog);    /* program number */
2067         *p++ = htonl(clnt->cl_vers);    /* program version */
2068         *p++ = htonl(task->tk_msg.rpc_proc->p_proc);    /* procedure */
2069         p = rpcauth_marshcred(task, p);
2070         req->rq_slen = xdr_adjust_iovec(&req->rq_svec[0], p);
2071         return p;
2072 }
2073
2074 static __be32 *
2075 rpc_verify_header(struct rpc_task *task)
2076 {
2077         struct rpc_clnt *clnt = task->tk_client;
2078         struct kvec *iov = &task->tk_rqstp->rq_rcv_buf.head[0];
2079         int len = task->tk_rqstp->rq_rcv_buf.len >> 2;
2080         __be32  *p = iov->iov_base;
2081         u32 n;
2082         int error = -EACCES;
2083
2084         if ((task->tk_rqstp->rq_rcv_buf.len & 3) != 0) {
2085                 /* RFC-1014 says that the representation of XDR data must be a
2086                  * multiple of four bytes
2087                  * - if it isn't pointer subtraction in the NFS client may give
2088                  *   undefined results
2089                  */
2090                 dprintk("RPC: %5u %s: XDR representation not a multiple of"
2091                        " 4 bytes: 0x%x\n", task->tk_pid, __func__,
2092                        task->tk_rqstp->rq_rcv_buf.len);
2093                 goto out_eio;
2094         }
2095         if ((len -= 3) < 0)
2096                 goto out_overflow;
2097
2098         p += 1; /* skip XID */
2099         if ((n = ntohl(*p++)) != RPC_REPLY) {
2100                 dprintk("RPC: %5u %s: not an RPC reply: %x\n",
2101                         task->tk_pid, __func__, n);
2102                 goto out_garbage;
2103         }
2104
2105         if ((n = ntohl(*p++)) != RPC_MSG_ACCEPTED) {
2106                 if (--len < 0)
2107                         goto out_overflow;
2108                 switch ((n = ntohl(*p++))) {
2109                 case RPC_AUTH_ERROR:
2110                         break;
2111                 case RPC_MISMATCH:
2112                         dprintk("RPC: %5u %s: RPC call version mismatch!\n",
2113                                 task->tk_pid, __func__);
2114                         error = -EPROTONOSUPPORT;
2115                         goto out_err;
2116                 default:
2117                         dprintk("RPC: %5u %s: RPC call rejected, "
2118                                 "unknown error: %x\n",
2119                                 task->tk_pid, __func__, n);
2120                         goto out_eio;
2121                 }
2122                 if (--len < 0)
2123                         goto out_overflow;
2124                 switch ((n = ntohl(*p++))) {
2125                 case RPC_AUTH_REJECTEDCRED:
2126                 case RPC_AUTH_REJECTEDVERF:
2127                 case RPCSEC_GSS_CREDPROBLEM:
2128                 case RPCSEC_GSS_CTXPROBLEM:
2129                         if (!task->tk_cred_retry)
2130                                 break;
2131                         task->tk_cred_retry--;
2132                         dprintk("RPC: %5u %s: retry stale creds\n",
2133                                         task->tk_pid, __func__);
2134                         rpcauth_invalcred(task);
2135                         /* Ensure we obtain a new XID! */
2136                         xprt_release(task);
2137                         task->tk_action = call_reserve;
2138                         goto out_retry;
2139                 case RPC_AUTH_BADCRED:
2140                 case RPC_AUTH_BADVERF:
2141                         /* possibly garbled cred/verf? */
2142                         if (!task->tk_garb_retry)
2143                                 break;
2144                         task->tk_garb_retry--;
2145                         dprintk("RPC: %5u %s: retry garbled creds\n",
2146                                         task->tk_pid, __func__);
2147                         task->tk_action = call_bind;
2148                         goto out_retry;
2149                 case RPC_AUTH_TOOWEAK:
2150                         rcu_read_lock();
2151                         printk(KERN_NOTICE "RPC: server %s requires stronger "
2152                                "authentication.\n",
2153                                rcu_dereference(clnt->cl_xprt)->servername);
2154                         rcu_read_unlock();
2155                         break;
2156                 default:
2157                         dprintk("RPC: %5u %s: unknown auth error: %x\n",
2158                                         task->tk_pid, __func__, n);
2159                         error = -EIO;
2160                 }
2161                 dprintk("RPC: %5u %s: call rejected %d\n",
2162                                 task->tk_pid, __func__, n);
2163                 goto out_err;
2164         }
2165         if (!(p = rpcauth_checkverf(task, p))) {
2166                 dprintk("RPC: %5u %s: auth check failed\n",
2167                                 task->tk_pid, __func__);
2168                 goto out_garbage;               /* bad verifier, retry */
2169         }
2170         len = p - (__be32 *)iov->iov_base - 1;
2171         if (len < 0)
2172                 goto out_overflow;
2173         switch ((n = ntohl(*p++))) {
2174         case RPC_SUCCESS:
2175                 return p;
2176         case RPC_PROG_UNAVAIL:
2177                 dprintk_rcu("RPC: %5u %s: program %u is unsupported "
2178                                 "by server %s\n", task->tk_pid, __func__,
2179                                 (unsigned int)clnt->cl_prog,
2180                                 rcu_dereference(clnt->cl_xprt)->servername);
2181                 error = -EPFNOSUPPORT;
2182                 goto out_err;
2183         case RPC_PROG_MISMATCH:
2184                 dprintk_rcu("RPC: %5u %s: program %u, version %u unsupported "
2185                                 "by server %s\n", task->tk_pid, __func__,
2186                                 (unsigned int)clnt->cl_prog,
2187                                 (unsigned int)clnt->cl_vers,
2188                                 rcu_dereference(clnt->cl_xprt)->servername);
2189                 error = -EPROTONOSUPPORT;
2190                 goto out_err;
2191         case RPC_PROC_UNAVAIL:
2192                 dprintk_rcu("RPC: %5u %s: proc %s unsupported by program %u, "
2193                                 "version %u on server %s\n",
2194                                 task->tk_pid, __func__,
2195                                 rpc_proc_name(task),
2196                                 clnt->cl_prog, clnt->cl_vers,
2197                                 rcu_dereference(clnt->cl_xprt)->servername);
2198                 error = -EOPNOTSUPP;
2199                 goto out_err;
2200         case RPC_GARBAGE_ARGS:
2201                 dprintk("RPC: %5u %s: server saw garbage\n",
2202                                 task->tk_pid, __func__);
2203                 break;                  /* retry */
2204         default:
2205                 dprintk("RPC: %5u %s: server accept status: %x\n",
2206                                 task->tk_pid, __func__, n);
2207                 /* Also retry */
2208         }
2209
2210 out_garbage:
2211         clnt->cl_stats->rpcgarbage++;
2212         if (task->tk_garb_retry) {
2213                 task->tk_garb_retry--;
2214                 dprintk("RPC: %5u %s: retrying\n",
2215                                 task->tk_pid, __func__);
2216                 task->tk_action = call_bind;
2217 out_retry:
2218                 return ERR_PTR(-EAGAIN);
2219         }
2220 out_eio:
2221         error = -EIO;
2222 out_err:
2223         rpc_exit(task, error);
2224         dprintk("RPC: %5u %s: call failed with error %d\n", task->tk_pid,
2225                         __func__, error);
2226         return ERR_PTR(error);
2227 out_overflow:
2228         dprintk("RPC: %5u %s: server reply was truncated.\n", task->tk_pid,
2229                         __func__);
2230         goto out_garbage;
2231 }
2232
2233 static void rpcproc_encode_null(void *rqstp, struct xdr_stream *xdr, void *obj)
2234 {
2235 }
2236
2237 static int rpcproc_decode_null(void *rqstp, struct xdr_stream *xdr, void *obj)
2238 {
2239         return 0;
2240 }
2241
2242 static struct rpc_procinfo rpcproc_null = {
2243         .p_encode = rpcproc_encode_null,
2244         .p_decode = rpcproc_decode_null,
2245 };
2246
2247 static int rpc_ping(struct rpc_clnt *clnt)
2248 {
2249         struct rpc_message msg = {
2250                 .rpc_proc = &rpcproc_null,
2251         };
2252         int err;
2253         msg.rpc_cred = authnull_ops.lookup_cred(NULL, NULL, 0);
2254         err = rpc_call_sync(clnt, &msg, RPC_TASK_SOFT | RPC_TASK_SOFTCONN);
2255         put_rpccred(msg.rpc_cred);
2256         return err;
2257 }
2258
2259 struct rpc_task *rpc_call_null(struct rpc_clnt *clnt, struct rpc_cred *cred, int flags)
2260 {
2261         struct rpc_message msg = {
2262                 .rpc_proc = &rpcproc_null,
2263                 .rpc_cred = cred,
2264         };
2265         struct rpc_task_setup task_setup_data = {
2266                 .rpc_client = clnt,
2267                 .rpc_message = &msg,
2268                 .callback_ops = &rpc_default_ops,
2269                 .flags = flags,
2270         };
2271         return rpc_run_task(&task_setup_data);
2272 }
2273 EXPORT_SYMBOL_GPL(rpc_call_null);
2274
2275 #ifdef RPC_DEBUG
2276 static void rpc_show_header(void)
2277 {
2278         printk(KERN_INFO "-pid- flgs status -client- --rqstp- "
2279                 "-timeout ---ops--\n");
2280 }
2281
2282 static void rpc_show_task(const struct rpc_clnt *clnt,
2283                           const struct rpc_task *task)
2284 {
2285         const char *rpc_waitq = "none";
2286
2287         if (RPC_IS_QUEUED(task))
2288                 rpc_waitq = rpc_qname(task->tk_waitqueue);
2289
2290         printk(KERN_INFO "%5u %04x %6d %8p %8p %8ld %8p %sv%u %s a:%ps q:%s\n",
2291                 task->tk_pid, task->tk_flags, task->tk_status,
2292                 clnt, task->tk_rqstp, task->tk_timeout, task->tk_ops,
2293                 clnt->cl_protname, clnt->cl_vers, rpc_proc_name(task),
2294                 task->tk_action, rpc_waitq);
2295 }
2296
2297 void rpc_show_tasks(struct net *net)
2298 {
2299         struct rpc_clnt *clnt;
2300         struct rpc_task *task;
2301         int header = 0;
2302         struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
2303
2304         spin_lock(&sn->rpc_client_lock);
2305         list_for_each_entry(clnt, &sn->all_clients, cl_clients) {
2306                 spin_lock(&clnt->cl_lock);
2307                 list_for_each_entry(task, &clnt->cl_tasks, tk_task) {
2308                         if (!header) {
2309                                 rpc_show_header();
2310                                 header++;
2311                         }
2312                         rpc_show_task(clnt, task);
2313                 }
2314                 spin_unlock(&clnt->cl_lock);
2315         }
2316         spin_unlock(&sn->rpc_client_lock);
2317 }
2318 #endif