ferron/
server.rs

1use std::error::Error;
2use std::net::{IpAddr, Ipv6Addr, SocketAddr};
3use std::sync::Arc;
4use std::time::SystemTime;
5use std::{env, thread};
6
7use crate::ferron_common::{LogMessage, ServerModule, ServerModuleHandlers};
8use crate::ferron_request_handler::request_handler;
9use crate::ferron_util::env_config;
10use crate::ferron_util::load_tls::{load_certs, load_private_key};
11use crate::ferron_util::sni::CustomSniResolver;
12use crate::ferron_util::validate_config::{prepare_config_for_validation, validate_config};
13use async_channel::Sender;
14use chrono::prelude::*;
15use futures_util::StreamExt;
16use h3_quinn::quinn;
17use h3_quinn::quinn::crypto::rustls::QuicServerConfig;
18use http::Response;
19use http_body_util::{BodyExt, StreamBody};
20use hyper::body::{Buf, Bytes, Frame, Incoming};
21use hyper::service::service_fn;
22use hyper::Request;
23use hyper_util::rt::{TokioExecutor, TokioIo, TokioTimer};
24use ocsp_stapler::Stapler;
25use rustls::crypto::ring::cipher_suite::*;
26use rustls::crypto::ring::default_provider;
27use rustls::crypto::ring::kx_group::*;
28use rustls::server::{Acceptor, WebPkiClientVerifier};
29use rustls::sign::CertifiedKey;
30use rustls::version::{TLS12, TLS13};
31use rustls::{RootCertStore, ServerConfig};
32use rustls_native_certs::load_native_certs;
33use tokio::fs;
34use tokio::io::{AsyncWriteExt, BufWriter};
35use tokio::net::{TcpListener, TcpStream};
36use tokio::runtime::Handle;
37use tokio::signal;
38use tokio::sync::Mutex;
39use tokio::time;
40use tokio_rustls::server::TlsStream;
41use tokio_rustls::LazyConfigAcceptor;
42use tokio_rustls_acme2::acme::ACME_TLS_ALPN_NAME;
43use tokio_rustls_acme2::caches::DirCache;
44use tokio_rustls_acme2::{is_tls_alpn_challenge, AcmeConfig, ResolvesServerCertAcme, UseChallenge};
45use tokio_util::sync::CancellationToken;
46use yaml_rust2::Yaml;
47
48// Enum for maybe TLS stream
49#[allow(clippy::large_enum_variant)]
50enum MaybeTlsStream {
51  Tls(TlsStream<TcpStream>),
52  Plain(TcpStream),
53}
54
55// Function to accept and handle incoming QUIC connections
56#[allow(clippy::too_many_arguments)]
57async fn accept_quic_connection(
58  connection_attempt: quinn::Incoming,
59  local_address: SocketAddr,
60  config: Arc<Yaml>,
61  logger: Sender<LogMessage>,
62  modules: Arc<Vec<Box<dyn ServerModule + std::marker::Send + Sync>>>,
63) {
64  let remote_address = connection_attempt.remote_address();
65
66  let logger_clone = logger.clone();
67
68  tokio::task::spawn(async move {
69    match connection_attempt.await {
70      Ok(connection) => {
71        let mut h3_conn: h3::server::Connection<h3_quinn::Connection, Bytes> =
72          match h3::server::Connection::new(h3_quinn::Connection::new(connection)).await {
73            Ok(h3_conn) => h3_conn,
74            Err(err) => {
75              logger_clone
76                .send(LogMessage::new(
77                  format!("Error serving HTTP/3 connection: {err}"),
78                  true,
79                ))
80                .await
81                .unwrap_or_default();
82              return;
83            }
84          };
85
86        loop {
87          match h3_conn.accept().await {
88            Ok(Some(resolver)) => {
89              let config = config.clone();
90              let remote_address = remote_address;
91
92              let logger_clone = logger_clone.clone();
93              let modules = modules.clone();
94              tokio::spawn(async move {
95                let handlers_vec = modules
96                  .iter()
97                  .map(|module| module.get_handlers(Handle::current()));
98
99                let (request, stream) = match resolver.resolve_request().await {
100                  Ok(resolved) => resolved,
101                  Err(err) => {
102                    logger_clone
103                      .send(LogMessage::new(
104                        format!("Error serving HTTP/3 connection: {err}"),
105                        true,
106                      ))
107                      .await
108                      .unwrap_or_default();
109                    return;
110                  }
111                };
112                let (mut send, receive) = stream.split();
113                let request_body_stream = futures_util::stream::unfold(
114                  (receive, false),
115                  async move |(mut receive, mut is_body_finished)| loop {
116                    if !is_body_finished {
117                      match receive.recv_data().await {
118                        Ok(Some(mut data)) => {
119                          return Some((
120                            Ok(Frame::data(data.copy_to_bytes(data.remaining()))),
121                            (receive, false),
122                          ))
123                        }
124                        Ok(None) => is_body_finished = true,
125                        Err(err) => {
126                          return Some((
127                            Err(std::io::Error::other(err.to_string())),
128                            (receive, false),
129                          ))
130                        }
131                      }
132                    } else {
133                      match receive.recv_trailers().await {
134                        Ok(Some(trailers)) => {
135                          return Some((Ok(Frame::trailers(trailers)), (receive, true)))
136                        }
137                        Ok(None) => {
138                          return None;
139                        }
140                        Err(err) => {
141                          return Some((
142                            Err(std::io::Error::other(err.to_string())),
143                            (receive, true),
144                          ))
145                        }
146                      }
147                    }
148                  },
149                );
150                let request_body = BodyExt::boxed(StreamBody::new(request_body_stream));
151                let (request_parts, _) = request.into_parts();
152                let request = Request::from_parts(request_parts, request_body);
153                let handlers_vec_clone = handlers_vec
154                  .clone()
155                  .collect::<Vec<Box<dyn ServerModuleHandlers + Send>>>();
156                let mut response = match request_handler(
157                  request,
158                  remote_address,
159                  local_address,
160                  true,
161                  config,
162                  logger_clone.clone(),
163                  handlers_vec_clone,
164                  None,
165                  None,
166                )
167                .await
168                {
169                  Ok(response) => response,
170                  Err(err) => {
171                    logger_clone
172                      .send(LogMessage::new(
173                        format!("Error serving HTTP/3 connection: {err}"),
174                        true,
175                      ))
176                      .await
177                      .unwrap_or_default();
178                    return;
179                  }
180                };
181                if let Ok(http_date) = httpdate::fmt_http_date(SystemTime::now()).try_into() {
182                  response
183                    .headers_mut()
184                    .entry(http::header::DATE)
185                    .or_insert(http_date);
186                }
187                let (response_parts, mut response_body) = response.into_parts();
188                if let Err(err) = send
189                  .send_response(Response::from_parts(response_parts, ()))
190                  .await
191                {
192                  logger_clone
193                    .send(LogMessage::new(
194                      format!("Error serving HTTP/3 connection: {err}"),
195                      true,
196                    ))
197                    .await
198                    .unwrap_or_default();
199                  return;
200                }
201                let mut had_trailers = false;
202                while let Some(chunk) = response_body.frame().await {
203                  match chunk {
204                    Ok(frame) => {
205                      if frame.is_data() {
206                        match frame.into_data() {
207                          Ok(data) => {
208                            if let Err(err) = send.send_data(data).await {
209                              logger_clone
210                                .send(LogMessage::new(
211                                  format!("Error serving HTTP/3 connection: {err}"),
212                                  true,
213                                ))
214                                .await
215                                .unwrap_or_default();
216                              return;
217                            }
218                          }
219                          Err(_) => {
220                            logger_clone
221                            .send(LogMessage::new(
222                              "Error serving HTTP/3 connection: the frame isn't really a data frame".to_string(),
223                              true,
224                            ))
225                            .await
226                            .unwrap_or_default();
227                            return;
228                          }
229                        }
230                      } else if frame.is_trailers() {
231                        match frame.into_trailers() {
232                          Ok(trailers) => {
233                            had_trailers = true;
234                            if let Err(err) = send.send_trailers(trailers).await {
235                              logger_clone
236                                .send(LogMessage::new(
237                                  format!("Error serving HTTP/3 connection: {err}"),
238                                  true,
239                                ))
240                                .await
241                                .unwrap_or_default();
242                              return;
243                            }
244                          }
245                          Err(_) => {
246                            logger_clone
247                            .send(LogMessage::new(
248                              "Error serving HTTP/3 connection: the frame isn't really a trailers frame".to_string(),
249                              true,
250                            ))
251                            .await
252                            .unwrap_or_default();
253                            return;
254                          }
255                        }
256                      }
257                    }
258                    Err(err) => {
259                      logger_clone
260                        .send(LogMessage::new(
261                          format!("Error serving HTTP/3 connection: {err}"),
262                          true,
263                        ))
264                        .await
265                        .unwrap_or_default();
266                      return;
267                    }
268                  }
269                }
270                if !had_trailers {
271                  if let Err(err) = send.finish().await {
272                    logger_clone
273                      .send(LogMessage::new(
274                        format!("Error serving HTTP/3 connection: {err}"),
275                        true,
276                      ))
277                      .await
278                      .unwrap_or_default();
279                  }
280                }
281              });
282            }
283            Ok(None) => break,
284            Err(err) => {
285              logger_clone
286                .send(LogMessage::new(
287                  format!("Error serving HTTP/3 connection: {err}"),
288                  true,
289                ))
290                .await
291                .unwrap_or_default();
292              return;
293            }
294          }
295        }
296      }
297      Err(err) => {
298        logger_clone
299          .send(LogMessage::new(
300            format!("Cannot accept a connection: {err}"),
301            true,
302          ))
303          .await
304          .unwrap_or_default();
305      }
306    }
307  });
308}
309
310// Function to accept and handle incoming connections
311#[allow(clippy::too_many_arguments)]
312async fn accept_connection(
313  stream: TcpStream,
314  remote_address: SocketAddr,
315  tls_config_option: Option<(Arc<ServerConfig>, Option<Arc<ServerConfig>>)>,
316  acme_http01_resolver_option: Option<Arc<ResolvesServerCertAcme>>,
317  config: Arc<Yaml>,
318  logger: Sender<LogMessage>,
319  modules: Arc<Vec<Box<dyn ServerModule + std::marker::Send + Sync>>>,
320  http3_enabled: Option<u16>,
321) {
322  // Disable Nagle algorithm to improve performance
323  if let Err(err) = stream.set_nodelay(true) {
324    logger
325      .send(LogMessage::new(
326        format!("Cannot disable Nagle algorithm: {err}"),
327        true,
328      ))
329      .await
330      .unwrap_or_default();
331    return;
332  };
333
334  let config = config.clone();
335  let local_address = match stream.local_addr() {
336    Ok(local_address) => local_address,
337    Err(err) => {
338      logger
339        .send(LogMessage::new(
340          format!("Cannot obtain local address of the connection: {err}"),
341          true,
342        ))
343        .await
344        .unwrap_or_default();
345      return;
346    }
347  };
348
349  let logger_clone = logger.clone();
350
351  tokio::task::spawn(async move {
352    let maybe_tls_stream = if let Some((tls_config, acme_config_option)) = tls_config_option {
353      let start_handshake = match LazyConfigAcceptor::new(Acceptor::default(), stream).await {
354        Ok(start_handshake) => start_handshake,
355        Err(err) => {
356          logger
357            .send(LogMessage::new(
358              format!("Error during TLS handshake: {err}"),
359              true,
360            ))
361            .await
362            .unwrap_or_default();
363          return;
364        }
365      };
366
367      if let Some(acme_config) = acme_config_option {
368        if is_tls_alpn_challenge(&start_handshake.client_hello()) {
369          match start_handshake.into_stream(acme_config).await {
370            Ok(_) => (),
371            Err(err) => {
372              logger
373                .send(LogMessage::new(
374                  format!("Error during TLS handshake: {err}"),
375                  true,
376                ))
377                .await
378                .unwrap_or_default();
379              return;
380            }
381          };
382          return;
383        }
384      }
385
386      let tls_stream = match start_handshake.into_stream(tls_config).await {
387        Ok(tls_stream) => tls_stream,
388        Err(err) => {
389          logger
390            .send(LogMessage::new(
391              format!("Error during TLS handshake: {err}"),
392              true,
393            ))
394            .await
395            .unwrap_or_default();
396          return;
397        }
398      };
399
400      MaybeTlsStream::Tls(tls_stream)
401    } else {
402      MaybeTlsStream::Plain(stream)
403    };
404
405    if let MaybeTlsStream::Tls(tls_stream) = maybe_tls_stream {
406      let alpn_protocol = tls_stream.get_ref().1.alpn_protocol();
407      let is_http2;
408
409      if config["global"]["enableHTTP2"].as_bool().unwrap_or(true) {
410        if alpn_protocol == Some("h2".as_bytes()) {
411          is_http2 = true;
412        } else {
413          // Don't allow HTTP/2 if "h2" ALPN offering was't present
414          is_http2 = false;
415        }
416      } else {
417        is_http2 = false;
418      }
419
420      let io = TokioIo::new(tls_stream);
421      let handlers_vec = modules
422        .iter()
423        .map(|module| module.get_handlers(Handle::current()));
424
425      if is_http2 {
426        let mut http2_builder = hyper::server::conn::http2::Builder::new(TokioExecutor::new());
427        http2_builder.timer(TokioTimer::new());
428        if let Some(initial_window_size) =
429          config["global"]["http2Settings"]["initialWindowSize"].as_i64()
430        {
431          http2_builder.initial_stream_window_size(initial_window_size as u32);
432        }
433        if let Some(max_frame_size) = config["global"]["http2Settings"]["maxFrameSize"].as_i64() {
434          http2_builder.max_frame_size(max_frame_size as u32);
435        }
436        if let Some(max_concurrent_streams) =
437          config["global"]["http2Settings"]["maxConcurrentStreams"].as_i64()
438        {
439          http2_builder.max_concurrent_streams(max_concurrent_streams as u32);
440        }
441        if let Some(max_header_list_size) =
442          config["global"]["http2Settings"]["maxHeaderListSize"].as_i64()
443        {
444          http2_builder.max_header_list_size(max_header_list_size as u32);
445        }
446        if let Some(enable_connect_protocol) =
447          config["global"]["http2Settings"]["enableConnectProtocol"].as_bool()
448        {
449          if enable_connect_protocol {
450            http2_builder.enable_connect_protocol();
451          }
452        }
453
454        if let Err(err) = http2_builder
455          .serve_connection(
456            io,
457            service_fn(move |request: Request<Incoming>| {
458              let config = config.clone();
459              let logger = logger_clone.clone();
460              let handlers_vec_clone = handlers_vec
461                .clone()
462                .collect::<Vec<Box<dyn ServerModuleHandlers + Send>>>();
463              let acme_http01_resolver_option_clone = acme_http01_resolver_option.clone();
464              let (request_parts, request_body) = request.into_parts();
465              let request = Request::from_parts(
466                request_parts,
467                request_body
468                  .map_err(|e| std::io::Error::other(e.to_string()))
469                  .boxed(),
470              );
471              request_handler(
472                request,
473                remote_address,
474                local_address,
475                true,
476                config,
477                logger,
478                handlers_vec_clone,
479                acme_http01_resolver_option_clone,
480                http3_enabled,
481              )
482            }),
483          )
484          .await
485        {
486          logger
487            .send(LogMessage::new(
488              format!("Error serving HTTPS connection: {err}"),
489              true,
490            ))
491            .await
492            .unwrap_or_default();
493        }
494      } else {
495        let mut http1_builder = hyper::server::conn::http1::Builder::new();
496
497        // The timer is neccessary for the header timeout to work to mitigate Slowloris.
498        http1_builder.timer(TokioTimer::new());
499
500        if let Err(err) = http1_builder
501          .serve_connection(
502            io,
503            service_fn(move |request: Request<Incoming>| {
504              let config = config.clone();
505              let logger = logger_clone.clone();
506              let handlers_vec_clone = handlers_vec
507                .clone()
508                .collect::<Vec<Box<dyn ServerModuleHandlers + Send>>>();
509              let acme_http01_resolver_option_clone = acme_http01_resolver_option.clone();
510              let (request_parts, request_body) = request.into_parts();
511              let request = Request::from_parts(
512                request_parts,
513                request_body
514                  .map_err(|e| std::io::Error::other(e.to_string()))
515                  .boxed(),
516              );
517              request_handler(
518                request,
519                remote_address,
520                local_address,
521                true,
522                config,
523                logger,
524                handlers_vec_clone,
525                acme_http01_resolver_option_clone,
526                http3_enabled,
527              )
528            }),
529          )
530          .with_upgrades()
531          .await
532        {
533          logger
534            .send(LogMessage::new(
535              format!("Error serving HTTPS connection: {err}"),
536              true,
537            ))
538            .await
539            .unwrap_or_default();
540        }
541      }
542    } else if let MaybeTlsStream::Plain(stream) = maybe_tls_stream {
543      let io = TokioIo::new(stream);
544      let handlers_vec = modules
545        .iter()
546        .map(|module| module.get_handlers(Handle::current()));
547
548      let mut http1_builder = hyper::server::conn::http1::Builder::new();
549
550      // The timer is neccessary for the header timeout to work to mitigate Slowloris.
551      http1_builder.timer(TokioTimer::new());
552
553      if let Err(err) = http1_builder
554        .serve_connection(
555          io,
556          service_fn(move |request: Request<Incoming>| {
557            let config = config.clone();
558            let logger = logger_clone.clone();
559            let handlers_vec_clone = handlers_vec
560              .clone()
561              .collect::<Vec<Box<dyn ServerModuleHandlers + Send>>>();
562            let acme_http01_resolver_option_clone = acme_http01_resolver_option.clone();
563            let (request_parts, request_body) = request.into_parts();
564            let request = Request::from_parts(
565              request_parts,
566              request_body
567                .map_err(|e| std::io::Error::other(e.to_string()))
568                .boxed(),
569            );
570            request_handler(
571              request,
572              remote_address,
573              local_address,
574              false,
575              config,
576              logger,
577              handlers_vec_clone,
578              acme_http01_resolver_option_clone,
579              http3_enabled,
580            )
581          }),
582        )
583        .with_upgrades()
584        .await
585      {
586        logger
587          .send(LogMessage::new(
588            format!("Error serving HTTP connection: {err}"),
589            true,
590          ))
591          .await
592          .unwrap_or_default();
593      }
594    }
595  });
596}
597
598// Main server event loop
599#[allow(clippy::type_complexity)]
600async fn server_event_loop(
601  yaml_config: Arc<Yaml>,
602  logger: Sender<LogMessage>,
603  modules: Vec<Box<dyn ServerModule + Send + Sync>>,
604  module_error: Option<anyhow::Error>,
605  modules_optional_builtin: Vec<String>,
606  first_startup: bool,
607) -> Result<(), Box<dyn Error + Send + Sync>> {
608  if let Some(module_error) = module_error {
609    logger
610      .send(LogMessage::new(module_error.to_string(), true))
611      .await
612      .unwrap_or_default();
613    Err(module_error)?
614  }
615
616  let prepared_config = match prepare_config_for_validation(&yaml_config) {
617    Ok(prepared_config) => prepared_config,
618    Err(err) => {
619      logger
620        .send(LogMessage::new(
621          format!("Server configuration validation failed: {err}"),
622          true,
623        ))
624        .await
625        .unwrap_or_default();
626      Err(anyhow::anyhow!(
627        "Server configuration validation failed: {}",
628        err
629      ))?
630    }
631  };
632
633  for (config_to_validate, is_global, is_location, is_error_config) in prepared_config {
634    match validate_config(
635      config_to_validate,
636      is_global,
637      is_location,
638      is_error_config,
639      &modules_optional_builtin,
640    ) {
641      Ok(unused_properties) => {
642        for unused_property in unused_properties {
643          logger
644            .send(LogMessage::new(
645              format!(
646                "Unused configuration property detected: \"{unused_property}\". You might load an appropriate module to use this configuration property"
647              ),
648              true,
649            ))
650            .await
651            .unwrap_or_default();
652        }
653      }
654      Err(err) => {
655        logger
656          .send(LogMessage::new(
657            format!("Server configuration validation failed: {err}"),
658            true,
659          ))
660          .await
661          .unwrap_or_default();
662        Err(anyhow::anyhow!(
663          "Server configuration validation failed: {}",
664          err
665        ))?
666      }
667    };
668  }
669
670  let mut crypto_provider = default_provider();
671
672  if let Some(cipher_suite) = yaml_config["global"]["cipherSuite"].as_vec() {
673    let mut cipher_suites = Vec::new();
674    let cipher_suite_iter = cipher_suite.iter();
675    for cipher_suite_yaml in cipher_suite_iter {
676      if let Some(cipher_suite) = cipher_suite_yaml.as_str() {
677        let cipher_suite_to_add = match cipher_suite {
678          "TLS_AES_128_GCM_SHA256" => TLS13_AES_128_GCM_SHA256,
679          "TLS_AES_256_GCM_SHA384" => TLS13_AES_256_GCM_SHA384,
680          "TLS_CHACHA20_POLY1305_SHA256" => TLS13_CHACHA20_POLY1305_SHA256,
681          "TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256" => TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256,
682          "TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384" => TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384,
683          "TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305_SHA256" => {
684            TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305_SHA256
685          }
686          "TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256" => TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,
687          "TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384" => TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384,
688          "TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305_SHA256" => {
689            TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305_SHA256
690          }
691          _ => {
692            logger
693              .send(LogMessage::new(
694                format!("The \"{cipher_suite}\" cipher suite is not supported"),
695                true,
696              ))
697              .await
698              .unwrap_or_default();
699            Err(anyhow::anyhow!(
700              "The \"{}\" cipher suite is not supported",
701              cipher_suite
702            ))?
703          }
704        };
705        cipher_suites.push(cipher_suite_to_add);
706      }
707    }
708    crypto_provider.cipher_suites = cipher_suites;
709  }
710
711  if let Some(ecdh_curves) = yaml_config["global"]["ecdhCurve"].as_vec() {
712    let mut kx_groups = Vec::new();
713    let ecdh_curves_iter = ecdh_curves.iter();
714    for ecdh_curve_yaml in ecdh_curves_iter {
715      if let Some(ecdh_curve) = ecdh_curve_yaml.as_str() {
716        let kx_group_to_add = match ecdh_curve {
717          "secp256r1" => SECP256R1,
718          "secp384r1" => SECP384R1,
719          "x25519" => X25519,
720          _ => {
721            logger
722              .send(LogMessage::new(
723                format!("The \"{ecdh_curve}\" ECDH curve is not supported"),
724                true,
725              ))
726              .await
727              .unwrap_or_default();
728            Err(anyhow::anyhow!(
729              "The \"{}\" ECDH curve is not supported",
730              ecdh_curve
731            ))?
732          }
733        };
734        kx_groups.push(kx_group_to_add);
735      }
736    }
737    crypto_provider.kx_groups = kx_groups;
738  }
739
740  let crypto_provider_cloned = crypto_provider.clone();
741  let mut sni_resolver = CustomSniResolver::new();
742  let mut certified_keys = Vec::new();
743
744  let mut automatic_tls_enabled = false;
745  let mut acme_letsencrypt_production = true;
746  let acme_use_http_challenge = yaml_config["global"]["useAutomaticTLSHTTPChallenge"]
747    .as_bool()
748    .unwrap_or(false);
749  let acme_challenge_type = if acme_use_http_challenge {
750    UseChallenge::Http01
751  } else {
752    UseChallenge::TlsAlpn01
753  };
754
755  // Read automatic TLS configuration
756  if let Some(read_automatic_tls_enabled) = yaml_config["global"]["enableAutomaticTLS"].as_bool() {
757    automatic_tls_enabled = read_automatic_tls_enabled;
758  }
759
760  let acme_contact = yaml_config["global"]["automaticTLSContactEmail"].as_str();
761  let acme_cache = yaml_config["global"]["automaticTLSContactCacheDirectory"]
762    .as_str()
763    .map(|s| s.to_string())
764    .map(DirCache::new);
765
766  if let Some(read_acme_letsencrypt_production) =
767    yaml_config["global"]["automaticTLSLetsEncryptProduction"].as_bool()
768  {
769    acme_letsencrypt_production = read_acme_letsencrypt_production;
770  }
771
772  if !automatic_tls_enabled {
773    // Load public certificate and private key
774    if let Some(cert_path) = yaml_config["global"]["cert"].as_str() {
775      if let Some(key_path) = yaml_config["global"]["key"].as_str() {
776        let certs = match load_certs(cert_path) {
777          Ok(certs) => certs,
778          Err(err) => {
779            logger
780              .send(LogMessage::new(
781                format!("Cannot load the \"{cert_path}\" TLS certificate: {err}"),
782                true,
783              ))
784              .await
785              .unwrap_or_default();
786            Err(anyhow::anyhow!(
787              "Cannot load the \"{}\" TLS certificate: {}",
788              cert_path,
789              err
790            ))?
791          }
792        };
793        let key = match load_private_key(key_path) {
794          Ok(key) => key,
795          Err(err) => {
796            logger
797              .send(LogMessage::new(
798                format!("Cannot load the \"{cert_path}\" private key: {err}"),
799                true,
800              ))
801              .await
802              .unwrap_or_default();
803            Err(anyhow::anyhow!(
804              "Cannot load the \"{}\" private key: {}",
805              cert_path,
806              err
807            ))?
808          }
809        };
810        let signing_key = match crypto_provider_cloned.key_provider.load_private_key(key) {
811          Ok(key) => key,
812          Err(err) => {
813            logger
814              .send(LogMessage::new(
815                format!("Cannot load the \"{cert_path}\" private key: {err}"),
816                true,
817              ))
818              .await
819              .unwrap_or_default();
820            Err(anyhow::anyhow!(
821              "Cannot load the \"{}\" private key: {}",
822              cert_path,
823              err
824            ))?
825          }
826        };
827        let certified_key = CertifiedKey::new(certs, signing_key);
828        sni_resolver.load_fallback_cert_key(Arc::new(certified_key));
829      }
830    }
831
832    if let Some(sni) = yaml_config["global"]["sni"].as_hash() {
833      let sni_hostnames = sni.keys();
834      for sni_hostname_unknown in sni_hostnames {
835        if let Some(sni_hostname) = sni_hostname_unknown.as_str() {
836          if let Some(cert_path) = sni[sni_hostname_unknown]["cert"].as_str() {
837            if let Some(key_path) = sni[sni_hostname_unknown]["key"].as_str() {
838              let certs = match load_certs(cert_path) {
839                Ok(certs) => certs,
840                Err(err) => {
841                  logger
842                    .send(LogMessage::new(
843                      format!("Cannot load the \"{cert_path}\" TLS certificate: {err}"),
844                      true,
845                    ))
846                    .await
847                    .unwrap_or_default();
848                  Err(anyhow::anyhow!(
849                    "Cannot load the \"{}\" TLS certificate: {}",
850                    cert_path,
851                    err
852                  ))?
853                }
854              };
855              let key = match load_private_key(key_path) {
856                Ok(key) => key,
857                Err(err) => {
858                  logger
859                    .send(LogMessage::new(
860                      format!("Cannot load the \"{cert_path}\" private key: {err}"),
861                      true,
862                    ))
863                    .await
864                    .unwrap_or_default();
865                  Err(anyhow::anyhow!(
866                    "Cannot load the \"{}\" private key: {}",
867                    cert_path,
868                    err
869                  ))?
870                }
871              };
872              let signing_key = match crypto_provider_cloned.key_provider.load_private_key(key) {
873                Ok(key) => key,
874                Err(err) => {
875                  logger
876                    .send(LogMessage::new(
877                      format!("Cannot load the \"{cert_path}\" private key: {err}"),
878                      true,
879                    ))
880                    .await
881                    .unwrap_or_default();
882                  Err(anyhow::anyhow!(
883                    "Cannot load the \"{}\" private key: {}",
884                    cert_path,
885                    err
886                  ))?
887                }
888              };
889              let certified_key_arc = Arc::new(CertifiedKey::new(certs, signing_key));
890              sni_resolver.load_host_cert_key(sni_hostname, certified_key_arc.clone());
891              certified_keys.push(certified_key_arc);
892            }
893          }
894        }
895      }
896    }
897  }
898
899  // Build TLS configuration
900  let tls_config_builder_wants_versions =
901    ServerConfig::builder_with_provider(Arc::new(crypto_provider_cloned));
902
903  let min_tls_version_option = yaml_config["global"]["tlsMinVersion"].as_str();
904  let max_tls_version_option = yaml_config["global"]["tlsMaxVersion"].as_str();
905  let tls_config_builder_wants_verifier = if min_tls_version_option.is_none()
906    && max_tls_version_option.is_none()
907  {
908    tls_config_builder_wants_versions.with_safe_default_protocol_versions()?
909  } else {
910    let tls_versions = [("TLSv1.2", &TLS12), ("TLSv1.3", &TLS13)];
911    let min_tls_version_index =
912      min_tls_version_option.map_or(Some(0), |v| tls_versions.iter().position(|p| p.0 == v));
913    let max_tls_version_index = max_tls_version_option.map_or(Some(tls_versions.len() - 1), |v| {
914      tls_versions.iter().position(|p| p.0 == v)
915    });
916    if let Some(min_tls_version_index) = min_tls_version_index {
917      if let Some(max_tls_version_index) = max_tls_version_index {
918        if max_tls_version_index < min_tls_version_index {
919          logger
920            .send(LogMessage::new(
921              String::from("Maximum TLS version is older than minimum TLS version"),
922              true,
923            ))
924            .await
925            .unwrap_or_default();
926          Err(anyhow::anyhow!(
927            "Maximum TLS version is older than minimum TLS version"
928          ))?
929        }
930        match tls_config_builder_wants_versions.with_protocol_versions(
931          &tls_versions[min_tls_version_index..=max_tls_version_index]
932            .iter()
933            .map(|p| p.1)
934            .collect::<Vec<_>>(),
935        ) {
936          Ok(builder) => builder,
937          Err(err) => {
938            logger
939              .send(LogMessage::new(
940                format!("Couldn't create the TLS server configuration: {err}"),
941                true,
942              ))
943              .await
944              .unwrap_or_default();
945            Err(anyhow::anyhow!(
946              "Couldn't create the TLS server configuration: {}",
947              err
948            ))?
949          }
950        }
951      } else {
952        logger
953          .send(LogMessage::new(
954            String::from("Invalid maximum TLS version"),
955            true,
956          ))
957          .await
958          .unwrap_or_default();
959        Err(anyhow::anyhow!("Invalid maximum TLS version"))?
960      }
961    } else {
962      logger
963        .send(LogMessage::new(
964          String::from("Invalid minimum TLS version"),
965          true,
966        ))
967        .await
968        .unwrap_or_default();
969      Err(anyhow::anyhow!("Invalid minimum TLS version"))?
970    }
971  };
972
973  let tls_config_builder_wants_server_cert =
974    match yaml_config["global"]["useClientCertificate"].as_bool() {
975      Some(true) => {
976        let mut roots = RootCertStore::empty();
977        let certs_result = load_native_certs();
978        if !certs_result.errors.is_empty() {
979          logger
980            .send(LogMessage::new(
981              format!(
982                "Couldn't load the native certificate store: {}",
983                certs_result.errors[0]
984              ),
985              true,
986            ))
987            .await
988            .unwrap_or_default();
989          Err(anyhow::anyhow!(
990            "Couldn't load the native certificate store: {}",
991            certs_result.errors[0]
992          ))?
993        }
994        let certs = certs_result.certs;
995
996        for cert in certs {
997          match roots.add(cert) {
998            Ok(_) => (),
999            Err(err) => {
1000              logger
1001                .send(LogMessage::new(
1002                  format!("Couldn't add a certificate to the certificate store: {err}"),
1003                  true,
1004                ))
1005                .await
1006                .unwrap_or_default();
1007              Err(anyhow::anyhow!(
1008                "Couldn't add a certificate to the certificate store: {}",
1009                err
1010              ))?
1011            }
1012          }
1013        }
1014        tls_config_builder_wants_verifier
1015          .with_client_cert_verifier(WebPkiClientVerifier::builder(Arc::new(roots)).build()?)
1016      }
1017      _ => tls_config_builder_wants_verifier.with_no_client_auth(),
1018    };
1019
1020  let mut tls_config;
1021
1022  let mut addr = SocketAddr::from((IpAddr::V6(Ipv6Addr::UNSPECIFIED), 80));
1023  let mut addr_tls = SocketAddr::from((IpAddr::V6(Ipv6Addr::UNSPECIFIED), 443));
1024  let mut tls_enabled = false;
1025  let mut non_tls_disabled = false;
1026
1027  // Install a process-wide cryptography provider. If it fails, then warn about it.
1028  if crypto_provider.install_default().is_err() && first_startup {
1029    logger
1030      .send(LogMessage::new(
1031        "Cannot install a process-wide cryptography provider".to_string(),
1032        true,
1033      ))
1034      .await
1035      .unwrap_or_default();
1036    Err(anyhow::anyhow!(
1037      "Cannot install a process-wide cryptography provider"
1038    ))?;
1039  }
1040
1041  // Read port configurations from YAML
1042  if let Some(read_port) = yaml_config["global"]["port"].as_i64() {
1043    addr = SocketAddr::from((
1044      IpAddr::V6(Ipv6Addr::UNSPECIFIED),
1045      match read_port.try_into() {
1046        Ok(port) => port,
1047        Err(_) => {
1048          logger
1049            .send(LogMessage::new(String::from("Invalid HTTP port"), true))
1050            .await
1051            .unwrap_or_default();
1052          Err(anyhow::anyhow!("Invalid HTTP port"))?
1053        }
1054      },
1055    ));
1056  } else if let Some(read_port) = yaml_config["global"]["port"].as_str() {
1057    addr = match read_port.parse() {
1058      Ok(addr) => addr,
1059      Err(_) => {
1060        logger
1061          .send(LogMessage::new(String::from("Invalid HTTP port"), true))
1062          .await
1063          .unwrap_or_default();
1064        Err(anyhow::anyhow!("Invalid HTTP port"))?
1065      }
1066    };
1067  }
1068
1069  if let Some(read_tls_enabled) = yaml_config["global"]["secure"].as_bool() {
1070    tls_enabled = read_tls_enabled;
1071    if let Some(read_non_tls_disabled) =
1072      yaml_config["global"]["disableNonEncryptedServer"].as_bool()
1073    {
1074      non_tls_disabled = read_non_tls_disabled;
1075    }
1076  }
1077
1078  if let Some(read_port) = yaml_config["global"]["sport"].as_i64() {
1079    addr_tls = SocketAddr::from((
1080      IpAddr::V6(Ipv6Addr::UNSPECIFIED),
1081      match read_port.try_into() {
1082        Ok(port) => port,
1083        Err(_) => {
1084          logger
1085            .send(LogMessage::new(String::from("Invalid HTTPS port"), true))
1086            .await
1087            .unwrap_or_default();
1088          Err(anyhow::anyhow!("Invalid HTTPS port"))?
1089        }
1090      },
1091    ));
1092  } else if let Some(read_port) = yaml_config["global"]["sport"].as_str() {
1093    addr_tls = match read_port.parse() {
1094      Ok(addr) => addr,
1095      Err(_) => {
1096        logger
1097          .send(LogMessage::new(String::from("Invalid HTTPS port"), true))
1098          .await
1099          .unwrap_or_default();
1100        Err(anyhow::anyhow!("Invalid HTTPS port"))?
1101      }
1102    };
1103  }
1104
1105  // Get domains for ACME configuration
1106  let mut acme_domains = Vec::new();
1107  if let Some(hosts_config) = yaml_config["hosts"].as_vec() {
1108    for host_yaml in hosts_config.iter() {
1109      if let Some(host) = host_yaml.as_hash() {
1110        if let Some(domain_yaml) = host.get(&Yaml::from_str("domain")) {
1111          if let Some(domain) = domain_yaml.as_str() {
1112            if !domain.contains("*") {
1113              acme_domains.push(domain);
1114            }
1115          }
1116        }
1117      }
1118    }
1119  }
1120
1121  // Create ACME configuration
1122  let mut acme_config = AcmeConfig::new(acme_domains).challenge_type(acme_challenge_type);
1123  if let Some(acme_contact_unwrapped) = acme_contact {
1124    acme_config = acme_config.contact_push(format!("mailto:{acme_contact_unwrapped}"));
1125  }
1126  let mut acme_config_with_cache = acme_config.cache_option(acme_cache);
1127  acme_config_with_cache =
1128    acme_config_with_cache.directory_lets_encrypt(acme_letsencrypt_production);
1129
1130  let (acme_config, acme_http01_resolver) = if tls_enabled && automatic_tls_enabled {
1131    let mut acme_state = acme_config_with_cache.state();
1132
1133    let acme_resolver = acme_state.resolver();
1134
1135    // Create TLS configuration
1136    tls_config = if yaml_config["global"]["enableOCSPStapling"]
1137      .as_bool()
1138      .unwrap_or(true)
1139    {
1140      tls_config_builder_wants_server_cert
1141        .with_cert_resolver(Arc::new(Stapler::new(acme_resolver.clone())))
1142    } else {
1143      tls_config_builder_wants_server_cert.with_cert_resolver(acme_resolver.clone())
1144    };
1145
1146    let acme_logger = logger.clone();
1147    tokio::spawn(async move {
1148      while let Some(acme_result) = acme_state.next().await {
1149        if let Err(acme_error) = acme_result {
1150          acme_logger
1151            .send(LogMessage::new(
1152              format!("Error while obtaining a TLS certificate: {acme_error}"),
1153              true,
1154            ))
1155            .await
1156            .unwrap_or_default();
1157        }
1158      }
1159    });
1160
1161    if acme_use_http_challenge {
1162      (None, Some(acme_resolver))
1163    } else {
1164      let mut acme_config = tls_config.clone();
1165      acme_config.alpn_protocols.push(ACME_TLS_ALPN_NAME.to_vec());
1166
1167      (Some(acme_config), None)
1168    }
1169  } else {
1170    // Create TLS configuration
1171    tls_config = if yaml_config["global"]["enableOCSPStapling"]
1172      .as_bool()
1173      .unwrap_or(true)
1174    {
1175      let ocsp_stapler_arc = Arc::new(Stapler::new(Arc::new(sni_resolver)));
1176      for certified_key in certified_keys.iter() {
1177        ocsp_stapler_arc.preload(certified_key.clone());
1178      }
1179      tls_config_builder_wants_server_cert.with_cert_resolver(ocsp_stapler_arc.clone())
1180    } else {
1181      tls_config_builder_wants_server_cert.with_cert_resolver(Arc::new(sni_resolver))
1182    };
1183
1184    // Drop the ACME configuration
1185    drop(acme_config_with_cache);
1186    (None, None)
1187  };
1188
1189  let quic_config = if tls_enabled
1190    && yaml_config["global"]["enableHTTP3"]
1191      .as_bool()
1192      .unwrap_or(false)
1193  {
1194    let mut quic_tls_config = tls_config.clone();
1195    quic_tls_config.max_early_data_size = u32::MAX;
1196    quic_tls_config.alpn_protocols = vec![b"h3".to_vec(), b"h3-29".to_vec()];
1197    let quic_config = quinn::ServerConfig::with_crypto(Arc::new(match QuicServerConfig::try_from(
1198      quic_tls_config,
1199    ) {
1200      Ok(quinn_config) => quinn_config,
1201      Err(err) => {
1202        logger
1203          .send(LogMessage::new(
1204            format!("There was a problem when starting HTTP/3 server: {err}"),
1205            true,
1206          ))
1207          .await
1208          .unwrap_or_default();
1209        Err(anyhow::anyhow!(
1210          "There was a problem when starting HTTP/3 server: {}",
1211          err
1212        ))?
1213      }
1214    }));
1215    Some(quic_config)
1216  } else {
1217    None
1218  };
1219
1220  // Configure ALPN protocols
1221  let mut alpn_protocols = vec![b"http/1.1".to_vec(), b"http/1.0".to_vec()];
1222  if yaml_config["global"]["enableHTTP2"]
1223    .as_bool()
1224    .unwrap_or(true)
1225  {
1226    alpn_protocols.insert(0, b"h2".to_vec());
1227  }
1228  tls_config.alpn_protocols = alpn_protocols;
1229  let tls_config_arc = Arc::new(tls_config);
1230  let acme_config_arc = acme_config.map(Arc::new);
1231
1232  let mut listener = None;
1233  let mut listener_tls = None;
1234  let mut listener_quic = None;
1235
1236  // Bind to the specified ports
1237  if !non_tls_disabled {
1238    println!("HTTP server is listening at {addr}");
1239    listener = Some(match TcpListener::bind(addr).await {
1240      Ok(listener) => listener,
1241      Err(err) => {
1242        logger
1243          .send(LogMessage::new(
1244            format!("Cannot listen to HTTP port: {err}"),
1245            true,
1246          ))
1247          .await
1248          .unwrap_or_default();
1249        Err(anyhow::anyhow!("Cannot listen to HTTP port: {}", err))?
1250      }
1251    });
1252  }
1253
1254  if tls_enabled {
1255    println!("HTTPS server is listening at {addr_tls}");
1256    listener_tls = Some(match TcpListener::bind(addr_tls).await {
1257      Ok(listener) => listener,
1258      Err(err) => {
1259        logger
1260          .send(LogMessage::new(
1261            format!("Cannot listen to HTTPS port: {err}"),
1262            true,
1263          ))
1264          .await
1265          .unwrap_or_default();
1266        Err(anyhow::anyhow!("Cannot listen to HTTPS port: {}", err))?
1267      }
1268    });
1269
1270    if let Some(quic_config) = quic_config {
1271      println!("HTTP/3 server is listening at {addr_tls}");
1272      listener_quic = Some(match quinn::Endpoint::server(quic_config, addr_tls) {
1273        Ok(listener) => listener,
1274        Err(err) => {
1275          logger
1276            .send(LogMessage::new(
1277              format!("Cannot listen to HTTP/3 port: {err}"),
1278              true,
1279            ))
1280            .await
1281            .unwrap_or_default();
1282          Err(anyhow::anyhow!("Cannot listen to HTTP/3 port: {}", err))?
1283        }
1284      });
1285    }
1286  }
1287
1288  // Wrap the modules vector in an Arc
1289  let modules_arc = Arc::new(modules);
1290
1291  let http3_enabled = if listener_quic.is_some() {
1292    Some(addr_tls.port())
1293  } else {
1294    None
1295  };
1296
1297  // Main loop to accept incoming connections
1298  loop {
1299    let listener_borrowed = &listener;
1300    let listener_accept = async move {
1301      if let Some(listener) = listener_borrowed {
1302        listener.accept().await
1303      } else {
1304        futures_util::future::pending().await
1305      }
1306    };
1307
1308    let listener_tls_borrowed = &listener_tls;
1309    let listener_tls_accept = async move {
1310      if let Some(listener_tls) = listener_tls_borrowed {
1311        listener_tls.accept().await
1312      } else {
1313        futures_util::future::pending().await
1314      }
1315    };
1316
1317    let listener_quic_borrowed = &listener_quic;
1318    let listener_quic_accept = async move {
1319      if let Some(listener_quic) = listener_quic_borrowed {
1320        listener_quic.accept().await
1321      } else {
1322        futures_util::future::pending().await
1323      }
1324    };
1325
1326    if listener_borrowed.is_none()
1327      && listener_tls_borrowed.is_none()
1328      && listener_quic_borrowed.is_none()
1329    {
1330      logger
1331        .send(LogMessage::new(
1332          String::from("No server is listening"),
1333          true,
1334        ))
1335        .await
1336        .unwrap_or_default();
1337      Err(anyhow::anyhow!("No server is listening"))?;
1338    }
1339
1340    tokio::select! {
1341      status = listener_accept => {
1342        match status {
1343          Ok((stream, remote_address)) => {
1344            accept_connection(
1345              stream,
1346              remote_address,
1347              None,
1348              acme_http01_resolver.clone(),
1349              yaml_config.clone(),
1350              logger.clone(),
1351              modules_arc.clone(),
1352              None
1353            )
1354            .await;
1355          }
1356          Err(err) => {
1357            logger
1358              .send(LogMessage::new(
1359                format!("Cannot accept a connection: {err}"),
1360                true,
1361              ))
1362              .await
1363              .unwrap_or_default();
1364          }
1365        }
1366      },
1367      status = listener_tls_accept => {
1368        match status {
1369          Ok((stream, remote_address)) => {
1370            accept_connection(
1371              stream,
1372              remote_address,
1373              Some((tls_config_arc.clone(), acme_config_arc.clone())),
1374              None,
1375              yaml_config.clone(),
1376              logger.clone(),
1377              modules_arc.clone(),
1378              http3_enabled
1379            )
1380            .await;
1381          }
1382          Err(err) => {
1383            logger
1384              .send(LogMessage::new(
1385                format!("Cannot accept a connection: {err}"),
1386                true,
1387              ))
1388              .await
1389              .unwrap_or_default();
1390          }
1391        }
1392      },
1393      status = listener_quic_accept => {
1394        match status {
1395          Some(connection_attempt) => {
1396            let local_ip = SocketAddr::new(connection_attempt.local_ip().unwrap_or(IpAddr::V6(Ipv6Addr::UNSPECIFIED)), addr_tls.port());
1397            accept_quic_connection(
1398              connection_attempt,
1399              local_ip,
1400              yaml_config.clone(),
1401              logger.clone(),
1402              modules_arc.clone()
1403            )
1404            .await;
1405          }
1406          None => {
1407            logger
1408              .send(LogMessage::new(
1409                "HTTP/3 connections can't be accepted anymore".to_string(),
1410                true,
1411              ))
1412              .await
1413              .unwrap_or_default();
1414          }
1415        }
1416      }
1417    };
1418  }
1419}
1420
1421// Start the server
1422#[allow(clippy::type_complexity)]
1423pub fn start_server(
1424  yaml_config: Arc<Yaml>,
1425  modules: Vec<Box<dyn ServerModule + Send + Sync>>,
1426  module_error: Option<anyhow::Error>,
1427  modules_optional_builtin: Vec<String>,
1428  first_startup: bool,
1429) -> Result<bool, Box<dyn Error + Send + Sync>> {
1430  if let Some(environment_variables_hash) = yaml_config["global"]["environmentVariables"].as_hash()
1431  {
1432    let environment_variables_hash_iter = environment_variables_hash.iter();
1433    for (variable_name, variable_value) in environment_variables_hash_iter {
1434      if let Some(variable_name) = variable_name.as_str() {
1435        if let Some(variable_value) = variable_value.as_str() {
1436          if !variable_name.is_empty()
1437            && !variable_name.contains('\0')
1438            && !variable_name.contains('=')
1439            && !variable_value.contains('\0')
1440          {
1441            // Safety: the environment variables are set before threads are spawned
1442            // The `std::env::set_var` function is safe to use in single-threaded environments
1443            // In Rust 2024 edition, the `std::env::set_var` function would be `unsafe`.
1444            env::set_var(variable_name, variable_value);
1445          }
1446        }
1447      }
1448    }
1449  }
1450
1451  let available_parallelism = thread::available_parallelism()?.get();
1452
1453  // Create Tokio runtime for the server
1454  let server_runtime = tokio::runtime::Builder::new_multi_thread()
1455    .worker_threads(available_parallelism)
1456    .max_blocking_threads(1536)
1457    .event_interval(25)
1458    .thread_name("server-pool")
1459    .enable_all()
1460    .build()?;
1461
1462  // Create Tokio runtime for logging
1463  let log_runtime = tokio::runtime::Builder::new_multi_thread()
1464    .worker_threads(match available_parallelism / 2 {
1465      0 => 1,
1466      non_zero => non_zero,
1467    })
1468    .max_blocking_threads(768)
1469    .thread_name("log-pool")
1470    .enable_time()
1471    .build()?;
1472
1473  let (logger, receive_log) = async_channel::bounded::<LogMessage>(10000);
1474
1475  let log_filename = yaml_config["global"]["logFilePath"]
1476    .as_str()
1477    .map(String::from);
1478  let error_log_filename = yaml_config["global"]["errorLogFilePath"]
1479    .as_str()
1480    .map(String::from);
1481
1482  log_runtime.spawn(async move {
1483    let log_file = match log_filename {
1484      Some(log_filename) => Some(
1485        fs::OpenOptions::new()
1486          .append(true)
1487          .create(true)
1488          .open(log_filename)
1489          .await,
1490      ),
1491      None => None,
1492    };
1493
1494    let error_log_file = match error_log_filename {
1495      Some(error_log_filename) => Some(
1496        fs::OpenOptions::new()
1497          .append(true)
1498          .create(true)
1499          .open(error_log_filename)
1500          .await,
1501      ),
1502      None => None,
1503    };
1504
1505    let log_file_wrapped = match log_file {
1506      Some(Ok(file)) => Some(Arc::new(Mutex::new(BufWriter::with_capacity(131072, file)))),
1507      Some(Err(e)) => {
1508        eprintln!("Failed to open log file: {e}");
1509        None
1510      }
1511      None => None,
1512    };
1513
1514    let error_log_file_wrapped = match error_log_file {
1515      Some(Ok(file)) => Some(Arc::new(Mutex::new(BufWriter::with_capacity(131072, file)))),
1516      Some(Err(e)) => {
1517        eprintln!("Failed to open error log file: {e}");
1518        None
1519      }
1520      None => None,
1521    };
1522
1523    // The logs are written when the log message is received by the log event loop, and flushed every 100 ms, improving the server performance.
1524    let log_file_wrapped_cloned_for_sleep = log_file_wrapped.clone();
1525    let error_log_file_wrapped_cloned_for_sleep = error_log_file_wrapped.clone();
1526    tokio::task::spawn(async move {
1527      let mut interval = time::interval(time::Duration::from_millis(100));
1528      loop {
1529        interval.tick().await;
1530        if let Some(log_file_wrapped_cloned) = log_file_wrapped_cloned_for_sleep.clone() {
1531          let mut locked_file = log_file_wrapped_cloned.lock().await;
1532          locked_file.flush().await.unwrap_or_default();
1533        }
1534        if let Some(error_log_file_wrapped_cloned) = error_log_file_wrapped_cloned_for_sleep.clone()
1535        {
1536          let mut locked_file = error_log_file_wrapped_cloned.lock().await;
1537          locked_file.flush().await.unwrap_or_default();
1538        }
1539      }
1540    });
1541
1542    // Logging loop
1543    while let Ok(message) = receive_log.recv().await {
1544      let (mut message, is_error) = message.get_message();
1545      let log_file_wrapped_cloned = if !is_error {
1546        log_file_wrapped.clone()
1547      } else {
1548        error_log_file_wrapped.clone()
1549      };
1550
1551      if let Some(log_file_wrapped_cloned) = log_file_wrapped_cloned {
1552        tokio::task::spawn(async move {
1553          let mut locked_file = log_file_wrapped_cloned.lock().await;
1554          if is_error {
1555            let now: DateTime<Local> = Local::now();
1556            let formatted_time = now.format("%Y-%m-%d %H:%M:%S").to_string();
1557            message = format!("[{formatted_time}]: {message}");
1558          }
1559          message.push('\n');
1560          if let Err(e) = locked_file.write(message.as_bytes()).await {
1561            eprintln!("Failed to write to log file: {e}");
1562          }
1563        });
1564      }
1565    }
1566  });
1567
1568  // Log env overrides once at startup
1569  for msg in env_config::log_env_var_overrides() {
1570    logger
1571      .send_blocking(LogMessage::new(msg, true))
1572      .unwrap_or_default();
1573  }
1574
1575  // Run the server event loop
1576  let result = server_runtime.block_on(async {
1577    let event_loop_future = server_event_loop(
1578      yaml_config,
1579      logger,
1580      modules,
1581      module_error,
1582      modules_optional_builtin,
1583      first_startup,
1584    );
1585
1586    let (continue_tx, continue_rx) = async_channel::unbounded::<bool>();
1587    let cancel_token = CancellationToken::new();
1588
1589    #[cfg(unix)]
1590    {
1591      let cancel_token_clone = cancel_token.clone();
1592      let continue_tx_clone = continue_tx.clone();
1593      tokio::spawn(async move {
1594        if let Ok(mut signal) = signal::unix::signal(signal::unix::SignalKind::hangup()) {
1595          tokio::select! {
1596            _ = signal.recv() => {
1597              continue_tx_clone.send(true).await.unwrap_or_default();
1598            }
1599            _ = cancel_token_clone.cancelled() => {}
1600          }
1601        }
1602      });
1603    }
1604
1605    let cancel_token_clone = cancel_token.clone();
1606    tokio::spawn(async move {
1607      tokio::select! {
1608        result = signal::ctrl_c() => {
1609          if result.is_ok() {
1610            continue_tx.send(false).await.unwrap_or_default();
1611          }
1612        }
1613        _ = cancel_token_clone.cancelled() => {}
1614      }
1615    });
1616
1617    let result = tokio::select! {
1618      result = event_loop_future => {
1619        // Sleep the Tokio runtime to ensure error logs are saved
1620        time::sleep(tokio::time::Duration::from_millis(100)).await;
1621
1622        result.map(|_| false)
1623      },
1624      continue_running = continue_rx.recv() => Ok(continue_running?)
1625    };
1626
1627    cancel_token.cancel();
1628
1629    result
1630  });
1631
1632  // Wait 10 seconds or until all tasks are complete
1633  server_runtime.shutdown_timeout(time::Duration::from_secs(10));
1634
1635  result
1636}