Linux cpanel.rrshost.in 5.15.0-25-generic #25-Ubuntu SMP Wed Mar 30 15:54:22 UTC 2022 x86_64
Apache
: 109.123.238.221 | : 172.69.6.169
128 Domain
8.2.28
aev999
Terminal
AUTO ROOT
Adminer
Backdoor Destroyer
Linux Exploit
Lock Shell
Lock File
Create User
CREATE RDP
PHP Mailer
BACKCONNECT
HASH IDENTIFIER
README
+ Create Folder
+ Create File
/
usr /
lib /
mysqlsh /
lib /
python3.13 /
site-packages /
oci /
[ HOME SHELL ]
Name
Size
Permission
Action
_vendor
[ DIR ]
drwxr-xr-x
access_governance_cp
[ DIR ]
drwxr-xr-x
adm
[ DIR ]
drwxr-xr-x
ai_anomaly_detection
[ DIR ]
drwxr-xr-x
ai_document
[ DIR ]
drwxr-xr-x
ai_language
[ DIR ]
drwxr-xr-x
ai_speech
[ DIR ]
drwxr-xr-x
ai_vision
[ DIR ]
drwxr-xr-x
analytics
[ DIR ]
drwxr-xr-x
announcements_service
[ DIR ]
drwxr-xr-x
apigateway
[ DIR ]
drwxr-xr-x
apm_config
[ DIR ]
drwxr-xr-x
apm_control_plane
[ DIR ]
drwxr-xr-x
apm_synthetics
[ DIR ]
drwxr-xr-x
apm_traces
[ DIR ]
drwxr-xr-x
appmgmt_control
[ DIR ]
drwxr-xr-x
artifacts
[ DIR ]
drwxr-xr-x
audit
[ DIR ]
drwxr-xr-x
auth
[ DIR ]
drwxr-xr-x
autoscaling
[ DIR ]
drwxr-xr-x
bastion
[ DIR ]
drwxr-xr-x
bds
[ DIR ]
drwxr-xr-x
blockchain
[ DIR ]
drwxr-xr-x
budget
[ DIR ]
drwxr-xr-x
capacity_management
[ DIR ]
drwxr-xr-x
certificates
[ DIR ]
drwxr-xr-x
certificates_management
[ DIR ]
drwxr-xr-x
cims
[ DIR ]
drwxr-xr-x
circuit_breaker
[ DIR ]
drwxr-xr-x
cloud_bridge
[ DIR ]
drwxr-xr-x
cloud_guard
[ DIR ]
drwxr-xr-x
cloud_migrations
[ DIR ]
drwxr-xr-x
cluster_placement_groups
[ DIR ]
drwxr-xr-x
compute_cloud_at_customer
[ DIR ]
drwxr-xr-x
compute_instance_agent
[ DIR ]
drwxr-xr-x
container_engine
[ DIR ]
drwxr-xr-x
container_instances
[ DIR ]
drwxr-xr-x
core
[ DIR ]
drwxr-xr-x
dashboard_service
[ DIR ]
drwxr-xr-x
data_catalog
[ DIR ]
drwxr-xr-x
data_flow
[ DIR ]
drwxr-xr-x
data_integration
[ DIR ]
drwxr-xr-x
data_labeling_service
[ DIR ]
drwxr-xr-x
data_labeling_service_dataplan...
[ DIR ]
drwxr-xr-x
data_safe
[ DIR ]
drwxr-xr-x
data_science
[ DIR ]
drwxr-xr-x
database
[ DIR ]
drwxr-xr-x
database_management
[ DIR ]
drwxr-xr-x
database_migration
[ DIR ]
drwxr-xr-x
database_tools
[ DIR ]
drwxr-xr-x
delegate_access_control
[ DIR ]
drwxr-xr-x
demand_signal
[ DIR ]
drwxr-xr-x
desktops
[ DIR ]
drwxr-xr-x
devops
[ DIR ]
drwxr-xr-x
disaster_recovery
[ DIR ]
drwxr-xr-x
dns
[ DIR ]
drwxr-xr-x
dts
[ DIR ]
drwxr-xr-x
em_warehouse
[ DIR ]
drwxr-xr-x
email
[ DIR ]
drwxr-xr-x
email_data_plane
[ DIR ]
drwxr-xr-x
encryption
[ DIR ]
drwxr-xr-x
events
[ DIR ]
drwxr-xr-x
file_storage
[ DIR ]
drwxr-xr-x
fleet_apps_management
[ DIR ]
drwxr-xr-x
fleet_software_update
[ DIR ]
drwxr-xr-x
functions
[ DIR ]
drwxr-xr-x
fusion_apps
[ DIR ]
drwxr-xr-x
generative_ai
[ DIR ]
drwxr-xr-x
generative_ai_agent
[ DIR ]
drwxr-xr-x
generative_ai_agent_runtime
[ DIR ]
drwxr-xr-x
generative_ai_inference
[ DIR ]
drwxr-xr-x
generic_artifacts_content
[ DIR ]
drwxr-xr-x
globally_distributed_database
[ DIR ]
drwxr-xr-x
golden_gate
[ DIR ]
drwxr-xr-x
governance_rules_control_plane
[ DIR ]
drwxr-xr-x
healthchecks
[ DIR ]
drwxr-xr-x
identity
[ DIR ]
drwxr-xr-x
identity_data_plane
[ DIR ]
drwxr-xr-x
identity_domains
[ DIR ]
drwxr-xr-x
integration
[ DIR ]
drwxr-xr-x
jms
[ DIR ]
drwxr-xr-x
jms_java_downloads
[ DIR ]
drwxr-xr-x
key_management
[ DIR ]
drwxr-xr-x
license_manager
[ DIR ]
drwxr-xr-x
limits
[ DIR ]
drwxr-xr-x
load_balancer
[ DIR ]
drwxr-xr-x
lockbox
[ DIR ]
drwxr-xr-x
log_analytics
[ DIR ]
drwxr-xr-x
logging
[ DIR ]
drwxr-xr-x
loggingingestion
[ DIR ]
drwxr-xr-x
loggingsearch
[ DIR ]
drwxr-xr-x
management_agent
[ DIR ]
drwxr-xr-x
management_dashboard
[ DIR ]
drwxr-xr-x
marketplace
[ DIR ]
drwxr-xr-x
marketplace_private_offer
[ DIR ]
drwxr-xr-x
marketplace_publisher
[ DIR ]
drwxr-xr-x
media_services
[ DIR ]
drwxr-xr-x
monitoring
[ DIR ]
drwxr-xr-x
mysql
[ DIR ]
drwxr-xr-x
network_firewall
[ DIR ]
drwxr-xr-x
network_load_balancer
[ DIR ]
drwxr-xr-x
nosql
[ DIR ]
drwxr-xr-x
object_storage
[ DIR ]
drwxr-xr-x
oce
[ DIR ]
drwxr-xr-x
oci_control_center
[ DIR ]
drwxr-xr-x
ocvp
[ DIR ]
drwxr-xr-x
oda
[ DIR ]
drwxr-xr-x
onesubscription
[ DIR ]
drwxr-xr-x
ons
[ DIR ]
drwxr-xr-x
opa
[ DIR ]
drwxr-xr-x
opensearch
[ DIR ]
drwxr-xr-x
operator_access_control
[ DIR ]
drwxr-xr-x
opsi
[ DIR ]
drwxr-xr-x
optimizer
[ DIR ]
drwxr-xr-x
os_management
[ DIR ]
drwxr-xr-x
os_management_hub
[ DIR ]
drwxr-xr-x
osp_gateway
[ DIR ]
drwxr-xr-x
osub_billing_schedule
[ DIR ]
drwxr-xr-x
osub_organization_subscription
[ DIR ]
drwxr-xr-x
osub_subscription
[ DIR ]
drwxr-xr-x
osub_usage
[ DIR ]
drwxr-xr-x
pagination
[ DIR ]
drwxr-xr-x
psql
[ DIR ]
drwxr-xr-x
queue
[ DIR ]
drwxr-xr-x
recovery
[ DIR ]
drwxr-xr-x
redis
[ DIR ]
drwxr-xr-x
resource_manager
[ DIR ]
drwxr-xr-x
resource_scheduler
[ DIR ]
drwxr-xr-x
resource_search
[ DIR ]
drwxr-xr-x
retry
[ DIR ]
drwxr-xr-x
rover
[ DIR ]
drwxr-xr-x
sch
[ DIR ]
drwxr-xr-x
secrets
[ DIR ]
drwxr-xr-x
security_attribute
[ DIR ]
drwxr-xr-x
service_catalog
[ DIR ]
drwxr-xr-x
service_manager_proxy
[ DIR ]
drwxr-xr-x
service_mesh
[ DIR ]
drwxr-xr-x
stack_monitoring
[ DIR ]
drwxr-xr-x
streaming
[ DIR ]
drwxr-xr-x
tenant_manager_control_plane
[ DIR ]
drwxr-xr-x
threat_intelligence
[ DIR ]
drwxr-xr-x
usage
[ DIR ]
drwxr-xr-x
usage_api
[ DIR ]
drwxr-xr-x
vault
[ DIR ]
drwxr-xr-x
vbs_inst
[ DIR ]
drwxr-xr-x
visual_builder
[ DIR ]
drwxr-xr-x
vn_monitoring
[ DIR ]
drwxr-xr-x
vulnerability_scanning
[ DIR ]
drwxr-xr-x
waa
[ DIR ]
drwxr-xr-x
waas
[ DIR ]
drwxr-xr-x
waf
[ DIR ]
drwxr-xr-x
work_requests
[ DIR ]
drwxr-xr-x
zpr
[ DIR ]
drwxr-xr-x
__init__.py
6.14
KB
-rw-r--r--
alloy.py
6.52
KB
-rw-r--r--
base_client.py
50.76
KB
-rw-r--r--
config.py
9.98
KB
-rw-r--r--
constants.py
487
B
-rw-r--r--
decorators.py
1.61
KB
-rw-r--r--
exceptions.py
8.67
KB
-rw-r--r--
fips.py
3.17
KB
-rw-r--r--
regions.py
21.44
KB
-rw-r--r--
regions_definitions.py
6.45
KB
-rw-r--r--
request.py
1.97
KB
-rw-r--r--
response.py
2.19
KB
-rw-r--r--
service_endpoints.py
1.97
KB
-rw-r--r--
signer.py
11.49
KB
-rw-r--r--
util.py
13.09
KB
-rw-r--r--
version.py
367
B
-rw-r--r--
waiter.py
7.68
KB
-rw-r--r--
Delete
Unzip
Zip
${this.title}
Close
Code Editor : util.py
# coding: utf-8 # Copyright (c) 2016, 2025, Oracle and/or its affiliates. All rights reserved. # This software is dual-licensed to you under the Universal Permissive License (UPL) 1.0 as shown at https://oss.oracle.com/licenses/upl or Apache License 2.0 as shown at http://www.apache.org/licenses/LICENSE-2.0. You may choose either license. from __future__ import absolute_import import base64 import datetime import json import re import os import os.path import pytz from oci._vendor import six import oci.exceptions try: from urllib.parse import urlparse except ImportError: from urlparse import urlparse from io import SEEK_SET import logging logger = logging.getLogger(__name__) logger = logging.getLogger(__name__) INSTANCE_PRINCIPAL_AUTHENTICATION_TYPE_VALUE_NAME = 'instance_principal' DELEGATION_TOKEN_WITH_INSTANCE_PRINCIPAL_AUTHENTICATION_TYPE = 'delegation_token_with_instance_principal' RESOURCE_PRINCIPAL_AUTHENTICATION_TYPE = 'resource_principal' DELEGATION_TOKEN_FILE_FIELD_NAME = 'delegation_token_file' AUTHENTICATION_TYPE_FIELD_NAME = 'authentication_type' MEBIBYTE = 1024 * 1024 DEFAULT_BUFFER_SIZE = 100 * MEBIBYTE DEFAULT_PART_SIZE = 128 * MEBIBYTE try: # PY3+ import collections.abc as abc except ImportError: # PY2 import collections as abc missing_attr = object() def to_dict(obj): """Helper to flatten models into dicts for rendering. The following conversions are applied: * datetime.date, datetime.datetime, datetime.time are converted into ISO8601 UTC strings """ # Shortcut strings so they don't count as Iterables if isinstance(obj, six.string_types): return obj elif obj is NONE_SENTINEL: return None elif isinstance(obj, (datetime.datetime, datetime.time)): # always use UTC if not obj.tzinfo: obj = pytz.utc.localize(obj) if isinstance(obj, datetime.datetime): # only datetime.datetime takes a separator return obj.isoformat(sep="T") return obj.isoformat() elif isinstance(obj, datetime.date): # datetime.date doesn't have a timezone return obj.isoformat() elif isinstance(obj, abc.Mapping): return {k: to_dict(v) for k, v in six.iteritems(obj)} elif isinstance(obj, abc.Iterable): return [to_dict(v) for v in obj] # Not a string, datetime, dict, list, or model - return directly elif not hasattr(obj, "swagger_types"): return obj # Collect attrs from obj according to swagger_types into a dict as_dict = {} for key in six.iterkeys(obj.swagger_types): value = getattr(obj, key, missing_attr) if value is not missing_attr: as_dict[key] = to_dict(value) return as_dict def formatted_flat_dict(model): """Returns a string of the model flattened as a dict, sorted""" as_dict = to_dict(model) return json.dumps( as_dict, indent=2, sort_keys=True, ensure_ascii=False ) def value_allowed_none_or_none_sentinel(value_to_test, allowed_values): return value_to_test is None or value_to_test is NONE_SENTINEL or value_to_test in allowed_values def file_content_as_launch_instance_user_data(file_path): """ Takes a file path and returns a Base64-encoded string which can be provided as the value of the ``user_data`` key in the ``metadata`` dictionary when launching an instance(see :py:class:`~oci.core.models.LaunchInstanceDetails` for more information). :param str file_path: The path to the file whose contents will be Base64-encoded :return: The Base64-encoded string :rtype: str """ full_path = os.path.expandvars(os.path.expanduser(file_path)) with open(full_path, 'rb') as f: file_contents = f.read() return base64.b64encode(file_contents).decode('utf-8') class Sentinel(object): """Named singletons for clear docstrings. Also used to differentiate an explicit param of None from a lack of argument. .. code-block:: pycon >>> missing = Sentinel("Missing", False) >>> also_missing = Sentinel("Missing", False) >>> assert missing is also_missing >>> repr(missing) <Missing> >>> assert bool(missing) is False """ _symbols = {} def __new__(cls, name, truthy=True): sentinel = Sentinel._symbols.get(name, None) if sentinel is None: sentinel = Sentinel._symbols[name] = super(Sentinel, cls).__new__(cls) elif sentinel.truthy is not truthy: raise ValueError("Tried to get existing Sentinel {!r} with wrong truthy value".format(sentinel)) return sentinel def __init__(self, name, truthy=True): self.name = name self.truthy = truthy def __repr__(self): # Sentinel("Missing") -> <Missing> return "<{}>".format(self.name) def __bool__(self): return self.truthy # PY2 Compatibility __nonzero__ = __bool__ NONE_SENTINEL = Sentinel(name='None', truthy=False) WAIT_RESOURCE_NOT_FOUND = Sentinel(name='WaitResourceNotFound', truthy=False) def _get_signer_from_delegation_token_instance_principal(config): # Import the signer inside the function to avoid circular imports during initialization from oci.auth.signers import InstancePrincipalsDelegationTokenSigner signer_kwargs = {} delegation_token_file_path = config.get(DELEGATION_TOKEN_FILE_FIELD_NAME) if delegation_token_file_path is None: raise oci.exceptions.InvalidConfig('ERROR: {} was not provided.'.format(DELEGATION_TOKEN_FILE_FIELD_NAME)) expanded_delegation_token_file_path = os.path.expanduser(delegation_token_file_path) if not os.path.isfile(expanded_delegation_token_file_path): raise oci.exceptions.InvalidConfig('ERROR: delegation token file not found at {}'.format(expanded_delegation_token_file_path)) with open(expanded_delegation_token_file_path, mode="r") as f: delegation_token = f.read().strip() if delegation_token is None: raise oci.exceptions.InvalidConfig('ERROR: delegation_token was not provided.') signer_kwargs['delegation_token'] = delegation_token # Return signer with delegation token return InstancePrincipalsDelegationTokenSigner(**signer_kwargs) def _get_signer_from_resource_principal(config): # Import the signer inside the function to avoid circular imports during initialization from oci.auth.signers import get_resource_principal_signer return get_resource_principal_signer() # This map can be easily extended to accommodate support for more auth types through the config file AUTH_TYPE_TO_SIGNER_FUNCTION_MAP = { DELEGATION_TOKEN_WITH_INSTANCE_PRINCIPAL_AUTHENTICATION_TYPE: _get_signer_from_delegation_token_instance_principal, RESOURCE_PRINCIPAL_AUTHENTICATION_TYPE: _get_signer_from_resource_principal, } def get_signer_from_authentication_type(config): # This is currently made for allowing SDK to run seamlessly on cloud shell auth_type = get_authentication_type_from_config(config) # Get the signer function from map signer_function = AUTH_TYPE_TO_SIGNER_FUNCTION_MAP.get(auth_type) return signer_function(config) def get_authentication_type_from_config(config): auth_type = config.get(AUTHENTICATION_TYPE_FIELD_NAME) if auth_type is None: raise ValueError("{} not provided".format(AUTHENTICATION_TYPE_FIELD_NAME)) # Currently the SDK supports only the cloud shell use case, this can be extended for other auth types if auth_type == INSTANCE_PRINCIPAL_AUTHENTICATION_TYPE_VALUE_NAME: if DELEGATION_TOKEN_FILE_FIELD_NAME in config: return DELEGATION_TOKEN_WITH_INSTANCE_PRINCIPAL_AUTHENTICATION_TYPE else: raise oci.exceptions.InvalidConfig("The authentication type {} requires config values for the keys {}".format(DELEGATION_TOKEN_WITH_INSTANCE_PRINCIPAL_AUTHENTICATION_TYPE, DELEGATION_TOKEN_FILE_FIELD_NAME)) else: raise oci.exceptions.InvalidConfig("The authentication type {} is not supported".format(auth_type)) def back_up_body_calculate_stream_content_length(stream, buffer_limit=DEFAULT_BUFFER_SIZE): # Note: Need to read in chunks, older version of Python will fail to read more than 2GB at once. # We pull data in chunks (128MB at a time) from the stream until there is no more logger.warning("Reading the stream to calculate its content-length. Process may freeze for very big streams. Consider passing in content length for big objects") try: keep_reading = True part_size = DEFAULT_PART_SIZE content_length = 0 content = "" byte_content = b'' while keep_reading: if hasattr(stream, 'buffer'): content = stream.buffer.read(part_size) elif hasattr(stream, 'read'): content = stream.read(part_size) else: raise TypeError("Stream object does not contain a 'read' property. Cannot auto calculate content length, please pass in content length") if len(content) == 0: keep_reading = False byte_content += content content_length += len(content) if (buffer_limit and content_length > buffer_limit): raise BufferError("Stream size is greater than the buffer_limit, please pass in a bigger buffer_limit or pass in content length to the request") return {"content_length": content_length, "byte_content": byte_content} except (IOError, OSError): raise TypeError("Stream object's content length cannot be calculated, please pass in content length") # This helper function checks if an object can have it's content length auto-calculated by the Request library def is_content_length_calculable_by_req_util(o): if hasattr(o, '__len__') or hasattr(o, 'len') or hasattr(o, 'fileno') or hasattr(o, 'tell'): return True logger.warning("Request did not contain content-length and stream object does not contain fileno or tell property. Stream object will be read to calculate content-length") return False def extract_service_endpoint(endpoint_with_base_path): """ Takes a Service Endpoint with base-path embedded and extracts the Service Endpoint from it. :param str endpoint_with_base_path: Service Endpoint with base-path embedded :return: The Service Endpoint without base-path. :rtype: str """ parsed_endpoint = urlparse(endpoint_with_base_path) return parsed_endpoint.scheme + r'://' + parsed_endpoint.netloc def should_record_body_position_for_retry(func_ref, **func_kwargs): func_name = func_ref.__name__ # TODO: remove Python 2 requirements, use qualname if func_name == 'call_api': body = func_kwargs.get('body') # A file-like object body should be treated differently for retry if body and hasattr(body, 'read'): return True return False return False def record_body_position_for_rewind(body): is_body_rewindable = True if getattr(body, 'tell', None) is not None: try: # Attempt to record current body position body_position = body.tell() except (IOError, OSError): # If we cannot record the current body position for a file-like body, then we should not retry is_body_rewindable = False body_position = None logger.warning("Unable to record body position for rewinding. This request will not be retried/rewound") else: # If the body does not support tell, then don't retry is_body_rewindable = False body_position = None logger.warning("Unable to record body position for rewinding. This request will not be retried/rewound") return is_body_rewindable, body_position def rewind_body(body, body_position): if getattr(body, 'seek', None) is not None: try: body.seek(body_position, SEEK_SET) except (IOError, OSError): # If we're unable to reset the body position, then we should not retry logger.warning("Unable to reset body position for rewinding. This request will not be retried/rewound") return False return True # if the body does not support seek, then we should not retry logger.warning("Unable to reset body position for rewinding. This request will not be retried/rewound") return False def read_stream_for_signing(signing_algorithm, body): bytes_read = 0 try: while True: chunk = "" if hasattr(body, "read"): chunk = body.read(DEFAULT_PART_SIZE) elif hasattr(body, "buffer"): chunk = body.buffer.read(DEFAULT_PART_SIZE) if len(chunk) == 0: break bytes_read += len(chunk) signing_algorithm.update(chunk) except (IOError, OSError): logger.warning("Unable to read stream body for signing") bytes_read = -1 return bytes_read def camel_to_snake(name): s1 = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', name) return re.sub('([a-z0-9])([A-Z])', r'\1_\2', s1).lower() def camel_to_snake_keys(dictionary): outdict = {} for k, v in dictionary.items(): outdict[camel_to_snake(k)] = v return outdict
Close