1use std::env;
4use std::error::Error;
5use std::path::{Path, PathBuf};
6use std::sync::Arc;
7use std::time::Duration;
8
9use crate::ferron_common::{
10 ErrorLogger, HyperRequest, HyperResponse, RequestData, ResponseData, ServerConfig, ServerModule,
11 ServerModuleHandlers, SocketData,
12};
13use crate::ferron_common::{HyperUpgraded, WithRuntime};
14use async_trait::async_trait;
15use futures_util::future::Either;
16use futures_util::TryStreamExt;
17use hashlink::LinkedHashMap;
18use http_body_util::{BodyExt, StreamBody};
19use httparse::EMPTY_HEADER;
20use hyper::body::{Bytes, Frame};
21use hyper::{header, Response, StatusCode};
22use hyper_tungstenite::HyperWebsocket;
23use tokio::fs;
24use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
25use tokio::net::TcpStream;
26use tokio::runtime::Handle;
27use tokio::sync::RwLock;
28use tokio_util::codec::{FramedRead, FramedWrite};
29use tokio_util::io::{ReaderStream, SinkWriter, StreamReader};
30
31use crate::ferron_res::server_software::SERVER_SOFTWARE;
32use crate::ferron_util::cgi_response::CgiResponse;
33use crate::ferron_util::fcgi_decoder::{FcgiDecodedData, FcgiDecoder};
34use crate::ferron_util::fcgi_encoder::FcgiEncoder;
35use crate::ferron_util::fcgi_name_value_pair::construct_fastcgi_name_value_pair;
36use crate::ferron_util::fcgi_record::construct_fastcgi_record;
37use crate::ferron_util::split_stream_by_map::SplitStreamByMapExt;
38use crate::ferron_util::ttl_cache::TtlCache;
39
40pub fn server_module_init(
41 _config: &ServerConfig,
42) -> Result<Box<dyn ServerModule + Send + Sync>, Box<dyn Error + Send + Sync>> {
43 let cache = Arc::new(RwLock::new(TtlCache::new(Duration::from_millis(100))));
44 Ok(Box::new(FcgiModule::new(cache)))
45}
46
47#[allow(clippy::type_complexity)]
48struct FcgiModule {
49 path_cache: Arc<RwLock<TtlCache<String, (Option<PathBuf>, Option<String>)>>>,
50}
51
52impl FcgiModule {
53 #[allow(clippy::type_complexity)]
54 fn new(path_cache: Arc<RwLock<TtlCache<String, (Option<PathBuf>, Option<String>)>>>) -> Self {
55 Self { path_cache }
56 }
57}
58
59impl ServerModule for FcgiModule {
60 fn get_handlers(&self, handle: Handle) -> Box<dyn ServerModuleHandlers + Send> {
61 Box::new(FcgiModuleHandlers {
62 path_cache: self.path_cache.clone(),
63 handle,
64 })
65 }
66}
67
68#[allow(clippy::type_complexity)]
69struct FcgiModuleHandlers {
70 handle: Handle,
71 path_cache: Arc<RwLock<TtlCache<String, (Option<PathBuf>, Option<String>)>>>,
72}
73
74#[async_trait]
75impl ServerModuleHandlers for FcgiModuleHandlers {
76 async fn request_handler(
77 &mut self,
78 request: RequestData,
79 config: &ServerConfig,
80 socket_data: &SocketData,
81 error_logger: &ErrorLogger,
82 ) -> Result<ResponseData, Box<dyn Error + Send + Sync>> {
83 WithRuntime::new(self.handle.clone(), async move {
84 let mut fastcgi_script_exts = Vec::new();
85
86 let fastcgi_script_exts_yaml = &config["fcgiScriptExtensions"];
87 if let Some(fastcgi_script_exts_obtained) = fastcgi_script_exts_yaml.as_vec() {
88 for fastcgi_script_ext_yaml in fastcgi_script_exts_obtained.iter() {
89 if let Some(fastcgi_script_ext) = fastcgi_script_ext_yaml.as_str() {
90 fastcgi_script_exts.push(fastcgi_script_ext);
91 }
92 }
93 }
94
95 let mut fastcgi_to = "tcp://localhost:4000/";
96 let fastcgi_to_yaml = &config["fcgiTo"];
97 if let Some(fastcgi_to_obtained) = fastcgi_to_yaml.as_str() {
98 fastcgi_to = fastcgi_to_obtained;
99 }
100
101 let mut fastcgi_path = None;
102 if let Some(fastcgi_path_obtained) = config["fcgiPath"].as_str() {
103 fastcgi_path = Some(fastcgi_path_obtained.to_string());
104 }
105
106 let hyper_request = request.get_hyper_request();
107
108 let request_path = hyper_request.uri().path();
109 let mut request_path_bytes = request_path.bytes();
110 if request_path_bytes.len() < 1 || request_path_bytes.nth(0) != Some(b'/') {
111 return Ok(
112 ResponseData::builder(request)
113 .status(StatusCode::BAD_REQUEST)
114 .build(),
115 );
116 }
117
118 let mut execute_pathbuf = None;
119 let mut execute_path_info = None;
120 let mut wwwroot_detected = None;
121
122 if let Some(fastcgi_path) = fastcgi_path {
123 let mut canonical_fastcgi_path: &str = &fastcgi_path;
124 if canonical_fastcgi_path.bytes().last() == Some(b'/') {
125 canonical_fastcgi_path = &canonical_fastcgi_path[..(canonical_fastcgi_path.len() - 1)];
126 }
127
128 let request_path_with_slashes = match request_path == canonical_fastcgi_path {
129 true => format!("{request_path}/"),
130 false => request_path.to_string(),
131 };
132 if let Some(stripped_request_path) =
133 request_path_with_slashes.strip_prefix(canonical_fastcgi_path)
134 {
135 let wwwroot_yaml = &config["wwwroot"];
136 let wwwroot = wwwroot_yaml.as_str().unwrap_or("/nonexistent");
137
138 let wwwroot_unknown = PathBuf::from(wwwroot);
139 let wwwroot_pathbuf = match wwwroot_unknown.as_path().is_absolute() {
140 true => wwwroot_unknown,
141 false => match fs::canonicalize(&wwwroot_unknown).await {
142 Ok(pathbuf) => pathbuf,
143 Err(_) => wwwroot_unknown,
144 },
145 };
146 wwwroot_detected = Some(wwwroot_pathbuf.clone());
147 let wwwroot = wwwroot_pathbuf.as_path();
148
149 let mut relative_path = &request_path[1..];
150 while relative_path.as_bytes().first().copied() == Some(b'/') {
151 relative_path = &relative_path[1..];
152 }
153
154 let decoded_relative_path = match urlencoding::decode(relative_path) {
155 Ok(path) => path.to_string(),
156 Err(_) => {
157 return Ok(
158 ResponseData::builder(request)
159 .status(StatusCode::BAD_REQUEST)
160 .build(),
161 );
162 }
163 };
164
165 let joined_pathbuf = wwwroot.join(decoded_relative_path);
166 execute_pathbuf = Some(joined_pathbuf);
167 execute_path_info = stripped_request_path
168 .strip_prefix("/")
169 .map(|s| s.to_string());
170 }
171 }
172
173 if execute_pathbuf.is_none() {
174 if let Some(wwwroot) = config["wwwroot"].as_str() {
175 let cache_key = format!(
176 "{}{}{}",
177 match config["ip"].as_str() {
178 Some(ip) => format!("{ip}-"),
179 None => String::from(""),
180 },
181 match config["domain"].as_str() {
182 Some(domain) => format!("{domain}-"),
183 None => String::from(""),
184 },
185 request_path
186 );
187
188 let wwwroot_unknown = PathBuf::from(wwwroot);
189 let wwwroot_pathbuf = match wwwroot_unknown.as_path().is_absolute() {
190 true => wwwroot_unknown,
191 false => match fs::canonicalize(&wwwroot_unknown).await {
192 Ok(pathbuf) => pathbuf,
193 Err(_) => wwwroot_unknown,
194 },
195 };
196 wwwroot_detected = Some(wwwroot_pathbuf.clone());
197 let wwwroot = wwwroot_pathbuf.as_path();
198
199 let read_rwlock = self.path_cache.read().await;
200 let (execute_pathbuf_got, execute_path_info_got) = match read_rwlock.get(&cache_key) {
201 Some(data) => {
202 drop(read_rwlock);
203 data
204 }
205 None => {
206 drop(read_rwlock);
207 let mut relative_path = &request_path[1..];
208 while relative_path.as_bytes().first().copied() == Some(b'/') {
209 relative_path = &relative_path[1..];
210 }
211
212 let decoded_relative_path = match urlencoding::decode(relative_path) {
213 Ok(path) => path.to_string(),
214 Err(_) => {
215 return Ok(
216 ResponseData::builder(request)
217 .status(StatusCode::BAD_REQUEST)
218 .build(),
219 );
220 }
221 };
222
223 let joined_pathbuf = wwwroot.join(decoded_relative_path);
224 let mut execute_pathbuf: Option<PathBuf> = None;
225 let mut execute_path_info: Option<String> = None;
226
227 match fs::metadata(&joined_pathbuf).await {
228 Ok(metadata) => {
229 if metadata.is_file() {
230 let contained_extension = joined_pathbuf
231 .extension()
232 .map(|a| format!(".{}", a.to_string_lossy()));
233 if let Some(contained_extension) = contained_extension {
234 if fastcgi_script_exts.contains(&(&contained_extension as &str)) {
235 execute_pathbuf = Some(joined_pathbuf);
236 }
237 }
238 } else if metadata.is_dir() {
239 let indexes = vec!["index.php", "index.cgi"];
240 for index in indexes {
241 let temp_joined_pathbuf = joined_pathbuf.join(index);
242 match fs::metadata(&temp_joined_pathbuf).await {
243 Ok(temp_metadata) => {
244 if temp_metadata.is_file() {
245 let contained_extension = temp_joined_pathbuf
246 .extension()
247 .map(|a| format!(".{}", a.to_string_lossy()));
248 if let Some(contained_extension) = contained_extension {
249 if fastcgi_script_exts.contains(&(&contained_extension as &str)) {
250 execute_pathbuf = Some(temp_joined_pathbuf);
251 break;
252 }
253 }
254 }
255 }
256 Err(_) => continue,
257 };
258 }
259 }
260 }
261 Err(err) => {
262 if err.kind() == tokio::io::ErrorKind::NotADirectory {
263 let mut temp_pathbuf = joined_pathbuf.clone();
264 loop {
265 if !temp_pathbuf.pop() {
266 break;
267 }
268 match fs::metadata(&temp_pathbuf).await {
269 Ok(metadata) => {
270 if metadata.is_file() {
271 let temp_path = temp_pathbuf.as_path();
272 if !temp_path.starts_with(wwwroot) {
273 break;
275 }
276 let path_info = match joined_pathbuf.as_path().strip_prefix(temp_path) {
277 Ok(path) => {
278 let path = path.to_string_lossy().to_string();
279 Some(match cfg!(windows) {
280 true => path.replace("\\", "/"),
281 false => path,
282 })
283 }
284 Err(_) => None,
285 };
286 let mut request_path_normalized = match cfg!(windows) {
287 true => request_path.to_lowercase(),
288 false => request_path.to_string(),
289 };
290 while request_path_normalized.contains("//") {
291 request_path_normalized = request_path_normalized.replace("//", "/");
292 }
293 if request_path_normalized == "/cgi-bin"
294 || request_path_normalized.starts_with("/cgi-bin/")
295 {
296 execute_pathbuf = Some(temp_pathbuf);
297 execute_path_info = path_info;
298 break;
299 } else {
300 let contained_extension = temp_pathbuf
301 .extension()
302 .map(|a| format!(".{}", a.to_string_lossy()));
303 if let Some(contained_extension) = contained_extension {
304 if fastcgi_script_exts.contains(&(&contained_extension as &str)) {
305 execute_pathbuf = Some(temp_pathbuf);
306 execute_path_info = path_info;
307 break;
308 }
309 }
310 }
311 } else {
312 break;
313 }
314 }
315 Err(err) => match err.kind() {
316 tokio::io::ErrorKind::NotADirectory => (),
317 _ => break,
318 },
319 };
320 }
321 }
322 }
323 };
324 let data = (execute_pathbuf, execute_path_info);
325
326 let mut write_rwlock = self.path_cache.write().await;
327 write_rwlock.cleanup();
328 write_rwlock.insert(cache_key, data.clone());
329 drop(write_rwlock);
330 data
331 }
332 };
333
334 execute_pathbuf = execute_pathbuf_got;
335 execute_path_info = execute_path_info_got;
336 }
337 }
338
339 if let Some(execute_pathbuf) = execute_pathbuf {
340 if let Some(wwwroot_detected) = wwwroot_detected {
341 return execute_fastcgi_with_environment_variables(
342 request,
343 socket_data,
344 error_logger,
345 wwwroot_detected.as_path(),
346 execute_pathbuf,
347 execute_path_info,
348 config["serverAdministratorEmail"].as_str(),
349 fastcgi_to,
350 )
351 .await;
352 }
353 }
354
355 Ok(ResponseData::builder(request).build())
356 })
357 .await
358 }
359
360 async fn proxy_request_handler(
361 &mut self,
362 request: RequestData,
363 _config: &ServerConfig,
364 _socket_data: &SocketData,
365 _error_logger: &ErrorLogger,
366 ) -> Result<ResponseData, Box<dyn Error + Send + Sync>> {
367 Ok(ResponseData::builder(request).build())
368 }
369
370 async fn response_modifying_handler(
371 &mut self,
372 response: HyperResponse,
373 ) -> Result<HyperResponse, Box<dyn Error + Send + Sync>> {
374 Ok(response)
375 }
376
377 async fn proxy_response_modifying_handler(
378 &mut self,
379 response: HyperResponse,
380 ) -> Result<HyperResponse, Box<dyn Error + Send + Sync>> {
381 Ok(response)
382 }
383
384 async fn connect_proxy_request_handler(
385 &mut self,
386 _upgraded_request: HyperUpgraded,
387 _connect_address: &str,
388 _config: &ServerConfig,
389 _socket_data: &SocketData,
390 _error_logger: &ErrorLogger,
391 ) -> Result<(), Box<dyn Error + Send + Sync>> {
392 Ok(())
393 }
394
395 fn does_connect_proxy_requests(&mut self) -> bool {
396 false
397 }
398
399 async fn websocket_request_handler(
400 &mut self,
401 _websocket: HyperWebsocket,
402 _uri: &hyper::Uri,
403 _headers: &hyper::HeaderMap,
404 _config: &ServerConfig,
405 _socket_data: &SocketData,
406 _error_logger: &ErrorLogger,
407 ) -> Result<(), Box<dyn Error + Send + Sync>> {
408 Ok(())
409 }
410
411 fn does_websocket_requests(&mut self, _config: &ServerConfig, _socket_data: &SocketData) -> bool {
412 false
413 }
414}
415
416#[allow(clippy::too_many_arguments)]
417async fn execute_fastcgi_with_environment_variables(
418 request: RequestData,
419 socket_data: &SocketData,
420 error_logger: &ErrorLogger,
421 wwwroot: &Path,
422 execute_pathbuf: PathBuf,
423 path_info: Option<String>,
424 server_administrator_email: Option<&str>,
425 fastcgi_to: &str,
426) -> Result<ResponseData, Box<dyn Error + Send + Sync>> {
427 let mut environment_variables: LinkedHashMap<String, String> = LinkedHashMap::new();
428
429 let hyper_request = request.get_hyper_request();
430 let original_request_uri = request.get_original_url().unwrap_or(hyper_request.uri());
431
432 if let Some(auth_user) = request.get_auth_user() {
433 if let Some(authorization) = hyper_request.headers().get(header::AUTHORIZATION) {
434 let authorization_value = String::from_utf8_lossy(authorization.as_bytes()).to_string();
435 let mut authorization_value_split = authorization_value.split(" ");
436 if let Some(authorization_type) = authorization_value_split.next() {
437 environment_variables.insert("AUTH_TYPE".to_string(), authorization_type.to_string());
438 }
439 }
440 environment_variables.insert("REMOTE_USER".to_string(), auth_user.to_string());
441 }
442
443 environment_variables.insert(
444 "QUERY_STRING".to_string(),
445 match hyper_request.uri().query() {
446 Some(query) => query.to_string(),
447 None => "".to_string(),
448 },
449 );
450
451 environment_variables.insert("SERVER_SOFTWARE".to_string(), SERVER_SOFTWARE.to_string());
452 environment_variables.insert(
453 "SERVER_PROTOCOL".to_string(),
454 match hyper_request.version() {
455 hyper::Version::HTTP_09 => "HTTP/0.9".to_string(),
456 hyper::Version::HTTP_10 => "HTTP/1.0".to_string(),
457 hyper::Version::HTTP_11 => "HTTP/1.1".to_string(),
458 hyper::Version::HTTP_2 => "HTTP/2.0".to_string(),
459 hyper::Version::HTTP_3 => "HTTP/3.0".to_string(),
460 _ => "HTTP/Unknown".to_string(),
461 },
462 );
463 environment_variables.insert(
464 "SERVER_PORT".to_string(),
465 socket_data.local_addr.port().to_string(),
466 );
467 environment_variables.insert(
468 "SERVER_ADDR".to_string(),
469 socket_data.local_addr.ip().to_canonical().to_string(),
470 );
471 if let Some(server_administrator_email) = server_administrator_email {
472 environment_variables.insert(
473 "SERVER_ADMIN".to_string(),
474 server_administrator_email.to_string(),
475 );
476 }
477 if let Some(host) = hyper_request.headers().get(header::HOST) {
478 environment_variables.insert(
479 "SERVER_NAME".to_string(),
480 String::from_utf8_lossy(host.as_bytes()).to_string(),
481 );
482 }
483
484 environment_variables.insert(
485 "DOCUMENT_ROOT".to_string(),
486 wwwroot.to_string_lossy().to_string(),
487 );
488 environment_variables.insert(
489 "PATH_INFO".to_string(),
490 match &path_info {
491 Some(path_info) => format!("/{path_info}"),
492 None => "".to_string(),
493 },
494 );
495 environment_variables.insert(
496 "PATH_TRANSLATED".to_string(),
497 match &path_info {
498 Some(path_info) => {
499 let mut path_translated = execute_pathbuf.clone();
500 path_translated.push(path_info);
501 path_translated.to_string_lossy().to_string()
502 }
503 None => "".to_string(),
504 },
505 );
506 environment_variables.insert(
507 "REQUEST_METHOD".to_string(),
508 hyper_request.method().to_string(),
509 );
510 environment_variables.insert("GATEWAY_INTERFACE".to_string(), "CGI/1.1".to_string());
511 environment_variables.insert(
512 "REQUEST_URI".to_string(),
513 format!(
514 "{}{}",
515 original_request_uri.path(),
516 match original_request_uri.query() {
517 Some(query) => format!("?{query}"),
518 None => String::from(""),
519 }
520 ),
521 );
522
523 environment_variables.insert(
524 "REMOTE_PORT".to_string(),
525 socket_data.remote_addr.port().to_string(),
526 );
527 environment_variables.insert(
528 "REMOTE_ADDR".to_string(),
529 socket_data.remote_addr.ip().to_canonical().to_string(),
530 );
531
532 environment_variables.insert(
533 "SCRIPT_FILENAME".to_string(),
534 execute_pathbuf.to_string_lossy().to_string(),
535 );
536 if let Ok(script_path) = execute_pathbuf.as_path().strip_prefix(wwwroot) {
537 environment_variables.insert(
538 "SCRIPT_NAME".to_string(),
539 format!(
540 "/{}",
541 match cfg!(windows) {
542 true => script_path.to_string_lossy().to_string().replace("\\", "/"),
543 false => script_path.to_string_lossy().to_string(),
544 }
545 ),
546 );
547 }
548
549 if socket_data.encrypted {
550 environment_variables.insert("HTTPS".to_string(), "on".to_string());
551 }
552
553 let mut content_length_set = false;
554 for (header_name, header_value) in hyper_request.headers().iter() {
555 let env_header_name = match *header_name {
556 header::CONTENT_LENGTH => {
557 content_length_set = true;
558 "CONTENT_LENGTH".to_string()
559 }
560 header::CONTENT_TYPE => "CONTENT_TYPE".to_string(),
561 _ => {
562 let mut result = String::new();
563
564 result.push_str("HTTP_");
565
566 for c in header_name.as_str().to_uppercase().chars() {
567 if c.is_alphanumeric() {
568 result.push(c);
569 } else {
570 result.push('_');
571 }
572 }
573
574 result
575 }
576 };
577 if environment_variables.contains_key(&env_header_name) {
578 let value = environment_variables.get_mut(&env_header_name);
579 if let Some(value) = value {
580 if env_header_name == "HTTP_COOKIE" {
581 value.push_str("; ");
582 } else {
583 value.push_str(", ");
585 }
586 value.push_str(String::from_utf8_lossy(header_value.as_bytes()).as_ref());
587 } else {
588 environment_variables.insert(
589 env_header_name,
590 String::from_utf8_lossy(header_value.as_bytes()).to_string(),
591 );
592 }
593 } else {
594 environment_variables.insert(
595 env_header_name,
596 String::from_utf8_lossy(header_value.as_bytes()).to_string(),
597 );
598 }
599 }
600
601 if !content_length_set {
602 environment_variables.insert("CONTENT_LENGTH".to_string(), "0".to_string());
603 }
604
605 let (hyper_request, _, _, _) = request.into_parts();
606
607 execute_fastcgi(
608 hyper_request,
609 error_logger,
610 fastcgi_to,
611 environment_variables,
612 )
613 .await
614}
615
616async fn execute_fastcgi(
617 hyper_request: HyperRequest,
618 error_logger: &ErrorLogger,
619 fastcgi_to: &str,
620 mut environment_variables: LinkedHashMap<String, String>,
621) -> Result<ResponseData, Box<dyn Error + Send + Sync>> {
622 let (_, body) = hyper_request.into_parts();
623
624 for (key, value) in env::vars_os() {
626 let key_string = key.to_string_lossy().to_string();
627 let value_string = value.to_string_lossy().to_string();
628 environment_variables
629 .entry(key_string)
630 .or_insert(value_string);
631 }
632
633 let fastcgi_to_fixed = if let Some(stripped) = fastcgi_to.strip_prefix("unix:///") {
634 &format!("unix://ignore/{stripped}")
636 } else {
637 fastcgi_to
638 };
639
640 let fastcgi_to_url = fastcgi_to_fixed.parse::<hyper::Uri>()?;
641 let scheme_str = fastcgi_to_url.scheme_str();
642
643 let (socket_reader, mut socket_writer) = match scheme_str {
644 Some("tcp") => {
645 let host = match fastcgi_to_url.host() {
646 Some(host) => host,
647 None => Err(anyhow::anyhow!("The FastCGI URL doesn't include the host"))?,
648 };
649
650 let port = match fastcgi_to_url.port_u16() {
651 Some(port) => port,
652 None => Err(anyhow::anyhow!("The FastCGI URL doesn't include the port"))?,
653 };
654
655 let addr = format!("{host}:{port}");
656
657 match connect_tcp(&addr).await {
658 Ok(data) => data,
659 Err(err) => match err.kind() {
660 tokio::io::ErrorKind::ConnectionRefused
661 | tokio::io::ErrorKind::NotFound
662 | tokio::io::ErrorKind::HostUnreachable => {
663 error_logger
664 .log(&format!("Service unavailable: {err}"))
665 .await;
666 return Ok(
667 ResponseData::builder_without_request()
668 .status(StatusCode::SERVICE_UNAVAILABLE)
669 .build(),
670 );
671 }
672 _ => Err(err)?,
673 },
674 }
675 }
676 Some("unix") => {
677 let path = fastcgi_to_url.path();
678 match connect_unix(path).await {
679 Ok(data) => data,
680 Err(err) => match err.kind() {
681 tokio::io::ErrorKind::ConnectionRefused
682 | tokio::io::ErrorKind::NotFound
683 | tokio::io::ErrorKind::HostUnreachable => {
684 error_logger
685 .log(&format!("Service unavailable: {err}"))
686 .await;
687 return Ok(
688 ResponseData::builder_without_request()
689 .status(StatusCode::SERVICE_UNAVAILABLE)
690 .build(),
691 );
692 }
693 _ => Err(err)?,
694 },
695 }
696 }
697 _ => Err(anyhow::anyhow!(
698 "Only HTTP and HTTPS reverse proxy URLs are supported."
699 ))?,
700 };
701
702 let begin_request_packet = construct_fastcgi_record(1, 1, &[0, 1, 0, 0, 0, 0, 0, 0]);
705 socket_writer.write_all(&begin_request_packet).await?;
706
707 let mut environment_variables_to_wrap = Vec::new();
709 for (key, value) in environment_variables.iter() {
710 let mut environment_variable =
711 construct_fastcgi_name_value_pair(key.as_bytes(), value.as_bytes());
712 environment_variables_to_wrap.append(&mut environment_variable);
713 }
714 if !environment_variables_to_wrap.is_empty() {
715 let mut offset = 0;
716 while offset < environment_variables_to_wrap.len() {
717 let chunk_size = std::cmp::min(65536, environment_variables_to_wrap.len() - offset);
718 let chunk = &environment_variables_to_wrap[offset..offset + chunk_size];
719
720 let params_packet = construct_fastcgi_record(4, 1, chunk);
722 socket_writer.write_all(¶ms_packet).await?;
723
724 offset += chunk_size;
725 }
726 }
727
728 let params_packet_terminating = construct_fastcgi_record(4, 1, &[]);
729 socket_writer.write_all(¶ms_packet_terminating).await?;
730
731 let cgi_stdin_reader = StreamReader::new(body.into_data_stream().map_err(std::io::Error::other));
732
733 type EitherStream = Either<Result<Bytes, std::io::Error>, Result<Bytes, std::io::Error>>;
735 let stdin = SinkWriter::new(FramedWrite::new(socket_writer, FcgiEncoder::new()));
736 let stdout_and_stderr = FramedRead::new(socket_reader, FcgiDecoder::new());
737 let (stdout_stream, stderr_stream) = stdout_and_stderr.split_by_map(|item| match item {
738 Ok(FcgiDecodedData::Stdout(bytes)) => EitherStream::Left(Ok(bytes)),
739 Ok(FcgiDecodedData::Stderr(bytes)) => EitherStream::Right(Ok(bytes)),
740 Err(err) => EitherStream::Left(Err(err)),
741 });
742 let stdout = StreamReader::new(stdout_stream);
743 let stderr = StreamReader::new(stderr_stream);
744
745 let mut cgi_response = CgiResponse::new(stdout);
746
747 let stdin_copy_future = async move {
748 let (mut cgi_stdin_reader, mut stdin) = (cgi_stdin_reader, stdin);
749 let result1 = tokio::io::copy(&mut cgi_stdin_reader, &mut stdin)
750 .await
751 .map(|_| ());
752
753 let result2 = stdin.write(&[]).await.map(|_| ());
755 let result3 = stdin.flush().await.map(|_| ());
756
757 result1.and(result2).and(result3)
758 };
759 let mut stdin_copy_future_pinned = Box::pin(stdin_copy_future);
760
761 let stderr_read_future = async move {
762 let mut stderr = stderr;
763 let mut buf = Vec::new();
764 stderr.read_to_end(&mut buf).await.map(|_| buf)
765 };
766 let mut stderr_read_future_pinned = Box::pin(stderr_read_future);
767
768 let mut headers = [EMPTY_HEADER; 128];
769
770 let mut early_stdin_copied = false;
771
772 {
774 let mut head_obtained = false;
775 let stdout_parse_future = cgi_response.get_head();
776 tokio::pin!(stdout_parse_future);
777
778 tokio::select! {
780 biased;
781
782 result = &mut stdin_copy_future_pinned => {
783 early_stdin_copied = true;
784 result?;
785 },
786 obtained_head = &mut stdout_parse_future => {
787 let obtained_head = obtained_head?;
788 if !obtained_head.is_empty() {
789 httparse::parse_headers(obtained_head, &mut headers)?;
790 }
791 head_obtained = true;
792 },
793 result = &mut stderr_read_future_pinned => {
794 let stderr_vec = result?;
795 let stderr_string = String::from_utf8_lossy(stderr_vec.as_slice()).to_string();
796 let stderr_string_trimmed = stderr_string.trim();
797 if !stderr_string_trimmed.is_empty() {
798 error_logger
799 .log(&format!("There were FastCGI errors: {stderr_string_trimmed}"))
800 .await;
801 }
802 return Ok(
803 ResponseData::builder_without_request()
804 .status(StatusCode::INTERNAL_SERVER_ERROR)
805 .build(),
806 );
807 },
808 }
809
810 if !head_obtained {
811 tokio::select! {
813 biased;
814
815 result = &mut stderr_read_future_pinned => {
816 let stderr_vec = result?;
817 let stderr_string = String::from_utf8_lossy(stderr_vec.as_slice()).to_string();
818 let stderr_string_trimmed = stderr_string.trim();
819 if !stderr_string_trimmed.is_empty() {
820 error_logger
821 .log(&format!("There were FastCGI errors: {stderr_string_trimmed}"))
822 .await;
823 }
824 return Ok(
825 ResponseData::builder_without_request()
826 .status(StatusCode::INTERNAL_SERVER_ERROR)
827 .build(),
828 );
829 },
830 obtained_head = &mut stdout_parse_future => {
831 let obtained_head = obtained_head?;
832 if !obtained_head.is_empty() {
833 httparse::parse_headers(obtained_head, &mut headers)?;
834 }
835 }
836 }
837 }
838 }
839
840 let mut response_builder = Response::builder();
841 let mut status_code = 200;
842 for header in headers {
843 if header == EMPTY_HEADER {
844 break;
845 }
846 let mut is_status_header = false;
847 match &header.name.to_lowercase() as &str {
848 "location" => {
849 if !(300..=399).contains(&status_code) {
850 status_code = 302;
851 }
852 }
853 "status" => {
854 is_status_header = true;
855 let header_value_cow = String::from_utf8_lossy(header.value);
856 let mut split_status = header_value_cow.split(" ");
857 let first_part = split_status.next();
858 if let Some(first_part) = first_part {
859 if first_part.starts_with("HTTP/") {
860 let second_part = split_status.next();
861 if let Some(second_part) = second_part {
862 if let Ok(parsed_status_code) = second_part.parse::<u16>() {
863 status_code = parsed_status_code;
864 }
865 }
866 } else if let Ok(parsed_status_code) = first_part.parse::<u16>() {
867 status_code = parsed_status_code;
868 }
869 }
870 }
871 _ => (),
872 }
873 if !is_status_header {
874 response_builder = response_builder.header(header.name, header.value);
875 }
876 }
877
878 response_builder = response_builder.status(status_code);
879
880 let reader_stream = ReaderStream::new(cgi_response);
881 let stream_body = StreamBody::new(reader_stream.map_ok(Frame::data));
882 let boxed_body = stream_body.boxed();
883
884 let response = response_builder.body(boxed_body)?;
885
886 let error_logger = error_logger.clone();
887
888 Ok(
889 ResponseData::builder_without_request()
890 .response(response)
891 .parallel_fn(async move {
892 let mut stdin_copied = early_stdin_copied;
893
894 if !stdin_copied {
895 tokio::select! {
896 biased;
897
898 _ = &mut stdin_copy_future_pinned => {
899 stdin_copied = true;
900 },
901 result = &mut stderr_read_future_pinned => {
902 let stderr_vec = result.unwrap_or(vec![]);
903 let stderr_string = String::from_utf8_lossy(stderr_vec.as_slice()).to_string();
904 if !stderr_string.is_empty() {
905 error_logger
906 .log(&format!("There were FastCGI errors: {stderr_string}"))
907 .await;
908 }
909 },
910 }
911 }
912
913 if stdin_copied {
914 let stderr_vec = stderr_read_future_pinned.await.unwrap_or(vec![]);
915 let stderr_string = String::from_utf8_lossy(stderr_vec.as_slice()).to_string();
916 if !stderr_string.is_empty() {
917 error_logger
918 .log(&format!("There were FastCGI errors: {stderr_string}"))
919 .await;
920 }
921 } else {
922 stdin_copy_future_pinned.await.unwrap_or_default();
923 }
924 })
925 .build(),
926 )
927}
928
929async fn connect_tcp(
930 addr: &str,
931) -> Result<
932 (
933 Box<dyn AsyncRead + Send + Sync + Unpin>,
934 Box<dyn AsyncWrite + Send + Sync + Unpin>,
935 ),
936 tokio::io::Error,
937> {
938 let socket = TcpStream::connect(addr).await?;
939 socket.set_nodelay(true)?;
940
941 let (socket_reader_set, socket_writer_set) = tokio::io::split(socket);
942 Ok((Box::new(socket_reader_set), Box::new(socket_writer_set)))
943}
944
945#[allow(dead_code)]
946#[cfg(unix)]
947async fn connect_unix(
948 path: &str,
949) -> Result<
950 (
951 Box<dyn AsyncRead + Send + Sync + Unpin>,
952 Box<dyn AsyncWrite + Send + Sync + Unpin>,
953 ),
954 tokio::io::Error,
955> {
956 use tokio::net::UnixStream;
957
958 let socket = UnixStream::connect(path).await?;
959
960 let (socket_reader_set, socket_writer_set) = tokio::io::split(socket);
961 Ok((Box::new(socket_reader_set), Box::new(socket_writer_set)))
962}
963
964#[allow(dead_code)]
965#[cfg(not(unix))]
966async fn connect_unix(
967 _path: &str,
968) -> Result<
969 (
970 Box<dyn AsyncRead + Send + Sync + Unpin>,
971 Box<dyn AsyncWrite + Send + Sync + Unpin>,
972 ),
973 tokio::io::Error,
974> {
975 Err(tokio::io::Error::new(
976 tokio::io::ErrorKind::Unsupported,
977 "Unix sockets are not supports on non-Unix platforms.",
978 ))
979}