1use std::error::Error;
2use std::net::{IpAddr, Ipv6Addr, SocketAddr};
3use std::sync::Arc;
4use std::time::SystemTime;
5use std::{env, thread};
6
7use crate::ferron_common::{LogMessage, ServerModule, ServerModuleHandlers};
8use crate::ferron_request_handler::request_handler;
9use crate::ferron_util::env_config;
10use crate::ferron_util::load_tls::{load_certs, load_private_key};
11use crate::ferron_util::sni::CustomSniResolver;
12use crate::ferron_util::validate_config::{prepare_config_for_validation, validate_config};
13use async_channel::Sender;
14use chrono::prelude::*;
15use futures_util::StreamExt;
16use h3_quinn::quinn;
17use h3_quinn::quinn::crypto::rustls::QuicServerConfig;
18use http::Response;
19use http_body_util::{BodyExt, StreamBody};
20use hyper::body::{Buf, Bytes, Frame, Incoming};
21use hyper::service::service_fn;
22use hyper::Request;
23use hyper_util::rt::{TokioExecutor, TokioIo, TokioTimer};
24use ocsp_stapler::Stapler;
25use rustls::crypto::ring::cipher_suite::*;
26use rustls::crypto::ring::default_provider;
27use rustls::crypto::ring::kx_group::*;
28use rustls::server::{Acceptor, WebPkiClientVerifier};
29use rustls::sign::CertifiedKey;
30use rustls::version::{TLS12, TLS13};
31use rustls::{RootCertStore, ServerConfig};
32use rustls_native_certs::load_native_certs;
33use tokio::fs;
34use tokio::io::{AsyncWriteExt, BufWriter};
35use tokio::net::{TcpListener, TcpStream};
36use tokio::runtime::Handle;
37use tokio::signal;
38use tokio::sync::Mutex;
39use tokio::time;
40use tokio_rustls::server::TlsStream;
41use tokio_rustls::LazyConfigAcceptor;
42use tokio_rustls_acme2::acme::ACME_TLS_ALPN_NAME;
43use tokio_rustls_acme2::caches::DirCache;
44use tokio_rustls_acme2::{is_tls_alpn_challenge, AcmeConfig, ResolvesServerCertAcme, UseChallenge};
45use tokio_util::sync::CancellationToken;
46use yaml_rust2::Yaml;
47
48#[allow(clippy::large_enum_variant)]
50enum MaybeTlsStream {
51 Tls(TlsStream<TcpStream>),
52 Plain(TcpStream),
53}
54
55#[allow(clippy::too_many_arguments)]
57async fn accept_quic_connection(
58 connection_attempt: quinn::Incoming,
59 local_address: SocketAddr,
60 config: Arc<Yaml>,
61 logger: Sender<LogMessage>,
62 modules: Arc<Vec<Box<dyn ServerModule + std::marker::Send + Sync>>>,
63) {
64 let remote_address = connection_attempt.remote_address();
65
66 let logger_clone = logger.clone();
67
68 tokio::task::spawn(async move {
69 match connection_attempt.await {
70 Ok(connection) => {
71 let mut h3_conn: h3::server::Connection<h3_quinn::Connection, Bytes> =
72 match h3::server::Connection::new(h3_quinn::Connection::new(connection)).await {
73 Ok(h3_conn) => h3_conn,
74 Err(err) => {
75 logger_clone
76 .send(LogMessage::new(
77 format!("Error serving HTTP/3 connection: {err}"),
78 true,
79 ))
80 .await
81 .unwrap_or_default();
82 return;
83 }
84 };
85
86 loop {
87 match h3_conn.accept().await {
88 Ok(Some(resolver)) => {
89 let config = config.clone();
90 let remote_address = remote_address;
91
92 let logger_clone = logger_clone.clone();
93 let modules = modules.clone();
94 tokio::spawn(async move {
95 let handlers_vec = modules
96 .iter()
97 .map(|module| module.get_handlers(Handle::current()));
98
99 let (request, stream) = match resolver.resolve_request().await {
100 Ok(resolved) => resolved,
101 Err(err) => {
102 logger_clone
103 .send(LogMessage::new(
104 format!("Error serving HTTP/3 connection: {err}"),
105 true,
106 ))
107 .await
108 .unwrap_or_default();
109 return;
110 }
111 };
112 let (mut send, receive) = stream.split();
113 let request_body_stream = futures_util::stream::unfold(
114 (receive, false),
115 async move |(mut receive, mut is_body_finished)| loop {
116 if !is_body_finished {
117 match receive.recv_data().await {
118 Ok(Some(mut data)) => {
119 return Some((
120 Ok(Frame::data(data.copy_to_bytes(data.remaining()))),
121 (receive, false),
122 ))
123 }
124 Ok(None) => is_body_finished = true,
125 Err(err) => {
126 return Some((
127 Err(std::io::Error::other(err.to_string())),
128 (receive, false),
129 ))
130 }
131 }
132 } else {
133 match receive.recv_trailers().await {
134 Ok(Some(trailers)) => {
135 return Some((Ok(Frame::trailers(trailers)), (receive, true)))
136 }
137 Ok(None) => {
138 return None;
139 }
140 Err(err) => {
141 return Some((
142 Err(std::io::Error::other(err.to_string())),
143 (receive, true),
144 ))
145 }
146 }
147 }
148 },
149 );
150 let request_body = BodyExt::boxed(StreamBody::new(request_body_stream));
151 let (request_parts, _) = request.into_parts();
152 let request = Request::from_parts(request_parts, request_body);
153 let handlers_vec_clone = handlers_vec
154 .clone()
155 .collect::<Vec<Box<dyn ServerModuleHandlers + Send>>>();
156 let mut response = match request_handler(
157 request,
158 remote_address,
159 local_address,
160 true,
161 config,
162 logger_clone.clone(),
163 handlers_vec_clone,
164 None,
165 None,
166 )
167 .await
168 {
169 Ok(response) => response,
170 Err(err) => {
171 logger_clone
172 .send(LogMessage::new(
173 format!("Error serving HTTP/3 connection: {err}"),
174 true,
175 ))
176 .await
177 .unwrap_or_default();
178 return;
179 }
180 };
181 if let Ok(http_date) = httpdate::fmt_http_date(SystemTime::now()).try_into() {
182 response
183 .headers_mut()
184 .entry(http::header::DATE)
185 .or_insert(http_date);
186 }
187 let (response_parts, mut response_body) = response.into_parts();
188 if let Err(err) = send
189 .send_response(Response::from_parts(response_parts, ()))
190 .await
191 {
192 logger_clone
193 .send(LogMessage::new(
194 format!("Error serving HTTP/3 connection: {err}"),
195 true,
196 ))
197 .await
198 .unwrap_or_default();
199 return;
200 }
201 let mut had_trailers = false;
202 while let Some(chunk) = response_body.frame().await {
203 match chunk {
204 Ok(frame) => {
205 if frame.is_data() {
206 match frame.into_data() {
207 Ok(data) => {
208 if let Err(err) = send.send_data(data).await {
209 logger_clone
210 .send(LogMessage::new(
211 format!("Error serving HTTP/3 connection: {err}"),
212 true,
213 ))
214 .await
215 .unwrap_or_default();
216 return;
217 }
218 }
219 Err(_) => {
220 logger_clone
221 .send(LogMessage::new(
222 "Error serving HTTP/3 connection: the frame isn't really a data frame".to_string(),
223 true,
224 ))
225 .await
226 .unwrap_or_default();
227 return;
228 }
229 }
230 } else if frame.is_trailers() {
231 match frame.into_trailers() {
232 Ok(trailers) => {
233 had_trailers = true;
234 if let Err(err) = send.send_trailers(trailers).await {
235 logger_clone
236 .send(LogMessage::new(
237 format!("Error serving HTTP/3 connection: {err}"),
238 true,
239 ))
240 .await
241 .unwrap_or_default();
242 return;
243 }
244 }
245 Err(_) => {
246 logger_clone
247 .send(LogMessage::new(
248 "Error serving HTTP/3 connection: the frame isn't really a trailers frame".to_string(),
249 true,
250 ))
251 .await
252 .unwrap_or_default();
253 return;
254 }
255 }
256 }
257 }
258 Err(err) => {
259 logger_clone
260 .send(LogMessage::new(
261 format!("Error serving HTTP/3 connection: {err}"),
262 true,
263 ))
264 .await
265 .unwrap_or_default();
266 return;
267 }
268 }
269 }
270 if !had_trailers {
271 if let Err(err) = send.finish().await {
272 logger_clone
273 .send(LogMessage::new(
274 format!("Error serving HTTP/3 connection: {err}"),
275 true,
276 ))
277 .await
278 .unwrap_or_default();
279 }
280 }
281 });
282 }
283 Ok(None) => break,
284 Err(err) => {
285 logger_clone
286 .send(LogMessage::new(
287 format!("Error serving HTTP/3 connection: {err}"),
288 true,
289 ))
290 .await
291 .unwrap_or_default();
292 return;
293 }
294 }
295 }
296 }
297 Err(err) => {
298 logger_clone
299 .send(LogMessage::new(
300 format!("Cannot accept a connection: {err}"),
301 true,
302 ))
303 .await
304 .unwrap_or_default();
305 }
306 }
307 });
308}
309
310#[allow(clippy::too_many_arguments)]
312async fn accept_connection(
313 stream: TcpStream,
314 remote_address: SocketAddr,
315 tls_config_option: Option<(Arc<ServerConfig>, Option<Arc<ServerConfig>>)>,
316 acme_http01_resolver_option: Option<Arc<ResolvesServerCertAcme>>,
317 config: Arc<Yaml>,
318 logger: Sender<LogMessage>,
319 modules: Arc<Vec<Box<dyn ServerModule + std::marker::Send + Sync>>>,
320 http3_enabled: Option<u16>,
321) {
322 if let Err(err) = stream.set_nodelay(true) {
324 logger
325 .send(LogMessage::new(
326 format!("Cannot disable Nagle algorithm: {err}"),
327 true,
328 ))
329 .await
330 .unwrap_or_default();
331 return;
332 };
333
334 let config = config.clone();
335 let local_address = match stream.local_addr() {
336 Ok(local_address) => local_address,
337 Err(err) => {
338 logger
339 .send(LogMessage::new(
340 format!("Cannot obtain local address of the connection: {err}"),
341 true,
342 ))
343 .await
344 .unwrap_or_default();
345 return;
346 }
347 };
348
349 let logger_clone = logger.clone();
350
351 tokio::task::spawn(async move {
352 let maybe_tls_stream = if let Some((tls_config, acme_config_option)) = tls_config_option {
353 let start_handshake = match LazyConfigAcceptor::new(Acceptor::default(), stream).await {
354 Ok(start_handshake) => start_handshake,
355 Err(err) => {
356 logger
357 .send(LogMessage::new(
358 format!("Error during TLS handshake: {err}"),
359 true,
360 ))
361 .await
362 .unwrap_or_default();
363 return;
364 }
365 };
366
367 if let Some(acme_config) = acme_config_option {
368 if is_tls_alpn_challenge(&start_handshake.client_hello()) {
369 match start_handshake.into_stream(acme_config).await {
370 Ok(_) => (),
371 Err(err) => {
372 logger
373 .send(LogMessage::new(
374 format!("Error during TLS handshake: {err}"),
375 true,
376 ))
377 .await
378 .unwrap_or_default();
379 return;
380 }
381 };
382 return;
383 }
384 }
385
386 let tls_stream = match start_handshake.into_stream(tls_config).await {
387 Ok(tls_stream) => tls_stream,
388 Err(err) => {
389 logger
390 .send(LogMessage::new(
391 format!("Error during TLS handshake: {err}"),
392 true,
393 ))
394 .await
395 .unwrap_or_default();
396 return;
397 }
398 };
399
400 MaybeTlsStream::Tls(tls_stream)
401 } else {
402 MaybeTlsStream::Plain(stream)
403 };
404
405 if let MaybeTlsStream::Tls(tls_stream) = maybe_tls_stream {
406 let alpn_protocol = tls_stream.get_ref().1.alpn_protocol();
407 let is_http2;
408
409 if config["global"]["enableHTTP2"].as_bool().unwrap_or(true) {
410 if alpn_protocol == Some("h2".as_bytes()) {
411 is_http2 = true;
412 } else {
413 is_http2 = false;
415 }
416 } else {
417 is_http2 = false;
418 }
419
420 let io = TokioIo::new(tls_stream);
421 let handlers_vec = modules
422 .iter()
423 .map(|module| module.get_handlers(Handle::current()));
424
425 if is_http2 {
426 let mut http2_builder = hyper::server::conn::http2::Builder::new(TokioExecutor::new());
427 http2_builder.timer(TokioTimer::new());
428 if let Some(initial_window_size) =
429 config["global"]["http2Settings"]["initialWindowSize"].as_i64()
430 {
431 http2_builder.initial_stream_window_size(initial_window_size as u32);
432 }
433 if let Some(max_frame_size) = config["global"]["http2Settings"]["maxFrameSize"].as_i64() {
434 http2_builder.max_frame_size(max_frame_size as u32);
435 }
436 if let Some(max_concurrent_streams) =
437 config["global"]["http2Settings"]["maxConcurrentStreams"].as_i64()
438 {
439 http2_builder.max_concurrent_streams(max_concurrent_streams as u32);
440 }
441 if let Some(max_header_list_size) =
442 config["global"]["http2Settings"]["maxHeaderListSize"].as_i64()
443 {
444 http2_builder.max_header_list_size(max_header_list_size as u32);
445 }
446 if let Some(enable_connect_protocol) =
447 config["global"]["http2Settings"]["enableConnectProtocol"].as_bool()
448 {
449 if enable_connect_protocol {
450 http2_builder.enable_connect_protocol();
451 }
452 }
453
454 if let Err(err) = http2_builder
455 .serve_connection(
456 io,
457 service_fn(move |request: Request<Incoming>| {
458 let config = config.clone();
459 let logger = logger_clone.clone();
460 let handlers_vec_clone = handlers_vec
461 .clone()
462 .collect::<Vec<Box<dyn ServerModuleHandlers + Send>>>();
463 let acme_http01_resolver_option_clone = acme_http01_resolver_option.clone();
464 let (request_parts, request_body) = request.into_parts();
465 let request = Request::from_parts(
466 request_parts,
467 request_body
468 .map_err(|e| std::io::Error::other(e.to_string()))
469 .boxed(),
470 );
471 request_handler(
472 request,
473 remote_address,
474 local_address,
475 true,
476 config,
477 logger,
478 handlers_vec_clone,
479 acme_http01_resolver_option_clone,
480 http3_enabled,
481 )
482 }),
483 )
484 .await
485 {
486 logger
487 .send(LogMessage::new(
488 format!("Error serving HTTPS connection: {err}"),
489 true,
490 ))
491 .await
492 .unwrap_or_default();
493 }
494 } else {
495 let mut http1_builder = hyper::server::conn::http1::Builder::new();
496
497 http1_builder.timer(TokioTimer::new());
499
500 if let Err(err) = http1_builder
501 .serve_connection(
502 io,
503 service_fn(move |request: Request<Incoming>| {
504 let config = config.clone();
505 let logger = logger_clone.clone();
506 let handlers_vec_clone = handlers_vec
507 .clone()
508 .collect::<Vec<Box<dyn ServerModuleHandlers + Send>>>();
509 let acme_http01_resolver_option_clone = acme_http01_resolver_option.clone();
510 let (request_parts, request_body) = request.into_parts();
511 let request = Request::from_parts(
512 request_parts,
513 request_body
514 .map_err(|e| std::io::Error::other(e.to_string()))
515 .boxed(),
516 );
517 request_handler(
518 request,
519 remote_address,
520 local_address,
521 true,
522 config,
523 logger,
524 handlers_vec_clone,
525 acme_http01_resolver_option_clone,
526 http3_enabled,
527 )
528 }),
529 )
530 .with_upgrades()
531 .await
532 {
533 logger
534 .send(LogMessage::new(
535 format!("Error serving HTTPS connection: {err}"),
536 true,
537 ))
538 .await
539 .unwrap_or_default();
540 }
541 }
542 } else if let MaybeTlsStream::Plain(stream) = maybe_tls_stream {
543 let io = TokioIo::new(stream);
544 let handlers_vec = modules
545 .iter()
546 .map(|module| module.get_handlers(Handle::current()));
547
548 let mut http1_builder = hyper::server::conn::http1::Builder::new();
549
550 http1_builder.timer(TokioTimer::new());
552
553 if let Err(err) = http1_builder
554 .serve_connection(
555 io,
556 service_fn(move |request: Request<Incoming>| {
557 let config = config.clone();
558 let logger = logger_clone.clone();
559 let handlers_vec_clone = handlers_vec
560 .clone()
561 .collect::<Vec<Box<dyn ServerModuleHandlers + Send>>>();
562 let acme_http01_resolver_option_clone = acme_http01_resolver_option.clone();
563 let (request_parts, request_body) = request.into_parts();
564 let request = Request::from_parts(
565 request_parts,
566 request_body
567 .map_err(|e| std::io::Error::other(e.to_string()))
568 .boxed(),
569 );
570 request_handler(
571 request,
572 remote_address,
573 local_address,
574 false,
575 config,
576 logger,
577 handlers_vec_clone,
578 acme_http01_resolver_option_clone,
579 http3_enabled,
580 )
581 }),
582 )
583 .with_upgrades()
584 .await
585 {
586 logger
587 .send(LogMessage::new(
588 format!("Error serving HTTP connection: {err}"),
589 true,
590 ))
591 .await
592 .unwrap_or_default();
593 }
594 }
595 });
596}
597
598#[allow(clippy::type_complexity)]
600async fn server_event_loop(
601 yaml_config: Arc<Yaml>,
602 logger: Sender<LogMessage>,
603 modules: Vec<Box<dyn ServerModule + Send + Sync>>,
604 module_error: Option<anyhow::Error>,
605 modules_optional_builtin: Vec<String>,
606 first_startup: bool,
607) -> Result<(), Box<dyn Error + Send + Sync>> {
608 if let Some(module_error) = module_error {
609 logger
610 .send(LogMessage::new(module_error.to_string(), true))
611 .await
612 .unwrap_or_default();
613 Err(module_error)?
614 }
615
616 let prepared_config = match prepare_config_for_validation(&yaml_config) {
617 Ok(prepared_config) => prepared_config,
618 Err(err) => {
619 logger
620 .send(LogMessage::new(
621 format!("Server configuration validation failed: {err}"),
622 true,
623 ))
624 .await
625 .unwrap_or_default();
626 Err(anyhow::anyhow!(
627 "Server configuration validation failed: {}",
628 err
629 ))?
630 }
631 };
632
633 for (config_to_validate, is_global, is_location, is_error_config) in prepared_config {
634 match validate_config(
635 config_to_validate,
636 is_global,
637 is_location,
638 is_error_config,
639 &modules_optional_builtin,
640 ) {
641 Ok(unused_properties) => {
642 for unused_property in unused_properties {
643 logger
644 .send(LogMessage::new(
645 format!(
646 "Unused configuration property detected: \"{unused_property}\". You might load an appropriate module to use this configuration property"
647 ),
648 true,
649 ))
650 .await
651 .unwrap_or_default();
652 }
653 }
654 Err(err) => {
655 logger
656 .send(LogMessage::new(
657 format!("Server configuration validation failed: {err}"),
658 true,
659 ))
660 .await
661 .unwrap_or_default();
662 Err(anyhow::anyhow!(
663 "Server configuration validation failed: {}",
664 err
665 ))?
666 }
667 };
668 }
669
670 let mut crypto_provider = default_provider();
671
672 if let Some(cipher_suite) = yaml_config["global"]["cipherSuite"].as_vec() {
673 let mut cipher_suites = Vec::new();
674 let cipher_suite_iter = cipher_suite.iter();
675 for cipher_suite_yaml in cipher_suite_iter {
676 if let Some(cipher_suite) = cipher_suite_yaml.as_str() {
677 let cipher_suite_to_add = match cipher_suite {
678 "TLS_AES_128_GCM_SHA256" => TLS13_AES_128_GCM_SHA256,
679 "TLS_AES_256_GCM_SHA384" => TLS13_AES_256_GCM_SHA384,
680 "TLS_CHACHA20_POLY1305_SHA256" => TLS13_CHACHA20_POLY1305_SHA256,
681 "TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256" => TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256,
682 "TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384" => TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384,
683 "TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305_SHA256" => {
684 TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305_SHA256
685 }
686 "TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256" => TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,
687 "TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384" => TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384,
688 "TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305_SHA256" => {
689 TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305_SHA256
690 }
691 _ => {
692 logger
693 .send(LogMessage::new(
694 format!("The \"{cipher_suite}\" cipher suite is not supported"),
695 true,
696 ))
697 .await
698 .unwrap_or_default();
699 Err(anyhow::anyhow!(
700 "The \"{}\" cipher suite is not supported",
701 cipher_suite
702 ))?
703 }
704 };
705 cipher_suites.push(cipher_suite_to_add);
706 }
707 }
708 crypto_provider.cipher_suites = cipher_suites;
709 }
710
711 if let Some(ecdh_curves) = yaml_config["global"]["ecdhCurve"].as_vec() {
712 let mut kx_groups = Vec::new();
713 let ecdh_curves_iter = ecdh_curves.iter();
714 for ecdh_curve_yaml in ecdh_curves_iter {
715 if let Some(ecdh_curve) = ecdh_curve_yaml.as_str() {
716 let kx_group_to_add = match ecdh_curve {
717 "secp256r1" => SECP256R1,
718 "secp384r1" => SECP384R1,
719 "x25519" => X25519,
720 _ => {
721 logger
722 .send(LogMessage::new(
723 format!("The \"{ecdh_curve}\" ECDH curve is not supported"),
724 true,
725 ))
726 .await
727 .unwrap_or_default();
728 Err(anyhow::anyhow!(
729 "The \"{}\" ECDH curve is not supported",
730 ecdh_curve
731 ))?
732 }
733 };
734 kx_groups.push(kx_group_to_add);
735 }
736 }
737 crypto_provider.kx_groups = kx_groups;
738 }
739
740 let crypto_provider_cloned = crypto_provider.clone();
741 let mut sni_resolver = CustomSniResolver::new();
742 let mut certified_keys = Vec::new();
743
744 let mut automatic_tls_enabled = false;
745 let mut acme_letsencrypt_production = true;
746 let acme_use_http_challenge = yaml_config["global"]["useAutomaticTLSHTTPChallenge"]
747 .as_bool()
748 .unwrap_or(false);
749 let acme_challenge_type = if acme_use_http_challenge {
750 UseChallenge::Http01
751 } else {
752 UseChallenge::TlsAlpn01
753 };
754
755 if let Some(read_automatic_tls_enabled) = yaml_config["global"]["enableAutomaticTLS"].as_bool() {
757 automatic_tls_enabled = read_automatic_tls_enabled;
758 }
759
760 let acme_contact = yaml_config["global"]["automaticTLSContactEmail"].as_str();
761 let acme_cache = yaml_config["global"]["automaticTLSContactCacheDirectory"]
762 .as_str()
763 .map(|s| s.to_string())
764 .map(DirCache::new);
765
766 if let Some(read_acme_letsencrypt_production) =
767 yaml_config["global"]["automaticTLSLetsEncryptProduction"].as_bool()
768 {
769 acme_letsencrypt_production = read_acme_letsencrypt_production;
770 }
771
772 if !automatic_tls_enabled {
773 if let Some(cert_path) = yaml_config["global"]["cert"].as_str() {
775 if let Some(key_path) = yaml_config["global"]["key"].as_str() {
776 let certs = match load_certs(cert_path) {
777 Ok(certs) => certs,
778 Err(err) => {
779 logger
780 .send(LogMessage::new(
781 format!("Cannot load the \"{cert_path}\" TLS certificate: {err}"),
782 true,
783 ))
784 .await
785 .unwrap_or_default();
786 Err(anyhow::anyhow!(
787 "Cannot load the \"{}\" TLS certificate: {}",
788 cert_path,
789 err
790 ))?
791 }
792 };
793 let key = match load_private_key(key_path) {
794 Ok(key) => key,
795 Err(err) => {
796 logger
797 .send(LogMessage::new(
798 format!("Cannot load the \"{cert_path}\" private key: {err}"),
799 true,
800 ))
801 .await
802 .unwrap_or_default();
803 Err(anyhow::anyhow!(
804 "Cannot load the \"{}\" private key: {}",
805 cert_path,
806 err
807 ))?
808 }
809 };
810 let signing_key = match crypto_provider_cloned.key_provider.load_private_key(key) {
811 Ok(key) => key,
812 Err(err) => {
813 logger
814 .send(LogMessage::new(
815 format!("Cannot load the \"{cert_path}\" private key: {err}"),
816 true,
817 ))
818 .await
819 .unwrap_or_default();
820 Err(anyhow::anyhow!(
821 "Cannot load the \"{}\" private key: {}",
822 cert_path,
823 err
824 ))?
825 }
826 };
827 let certified_key = CertifiedKey::new(certs, signing_key);
828 sni_resolver.load_fallback_cert_key(Arc::new(certified_key));
829 }
830 }
831
832 if let Some(sni) = yaml_config["global"]["sni"].as_hash() {
833 let sni_hostnames = sni.keys();
834 for sni_hostname_unknown in sni_hostnames {
835 if let Some(sni_hostname) = sni_hostname_unknown.as_str() {
836 if let Some(cert_path) = sni[sni_hostname_unknown]["cert"].as_str() {
837 if let Some(key_path) = sni[sni_hostname_unknown]["key"].as_str() {
838 let certs = match load_certs(cert_path) {
839 Ok(certs) => certs,
840 Err(err) => {
841 logger
842 .send(LogMessage::new(
843 format!("Cannot load the \"{cert_path}\" TLS certificate: {err}"),
844 true,
845 ))
846 .await
847 .unwrap_or_default();
848 Err(anyhow::anyhow!(
849 "Cannot load the \"{}\" TLS certificate: {}",
850 cert_path,
851 err
852 ))?
853 }
854 };
855 let key = match load_private_key(key_path) {
856 Ok(key) => key,
857 Err(err) => {
858 logger
859 .send(LogMessage::new(
860 format!("Cannot load the \"{cert_path}\" private key: {err}"),
861 true,
862 ))
863 .await
864 .unwrap_or_default();
865 Err(anyhow::anyhow!(
866 "Cannot load the \"{}\" private key: {}",
867 cert_path,
868 err
869 ))?
870 }
871 };
872 let signing_key = match crypto_provider_cloned.key_provider.load_private_key(key) {
873 Ok(key) => key,
874 Err(err) => {
875 logger
876 .send(LogMessage::new(
877 format!("Cannot load the \"{cert_path}\" private key: {err}"),
878 true,
879 ))
880 .await
881 .unwrap_or_default();
882 Err(anyhow::anyhow!(
883 "Cannot load the \"{}\" private key: {}",
884 cert_path,
885 err
886 ))?
887 }
888 };
889 let certified_key_arc = Arc::new(CertifiedKey::new(certs, signing_key));
890 sni_resolver.load_host_cert_key(sni_hostname, certified_key_arc.clone());
891 certified_keys.push(certified_key_arc);
892 }
893 }
894 }
895 }
896 }
897 }
898
899 let tls_config_builder_wants_versions =
901 ServerConfig::builder_with_provider(Arc::new(crypto_provider_cloned));
902
903 let min_tls_version_option = yaml_config["global"]["tlsMinVersion"].as_str();
904 let max_tls_version_option = yaml_config["global"]["tlsMaxVersion"].as_str();
905 let tls_config_builder_wants_verifier = if min_tls_version_option.is_none()
906 && max_tls_version_option.is_none()
907 {
908 tls_config_builder_wants_versions.with_safe_default_protocol_versions()?
909 } else {
910 let tls_versions = [("TLSv1.2", &TLS12), ("TLSv1.3", &TLS13)];
911 let min_tls_version_index =
912 min_tls_version_option.map_or(Some(0), |v| tls_versions.iter().position(|p| p.0 == v));
913 let max_tls_version_index = max_tls_version_option.map_or(Some(tls_versions.len() - 1), |v| {
914 tls_versions.iter().position(|p| p.0 == v)
915 });
916 if let Some(min_tls_version_index) = min_tls_version_index {
917 if let Some(max_tls_version_index) = max_tls_version_index {
918 if max_tls_version_index < min_tls_version_index {
919 logger
920 .send(LogMessage::new(
921 String::from("Maximum TLS version is older than minimum TLS version"),
922 true,
923 ))
924 .await
925 .unwrap_or_default();
926 Err(anyhow::anyhow!(
927 "Maximum TLS version is older than minimum TLS version"
928 ))?
929 }
930 match tls_config_builder_wants_versions.with_protocol_versions(
931 &tls_versions[min_tls_version_index..=max_tls_version_index]
932 .iter()
933 .map(|p| p.1)
934 .collect::<Vec<_>>(),
935 ) {
936 Ok(builder) => builder,
937 Err(err) => {
938 logger
939 .send(LogMessage::new(
940 format!("Couldn't create the TLS server configuration: {err}"),
941 true,
942 ))
943 .await
944 .unwrap_or_default();
945 Err(anyhow::anyhow!(
946 "Couldn't create the TLS server configuration: {}",
947 err
948 ))?
949 }
950 }
951 } else {
952 logger
953 .send(LogMessage::new(
954 String::from("Invalid maximum TLS version"),
955 true,
956 ))
957 .await
958 .unwrap_or_default();
959 Err(anyhow::anyhow!("Invalid maximum TLS version"))?
960 }
961 } else {
962 logger
963 .send(LogMessage::new(
964 String::from("Invalid minimum TLS version"),
965 true,
966 ))
967 .await
968 .unwrap_or_default();
969 Err(anyhow::anyhow!("Invalid minimum TLS version"))?
970 }
971 };
972
973 let tls_config_builder_wants_server_cert =
974 match yaml_config["global"]["useClientCertificate"].as_bool() {
975 Some(true) => {
976 let mut roots = RootCertStore::empty();
977 let certs_result = load_native_certs();
978 if !certs_result.errors.is_empty() {
979 logger
980 .send(LogMessage::new(
981 format!(
982 "Couldn't load the native certificate store: {}",
983 certs_result.errors[0]
984 ),
985 true,
986 ))
987 .await
988 .unwrap_or_default();
989 Err(anyhow::anyhow!(
990 "Couldn't load the native certificate store: {}",
991 certs_result.errors[0]
992 ))?
993 }
994 let certs = certs_result.certs;
995
996 for cert in certs {
997 match roots.add(cert) {
998 Ok(_) => (),
999 Err(err) => {
1000 logger
1001 .send(LogMessage::new(
1002 format!("Couldn't add a certificate to the certificate store: {err}"),
1003 true,
1004 ))
1005 .await
1006 .unwrap_or_default();
1007 Err(anyhow::anyhow!(
1008 "Couldn't add a certificate to the certificate store: {}",
1009 err
1010 ))?
1011 }
1012 }
1013 }
1014 tls_config_builder_wants_verifier
1015 .with_client_cert_verifier(WebPkiClientVerifier::builder(Arc::new(roots)).build()?)
1016 }
1017 _ => tls_config_builder_wants_verifier.with_no_client_auth(),
1018 };
1019
1020 let mut tls_config;
1021
1022 let mut addr = SocketAddr::from((IpAddr::V6(Ipv6Addr::UNSPECIFIED), 80));
1023 let mut addr_tls = SocketAddr::from((IpAddr::V6(Ipv6Addr::UNSPECIFIED), 443));
1024 let mut tls_enabled = false;
1025 let mut non_tls_disabled = false;
1026
1027 if crypto_provider.install_default().is_err() && first_startup {
1029 logger
1030 .send(LogMessage::new(
1031 "Cannot install a process-wide cryptography provider".to_string(),
1032 true,
1033 ))
1034 .await
1035 .unwrap_or_default();
1036 Err(anyhow::anyhow!(
1037 "Cannot install a process-wide cryptography provider"
1038 ))?;
1039 }
1040
1041 if let Some(read_port) = yaml_config["global"]["port"].as_i64() {
1043 addr = SocketAddr::from((
1044 IpAddr::V6(Ipv6Addr::UNSPECIFIED),
1045 match read_port.try_into() {
1046 Ok(port) => port,
1047 Err(_) => {
1048 logger
1049 .send(LogMessage::new(String::from("Invalid HTTP port"), true))
1050 .await
1051 .unwrap_or_default();
1052 Err(anyhow::anyhow!("Invalid HTTP port"))?
1053 }
1054 },
1055 ));
1056 } else if let Some(read_port) = yaml_config["global"]["port"].as_str() {
1057 addr = match read_port.parse() {
1058 Ok(addr) => addr,
1059 Err(_) => {
1060 logger
1061 .send(LogMessage::new(String::from("Invalid HTTP port"), true))
1062 .await
1063 .unwrap_or_default();
1064 Err(anyhow::anyhow!("Invalid HTTP port"))?
1065 }
1066 };
1067 }
1068
1069 if let Some(read_tls_enabled) = yaml_config["global"]["secure"].as_bool() {
1070 tls_enabled = read_tls_enabled;
1071 if let Some(read_non_tls_disabled) =
1072 yaml_config["global"]["disableNonEncryptedServer"].as_bool()
1073 {
1074 non_tls_disabled = read_non_tls_disabled;
1075 }
1076 }
1077
1078 if let Some(read_port) = yaml_config["global"]["sport"].as_i64() {
1079 addr_tls = SocketAddr::from((
1080 IpAddr::V6(Ipv6Addr::UNSPECIFIED),
1081 match read_port.try_into() {
1082 Ok(port) => port,
1083 Err(_) => {
1084 logger
1085 .send(LogMessage::new(String::from("Invalid HTTPS port"), true))
1086 .await
1087 .unwrap_or_default();
1088 Err(anyhow::anyhow!("Invalid HTTPS port"))?
1089 }
1090 },
1091 ));
1092 } else if let Some(read_port) = yaml_config["global"]["sport"].as_str() {
1093 addr_tls = match read_port.parse() {
1094 Ok(addr) => addr,
1095 Err(_) => {
1096 logger
1097 .send(LogMessage::new(String::from("Invalid HTTPS port"), true))
1098 .await
1099 .unwrap_or_default();
1100 Err(anyhow::anyhow!("Invalid HTTPS port"))?
1101 }
1102 };
1103 }
1104
1105 let mut acme_domains = Vec::new();
1107 if let Some(hosts_config) = yaml_config["hosts"].as_vec() {
1108 for host_yaml in hosts_config.iter() {
1109 if let Some(host) = host_yaml.as_hash() {
1110 if let Some(domain_yaml) = host.get(&Yaml::from_str("domain")) {
1111 if let Some(domain) = domain_yaml.as_str() {
1112 if !domain.contains("*") {
1113 acme_domains.push(domain);
1114 }
1115 }
1116 }
1117 }
1118 }
1119 }
1120
1121 let mut acme_config = AcmeConfig::new(acme_domains).challenge_type(acme_challenge_type);
1123 if let Some(acme_contact_unwrapped) = acme_contact {
1124 acme_config = acme_config.contact_push(format!("mailto:{acme_contact_unwrapped}"));
1125 }
1126 let mut acme_config_with_cache = acme_config.cache_option(acme_cache);
1127 acme_config_with_cache =
1128 acme_config_with_cache.directory_lets_encrypt(acme_letsencrypt_production);
1129
1130 let (acme_config, acme_http01_resolver) = if tls_enabled && automatic_tls_enabled {
1131 let mut acme_state = acme_config_with_cache.state();
1132
1133 let acme_resolver = acme_state.resolver();
1134
1135 tls_config = if yaml_config["global"]["enableOCSPStapling"]
1137 .as_bool()
1138 .unwrap_or(true)
1139 {
1140 tls_config_builder_wants_server_cert
1141 .with_cert_resolver(Arc::new(Stapler::new(acme_resolver.clone())))
1142 } else {
1143 tls_config_builder_wants_server_cert.with_cert_resolver(acme_resolver.clone())
1144 };
1145
1146 let acme_logger = logger.clone();
1147 tokio::spawn(async move {
1148 while let Some(acme_result) = acme_state.next().await {
1149 if let Err(acme_error) = acme_result {
1150 acme_logger
1151 .send(LogMessage::new(
1152 format!("Error while obtaining a TLS certificate: {acme_error}"),
1153 true,
1154 ))
1155 .await
1156 .unwrap_or_default();
1157 }
1158 }
1159 });
1160
1161 if acme_use_http_challenge {
1162 (None, Some(acme_resolver))
1163 } else {
1164 let mut acme_config = tls_config.clone();
1165 acme_config.alpn_protocols.push(ACME_TLS_ALPN_NAME.to_vec());
1166
1167 (Some(acme_config), None)
1168 }
1169 } else {
1170 tls_config = if yaml_config["global"]["enableOCSPStapling"]
1172 .as_bool()
1173 .unwrap_or(true)
1174 {
1175 let ocsp_stapler_arc = Arc::new(Stapler::new(Arc::new(sni_resolver)));
1176 for certified_key in certified_keys.iter() {
1177 ocsp_stapler_arc.preload(certified_key.clone());
1178 }
1179 tls_config_builder_wants_server_cert.with_cert_resolver(ocsp_stapler_arc.clone())
1180 } else {
1181 tls_config_builder_wants_server_cert.with_cert_resolver(Arc::new(sni_resolver))
1182 };
1183
1184 drop(acme_config_with_cache);
1186 (None, None)
1187 };
1188
1189 let quic_config = if tls_enabled
1190 && yaml_config["global"]["enableHTTP3"]
1191 .as_bool()
1192 .unwrap_or(false)
1193 {
1194 let mut quic_tls_config = tls_config.clone();
1195 quic_tls_config.max_early_data_size = u32::MAX;
1196 quic_tls_config.alpn_protocols = vec![b"h3".to_vec(), b"h3-29".to_vec()];
1197 let quic_config = quinn::ServerConfig::with_crypto(Arc::new(match QuicServerConfig::try_from(
1198 quic_tls_config,
1199 ) {
1200 Ok(quinn_config) => quinn_config,
1201 Err(err) => {
1202 logger
1203 .send(LogMessage::new(
1204 format!("There was a problem when starting HTTP/3 server: {err}"),
1205 true,
1206 ))
1207 .await
1208 .unwrap_or_default();
1209 Err(anyhow::anyhow!(
1210 "There was a problem when starting HTTP/3 server: {}",
1211 err
1212 ))?
1213 }
1214 }));
1215 Some(quic_config)
1216 } else {
1217 None
1218 };
1219
1220 let mut alpn_protocols = vec![b"http/1.1".to_vec(), b"http/1.0".to_vec()];
1222 if yaml_config["global"]["enableHTTP2"]
1223 .as_bool()
1224 .unwrap_or(true)
1225 {
1226 alpn_protocols.insert(0, b"h2".to_vec());
1227 }
1228 tls_config.alpn_protocols = alpn_protocols;
1229 let tls_config_arc = Arc::new(tls_config);
1230 let acme_config_arc = acme_config.map(Arc::new);
1231
1232 let mut listener = None;
1233 let mut listener_tls = None;
1234 let mut listener_quic = None;
1235
1236 if !non_tls_disabled {
1238 println!("HTTP server is listening at {addr}");
1239 listener = Some(match TcpListener::bind(addr).await {
1240 Ok(listener) => listener,
1241 Err(err) => {
1242 logger
1243 .send(LogMessage::new(
1244 format!("Cannot listen to HTTP port: {err}"),
1245 true,
1246 ))
1247 .await
1248 .unwrap_or_default();
1249 Err(anyhow::anyhow!("Cannot listen to HTTP port: {}", err))?
1250 }
1251 });
1252 }
1253
1254 if tls_enabled {
1255 println!("HTTPS server is listening at {addr_tls}");
1256 listener_tls = Some(match TcpListener::bind(addr_tls).await {
1257 Ok(listener) => listener,
1258 Err(err) => {
1259 logger
1260 .send(LogMessage::new(
1261 format!("Cannot listen to HTTPS port: {err}"),
1262 true,
1263 ))
1264 .await
1265 .unwrap_or_default();
1266 Err(anyhow::anyhow!("Cannot listen to HTTPS port: {}", err))?
1267 }
1268 });
1269
1270 if let Some(quic_config) = quic_config {
1271 println!("HTTP/3 server is listening at {addr_tls}");
1272 listener_quic = Some(match quinn::Endpoint::server(quic_config, addr_tls) {
1273 Ok(listener) => listener,
1274 Err(err) => {
1275 logger
1276 .send(LogMessage::new(
1277 format!("Cannot listen to HTTP/3 port: {err}"),
1278 true,
1279 ))
1280 .await
1281 .unwrap_or_default();
1282 Err(anyhow::anyhow!("Cannot listen to HTTP/3 port: {}", err))?
1283 }
1284 });
1285 }
1286 }
1287
1288 let modules_arc = Arc::new(modules);
1290
1291 let http3_enabled = if listener_quic.is_some() {
1292 Some(addr_tls.port())
1293 } else {
1294 None
1295 };
1296
1297 loop {
1299 let listener_borrowed = &listener;
1300 let listener_accept = async move {
1301 if let Some(listener) = listener_borrowed {
1302 listener.accept().await
1303 } else {
1304 futures_util::future::pending().await
1305 }
1306 };
1307
1308 let listener_tls_borrowed = &listener_tls;
1309 let listener_tls_accept = async move {
1310 if let Some(listener_tls) = listener_tls_borrowed {
1311 listener_tls.accept().await
1312 } else {
1313 futures_util::future::pending().await
1314 }
1315 };
1316
1317 let listener_quic_borrowed = &listener_quic;
1318 let listener_quic_accept = async move {
1319 if let Some(listener_quic) = listener_quic_borrowed {
1320 listener_quic.accept().await
1321 } else {
1322 futures_util::future::pending().await
1323 }
1324 };
1325
1326 if listener_borrowed.is_none()
1327 && listener_tls_borrowed.is_none()
1328 && listener_quic_borrowed.is_none()
1329 {
1330 logger
1331 .send(LogMessage::new(
1332 String::from("No server is listening"),
1333 true,
1334 ))
1335 .await
1336 .unwrap_or_default();
1337 Err(anyhow::anyhow!("No server is listening"))?;
1338 }
1339
1340 tokio::select! {
1341 status = listener_accept => {
1342 match status {
1343 Ok((stream, remote_address)) => {
1344 accept_connection(
1345 stream,
1346 remote_address,
1347 None,
1348 acme_http01_resolver.clone(),
1349 yaml_config.clone(),
1350 logger.clone(),
1351 modules_arc.clone(),
1352 None
1353 )
1354 .await;
1355 }
1356 Err(err) => {
1357 logger
1358 .send(LogMessage::new(
1359 format!("Cannot accept a connection: {err}"),
1360 true,
1361 ))
1362 .await
1363 .unwrap_or_default();
1364 }
1365 }
1366 },
1367 status = listener_tls_accept => {
1368 match status {
1369 Ok((stream, remote_address)) => {
1370 accept_connection(
1371 stream,
1372 remote_address,
1373 Some((tls_config_arc.clone(), acme_config_arc.clone())),
1374 None,
1375 yaml_config.clone(),
1376 logger.clone(),
1377 modules_arc.clone(),
1378 http3_enabled
1379 )
1380 .await;
1381 }
1382 Err(err) => {
1383 logger
1384 .send(LogMessage::new(
1385 format!("Cannot accept a connection: {err}"),
1386 true,
1387 ))
1388 .await
1389 .unwrap_or_default();
1390 }
1391 }
1392 },
1393 status = listener_quic_accept => {
1394 match status {
1395 Some(connection_attempt) => {
1396 let local_ip = SocketAddr::new(connection_attempt.local_ip().unwrap_or(IpAddr::V6(Ipv6Addr::UNSPECIFIED)), addr_tls.port());
1397 accept_quic_connection(
1398 connection_attempt,
1399 local_ip,
1400 yaml_config.clone(),
1401 logger.clone(),
1402 modules_arc.clone()
1403 )
1404 .await;
1405 }
1406 None => {
1407 logger
1408 .send(LogMessage::new(
1409 "HTTP/3 connections can't be accepted anymore".to_string(),
1410 true,
1411 ))
1412 .await
1413 .unwrap_or_default();
1414 }
1415 }
1416 }
1417 };
1418 }
1419}
1420
1421#[allow(clippy::type_complexity)]
1423pub fn start_server(
1424 yaml_config: Arc<Yaml>,
1425 modules: Vec<Box<dyn ServerModule + Send + Sync>>,
1426 module_error: Option<anyhow::Error>,
1427 modules_optional_builtin: Vec<String>,
1428 first_startup: bool,
1429) -> Result<bool, Box<dyn Error + Send + Sync>> {
1430 if let Some(environment_variables_hash) = yaml_config["global"]["environmentVariables"].as_hash()
1431 {
1432 let environment_variables_hash_iter = environment_variables_hash.iter();
1433 for (variable_name, variable_value) in environment_variables_hash_iter {
1434 if let Some(variable_name) = variable_name.as_str() {
1435 if let Some(variable_value) = variable_value.as_str() {
1436 if !variable_name.is_empty()
1437 && !variable_name.contains('\0')
1438 && !variable_name.contains('=')
1439 && !variable_value.contains('\0')
1440 {
1441 env::set_var(variable_name, variable_value);
1445 }
1446 }
1447 }
1448 }
1449 }
1450
1451 let available_parallelism = thread::available_parallelism()?.get();
1452
1453 let server_runtime = tokio::runtime::Builder::new_multi_thread()
1455 .worker_threads(available_parallelism)
1456 .max_blocking_threads(1536)
1457 .event_interval(25)
1458 .thread_name("server-pool")
1459 .enable_all()
1460 .build()?;
1461
1462 let log_runtime = tokio::runtime::Builder::new_multi_thread()
1464 .worker_threads(match available_parallelism / 2 {
1465 0 => 1,
1466 non_zero => non_zero,
1467 })
1468 .max_blocking_threads(768)
1469 .thread_name("log-pool")
1470 .enable_time()
1471 .build()?;
1472
1473 let (logger, receive_log) = async_channel::bounded::<LogMessage>(10000);
1474
1475 let log_filename = yaml_config["global"]["logFilePath"]
1476 .as_str()
1477 .map(String::from);
1478 let error_log_filename = yaml_config["global"]["errorLogFilePath"]
1479 .as_str()
1480 .map(String::from);
1481
1482 log_runtime.spawn(async move {
1483 let log_file = match log_filename {
1484 Some(log_filename) => Some(
1485 fs::OpenOptions::new()
1486 .append(true)
1487 .create(true)
1488 .open(log_filename)
1489 .await,
1490 ),
1491 None => None,
1492 };
1493
1494 let error_log_file = match error_log_filename {
1495 Some(error_log_filename) => Some(
1496 fs::OpenOptions::new()
1497 .append(true)
1498 .create(true)
1499 .open(error_log_filename)
1500 .await,
1501 ),
1502 None => None,
1503 };
1504
1505 let log_file_wrapped = match log_file {
1506 Some(Ok(file)) => Some(Arc::new(Mutex::new(BufWriter::with_capacity(131072, file)))),
1507 Some(Err(e)) => {
1508 eprintln!("Failed to open log file: {e}");
1509 None
1510 }
1511 None => None,
1512 };
1513
1514 let error_log_file_wrapped = match error_log_file {
1515 Some(Ok(file)) => Some(Arc::new(Mutex::new(BufWriter::with_capacity(131072, file)))),
1516 Some(Err(e)) => {
1517 eprintln!("Failed to open error log file: {e}");
1518 None
1519 }
1520 None => None,
1521 };
1522
1523 let log_file_wrapped_cloned_for_sleep = log_file_wrapped.clone();
1525 let error_log_file_wrapped_cloned_for_sleep = error_log_file_wrapped.clone();
1526 tokio::task::spawn(async move {
1527 let mut interval = time::interval(time::Duration::from_millis(100));
1528 loop {
1529 interval.tick().await;
1530 if let Some(log_file_wrapped_cloned) = log_file_wrapped_cloned_for_sleep.clone() {
1531 let mut locked_file = log_file_wrapped_cloned.lock().await;
1532 locked_file.flush().await.unwrap_or_default();
1533 }
1534 if let Some(error_log_file_wrapped_cloned) = error_log_file_wrapped_cloned_for_sleep.clone()
1535 {
1536 let mut locked_file = error_log_file_wrapped_cloned.lock().await;
1537 locked_file.flush().await.unwrap_or_default();
1538 }
1539 }
1540 });
1541
1542 while let Ok(message) = receive_log.recv().await {
1544 let (mut message, is_error) = message.get_message();
1545 let log_file_wrapped_cloned = if !is_error {
1546 log_file_wrapped.clone()
1547 } else {
1548 error_log_file_wrapped.clone()
1549 };
1550
1551 if let Some(log_file_wrapped_cloned) = log_file_wrapped_cloned {
1552 tokio::task::spawn(async move {
1553 let mut locked_file = log_file_wrapped_cloned.lock().await;
1554 if is_error {
1555 let now: DateTime<Local> = Local::now();
1556 let formatted_time = now.format("%Y-%m-%d %H:%M:%S").to_string();
1557 message = format!("[{formatted_time}]: {message}");
1558 }
1559 message.push('\n');
1560 if let Err(e) = locked_file.write(message.as_bytes()).await {
1561 eprintln!("Failed to write to log file: {e}");
1562 }
1563 });
1564 }
1565 }
1566 });
1567
1568 for msg in env_config::log_env_var_overrides() {
1570 logger
1571 .send_blocking(LogMessage::new(msg, true))
1572 .unwrap_or_default();
1573 }
1574
1575 let result = server_runtime.block_on(async {
1577 let event_loop_future = server_event_loop(
1578 yaml_config,
1579 logger,
1580 modules,
1581 module_error,
1582 modules_optional_builtin,
1583 first_startup,
1584 );
1585
1586 let (continue_tx, continue_rx) = async_channel::unbounded::<bool>();
1587 let cancel_token = CancellationToken::new();
1588
1589 #[cfg(unix)]
1590 {
1591 let cancel_token_clone = cancel_token.clone();
1592 let continue_tx_clone = continue_tx.clone();
1593 tokio::spawn(async move {
1594 if let Ok(mut signal) = signal::unix::signal(signal::unix::SignalKind::hangup()) {
1595 tokio::select! {
1596 _ = signal.recv() => {
1597 continue_tx_clone.send(true).await.unwrap_or_default();
1598 }
1599 _ = cancel_token_clone.cancelled() => {}
1600 }
1601 }
1602 });
1603 }
1604
1605 let cancel_token_clone = cancel_token.clone();
1606 tokio::spawn(async move {
1607 tokio::select! {
1608 result = signal::ctrl_c() => {
1609 if result.is_ok() {
1610 continue_tx.send(false).await.unwrap_or_default();
1611 }
1612 }
1613 _ = cancel_token_clone.cancelled() => {}
1614 }
1615 });
1616
1617 let result = tokio::select! {
1618 result = event_loop_future => {
1619 time::sleep(tokio::time::Duration::from_millis(100)).await;
1621
1622 result.map(|_| false)
1623 },
1624 continue_running = continue_rx.recv() => Ok(continue_running?)
1625 };
1626
1627 cancel_token.cancel();
1628
1629 result
1630 });
1631
1632 server_runtime.shutdown_timeout(time::Duration::from_secs(10));
1634
1635 result
1636}