tokio_rustls_acme2/
state.rs

1use crate::acceptor::AcmeAcceptor;
2use crate::acme::{Account, AcmeError, Auth, AuthStatus, Directory, Identifier, Order, OrderStatus, ACME_TLS_ALPN_NAME};
3use crate::{any_ecdsa_type, crypto_provider, AcmeConfig, Incoming, ResolvesServerCertAcme, UseChallenge};
4use chrono::{DateTime, TimeZone, Utc};
5use core::fmt;
6use futures_util::{ready, FutureExt, Stream};
7use rcgen::{CertificateParams, DistinguishedName, KeyPair, PKCS_ECDSA_P256_SHA256};
8use rustls_pki_types::{CertificateDer as RustlsCertificate, PrivateKeyDer, PrivatePkcs8KeyDer};
9use std::convert::Infallible;
10use std::fmt::Debug;
11use std::future::Future;
12use std::pin::Pin;
13use std::sync::Arc;
14use std::task::{Context, Poll};
15use std::time::Duration;
16use thiserror::Error;
17use tokio::io::{AsyncRead, AsyncWrite};
18use tokio_rustls::rustls::crypto::CryptoProvider;
19use tokio_rustls::rustls::sign::CertifiedKey;
20use tokio_rustls::rustls::ServerConfig;
21use x509_parser::parse_x509_certificate;
22
23#[allow(clippy::type_complexity)]
24pub struct AcmeState<EC: Debug = Infallible, EA: Debug = EC> {
25    config: Arc<AcmeConfig<EC, EA>>,
26    resolver: Arc<ResolvesServerCertAcme>,
27    account_key: Option<Vec<u8>>,
28
29    early_action: Option<Pin<Box<dyn Future<Output = Event<EC, EA>> + Send>>>,
30    load_cert: Option<Pin<Box<dyn Future<Output = Result<Option<Vec<u8>>, EC>> + Send>>>,
31    load_account: Option<Pin<Box<dyn Future<Output = Result<Option<Vec<u8>>, EA>> + Send>>>,
32    order: Option<Pin<Box<dyn Future<Output = Result<Vec<u8>, OrderError>> + Send>>>,
33    backoff_cnt: usize,
34    wait: Option<Pin<Box<tokio::time::Sleep>>>,
35}
36
37impl<EC: 'static + Debug, EA: 'static + Debug> fmt::Debug for AcmeState<EC, EA> {
38    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
39        f.debug_struct("AcmeState").field("config", &self.config).finish_non_exhaustive()
40    }
41}
42
43pub type Event<EC, EA> = Result<EventOk, EventError<EC, EA>>;
44
45#[derive(Debug)]
46pub enum EventOk {
47    DeployedCachedCert,
48    DeployedNewCert,
49    CertCacheStore,
50    AccountCacheStore,
51}
52
53#[derive(Error, Debug)]
54pub enum EventError<EC: Debug, EA: Debug> {
55    #[error("cert cache load: {0}")]
56    CertCacheLoad(EC),
57    #[error("account cache load: {0}")]
58    AccountCacheLoad(EA),
59    #[error("cert cache store: {0}")]
60    CertCacheStore(EC),
61    #[error("account cache store: {0}")]
62    AccountCacheStore(EA),
63    #[error("cached cert parse: {0}")]
64    CachedCertParse(CertParseError),
65    #[error("order: {0}")]
66    Order(OrderError),
67    #[error("new cert parse: {0}")]
68    NewCertParse(CertParseError),
69}
70
71#[derive(Error, Debug)]
72pub enum OrderError {
73    #[error("acme error: {0}")]
74    Acme(#[from] AcmeError),
75    #[error("certificate generation error: {0}")]
76    Rcgen(#[from] rcgen::Error),
77    #[error("bad order object: {0:?}")]
78    BadOrder(Order),
79    #[error("bad auth object: {0:?}")]
80    BadAuth(Auth),
81    #[error("authorization for {0} failed too many times")]
82    TooManyAttemptsAuth(String),
83    #[error("order status stayed on processing too long")]
84    ProcessingTimeout(Order),
85}
86
87#[derive(Error, Debug)]
88pub enum CertParseError {
89    #[error("X509 parsing error: {0}")]
90    X509(#[from] x509_parser::nom::Err<x509_parser::error::X509Error>),
91    #[error("expected 2 or more pem, got: {0}")]
92    Pem(#[from] pem::PemError),
93    #[error("expected 2 or more pem, got: {0}")]
94    TooFewPem(usize),
95    #[error("unsupported private key type")]
96    InvalidPrivateKey,
97}
98
99impl<EC: 'static + Debug, EA: 'static + Debug> AcmeState<EC, EA> {
100    pub fn incoming<TCP: AsyncRead + AsyncWrite + Unpin, ETCP, ITCP: Stream<Item = Result<TCP, ETCP>> + Unpin>(
101        self,
102        tcp_incoming: ITCP,
103        alpn_protocols: Vec<Vec<u8>>,
104    ) -> Incoming<TCP, ETCP, ITCP, EC, EA> {
105        #[allow(deprecated)]
106        let acceptor = self.acceptor();
107        Incoming::new(tcp_incoming, self, acceptor, alpn_protocols)
108    }
109    #[deprecated(note = "please use high-level API via `AcmeState::incoming()` instead or refer to updated low-level API examples")]
110    #[allow(deprecated)]
111    pub fn acceptor(&self) -> AcmeAcceptor {
112        AcmeAcceptor::new(self.resolver())
113    }
114    #[cfg(feature = "axum")]
115    pub fn axum_acceptor(&self, rustls_config: Arc<ServerConfig>) -> crate::axum::AxumAcceptor {
116        #[allow(deprecated)]
117        crate::axum::AxumAcceptor::new(self.acceptor(), rustls_config)
118    }
119
120    #[cfg(feature = "tower")]
121    pub fn http01_challenge_tower_service(&self) -> crate::tower::TowerHttp01ChallengeService {
122        crate::tower::TowerHttp01ChallengeService(self.resolver.clone())
123    }
124
125    pub fn resolver(&self) -> Arc<ResolvesServerCertAcme> {
126        self.resolver.clone()
127    }
128    /// Creates a [rustls::ServerConfig] for TLS-ALPN-01 challenge connections. Use this if [crate::is_tls_alpn_challenge] returns `true`.
129    #[cfg(any(feature = "ring", feature = "aws-lc-rs"))]
130    pub fn challenge_rustls_config(&self) -> Arc<ServerConfig> {
131        self.challenge_rustls_config_with_provider(crypto_provider().into())
132    }
133    /// Same as [AcmeState::challenge_rustls_config], with a specific [CryptoProvider].
134    pub fn challenge_rustls_config_with_provider(&self, provider: Arc<CryptoProvider>) -> Arc<ServerConfig> {
135        let mut rustls_config = ServerConfig::builder_with_provider(provider)
136            .with_safe_default_protocol_versions()
137            .unwrap()
138            .with_no_client_auth()
139            .with_cert_resolver(self.resolver());
140        rustls_config.alpn_protocols.push(ACME_TLS_ALPN_NAME.to_vec());
141        Arc::new(rustls_config)
142    }
143    /// Creates a default [rustls::ServerConfig] for accepting regular tls connections. Use this if [crate::is_tls_alpn_challenge] returns `false`.
144    /// If you need a [rustls::ServerConfig], which uses the certificates acquired by this [AcmeState],
145    /// you may build your own using the output of [AcmeState::resolver].
146    #[cfg(any(feature = "ring", feature = "aws-lc-rs"))]
147    pub fn default_rustls_config(&self) -> Arc<ServerConfig> {
148        self.default_rustls_config_with_provider(crypto_provider().into())
149    }
150    /// Same as [AcmeState::default_rustls_config], with a specific [CryptoProvider].
151    pub fn default_rustls_config_with_provider(&self, provider: Arc<CryptoProvider>) -> Arc<ServerConfig> {
152        let rustls_config = ServerConfig::builder_with_provider(provider)
153            .with_safe_default_protocol_versions()
154            .unwrap()
155            .with_no_client_auth()
156            .with_cert_resolver(self.resolver());
157        Arc::new(rustls_config)
158    }
159    pub fn new(config: AcmeConfig<EC, EA>) -> Self {
160        let config = Arc::new(config);
161        Self {
162            config: config.clone(),
163            resolver: ResolvesServerCertAcme::new(),
164            account_key: None,
165            early_action: None,
166            load_cert: Some(Box::pin({
167                let config = config.clone();
168                async move { config.cache.load_cert(&config.domains, &config.directory_url).await }
169            })),
170            load_account: Some(Box::pin({
171                let config = config.clone();
172                async move { config.cache.load_account(&config.contact, &config.directory_url).await }
173            })),
174            order: None,
175            backoff_cnt: 0,
176            wait: None,
177        }
178    }
179    fn parse_cert(pem: &[u8]) -> Result<(CertifiedKey, [DateTime<Utc>; 2]), CertParseError> {
180        let mut pems = pem::parse_many(pem)?;
181        if pems.len() < 2 {
182            return Err(CertParseError::TooFewPem(pems.len()));
183        }
184        let pk = match any_ecdsa_type(&PrivateKeyDer::Pkcs8(PrivatePkcs8KeyDer::from(pems.remove(0).contents()))) {
185            Ok(pk) => pk,
186            Err(_) => return Err(CertParseError::InvalidPrivateKey),
187        };
188        let cert_chain: Vec<RustlsCertificate> = pems.into_iter().map(|p| RustlsCertificate::from(p.into_contents())).collect();
189        let validity = match parse_x509_certificate(&cert_chain[0]) {
190            Ok((_, cert)) => {
191                let validity = cert.validity();
192                [validity.not_before, validity.not_after].map(|t| Utc.timestamp_opt(t.timestamp(), 0).earliest().unwrap())
193            }
194            Err(err) => return Err(CertParseError::X509(err)),
195        };
196        let cert = CertifiedKey::new(cert_chain, pk);
197        Ok((cert, validity))
198    }
199
200    #[allow(clippy::result_large_err)]
201    fn process_cert(&mut self, pem: Vec<u8>, cached: bool) -> Event<EC, EA> {
202        let (cert, validity) = match (Self::parse_cert(&pem), cached) {
203            (Ok(r), _) => r,
204            (Err(err), cached) => {
205                return match cached {
206                    true => Err(EventError::CachedCertParse(err)),
207                    false => Err(EventError::NewCertParse(err)),
208                }
209            }
210        };
211        self.resolver.set_cert(Arc::new(cert));
212        let wait_duration = (validity[1] - (validity[1] - validity[0]) / 3 - Utc::now())
213            .max(chrono::Duration::zero())
214            .to_std()
215            .unwrap_or_default();
216        self.wait = Some(Box::pin(tokio::time::sleep(wait_duration)));
217        if cached {
218            return Ok(EventOk::DeployedCachedCert);
219        }
220        let config = self.config.clone();
221        self.early_action = Some(Box::pin(async move {
222            match config.cache.store_cert(&config.domains, &config.directory_url, &pem).await {
223                Ok(()) => Ok(EventOk::CertCacheStore),
224                Err(err) => Err(EventError::CertCacheStore(err)),
225            }
226        }));
227        Event::Ok(EventOk::DeployedNewCert)
228    }
229    async fn order(config: Arc<AcmeConfig<EC, EA>>, resolver: Arc<ResolvesServerCertAcme>, key_pair: Vec<u8>) -> Result<Vec<u8>, OrderError> {
230        let directory = Directory::discover(&config.client_config, &config.directory_url).await?;
231        let account = Account::create_with_keypair(&config.client_config, directory, &config.contact, &key_pair).await?;
232
233        let mut params = CertificateParams::new(config.domains.clone())?;
234        params.distinguished_name = DistinguishedName::new();
235        let key_pair = KeyPair::generate_for(&PKCS_ECDSA_P256_SHA256)?;
236        let csr = params.serialize_request(&key_pair)?;
237
238        let (order_url, mut order) = account.new_order(&config.client_config, config.domains.clone()).await?;
239        loop {
240            match order.status {
241                OrderStatus::Pending => {
242                    // Force in order authorizations to allow single global challenge data state
243                    for url in order.authorizations.iter() {
244                        Self::authorize(&config, &resolver, &account, url).await?
245                    }
246                    log::info!("completed all authorizations");
247                    order = account.order(&config.client_config, &order_url).await?;
248                }
249                OrderStatus::Processing => {
250                    for i in 0u64..10 {
251                        log::info!("order processing");
252                        tokio::time::sleep(Duration::from_secs(1u64 << i)).await;
253                        order = account.order(&config.client_config, &order_url).await?;
254                        if order.status != OrderStatus::Processing {
255                            break;
256                        }
257                    }
258                    if order.status == OrderStatus::Processing {
259                        return Err(OrderError::ProcessingTimeout(order));
260                    }
261                }
262                OrderStatus::Ready => {
263                    log::info!("sending csr");
264                    order = account.finalize(&config.client_config, order.finalize, csr.der()).await?
265                }
266                OrderStatus::Valid { certificate } => {
267                    log::info!("download certificate");
268                    let pem = [
269                        &key_pair.serialize_pem(),
270                        "\n",
271                        &account.certificate(&config.client_config, certificate).await?,
272                    ]
273                    .concat();
274                    return Ok(pem.into_bytes());
275                }
276                OrderStatus::Invalid => return Err(OrderError::BadOrder(order)),
277            }
278        }
279    }
280    async fn authorize(config: &AcmeConfig<EC, EA>, resolver: &ResolvesServerCertAcme, account: &Account, url: &String) -> Result<(), OrderError> {
281        let auth = account.auth(&config.client_config, url).await?;
282        let (domain, challenge_url) = match auth.status {
283            AuthStatus::Pending => {
284                let Identifier::Dns(domain) = auth.identifier;
285                log::info!("trigger challenge for {}", &domain);
286                let challenge = match config.challenge_type {
287                    UseChallenge::Http01 => {
288                        let (challenge, key_auth) = account.http_01(&auth.challenges)?;
289                        resolver.set_http_01_challenge_data(challenge.token.clone(), key_auth);
290                        challenge
291                    }
292                    UseChallenge::TlsAlpn01 => {
293                        let (challenge, auth_key) = account.tls_alpn_01(&auth.challenges, domain.clone())?;
294                        resolver.set_tls_alpn_01_challenge_data(domain.clone(), Arc::new(auth_key));
295                        challenge
296                    }
297                };
298                account.challenge(&config.client_config, &challenge.url).await?;
299                (domain, challenge.url.clone())
300            }
301            AuthStatus::Valid => {
302                // clear challenge data when auth validated
303                resolver.clear_challenge_data();
304                return Ok(());
305            }
306            _ => {
307                // clear challenge data when auth invalidated
308                resolver.clear_challenge_data();
309                return Err(OrderError::BadAuth(auth));
310            }
311        };
312        for i in 0u64..5 {
313            tokio::time::sleep(Duration::from_secs(1u64 << i)).await;
314            let auth = account.auth(&config.client_config, url).await?;
315            match auth.status {
316                AuthStatus::Pending => {
317                    log::info!("authorization for {} still pending", &domain);
318                    account.challenge(&config.client_config, &challenge_url).await?
319                }
320                AuthStatus::Valid => {
321                    // clear challenge data when auth validated
322                    resolver.clear_challenge_data();
323                    return Ok(());
324                }
325                _ => {
326                    // clear challenge data when auth invalidated
327                    resolver.clear_challenge_data();
328                    return Err(OrderError::BadAuth(auth));
329                }
330            }
331        }
332        Err(OrderError::TooManyAttemptsAuth(domain))
333    }
334    fn poll_next_infinite(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Event<EC, EA>> {
335        loop {
336            // queued early action
337            if let Some(early_action) = &mut self.early_action {
338                let result = ready!(early_action.poll_unpin(cx));
339                self.early_action.take();
340                return Poll::Ready(result);
341            }
342
343            // sleep
344            if let Some(timer) = &mut self.wait {
345                ready!(timer.poll_unpin(cx));
346                self.wait.take();
347            }
348
349            // load from cert cache
350            if let Some(load_cert) = &mut self.load_cert {
351                let result = ready!(load_cert.poll_unpin(cx));
352                self.load_cert.take();
353                match result {
354                    Ok(Some(pem)) => {
355                        return Poll::Ready(Self::process_cert(self.get_mut(), pem, true));
356                    }
357                    Ok(None) => {}
358                    Err(err) => return Poll::Ready(Err(EventError::CertCacheLoad(err))),
359                }
360            }
361
362            // load from account cache
363            if let Some(load_account) = &mut self.load_account {
364                let result = ready!(load_account.poll_unpin(cx));
365                self.load_account.take();
366                match result {
367                    Ok(Some(key_pair)) => self.account_key = Some(key_pair),
368                    Ok(None) => {}
369                    Err(err) => return Poll::Ready(Err(EventError::AccountCacheLoad(err))),
370                }
371            }
372
373            // execute order
374            if let Some(order) = &mut self.order {
375                let result = ready!(order.poll_unpin(cx));
376                self.order.take();
377                match result {
378                    Ok(pem) => {
379                        self.backoff_cnt = 0;
380                        return Poll::Ready(Self::process_cert(self.get_mut(), pem, false));
381                    }
382                    Err(err) => {
383                        // TODO: replace key on some errors or high backoff_cnt?
384                        self.wait = Some(Box::pin(tokio::time::sleep(Duration::from_secs(1 << self.backoff_cnt))));
385                        self.backoff_cnt = (self.backoff_cnt + 1).min(16);
386                        return Poll::Ready(Err(EventError::Order(err)));
387                    }
388                }
389            }
390
391            // schedule order
392            let account_key = match &self.account_key {
393                None => {
394                    let account_key = Account::generate_key_pair();
395                    self.account_key = Some(account_key.clone());
396                    let config = self.config.clone();
397                    let account_key_clone = account_key.clone();
398                    self.early_action = Some(Box::pin(async move {
399                        match config
400                            .cache
401                            .store_account(&config.contact, &config.directory_url, &account_key_clone)
402                            .await
403                        {
404                            Ok(()) => Ok(EventOk::AccountCacheStore),
405                            Err(err) => Err(EventError::AccountCacheStore(err)),
406                        }
407                    }));
408                    account_key
409                }
410                Some(account_key) => account_key.clone(),
411            };
412            let config = self.config.clone();
413            let resolver = self.resolver.clone();
414            self.order = Some(Box::pin(Self::order(config.clone(), resolver.clone(), account_key)));
415        }
416    }
417}
418
419impl<EC: 'static + Debug, EA: 'static + Debug> Stream for AcmeState<EC, EA> {
420    type Item = Event<EC, EA>;
421
422    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
423        Poll::Ready(Some(ready!(self.poll_next_infinite(cx))))
424    }
425}