ferron/optional_modules/
fcgi.rs

1// FastCGI handler code inspired by SVR.JS's GreenRhombus mod, translated from JavaScript to Rust.
2// Based on the "cgi" and "scgi" module
3use std::env;
4use std::error::Error;
5use std::path::{Path, PathBuf};
6use std::sync::Arc;
7use std::time::Duration;
8
9use crate::ferron_common::{
10  ErrorLogger, HyperRequest, HyperResponse, RequestData, ResponseData, ServerConfig, ServerModule,
11  ServerModuleHandlers, SocketData,
12};
13use crate::ferron_common::{HyperUpgraded, WithRuntime};
14use async_trait::async_trait;
15use futures_util::future::Either;
16use futures_util::TryStreamExt;
17use hashlink::LinkedHashMap;
18use http_body_util::{BodyExt, StreamBody};
19use httparse::EMPTY_HEADER;
20use hyper::body::{Bytes, Frame};
21use hyper::{header, Response, StatusCode};
22use hyper_tungstenite::HyperWebsocket;
23use tokio::fs;
24use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
25use tokio::net::TcpStream;
26use tokio::runtime::Handle;
27use tokio::sync::RwLock;
28use tokio_util::codec::{FramedRead, FramedWrite};
29use tokio_util::io::{ReaderStream, SinkWriter, StreamReader};
30
31use crate::ferron_res::server_software::SERVER_SOFTWARE;
32use crate::ferron_util::cgi_response::CgiResponse;
33use crate::ferron_util::fcgi_decoder::{FcgiDecodedData, FcgiDecoder};
34use crate::ferron_util::fcgi_encoder::FcgiEncoder;
35use crate::ferron_util::fcgi_name_value_pair::construct_fastcgi_name_value_pair;
36use crate::ferron_util::fcgi_record::construct_fastcgi_record;
37use crate::ferron_util::split_stream_by_map::SplitStreamByMapExt;
38use crate::ferron_util::ttl_cache::TtlCache;
39
40pub fn server_module_init(
41  _config: &ServerConfig,
42) -> Result<Box<dyn ServerModule + Send + Sync>, Box<dyn Error + Send + Sync>> {
43  let cache = Arc::new(RwLock::new(TtlCache::new(Duration::from_millis(100))));
44  Ok(Box::new(FcgiModule::new(cache)))
45}
46
47#[allow(clippy::type_complexity)]
48struct FcgiModule {
49  path_cache: Arc<RwLock<TtlCache<String, (Option<PathBuf>, Option<String>)>>>,
50}
51
52impl FcgiModule {
53  #[allow(clippy::type_complexity)]
54  fn new(path_cache: Arc<RwLock<TtlCache<String, (Option<PathBuf>, Option<String>)>>>) -> Self {
55    Self { path_cache }
56  }
57}
58
59impl ServerModule for FcgiModule {
60  fn get_handlers(&self, handle: Handle) -> Box<dyn ServerModuleHandlers + Send> {
61    Box::new(FcgiModuleHandlers {
62      path_cache: self.path_cache.clone(),
63      handle,
64    })
65  }
66}
67
68#[allow(clippy::type_complexity)]
69struct FcgiModuleHandlers {
70  handle: Handle,
71  path_cache: Arc<RwLock<TtlCache<String, (Option<PathBuf>, Option<String>)>>>,
72}
73
74#[async_trait]
75impl ServerModuleHandlers for FcgiModuleHandlers {
76  async fn request_handler(
77    &mut self,
78    request: RequestData,
79    config: &ServerConfig,
80    socket_data: &SocketData,
81    error_logger: &ErrorLogger,
82  ) -> Result<ResponseData, Box<dyn Error + Send + Sync>> {
83    WithRuntime::new(self.handle.clone(), async move {
84      let mut fastcgi_script_exts = Vec::new();
85
86      let fastcgi_script_exts_yaml = &config["fcgiScriptExtensions"];
87      if let Some(fastcgi_script_exts_obtained) = fastcgi_script_exts_yaml.as_vec() {
88        for fastcgi_script_ext_yaml in fastcgi_script_exts_obtained.iter() {
89          if let Some(fastcgi_script_ext) = fastcgi_script_ext_yaml.as_str() {
90            fastcgi_script_exts.push(fastcgi_script_ext);
91          }
92        }
93      }
94
95      let mut fastcgi_to = "tcp://localhost:4000/";
96      let fastcgi_to_yaml = &config["fcgiTo"];
97      if let Some(fastcgi_to_obtained) = fastcgi_to_yaml.as_str() {
98        fastcgi_to = fastcgi_to_obtained;
99      }
100
101      let mut fastcgi_path = None;
102      if let Some(fastcgi_path_obtained) = config["fcgiPath"].as_str() {
103        fastcgi_path = Some(fastcgi_path_obtained.to_string());
104      }
105
106      let hyper_request = request.get_hyper_request();
107
108      let request_path = hyper_request.uri().path();
109      let mut request_path_bytes = request_path.bytes();
110      if request_path_bytes.len() < 1 || request_path_bytes.nth(0) != Some(b'/') {
111        return Ok(
112          ResponseData::builder(request)
113            .status(StatusCode::BAD_REQUEST)
114            .build(),
115        );
116      }
117
118      let mut execute_pathbuf = None;
119      let mut execute_path_info = None;
120      let mut wwwroot_detected = None;
121
122      if let Some(fastcgi_path) = fastcgi_path {
123        let mut canonical_fastcgi_path: &str = &fastcgi_path;
124        if canonical_fastcgi_path.bytes().last() == Some(b'/') {
125          canonical_fastcgi_path = &canonical_fastcgi_path[..(canonical_fastcgi_path.len() - 1)];
126        }
127
128        let request_path_with_slashes = match request_path == canonical_fastcgi_path {
129          true => format!("{request_path}/"),
130          false => request_path.to_string(),
131        };
132        if let Some(stripped_request_path) =
133          request_path_with_slashes.strip_prefix(canonical_fastcgi_path)
134        {
135          let wwwroot_yaml = &config["wwwroot"];
136          let wwwroot = wwwroot_yaml.as_str().unwrap_or("/nonexistent");
137
138          let wwwroot_unknown = PathBuf::from(wwwroot);
139          let wwwroot_pathbuf = match wwwroot_unknown.as_path().is_absolute() {
140            true => wwwroot_unknown,
141            false => match fs::canonicalize(&wwwroot_unknown).await {
142              Ok(pathbuf) => pathbuf,
143              Err(_) => wwwroot_unknown,
144            },
145          };
146          wwwroot_detected = Some(wwwroot_pathbuf.clone());
147          let wwwroot = wwwroot_pathbuf.as_path();
148
149          let mut relative_path = &request_path[1..];
150          while relative_path.as_bytes().first().copied() == Some(b'/') {
151            relative_path = &relative_path[1..];
152          }
153
154          let decoded_relative_path = match urlencoding::decode(relative_path) {
155            Ok(path) => path.to_string(),
156            Err(_) => {
157              return Ok(
158                ResponseData::builder(request)
159                  .status(StatusCode::BAD_REQUEST)
160                  .build(),
161              );
162            }
163          };
164
165          let joined_pathbuf = wwwroot.join(decoded_relative_path);
166          execute_pathbuf = Some(joined_pathbuf);
167          execute_path_info = stripped_request_path
168            .strip_prefix("/")
169            .map(|s| s.to_string());
170        }
171      }
172
173      if execute_pathbuf.is_none() {
174        if let Some(wwwroot) = config["wwwroot"].as_str() {
175          let cache_key = format!(
176            "{}{}{}",
177            match config["ip"].as_str() {
178              Some(ip) => format!("{ip}-"),
179              None => String::from(""),
180            },
181            match config["domain"].as_str() {
182              Some(domain) => format!("{domain}-"),
183              None => String::from(""),
184            },
185            request_path
186          );
187
188          let wwwroot_unknown = PathBuf::from(wwwroot);
189          let wwwroot_pathbuf = match wwwroot_unknown.as_path().is_absolute() {
190            true => wwwroot_unknown,
191            false => match fs::canonicalize(&wwwroot_unknown).await {
192              Ok(pathbuf) => pathbuf,
193              Err(_) => wwwroot_unknown,
194            },
195          };
196          wwwroot_detected = Some(wwwroot_pathbuf.clone());
197          let wwwroot = wwwroot_pathbuf.as_path();
198
199          let read_rwlock = self.path_cache.read().await;
200          let (execute_pathbuf_got, execute_path_info_got) = match read_rwlock.get(&cache_key) {
201            Some(data) => {
202              drop(read_rwlock);
203              data
204            }
205            None => {
206              drop(read_rwlock);
207              let mut relative_path = &request_path[1..];
208              while relative_path.as_bytes().first().copied() == Some(b'/') {
209                relative_path = &relative_path[1..];
210              }
211
212              let decoded_relative_path = match urlencoding::decode(relative_path) {
213                Ok(path) => path.to_string(),
214                Err(_) => {
215                  return Ok(
216                    ResponseData::builder(request)
217                      .status(StatusCode::BAD_REQUEST)
218                      .build(),
219                  );
220                }
221              };
222
223              let joined_pathbuf = wwwroot.join(decoded_relative_path);
224              let mut execute_pathbuf: Option<PathBuf> = None;
225              let mut execute_path_info: Option<String> = None;
226
227              match fs::metadata(&joined_pathbuf).await {
228                Ok(metadata) => {
229                  if metadata.is_file() {
230                    let contained_extension = joined_pathbuf
231                      .extension()
232                      .map(|a| format!(".{}", a.to_string_lossy()));
233                    if let Some(contained_extension) = contained_extension {
234                      if fastcgi_script_exts.contains(&(&contained_extension as &str)) {
235                        execute_pathbuf = Some(joined_pathbuf);
236                      }
237                    }
238                  } else if metadata.is_dir() {
239                    let indexes = vec!["index.php", "index.cgi"];
240                    for index in indexes {
241                      let temp_joined_pathbuf = joined_pathbuf.join(index);
242                      match fs::metadata(&temp_joined_pathbuf).await {
243                        Ok(temp_metadata) => {
244                          if temp_metadata.is_file() {
245                            let contained_extension = temp_joined_pathbuf
246                              .extension()
247                              .map(|a| format!(".{}", a.to_string_lossy()));
248                            if let Some(contained_extension) = contained_extension {
249                              if fastcgi_script_exts.contains(&(&contained_extension as &str)) {
250                                execute_pathbuf = Some(temp_joined_pathbuf);
251                                break;
252                              }
253                            }
254                          }
255                        }
256                        Err(_) => continue,
257                      };
258                    }
259                  }
260                }
261                Err(err) => {
262                  if err.kind() == tokio::io::ErrorKind::NotADirectory {
263                    let mut temp_pathbuf = joined_pathbuf.clone();
264                    loop {
265                      if !temp_pathbuf.pop() {
266                        break;
267                      }
268                      match fs::metadata(&temp_pathbuf).await {
269                        Ok(metadata) => {
270                          if metadata.is_file() {
271                            let temp_path = temp_pathbuf.as_path();
272                            if !temp_path.starts_with(wwwroot) {
273                              // Traversed above the webroot, so ignore that.
274                              break;
275                            }
276                            let path_info = match joined_pathbuf.as_path().strip_prefix(temp_path) {
277                              Ok(path) => {
278                                let path = path.to_string_lossy().to_string();
279                                Some(match cfg!(windows) {
280                                  true => path.replace("\\", "/"),
281                                  false => path,
282                                })
283                              }
284                              Err(_) => None,
285                            };
286                            let mut request_path_normalized = match cfg!(windows) {
287                              true => request_path.to_lowercase(),
288                              false => request_path.to_string(),
289                            };
290                            while request_path_normalized.contains("//") {
291                              request_path_normalized = request_path_normalized.replace("//", "/");
292                            }
293                            if request_path_normalized == "/cgi-bin"
294                              || request_path_normalized.starts_with("/cgi-bin/")
295                            {
296                              execute_pathbuf = Some(temp_pathbuf);
297                              execute_path_info = path_info;
298                              break;
299                            } else {
300                              let contained_extension = temp_pathbuf
301                                .extension()
302                                .map(|a| format!(".{}", a.to_string_lossy()));
303                              if let Some(contained_extension) = contained_extension {
304                                if fastcgi_script_exts.contains(&(&contained_extension as &str)) {
305                                  execute_pathbuf = Some(temp_pathbuf);
306                                  execute_path_info = path_info;
307                                  break;
308                                }
309                              }
310                            }
311                          } else {
312                            break;
313                          }
314                        }
315                        Err(err) => match err.kind() {
316                          tokio::io::ErrorKind::NotADirectory => (),
317                          _ => break,
318                        },
319                      };
320                    }
321                  }
322                }
323              };
324              let data = (execute_pathbuf, execute_path_info);
325
326              let mut write_rwlock = self.path_cache.write().await;
327              write_rwlock.cleanup();
328              write_rwlock.insert(cache_key, data.clone());
329              drop(write_rwlock);
330              data
331            }
332          };
333
334          execute_pathbuf = execute_pathbuf_got;
335          execute_path_info = execute_path_info_got;
336        }
337      }
338
339      if let Some(execute_pathbuf) = execute_pathbuf {
340        if let Some(wwwroot_detected) = wwwroot_detected {
341          return execute_fastcgi_with_environment_variables(
342            request,
343            socket_data,
344            error_logger,
345            wwwroot_detected.as_path(),
346            execute_pathbuf,
347            execute_path_info,
348            config["serverAdministratorEmail"].as_str(),
349            fastcgi_to,
350          )
351          .await;
352        }
353      }
354
355      Ok(ResponseData::builder(request).build())
356    })
357    .await
358  }
359
360  async fn proxy_request_handler(
361    &mut self,
362    request: RequestData,
363    _config: &ServerConfig,
364    _socket_data: &SocketData,
365    _error_logger: &ErrorLogger,
366  ) -> Result<ResponseData, Box<dyn Error + Send + Sync>> {
367    Ok(ResponseData::builder(request).build())
368  }
369
370  async fn response_modifying_handler(
371    &mut self,
372    response: HyperResponse,
373  ) -> Result<HyperResponse, Box<dyn Error + Send + Sync>> {
374    Ok(response)
375  }
376
377  async fn proxy_response_modifying_handler(
378    &mut self,
379    response: HyperResponse,
380  ) -> Result<HyperResponse, Box<dyn Error + Send + Sync>> {
381    Ok(response)
382  }
383
384  async fn connect_proxy_request_handler(
385    &mut self,
386    _upgraded_request: HyperUpgraded,
387    _connect_address: &str,
388    _config: &ServerConfig,
389    _socket_data: &SocketData,
390    _error_logger: &ErrorLogger,
391  ) -> Result<(), Box<dyn Error + Send + Sync>> {
392    Ok(())
393  }
394
395  fn does_connect_proxy_requests(&mut self) -> bool {
396    false
397  }
398
399  async fn websocket_request_handler(
400    &mut self,
401    _websocket: HyperWebsocket,
402    _uri: &hyper::Uri,
403    _headers: &hyper::HeaderMap,
404    _config: &ServerConfig,
405    _socket_data: &SocketData,
406    _error_logger: &ErrorLogger,
407  ) -> Result<(), Box<dyn Error + Send + Sync>> {
408    Ok(())
409  }
410
411  fn does_websocket_requests(&mut self, _config: &ServerConfig, _socket_data: &SocketData) -> bool {
412    false
413  }
414}
415
416#[allow(clippy::too_many_arguments)]
417async fn execute_fastcgi_with_environment_variables(
418  request: RequestData,
419  socket_data: &SocketData,
420  error_logger: &ErrorLogger,
421  wwwroot: &Path,
422  execute_pathbuf: PathBuf,
423  path_info: Option<String>,
424  server_administrator_email: Option<&str>,
425  fastcgi_to: &str,
426) -> Result<ResponseData, Box<dyn Error + Send + Sync>> {
427  let mut environment_variables: LinkedHashMap<String, String> = LinkedHashMap::new();
428
429  let hyper_request = request.get_hyper_request();
430  let original_request_uri = request.get_original_url().unwrap_or(hyper_request.uri());
431
432  if let Some(auth_user) = request.get_auth_user() {
433    if let Some(authorization) = hyper_request.headers().get(header::AUTHORIZATION) {
434      let authorization_value = String::from_utf8_lossy(authorization.as_bytes()).to_string();
435      let mut authorization_value_split = authorization_value.split(" ");
436      if let Some(authorization_type) = authorization_value_split.next() {
437        environment_variables.insert("AUTH_TYPE".to_string(), authorization_type.to_string());
438      }
439    }
440    environment_variables.insert("REMOTE_USER".to_string(), auth_user.to_string());
441  }
442
443  environment_variables.insert(
444    "QUERY_STRING".to_string(),
445    match hyper_request.uri().query() {
446      Some(query) => query.to_string(),
447      None => "".to_string(),
448    },
449  );
450
451  environment_variables.insert("SERVER_SOFTWARE".to_string(), SERVER_SOFTWARE.to_string());
452  environment_variables.insert(
453    "SERVER_PROTOCOL".to_string(),
454    match hyper_request.version() {
455      hyper::Version::HTTP_09 => "HTTP/0.9".to_string(),
456      hyper::Version::HTTP_10 => "HTTP/1.0".to_string(),
457      hyper::Version::HTTP_11 => "HTTP/1.1".to_string(),
458      hyper::Version::HTTP_2 => "HTTP/2.0".to_string(),
459      hyper::Version::HTTP_3 => "HTTP/3.0".to_string(),
460      _ => "HTTP/Unknown".to_string(),
461    },
462  );
463  environment_variables.insert(
464    "SERVER_PORT".to_string(),
465    socket_data.local_addr.port().to_string(),
466  );
467  environment_variables.insert(
468    "SERVER_ADDR".to_string(),
469    socket_data.local_addr.ip().to_canonical().to_string(),
470  );
471  if let Some(server_administrator_email) = server_administrator_email {
472    environment_variables.insert(
473      "SERVER_ADMIN".to_string(),
474      server_administrator_email.to_string(),
475    );
476  }
477  if let Some(host) = hyper_request.headers().get(header::HOST) {
478    environment_variables.insert(
479      "SERVER_NAME".to_string(),
480      String::from_utf8_lossy(host.as_bytes()).to_string(),
481    );
482  }
483
484  environment_variables.insert(
485    "DOCUMENT_ROOT".to_string(),
486    wwwroot.to_string_lossy().to_string(),
487  );
488  environment_variables.insert(
489    "PATH_INFO".to_string(),
490    match &path_info {
491      Some(path_info) => format!("/{path_info}"),
492      None => "".to_string(),
493    },
494  );
495  environment_variables.insert(
496    "PATH_TRANSLATED".to_string(),
497    match &path_info {
498      Some(path_info) => {
499        let mut path_translated = execute_pathbuf.clone();
500        path_translated.push(path_info);
501        path_translated.to_string_lossy().to_string()
502      }
503      None => "".to_string(),
504    },
505  );
506  environment_variables.insert(
507    "REQUEST_METHOD".to_string(),
508    hyper_request.method().to_string(),
509  );
510  environment_variables.insert("GATEWAY_INTERFACE".to_string(), "CGI/1.1".to_string());
511  environment_variables.insert(
512    "REQUEST_URI".to_string(),
513    format!(
514      "{}{}",
515      original_request_uri.path(),
516      match original_request_uri.query() {
517        Some(query) => format!("?{query}"),
518        None => String::from(""),
519      }
520    ),
521  );
522
523  environment_variables.insert(
524    "REMOTE_PORT".to_string(),
525    socket_data.remote_addr.port().to_string(),
526  );
527  environment_variables.insert(
528    "REMOTE_ADDR".to_string(),
529    socket_data.remote_addr.ip().to_canonical().to_string(),
530  );
531
532  environment_variables.insert(
533    "SCRIPT_FILENAME".to_string(),
534    execute_pathbuf.to_string_lossy().to_string(),
535  );
536  if let Ok(script_path) = execute_pathbuf.as_path().strip_prefix(wwwroot) {
537    environment_variables.insert(
538      "SCRIPT_NAME".to_string(),
539      format!(
540        "/{}",
541        match cfg!(windows) {
542          true => script_path.to_string_lossy().to_string().replace("\\", "/"),
543          false => script_path.to_string_lossy().to_string(),
544        }
545      ),
546    );
547  }
548
549  if socket_data.encrypted {
550    environment_variables.insert("HTTPS".to_string(), "on".to_string());
551  }
552
553  let mut content_length_set = false;
554  for (header_name, header_value) in hyper_request.headers().iter() {
555    let env_header_name = match *header_name {
556      header::CONTENT_LENGTH => {
557        content_length_set = true;
558        "CONTENT_LENGTH".to_string()
559      }
560      header::CONTENT_TYPE => "CONTENT_TYPE".to_string(),
561      _ => {
562        let mut result = String::new();
563
564        result.push_str("HTTP_");
565
566        for c in header_name.as_str().to_uppercase().chars() {
567          if c.is_alphanumeric() {
568            result.push(c);
569          } else {
570            result.push('_');
571          }
572        }
573
574        result
575      }
576    };
577    if environment_variables.contains_key(&env_header_name) {
578      let value = environment_variables.get_mut(&env_header_name);
579      if let Some(value) = value {
580        if env_header_name == "HTTP_COOKIE" {
581          value.push_str("; ");
582        } else {
583          // See https://stackoverflow.com/a/1801191
584          value.push_str(", ");
585        }
586        value.push_str(String::from_utf8_lossy(header_value.as_bytes()).as_ref());
587      } else {
588        environment_variables.insert(
589          env_header_name,
590          String::from_utf8_lossy(header_value.as_bytes()).to_string(),
591        );
592      }
593    } else {
594      environment_variables.insert(
595        env_header_name,
596        String::from_utf8_lossy(header_value.as_bytes()).to_string(),
597      );
598    }
599  }
600
601  if !content_length_set {
602    environment_variables.insert("CONTENT_LENGTH".to_string(), "0".to_string());
603  }
604
605  let (hyper_request, _, _, _) = request.into_parts();
606
607  execute_fastcgi(
608    hyper_request,
609    error_logger,
610    fastcgi_to,
611    environment_variables,
612  )
613  .await
614}
615
616async fn execute_fastcgi(
617  hyper_request: HyperRequest,
618  error_logger: &ErrorLogger,
619  fastcgi_to: &str,
620  mut environment_variables: LinkedHashMap<String, String>,
621) -> Result<ResponseData, Box<dyn Error + Send + Sync>> {
622  let (_, body) = hyper_request.into_parts();
623
624  // Insert other environment variables
625  for (key, value) in env::vars_os() {
626    let key_string = key.to_string_lossy().to_string();
627    let value_string = value.to_string_lossy().to_string();
628    environment_variables
629      .entry(key_string)
630      .or_insert(value_string);
631  }
632
633  let fastcgi_to_fixed = if let Some(stripped) = fastcgi_to.strip_prefix("unix:///") {
634    // hyper::Uri fails to parse a string if there is an empty authority, so add an "ignore" authority to Unix socket URLs
635    &format!("unix://ignore/{stripped}")
636  } else {
637    fastcgi_to
638  };
639
640  let fastcgi_to_url = fastcgi_to_fixed.parse::<hyper::Uri>()?;
641  let scheme_str = fastcgi_to_url.scheme_str();
642
643  let (socket_reader, mut socket_writer) = match scheme_str {
644    Some("tcp") => {
645      let host = match fastcgi_to_url.host() {
646        Some(host) => host,
647        None => Err(anyhow::anyhow!("The FastCGI URL doesn't include the host"))?,
648      };
649
650      let port = match fastcgi_to_url.port_u16() {
651        Some(port) => port,
652        None => Err(anyhow::anyhow!("The FastCGI URL doesn't include the port"))?,
653      };
654
655      let addr = format!("{host}:{port}");
656
657      match connect_tcp(&addr).await {
658        Ok(data) => data,
659        Err(err) => match err.kind() {
660          tokio::io::ErrorKind::ConnectionRefused
661          | tokio::io::ErrorKind::NotFound
662          | tokio::io::ErrorKind::HostUnreachable => {
663            error_logger
664              .log(&format!("Service unavailable: {err}"))
665              .await;
666            return Ok(
667              ResponseData::builder_without_request()
668                .status(StatusCode::SERVICE_UNAVAILABLE)
669                .build(),
670            );
671          }
672          _ => Err(err)?,
673        },
674      }
675    }
676    Some("unix") => {
677      let path = fastcgi_to_url.path();
678      match connect_unix(path).await {
679        Ok(data) => data,
680        Err(err) => match err.kind() {
681          tokio::io::ErrorKind::ConnectionRefused
682          | tokio::io::ErrorKind::NotFound
683          | tokio::io::ErrorKind::HostUnreachable => {
684            error_logger
685              .log(&format!("Service unavailable: {err}"))
686              .await;
687            return Ok(
688              ResponseData::builder_without_request()
689                .status(StatusCode::SERVICE_UNAVAILABLE)
690                .build(),
691            );
692          }
693          _ => Err(err)?,
694        },
695      }
696    }
697    _ => Err(anyhow::anyhow!(
698      "Only HTTP and HTTPS reverse proxy URLs are supported."
699    ))?,
700  };
701
702  // Construct and send BEGIN_REQUEST record
703  // Use the responder role and don't use keep-alive
704  let begin_request_packet = construct_fastcgi_record(1, 1, &[0, 1, 0, 0, 0, 0, 0, 0]);
705  socket_writer.write_all(&begin_request_packet).await?;
706
707  // Construct and send PARAMS records
708  let mut environment_variables_to_wrap = Vec::new();
709  for (key, value) in environment_variables.iter() {
710    let mut environment_variable =
711      construct_fastcgi_name_value_pair(key.as_bytes(), value.as_bytes());
712    environment_variables_to_wrap.append(&mut environment_variable);
713  }
714  if !environment_variables_to_wrap.is_empty() {
715    let mut offset = 0;
716    while offset < environment_variables_to_wrap.len() {
717      let chunk_size = std::cmp::min(65536, environment_variables_to_wrap.len() - offset);
718      let chunk = &environment_variables_to_wrap[offset..offset + chunk_size];
719
720      // Record type 4 means PARAMS
721      let params_packet = construct_fastcgi_record(4, 1, chunk);
722      socket_writer.write_all(&params_packet).await?;
723
724      offset += chunk_size;
725    }
726  }
727
728  let params_packet_terminating = construct_fastcgi_record(4, 1, &[]);
729  socket_writer.write_all(&params_packet_terminating).await?;
730
731  let cgi_stdin_reader = StreamReader::new(body.into_data_stream().map_err(std::io::Error::other));
732
733  // Emulated standard input, standard output, and standard error
734  type EitherStream = Either<Result<Bytes, std::io::Error>, Result<Bytes, std::io::Error>>;
735  let stdin = SinkWriter::new(FramedWrite::new(socket_writer, FcgiEncoder::new()));
736  let stdout_and_stderr = FramedRead::new(socket_reader, FcgiDecoder::new());
737  let (stdout_stream, stderr_stream) = stdout_and_stderr.split_by_map(|item| match item {
738    Ok(FcgiDecodedData::Stdout(bytes)) => EitherStream::Left(Ok(bytes)),
739    Ok(FcgiDecodedData::Stderr(bytes)) => EitherStream::Right(Ok(bytes)),
740    Err(err) => EitherStream::Left(Err(err)),
741  });
742  let stdout = StreamReader::new(stdout_stream);
743  let stderr = StreamReader::new(stderr_stream);
744
745  let mut cgi_response = CgiResponse::new(stdout);
746
747  let stdin_copy_future = async move {
748    let (mut cgi_stdin_reader, mut stdin) = (cgi_stdin_reader, stdin);
749    let result1 = tokio::io::copy(&mut cgi_stdin_reader, &mut stdin)
750      .await
751      .map(|_| ());
752
753    // Send terminating STDIN packet
754    let result2 = stdin.write(&[]).await.map(|_| ());
755    let result3 = stdin.flush().await.map(|_| ());
756
757    result1.and(result2).and(result3)
758  };
759  let mut stdin_copy_future_pinned = Box::pin(stdin_copy_future);
760
761  let stderr_read_future = async move {
762    let mut stderr = stderr;
763    let mut buf = Vec::new();
764    stderr.read_to_end(&mut buf).await.map(|_| buf)
765  };
766  let mut stderr_read_future_pinned = Box::pin(stderr_read_future);
767
768  let mut headers = [EMPTY_HEADER; 128];
769
770  let mut early_stdin_copied = false;
771
772  // Needed to wrap this in another scope to prevent errors with multiple mutable borrows.
773  {
774    let mut head_obtained = false;
775    let stdout_parse_future = cgi_response.get_head();
776    tokio::pin!(stdout_parse_future);
777
778    // Cannot use a loop with tokio::select, since stdin_copy_future_pinned being constantly ready will make the web server stop responding to HTTP requests
779    tokio::select! {
780      biased;
781
782      result = &mut stdin_copy_future_pinned => {
783        early_stdin_copied = true;
784        result?;
785      },
786      obtained_head = &mut stdout_parse_future => {
787        let obtained_head = obtained_head?;
788        if !obtained_head.is_empty() {
789          httparse::parse_headers(obtained_head, &mut headers)?;
790        }
791        head_obtained = true;
792      },
793      result = &mut stderr_read_future_pinned => {
794        let stderr_vec = result?;
795        let stderr_string = String::from_utf8_lossy(stderr_vec.as_slice()).to_string();
796        let stderr_string_trimmed = stderr_string.trim();
797        if !stderr_string_trimmed.is_empty() {
798          error_logger
799            .log(&format!("There were FastCGI errors: {stderr_string_trimmed}"))
800            .await;
801        }
802        return Ok(
803          ResponseData::builder_without_request()
804            .status(StatusCode::INTERNAL_SERVER_ERROR)
805            .build(),
806        );
807      },
808    }
809
810    if !head_obtained {
811      // Kept it same as in the tokio::select macro
812      tokio::select! {
813        biased;
814
815        result = &mut stderr_read_future_pinned => {
816          let stderr_vec = result?;
817          let stderr_string = String::from_utf8_lossy(stderr_vec.as_slice()).to_string();
818          let stderr_string_trimmed = stderr_string.trim();
819          if !stderr_string_trimmed.is_empty() {
820            error_logger
821              .log(&format!("There were FastCGI errors: {stderr_string_trimmed}"))
822              .await;
823          }
824          return Ok(
825            ResponseData::builder_without_request()
826              .status(StatusCode::INTERNAL_SERVER_ERROR)
827              .build(),
828          );
829        },
830        obtained_head = &mut stdout_parse_future => {
831          let obtained_head = obtained_head?;
832          if !obtained_head.is_empty() {
833            httparse::parse_headers(obtained_head, &mut headers)?;
834          }
835        }
836      }
837    }
838  }
839
840  let mut response_builder = Response::builder();
841  let mut status_code = 200;
842  for header in headers {
843    if header == EMPTY_HEADER {
844      break;
845    }
846    let mut is_status_header = false;
847    match &header.name.to_lowercase() as &str {
848      "location" => {
849        if !(300..=399).contains(&status_code) {
850          status_code = 302;
851        }
852      }
853      "status" => {
854        is_status_header = true;
855        let header_value_cow = String::from_utf8_lossy(header.value);
856        let mut split_status = header_value_cow.split(" ");
857        let first_part = split_status.next();
858        if let Some(first_part) = first_part {
859          if first_part.starts_with("HTTP/") {
860            let second_part = split_status.next();
861            if let Some(second_part) = second_part {
862              if let Ok(parsed_status_code) = second_part.parse::<u16>() {
863                status_code = parsed_status_code;
864              }
865            }
866          } else if let Ok(parsed_status_code) = first_part.parse::<u16>() {
867            status_code = parsed_status_code;
868          }
869        }
870      }
871      _ => (),
872    }
873    if !is_status_header {
874      response_builder = response_builder.header(header.name, header.value);
875    }
876  }
877
878  response_builder = response_builder.status(status_code);
879
880  let reader_stream = ReaderStream::new(cgi_response);
881  let stream_body = StreamBody::new(reader_stream.map_ok(Frame::data));
882  let boxed_body = stream_body.boxed();
883
884  let response = response_builder.body(boxed_body)?;
885
886  let error_logger = error_logger.clone();
887
888  Ok(
889    ResponseData::builder_without_request()
890      .response(response)
891      .parallel_fn(async move {
892        let mut stdin_copied = early_stdin_copied;
893
894        if !stdin_copied {
895          tokio::select! {
896            biased;
897
898            _ = &mut stdin_copy_future_pinned => {
899              stdin_copied = true;
900            },
901            result = &mut stderr_read_future_pinned => {
902              let stderr_vec = result.unwrap_or(vec![]);
903              let stderr_string = String::from_utf8_lossy(stderr_vec.as_slice()).to_string();
904              if !stderr_string.is_empty() {
905                error_logger
906                  .log(&format!("There were FastCGI errors: {stderr_string}"))
907                  .await;
908              }
909            },
910          }
911        }
912
913        if stdin_copied {
914          let stderr_vec = stderr_read_future_pinned.await.unwrap_or(vec![]);
915          let stderr_string = String::from_utf8_lossy(stderr_vec.as_slice()).to_string();
916          if !stderr_string.is_empty() {
917            error_logger
918              .log(&format!("There were FastCGI errors: {stderr_string}"))
919              .await;
920          }
921        } else {
922          stdin_copy_future_pinned.await.unwrap_or_default();
923        }
924      })
925      .build(),
926  )
927}
928
929async fn connect_tcp(
930  addr: &str,
931) -> Result<
932  (
933    Box<dyn AsyncRead + Send + Sync + Unpin>,
934    Box<dyn AsyncWrite + Send + Sync + Unpin>,
935  ),
936  tokio::io::Error,
937> {
938  let socket = TcpStream::connect(addr).await?;
939  socket.set_nodelay(true)?;
940
941  let (socket_reader_set, socket_writer_set) = tokio::io::split(socket);
942  Ok((Box::new(socket_reader_set), Box::new(socket_writer_set)))
943}
944
945#[allow(dead_code)]
946#[cfg(unix)]
947async fn connect_unix(
948  path: &str,
949) -> Result<
950  (
951    Box<dyn AsyncRead + Send + Sync + Unpin>,
952    Box<dyn AsyncWrite + Send + Sync + Unpin>,
953  ),
954  tokio::io::Error,
955> {
956  use tokio::net::UnixStream;
957
958  let socket = UnixStream::connect(path).await?;
959
960  let (socket_reader_set, socket_writer_set) = tokio::io::split(socket);
961  Ok((Box::new(socket_reader_set), Box::new(socket_writer_set)))
962}
963
964#[allow(dead_code)]
965#[cfg(not(unix))]
966async fn connect_unix(
967  _path: &str,
968) -> Result<
969  (
970    Box<dyn AsyncRead + Send + Sync + Unpin>,
971    Box<dyn AsyncWrite + Send + Sync + Unpin>,
972  ),
973  tokio::io::Error,
974> {
975  Err(tokio::io::Error::new(
976    tokio::io::ErrorKind::Unsupported,
977    "Unix sockets are not supports on non-Unix platforms.",
978  ))
979}