gNOI 证书管理服务
总结 使用 gNOI CertificateManagement 服务管理目标网络元素上的证书。
概述
软件包中的 gnoi.certificate gNOI CertificateManagement 服务可处理目标网络元素上的证书管理。proto 定义文件位于 https://github.com/openconfig/gnoi/blob/master/cert/cert.proto。
公钥基础架构 (PKI) 支持公共加密密钥的分发和识别,使用户既能安全地通过互联网等网络交换数据,又能验证另一方的身份。借助 Junos PKI,您可以管理 Junos 设备上的公钥证书,包括下载、生成和验证证书。gNOI CertificateManagement 服务通过 Junos PKI 定义证书管理操作。两个主要操作是:
-
安装 — 在目标网络设备上使用新证书 ID 安装新证书。如果证书 ID 已存在,则操作将返回错误。
-
轮换 - 替换目标网络设备上已有证书 ID 的现有证书。如果在此过程中流中断或任何步骤失败,则设备将回滚到原始证书。
图 1 概述了 Install() 该工作流程和 Rotate() 操作。对于这两种操作,客户端可以自行生成证书签名请求 (CSR),或者请求目标生成 CSR。无论哪种情况,客户端都会将 CSR 转发给证书颁发机构 (CA),以请求数字证书。然后,客户端会使用用于操作的新证书 ID Install() 或使用现有操作证书 ID Rotate() 加载目标上的证书。对于 Rotate() 操作,客户端还应验证任何更换证书,并根据验证的成功或失败敲定或取消 Rotate() 操作。如果客户端取消操作,则服务器将回滚证书、密钥对和任何 CA 捆绑包(如果请求中存在它)。
从 Junos OS 演化版 23.1R1 开始,在 、 或LoadCertificate()操作期间Install()Rotate(),gNOI 服务器将使用相应的 CA 证书验证新的最终实体证书。因此,gNOI 服务器的 PKI 必须包含验证新证书的根 CA 证书。您可以将必需的 CA 证书作为 gNOI CA 捆绑包的一部分加载,也可以单独加载。如果验证失败,设备不会安装新证书。
gNOI 服务器仅支持一个用于 gNOI 服务的全局 CA 证书包。使用 gNOI CertificateManagement 服务加载 CA 捆绑包时,以下语句适用:
-
gNOI 始终使用保留的
ca-profile-group标识符gnoi-ca-bundle加载 CA 证书捆绑包。 -
如果您使用 gNOI 加载 CA 证书捆绑包,则设备会隐式使用交互身份验证并承担以下配置,即使设备未显式配置。
[edit system services extension-service request-response grpc ssl] mutual-authentication { certificate-authority gnoi-ca-bundle; client-certificate-request require-certificate-and-verify; } -
如果 gNOI 服务器收到加载新 CA 证书包的请求,它将从设备中清除前一个 CA 捆绑包的证书,并加载新 CA 捆绑包的证书。
-
如果您使用 gNOI 加载 CA 证书捆绑包并配置
[edit system services extension-service request-response grpc ssl mutual-authentication]语句层次结构,则配置的语句优先。
因此,您可以在 gNOI 服务器上初始设置仅服务器身份验证, Install() 然后使用 RPC 加载 CA 证书。使用 gNOI 加载初始 CA 证书捆绑包时,设备将执行以下步骤:
- 在 Junos PKI 中添加 CA 证书。
- 使用
ca-profile-group标识符gnoi-ca-bundle在层次结构级别自动配置 gNOI CA 证书包[edit security pki]。 - 从仅服务器身份验证切换到交互身份验证。
[edit]
security {
pki {
ca-profile gnoi-ca-bundle_1 {
ca-identity gnoi-ca-bundle_1;
}
ca-profile gnoi-ca-bundle_2 {
ca-identity gnoi-ca-bundle_2;
}
ca-profile-group gnoi-ca-bundle {
cert-base-count 2;
}
}
}
RPC Rotate() 不支持在轮换操作期间在身份验证模式之间切换。因此, Rotate() 不支持首次在 gNOI 服务器上加载 CA 证书捆绑包,因为它会使设备在操作过程中从仅服务器身份验证切换到相互身份验证。当身份验证模式发生变化时,网络设备必须重新启动 gRPC 堆栈,并且连接将丢失。如果流中断,客户端将无法敲定轮换请求,并且设备将回滚到发起请求之前已到位的 Rotate() 证书。
hot-reloading当身份验证模式在操作期间保持不变时,层级的语句[edit system services extension-service request-response grpc ssl]只会在证书更新期间维护 gRPC 会话。例如,如果身份验证模式从仅服务器身份验证切换到交互身份验证,客户端将断开连接。
支持的 RPC
表 1 概述了 CertificateManagement Junos 设备支持的服务 RPC。
| 版本中引入的 | RPC | 说明 |
|---|---|---|
CanGenerateCSR() |
查询目标设备,以确定设备是否可以生成具有指定密钥类型、密钥大小和证书类型的证书签名请求 (CSR)。支持的值:
|
Junos OS Evolved 23.1R1 |
GenerateCSR() |
生成并返回证书签名请求 (CSR)。 |
Junos OS Evolved 22.2R1 |
GetCertificates() |
返回目标设备上加载的本地证书。 |
Junos OS Evolved 22.2R1 |
Install() |
在目标设备上加载新证书,具体方式是创建 CSR 请求,基于 CSR 生成证书,然后使用新证书 ID 加载证书。 |
Junos OS Evolved 22.2R1 |
LoadCertificate() |
在目标设备上加载由证书颁发机构 (CA) 签署的证书。 |
Junos OS Evolved 22.2R1 |
LoadCertificateAuthorityBundle() |
在目标设备上加载 CA 证书包。 |
Junos OS Evolved 22.2R1 |
RevokeCertificates() |
撤销目标设备上具有指定证书 ID 的证书。 |
Junos OS Evolved 23.1R1 |
Rotate() |
通过创建 CSR 请求,生成基于 CSR 的证书,并使用现有证书 ID 加载证书,来替换目标设备上的现有证书。 |
Junos OS Evolved 22.2R1 |
网络设备配置
要在 Junos 设备上使用 gNOI 证书管理服务,必须为 gRPC 扩展服务配置 use-pki 和 hot-reloading 语句。在大多数情况下,您可以在网络设备上配置 use-pki gRPC 服务时配置语句。更新 hot-reloading 影响会话的证书时,需要该语句来维护 gRPC 会话。
开始之前:
- 按照配置 gRPC 服务中的说明,在网络设备上 配置 gRPC 服务。
- 按照配置 gNOI 服务中的说明,配置网络管理系统以支持 gNOI 操作。
要为服务运维配置网络设备 CertificateManagement :
安装证书
您可以使用服务 Install() RPC 在CertificateManagement目标设备上加载新证书。使用Install()操作安装新证书时,必须指定目标设备上不存在的新证书 ID。您还可以选择在操作中Install()加载 CA 证书捆绑包。
作为操作的 Install() 一部分,设备会验证新证书。因此,Junos PKI 必须具有验证新证书的根 CA 证书。您可以在操作中 Install() 加载所需的 CA 证书,或者在操作之前单独加载(如果它不在 PKI 中)。
如果安装将用于 gRPC 会话身份验证的新本地证书,还必须更新设备上的 gRPC 服务器配置,以使用新的证书 ID。
示例:安装证书
在此示例中,gNOI 服务器最初仅配置了本地证书,尚未配置为使用交互身份验证。gNOI 客户端使用 RPC 在 Install() 设备上加载新的本地证书和 CA 证书包。在 gNOI 服务器上加载 CA 捆绑包后,服务器默认使用交互身份验证。CA 捆绑包包括用于客户端证书的根 CA 证书以及用于新服务器证书的根 CA 证书。
客户端执行 gnoi_cert_install_certificate_csr.py Python 应用程序,该应用程序将执行以下操作:
- 请求目标生成 CSR。
- 获取基于 CSR 的已签名证书。
- 在目标网络设备上加载新服务器证书、服务器的新根 CA 证书和客户端的根 CA 证书。
应用程序使用 InstallCertificateRequest 带有相应参数的消息来定义生成 CSR 和加载证书的请求。对于每个请求,应用程序都使用 Install() RPC 将请求发送到网络设备。
应用程序gnoi_cert_install_certificate_csr.py会导入模块grpc_channel以建立通道。配置 gNOI 服务中介绍了该grpc_channel模块。应用程序参数存储在文件中args_cert_install_csr.txt。此处显示应用程序和参数文件。
gnoi_cert_install_certificate_csr.py
"""gNOI Install Certificate utility."""
from __future__ import print_function
from __future__ import unicode_literals
import argparse
import logging
import re
from getpass import getpass
from subprocess import call
import cert_pb2
import cert_pb2_grpc
from grpc_channel import grpc_authenticate_channel_server_only
def get_args(parser):
parser.add_argument('--server',
dest='server',
type=str,
default='localhost',
help='Server IP or name. Default is localhost.')
parser.add_argument('--port',
dest='port',
nargs='?',
type=int,
default=50051,
help='Server port. Default is 50051')
parser.add_argument('--client_key',
dest='client_key',
type=str,
default='',
help='Full path of the client private key. Default is "".')
parser.add_argument('--client_cert',
dest='client_cert',
type=str,
default='',
help='Full path of the client certificate. Default is "".')
parser.add_argument('--root_ca_cert',
dest='root_ca_cert',
required=True,
type=str,
help='Full path of the Root CA certificate.')
parser.add_argument('--user_id',
dest='user_id',
required=True,
type=str,
help='User ID for RPC call credentials.')
parser.add_argument('--type',
dest='type',
type=int,
default='1',
help='Certificate Type. Default is 1. Valid value is 1 (1 is CT_X509); Invalid value is 0 (0 is CT_UNKNOWN).')
parser.add_argument('--min_key_size',
dest='min_key_size',
type=int,
default='2048',
help='Minimum key size. Default is 2048.')
parser.add_argument('--key_type',
dest='key_type',
type=int,
default='1',
help='Key Type. Default is 1 (KT_RSA); 0 is KT_UNKNOWN.')
parser.add_argument('--common_name',
dest='common_name',
type=str,
default='',
help='CN of the certificate')
parser.add_argument('--country',
dest='country',
type=str,
default='US',
help='Country name')
parser.add_argument('--state',
dest='state',
type=str,
default='CA',
help='State name')
parser.add_argument('--city',
dest='city',
type=str,
default='Sunnyvale',
help='City name')
parser.add_argument('--organization',
dest='organization',
type=str,
default='Acme',
help='Organization name')
parser.add_argument('--organizational_unit',
dest='organizational_unit',
type=str,
default='Test',
help='Organization unit name')
parser.add_argument('--ip_address',
dest='ip_address',
type=str,
default='',
help='IP address on the certificate')
parser.add_argument('--email_id',
dest='email_id',
type=str,
default='',
help='Email id')
parser.add_argument('--certificate_id',
dest='certificate_id',
required=True,
type=str,
help='Certificate id.')
parser.add_argument('--server_cert_private_key',
dest='server_cert_private_key',
type=str,
default='',
help='Server certificate private key')
parser.add_argument('--server_cert_public_key',
dest='server_cert_public_key',
type=str,
default='',
help='Server certificate public key')
parser.add_argument('--server_cert',
dest='server_cert',
type=str,
default='server_cert',
help='Server certificate')
parser.add_argument('--server_root_ca1',
dest='server_root_ca1',
type=str,
default='server_root_ca1',
help='Server Root CA')
parser.add_argument('--server_root_ca2',
dest='server_root_ca2',
type=str,
default='server_root_ca2',
help='Server Root CA')
parser.add_argument('--client_root_ca1',
dest='client_root_ca1',
type=str,
default='client_root_ca1',
help='Client Root CA')
parser.add_argument('--client_root_ca2',
dest='client_root_ca2',
type=str,
default='client_root_ca2',
help='Client Root CA')
parser.add_argument('--client_root_ca3',
dest='client_root_ca3',
type=str,
default='client_root_ca3',
help='Client Root CA')
parser.add_argument('--client_root_ca4',
dest='client_root_ca4',
type=str,
default='client_root_ca4',
help='Client Root CA')
args = parser.parse_args()
return args
def install_cert(channel, metadata, args):
try:
stub = cert_pb2_grpc.CertificateManagementStub(channel)
print("Executing GNOI::CertificateManagement::Install")
# Create request to generate certificate signing request (CSR)
it = []
req = cert_pb2.InstallCertificateRequest()
req.generate_csr.csr_params.type = args.type
req.generate_csr.csr_params.min_key_size = args.min_key_size
req.generate_csr.csr_params.key_type = args.key_type
req.generate_csr.csr_params.common_name = args.common_name
req.generate_csr.csr_params.country = args.country
req.generate_csr.csr_params.state = args.state
req.generate_csr.csr_params.city = args.city
req.generate_csr.csr_params.organization = args.organization
req.generate_csr.csr_params.organizational_unit = args.organizational_unit
req.generate_csr.csr_params.ip_address = args.ip_address
req.generate_csr.csr_params.email_id = args.email_id
req.generate_csr.certificate_id = args.certificate_id
it.append(req)
# Send request to generate CSR
for csr_rsp in stub.Install(iter(it), metadata=metadata, timeout=180):
logging.info(csr_rsp)
# Write CSR to a file
with open('/home/lab/certs/server_temp.csr', "wb") as file:
file.write(csr_rsp.generated_csr.csr.csr)
# If client connects to server IP address
# update openssl.cnf template to include subjectAltName IP extension
with open('/etc/pki/certs/openssl.cnf', 'r') as fd:
data = fd.read()
data1 = re.sub(r'(subjectAltName=IP:).*',
r'\g<1>'+args.ip_address, data)
with open('/home/lab/certs/openssl_temp.cnf', 'w') as fd:
fd.write(data1)
# Generate certificate with v3 extensions
cmd = "openssl x509 -req -days 365 -in /home/lab/certs/server_temp.csr -CA /etc/pki/certs/serverRootCA.crt -CAkey /etc/pki/certs/serverRootCA.key -CAcreateserial -out /home/lab/certs/server_temp.crt -extensions v3_sign -extfile /home/lab/certs/openssl_temp.cnf -sha384"
decrypted = call(cmd, shell=True)
# Create request to install node certificate and CA certificates
print("\nExecuting GNOI::CertificateManagement::Install")
it = []
req = cert_pb2.InstallCertificateRequest()
# Import certificate and add to request
cert_data = bytearray(b'')
with open("/home/lab/certs/server_temp.crt", "rb") as file:
cert_data = file.read()
req.load_certificate.certificate.type = args.type
req.load_certificate.certificate_id = args.certificate_id
req.load_certificate.certificate.certificate = cert_data
# Add client and server CA certificates to request
ca1 = req.load_certificate.ca_certificates.add()
ca1.type = args.type
ca1.certificate = open(args.client_root_ca1, 'rb').read()
ca2 = req.load_certificate.ca_certificates.add()
ca2.type = args.type
ca2.certificate = open(args.server_root_ca1, 'rb').read()
it.append(req)
# Send request to install node certificate and CA bundle
for rsp in stub.Install(iter(it), metadata=metadata, timeout=180):
logging.info("Installing certificates: %s", rsp)
print("Install complete.")
except Exception as e:
logging.error('Certificate install error: %s', e)
print(e)
def main():
parser = argparse.ArgumentParser(fromfile_prefix_chars='@')
args = get_args(parser)
grpc_server_password = getpass("gRPC server password for executing RPCs: ")
metadata = [('username', args.user_id),
('password', grpc_server_password)]
try:
# Establish grpc channel to network device
channel = grpc_authenticate_channel_server_only(
args.server, args.port, args.root_ca_cert)
install_cert(channel, metadata, args)
except Exception as e:
logging.error('Received error: %s', e)
print(e)
if __name__ == '__main__':
logging.basicConfig(filename='gnoi-testing.log',
format='%(asctime)s %(levelname)-8s %(message)s',
level=logging.INFO,
datefmt='%Y-%m-%d %H:%M:%S')
main()
args_cert_install_csr.txt
--server=10.53.52.169 --port=50051 --root_ca_cert=/etc/pki/certs/serverRootCA.crt --user_id=gnoi-user --type=1 --min_key_size=2048 --key_type=1 --common_name=gnoi-server.example.com --country=US --state=CA --city=Sunnyvale --organization=Acme --organizational_unit=testing --ip_address=10.53.52.169 --email_id=test@example.com --certificate_id=gnoi-server1 --client_root_ca1=/etc/pki/certs/clientRootCA.crt --server_root_ca1=/etc/pki/certs/serverRootCA1.crt
执行应用程序
当客户端执行应用程序时,应用程序会请求 CSR,获取已签名的证书,并在目标网络设备上加载新的服务器证书和 CA 证书。
lab@gnoi-client:~/src/gnoi/proto$ python3 gnoi_cert_install_certificate_csr.py @args_cert_install_csr.txt gRPC server password for executing RPCs: Creating channel Executing GNOI::CertificateManagement::Install Signature ok subject=CN = gnoi-server.example.com, C = US, ST = CA, O = Acme, OU = testing Getting CA Private Key Executing GNOI::CertificateManagement::Install Install complete.
安装新服务器证书后,您必须将服务器配置为使用该证书 ID 进行 gRPC 会话身份验证,如下所示。此外,由于 Install() 操作加载了新的 CA 证书,设备会隐式使用交互身份验证。因此,在建立通道时,所有后续 gRPC 会话都必须包含客户端的证书和密钥。
user@gnoi-server> show configuration system services extension-service request-response grpc ssl port 50051; local-certificate gnoi-server1; hot-reloading; use-pki;
如果执行应用程序并提供服务器上已存在的证书 ID,则应用程序将返回错误 ALREADY_EXISTS ,因为 Install() 操作需要新的证书 ID。
lab@gnoi-client:~/src/gnoi/proto$ python3 gnoi_cert_install_certificate_csr.py @args_cert_install_csr.txt
gRPC server password for executing RPCs:
Creating channel
Executing GNOI::CertificateManagement::Install
Signature ok
subject=CN = gnoi-server.example.com, C = US, ST = CA, O = Acme, OU = testing
Getting CA Private Key
Executing GNOI::CertificateManagement::Install
<_MultiThreadedRendezvous of RPC that terminated with:
status = StatusCode.ALREADY_EXISTS
details = ""
debug_error_string = "{"created":"@1652241881.676147097","description":"Error received from peer ipv4:10.53.52.169:50051","file":"src/core/lib/surface/call.cc","file_line":903,"grpc_message":"","grpc_status":6}"
轮换证书
您可以使用 CertificateManagement 服务 Rotate() RPC 替换目标设备上的现有证书。使用 Rotate() 操作替换现有证书时,必须使用目标设备上已存在的证书 ID 加载证书。您还可以选择替换现有的 gNOI CA 证书包作为操作的 Rotate() 一部分。
操作 Rotate() 与操作类似 Install() ,只是操作 Rotate() 会替换现有证书,而不是安装新证书。此外,客户端必须验证更新的证书是否有效,然后根据证书验证的成功或失败敲定或取消 Rotate() 请求。
作为操作的 Rotate() 一部分,设备会验证新证书。因此,Junos PKI 必须具有验证新证书的根 CA 证书。您可以在操作中 Rotate() 加载所需的 CA 证书,或者在操作之前单独加载(如果它不在 PKI 中)。
示例:轮换证书
在此示例中,客户端会执行 gnoi_cert_rotate_certificate_csr.py Python 应用程序,该应用程序将执行以下操作:
- 请求目标生成 CSR。
- 获取基于 CSR 的签名证书
- 替换目标网络设备上的节点证书和 gNOI CA 捆绑包。
- 验证新证书。
Rotate敲定操作。
应用程序使用 RotateCertificateRequest 带有适当参数的消息来定义生成 CSR 以及加载证书和 CA 捆绑包的请求。对于每个请求,应用程序都使用 Rotate() RPC 将请求发送到网络设备。要使目标设备能够验证新节点证书,应用程序将现有 CA 捆绑包替换为新的 CA 捆绑包。该捆绑包既包括客户端 CA 证书,也包括验证节点证书所需的 CA 证书。
尽管您可以使用任何 RPC 测试会话身份验证,但应用程序会通过使用网络设备创建新的 gRPC 会话并执行简单的 Time() RPC 来验证新证书是否有效。如果会话成功建立,应用程序将敲定轮换请求,并在会话身份验证失败时取消轮换请求。
应用程序gnoi_cert_rotate_certificate_csr.py会导入模块grpc_channel以建立通道。配置 gNOI 服务中介绍了该grpc_channel模块。应用程序参数存储在文件中args_cert_rotate_csr.txt。此处显示应用程序和参数文件。
gnoi_cert_rotate_certificate_csr.py
"""gNOI Rotate Certificate utility."""
from __future__ import print_function
from __future__ import unicode_literals
import argparse
import logging
import time
import re
import grpc
from getpass import getpass
from subprocess import call
import cert_pb2
import cert_pb2_grpc
import system_pb2
import system_pb2_grpc
from grpc_channel import grpc_authenticate_channel_mutual
def get_args(parser):
parser.add_argument('--server',
dest='server',
type=str,
default='localhost',
help='Server IP or name. Default is localhost.')
parser.add_argument('--port',
dest='port',
nargs='?',
type=int,
default=50051,
help='Server port. Default is 50051')
parser.add_argument('--client_key',
dest='client_key',
type=str,
default='',
help='Full path of the client private key. Default is "".')
parser.add_argument('--client_cert',
dest='client_cert',
type=str,
default='',
help='Full path of the client certificate. Default is "".')
parser.add_argument('--root_ca_cert',
dest='root_ca_cert',
required=True,
type=str,
help='Full path of the Root CA certificate.')
parser.add_argument('--user_id',
dest='user_id',
required=True,
type=str,
help='User ID for RPC call credentials.')
parser.add_argument('--type',
dest='type',
type=int,
default='1',
help='Certificate Type. Default is 1. Valid value is 1 (1 is CT_X509); Invalid value is 0 (0 is CT_UNKNOWN).')
parser.add_argument('--min_key_size',
dest='min_key_size',
type=int,
default='2048',
help='Minimum key size. Default is 2048.')
parser.add_argument('--key_type',
dest='key_type',
type=int,
default='1',
help='Key Type. Default is 1 (KT_RSA); 0 is KT_UNKNOWN.')
parser.add_argument('--common_name',
dest='common_name',
type=str,
default='',
help='CN of the certificate')
parser.add_argument('--country',
dest='country',
type=str,
default='US',
help='Country name')
parser.add_argument('--state',
dest='state',
type=str,
default='CA',
help='State name')
parser.add_argument('--city',
dest='city',
type=str,
default='Sunnyvale',
help='City name')
parser.add_argument('--organization',
dest='organization',
type=str,
default='Acme',
help='Organization name')
parser.add_argument('--organizational_unit',
dest='organizational_unit',
type=str,
default='Test',
help='Organization unit name')
parser.add_argument('--ip_address',
dest='ip_address',
type=str,
default='',
help='IP address on the certificate')
parser.add_argument('--email_id',
dest='email_id',
type=str,
default='',
help='Email id')
parser.add_argument('--certificate_id',
dest='certificate_id',
required=True,
type=str,
help='Certificate id.')
parser.add_argument('--server_cert_private_key',
dest='server_cert_private_key',
type=str,
default='',
help='Server certificate private key')
parser.add_argument('--server_cert_public_key',
dest='server_cert_public_key',
type=str,
default='',
help='Server certificate public key')
parser.add_argument('--server_cert',
dest='server_cert',
type=str,
default='server_cert',
help='Server certificate')
parser.add_argument('--server_root_ca1',
dest='server_root_ca1',
type=str,
default='server_root_ca1',
help='Server Root CA')
parser.add_argument('--server_root_ca2',
dest='server_root_ca2',
type=str,
default='server_root_ca2',
help='Server Root CA')
parser.add_argument('--client_root_ca1',
dest='client_root_ca1',
type=str,
default='client_root_ca1',
help='Client Root CA')
parser.add_argument('--client_root_ca2',
dest='client_root_ca2',
type=str,
default='client_root_ca2',
help='Client Root CA')
parser.add_argument('--client_root_ca3',
dest='client_root_ca3',
type=str,
default='client_root_ca3',
help='Client Root CA')
parser.add_argument('--client_root_ca4',
dest='client_root_ca4',
type=str,
default='client_root_ca4',
help='Client Root CA')
parser.add_argument("--client_key_test",
dest='client_key_test',
type=str,
default='',
help='Full path of the test client private key. Default ""')
parser.add_argument("--client_cert_test",
dest='client_cert_test',
type=str,
default='',
help='Full path of the test client certificate. Default ""')
args = parser.parse_args()
return args
def rotate_cert(channel, metadata, args):
try:
result = ''
stub = cert_pb2_grpc.CertificateManagementStub(channel)
print("Executing GNOI::CertificateManagement::Rotate")
# Create request to generate certificate signing request (CSR)
it = []
req = cert_pb2.RotateCertificateRequest()
req.generate_csr.csr_params.type = args.type
req.generate_csr.csr_params.min_key_size = args.min_key_size
req.generate_csr.csr_params.key_type = args.key_type
req.generate_csr.csr_params.common_name = args.common_name
req.generate_csr.csr_params.country = args.country
req.generate_csr.csr_params.state = args.state
req.generate_csr.csr_params.city = args.city
req.generate_csr.csr_params.organization = args.organization
req.generate_csr.csr_params.organizational_unit = args.organizational_unit
req.generate_csr.csr_params.ip_address = args.ip_address
req.generate_csr.csr_params.email_id = args.email_id
req.generate_csr.certificate_id = args.certificate_id
it.append(req)
# Send request to generate CSR
print('Sending request for CSR')
for csr_rsp in stub.Rotate(iter(it), metadata=metadata, timeout=30):
logging.info(csr_rsp)
# Write CSR to a file
with open('/home/lab/certs/server_temp.csr', "wb") as file:
file.write(csr_rsp.generated_csr.csr.csr)
# If client connects to server IP address
# update openssl.cnf template to include subjectAltName IP extension
with open('/etc/pki/certs/openssl.cnf', 'r') as fd:
data = fd.read()
data1 = re.sub(r'(subjectAltName=IP:).*',
r'\g<1>'+args.ip_address, data)
with open('/home/lab/certs/openssl_temp.cnf', 'w') as fd:
fd.write(data1)
# Generate certificate with v3 extensions
cmd = "openssl x509 -req -days 365 -in /home/lab/certs/server_temp.csr -CA /etc/pki/certs/serverRootCA.crt -CAkey /etc/pki/certs/serverRootCA.key -CAcreateserial -out /home/lab/certs/server_temp.crt -extensions v3_sign -extfile /home/lab/certs/openssl_temp.cnf -sha384"
decrypted = call(cmd, shell=True)
# Create request to rotate node certificate and CA certificates
print("\nExecuting GNOI::CertificateManagement::Rotate")
it = []
req = cert_pb2.RotateCertificateRequest()
# Import certificate and add to request
with open("/home/lab/certs/server_temp.crt", "rb") as file:
cert_data = file.read()
req.load_certificate.certificate.type = args.type
req.load_certificate.certificate.certificate = cert_data
req.load_certificate.certificate_id = args.certificate_id
# Add client and server CA certificates to request
ca1 = req.load_certificate.ca_certificates.add()
ca1.type = args.type
ca1.certificate = open(args.client_root_ca1, 'rb').read()
ca2 = req.load_certificate.ca_certificates.add()
ca2.type = args.type
ca2.certificate = open(args.server_root_ca1, 'rb').read()
it.append(req)
# Send request to replace node certificate and CA bundle
for rsp in stub.Rotate(iter(it), metadata=metadata, timeout=60):
logging.info("Rotating certificates. %s", rsp)
# Validate certificates
print("Validating certificates")
time.sleep(5)
validate_rc = True
try:
validate_channel = grpc_authenticate_channel_mutual(
args.server, args.port, args.server_root_ca1, args.client_key_test,
args.client_cert_test)
validate_stub = system_pb2_grpc.SystemStub(validate_channel)
validate_rsp = validate_stub.Time(
request=system_pb2.TimeRequest(), metadata=metadata, timeout=60)
except grpc.RpcError as e:
print("Validation failed with error:", e)
validate_rc = False
pass
if validate_rc:
print("Finalizing certificate rotation.")
it = []
req = cert_pb2.RotateCertificateRequest()
req.finalize_rotation.SetInParent()
it.append(req)
for rsp in stub.Rotate(iter(it), metadata=metadata, timeout=30):
logging.info("Finalizing rotate. %s", rsp)
logging.info(
"Certificate validation succeeded. Certificate rotation finalized.")
result = "Certificate rotation finalized."
else:
print("Rolling back certificates.")
logging.info(
"Certificate validation failed. Rolling back to original certificates.")
rsp.cancel()
except Exception as e:
logging.error('Certificate rotate error: %s', e)
print(e)
else:
return result
def main():
parser = argparse.ArgumentParser(fromfile_prefix_chars='@')
args = get_args(parser)
grpc_server_password = getpass("gRPC server password for executing RPCs: ")
metadata = [('username', args.user_id),
('password', grpc_server_password)]
try:
# Establish grpc channel to network device
channel = grpc_authenticate_channel_mutual(
args.server, args.port, args.root_ca_cert, args.client_key, args.client_cert)
response = rotate_cert(channel, metadata, args)
print(response)
except Exception as e:
logging.error('Error: %s', e)
print(e)
if __name__ == '__main__':
logging.basicConfig(filename='gnoi-testing.log',
format='%(asctime)s %(levelname)-8s %(message)s',
level=logging.INFO,
datefmt='%Y-%m-%d %H:%M:%S')
main()
args_cert_rotate_csr.txt
--server=10.53.52.169 --port=50051 --root_ca_cert=/etc/pki/certs/serverRootCA.crt --client_key=/home/lab/certs/client.key --client_cert=/home/lab/certs/client.crt --user_id=gnoi-user --type=1 --min_key_size=2048 --key_type=1 --common_name=gnoi-server.example.com --country=US --state=CA --city=Sunnyvale --organization=Acme --organizational_unit=testing --ip_address=10.53.52.169 --email_id=test@example.com --certificate_id=gnoi-server --client_root_ca1=/etc/pki/certs/clientRootCA.crt --server_root_ca1=/etc/pki/certs/serverRootCA.crt --client_key_test=/home/lab/certs/client.key --client_cert_test=/home/lab/certs/client.crt
请注意, root_ca_cert 该参数是初始通道证书所需的服务器根 CA 证书。参数 server_root_ca1 是与服务器新证书对应的根 CA 证书。Junos PKI 必须具有新的根 CA 证书,以便在操作过程中 Rotate() 验证新的本地证书。此外,用于验证新证书的 gRPC 会话的通道凭据使用此根 CA 证书。尽管此示例对新旧服务器证书使用相同的根 CA 证书,但对于其他情况,这些证书可能会有所不同。
执行应用程序
当客户端执行应用程序时,应用程序会请求 CSR,获取已签名的证书,并在目标网络设备上加载替换证书和 CA 捆绑包。然后,应用程序使用执行简单 Time() RPC 的新 gRPC 会话验证替换证书。成功验证后,客户端将敲定轮换请求。
lab@gnoi-client:~/src/gnoi/proto$ python3 gnoi_cert_rotate_certificate_csr.py @args_cert_rotate_csr.txt gRPC server password for executing RPCs: Creating channel Executing GNOI::CertificateManagement::Rotate Sending request for CSR Signature ok subject=CN = gnoi-server.example.com, C = US, ST = CA, O = Acme, OU = testing Getting CA Private Key Executing GNOI::CertificateManagement::Rotate Validating certificates Creating channel Finalizing certificate rotation. Certificate rotation finalized.
撤销证书
gNOI 客户端可以使用 RevokeCertificates() RPC 从目标设备中移除一个或多个证书。客户端会包含一 RevokeCertificatesRequest 条消息,其中包含要撤销的证书 ID 列表。
当 gNOI 服务器收到请求时 RevokeCertificates() ,它会按如下方式处理列表中的各个证书 ID:
-
如果证书存在且撤销成功,设备将从文件系统和 Junos PKI 中移除证书,并将证书 ID 添加到成功撤销的证书列表中。
-
如果证书存在且撤销失败,设备将证书 ID 和失败原因包括在证书吊销错误列表中。
-
如果证书不存在,设备将视为撤销操作已成功,并将证书 ID 添加到已成功撤销的证书列表中。
如果请求撤销用于当前会话的证书,则会话不会受到影响。
处理请求后,gNOI 服务器将返回一 RevokeCertificatesResponse 条消息,其中包括:
-
已成功撤销的证书 ID 列表。
-
撤销错误列表,其中包含证书 ID 和故障原因。
示例:撤销证书
在此示例中,客户端执行 gnoi_cert_revoke_certificates.py Python 应用程序,这将撤销服务器上的两个证书。第一个证书 ID 是设备上的有效标识符。第二个证书 ID 是设备上不存在的标识符。
应用程序使用 RevokeCertificatesRequest 带有相应参数的消息来定义请求。应用程序会将 RevokeCertificates() RPC 发送至网络设备以执行操作。
应用程序gnoi_cert_revoke_certificates.py会导入模块grpc_channel以建立通道。配置 gNOI 服务中介绍了该grpc_channel模块。应用程序参数存储在文件中args_cert_revoke_certificates.txt。此处显示应用程序和参数文件。
gnoi_cert_revoke_certificates.py
"""gNOI Revoke Certificates utility."""
from __future__ import print_function
from __future__ import unicode_literals
import argparse
import logging
from getpass import getpass
import cert_pb2
import cert_pb2_grpc
from grpc_channel import grpc_authenticate_channel_mutual
def get_args(parser):
parser.add_argument('--server',
dest='server',
type=str,
default='localhost',
help='Server IP or name. Default is localhost.')
parser.add_argument('--port',
dest='port',
nargs='?',
type=int,
default=50051,
help='Server port. Default is 50051')
parser.add_argument('--client_key',
dest='client_key',
type=str,
default='',
help='Full path of the client private key. Default is "".')
parser.add_argument('--client_cert',
dest='client_cert',
type=str,
default='',
help='Full path of the client certificate. Default is "".')
parser.add_argument('--root_ca_cert',
dest='root_ca_cert',
required=True,
type=str,
help='Full path of the Root CA certificate.')
parser.add_argument('--user_id',
dest='user_id',
required=True,
type=str,
help='User ID for RPC call credentials.')
parser.add_argument('--certificate_id1',
dest='certificate_id1',
required=True,
type=str,
help='Certificate id.')
parser.add_argument('--certificate_id2',
dest='certificate_id2',
type=str,
help='Certificate id.')
args = parser.parse_args()
return args
def revoke_cert(channel, metadata, args):
try:
stub = cert_pb2_grpc.CertificateManagementStub(channel)
print("Executing GNOI::CertificateManagement::RevokeCertificates")
# Create request to revoke certificates
req = cert_pb2.RevokeCertificatesRequest()
req.certificate_id.append(args.certificate_id1)
req.certificate_id.append(args.certificate_id2)
# Send request to revoke certificates
logging.info("Sending RevokeCertificates request.")
rsp = stub.RevokeCertificates(req, metadata=metadata, timeout=60)
logging.info(rsp)
print("rsp:\n%s" %rsp)
except Exception as e:
logging.error('Error: %s', e)
print(e)
def main():
parser = argparse.ArgumentParser(fromfile_prefix_chars='@')
args = get_args(parser)
grpc_server_password = getpass("gRPC server password for executing RPCs: ")
metadata = [('username', args.user_id),
('password', grpc_server_password)]
try:
# Establish grpc channel to network device
channel = grpc_authenticate_channel_mutual(
args.server, args.port, args.root_ca_cert, args.client_key, args.client_cert)
revoke_cert(channel, metadata, args)
except Exception as e:
logging.error('Received error: %s', e)
print(e)
if __name__ == '__main__':
logging.basicConfig(filename='gnoi-testing.log',
format='%(asctime)s %(levelname)-8s %(message)s',
level=logging.INFO,
datefmt='%Y-%m-%d %H:%M:%S')
main()
args_cert_revoke_certificates.txt
--server=10.53.52.169 --port=50051 --root_ca_cert=/etc/pki/certs/serverRootCA.crt --client_key=/home/lab/certs/client.key --client_cert=/home/lab/certs/client.crt --user_id=gnoi-user --certificate_id1=gnoi-server --certificate_id2=id-does-not-exist
执行应用程序
当客户端执行应用程序时,应用程序会指示目标设备撤销指定的证书。设备会返回成功撤销的证书和任何错误列表。对于有效的证书 ID 以及设备上当前不存在的证书 ID,设备都认为操作已成功。
lab@gnoi-client:~/src/gnoi/proto$ python3 gnoi_cert_revoke_certificates.py @args_cert_revoke_certificates.txt gRPC server password for executing RPCs: Creating channel Executing GNOI::CertificateManagement::RevokeCertificates rsp: revoked_certificate_id: "gnoi-server" revoked_certificate_id: "id-does-not-exist"
Install()一部分,、 和
LoadCertificate()操作会验证新
Rotate()证书。