Formation Data Import from Geological Models
Learn how to import formation pressure and temperature data from geological modeling software into Oliasoft. By automating this process, you can ensure your well designs always use the most current subsurface data, improving safety margins and operational efficiency.
Supported Data Sources
- Petrel (Schlumberger) - Via RESQML or CSV export
- GeoX (Landmark) - Via OpenWorks database
- Jewel Suite (Baker Hughes) - Via JewelSuite API
- SKUA-GOCAD (Emerson) - Via ASCII export
- Custom geological databases - Via SQL or REST APIs
Data Flow Architecture
┌─────────────────┐ ┌──────────────────┐ ┌─────────────────┐
│ Geological │────▶│ Data │────▶│ Validation & │
│ Model │ │ Extraction │ │ Transformation │
└─────────────────┘ └──────────────────┘ └────────┬────────┘
│
▼
┌─────────────────┐ ┌──────────────────┐ ┌─────────────────┐
│ Oliasoft │◀────│ Bulk Import │◀────│ Data Staging │
│ Archives/WD │ │ Process │ │ Area │
└─────────────────┘ └──────────────────┘ └─────────────────┘
Step 1: Extract Formation Data
From Petrel (RESQML Export)
import pandas as pd
import numpy as np
from lxml import etree
import h5py
class PetrelDataExtractor:
def __init__(self, resqml_file_path, h5_file_path):
self.resqml_path = resqml_file_path
self.h5_path = h5_file_path
def extract_formation_data(self):
"""
Extract formation tops, pressures, and temperatures from RESQML
"""
# Parse RESQML XML
tree = etree.parse(self.resqml_path)
ns = {'resqml': 'http://www.energistics.org/energyml/data/resqmlv2'}
formations = []
# Extract horizon interpretations
horizons = tree.xpath('//resqml:HorizonInterpretation', namespaces=ns)
for horizon in horizons:
formation_name = horizon.find('.//resqml:Title', namespaces=ns).text
# Get associated grid data from HDF5
with h5py.File(self.h5_path, 'r') as hdf:
# Extract depth grid
depth_data = hdf[f'/RESQML/{formation_name}/Depth'][:]
pressure_data = hdf[f'/RESQML/{formation_name}/Pressure'][:]
temperature_data = hdf[f'/RESQML/{formation_name}/Temperature'][:]
formations.append({
'name': formation_name,
'depth_grid': depth_data,
'pressure_grid': pressure_data,
'temperature_grid': temperature_data
})
return formations
def interpolate_at_well_location(self, formations, well_x, well_y):
"""
Interpolate formation data at specific well location
"""
well_formations = []
for formation in formations:
# Bilinear interpolation at well location
tvd = self._interpolate_2d(
formation['depth_grid'], well_x, well_y
)
pressure = self._interpolate_2d(
formation['pressure_grid'], well_x, well_y
)
temperature = self._interpolate_2d(
formation['temperature_grid'], well_x, well_y
)
well_formations.append({
'name': formation['name'],
'tvd': tvd,
'pressure': pressure,
'temperature': temperature
})
return sorted(well_formations, key=lambda x: x['tvd'])
From CSV Export (Generic)
def import_formation_csv(csv_path):
"""
Import formation data from standard CSV format
Expected columns: Formation, TVD_m, Pressure_bar, Temperature_C, Gradient_bar_m
"""
df = pd.read_csv(csv_path)
# Validate required columns
required_columns = ['Formation', 'TVD_m', 'Pressure_bar']
missing_columns = set(required_columns) - set(df.columns)
if missing_columns:
raise ValueError(f"Missing required columns: {missing_columns}")
# Clean and validate data
df = df.dropna(subset=required_columns)
df = df[df['TVD_m'] > 0] # Remove invalid depths
df = df.sort_values('TVD_m')
# Calculate gradients if not provided
if 'Gradient_bar_m' not in df.columns:
df['Gradient_bar_m'] = df['Pressure_bar'] / df['TVD_m'] / 100
return df.to_dict('records')
Step 2: Transform and Validate Data
class FormationDataValidator:
def __init__(self):
self.validation_rules = {
'pressure_gradient_min': 0.7, # bar/10m
'pressure_gradient_max': 2.5, # bar/10m
'temperature_gradient_min': 15, # °C/km
'temperature_gradient_max': 50, # °C/km
'max_depth': 10000, # meters
}
def validate_formation_data(self, formations):
"""
Validate formation data for geological consistency
"""
validated_formations = []
errors = []
for i, formation in enumerate(formations):
# Check depth ordering
if i > 0 and formation['tvd'] <= formations[i-1]['tvd']:
errors.append(f"Invalid depth order at {formation['name']}")
continue
# Validate pressure gradient
gradient = formation['pressure'] / formation['tvd'] * 10
if not (self.validation_rules['pressure_gradient_min'] <=
gradient <=
self.validation_rules['pressure_gradient_max']):
errors.append(
f"Pressure gradient {gradient:.2f} bar/10m out of range "
f"for {formation['name']}"
)
# Validate temperature if present
if 'temperature' in formation:
temp_gradient = (formation['temperature'] - 15) / formation['tvd'] * 1000
if not (self.validation_rules['temperature_gradient_min'] <=
temp_gradient <=
self.validation_rules['temperature_gradient_max']):
errors.append(
f"Temperature gradient {temp_gradient:.1f} °C/km out of range "
f"for {formation['name']}"
)
validated_formations.append(formation)
return validated_formations, errors
def interpolate_missing_data(self, formations):
"""
Interpolate pressures between formation tops
"""
# Convert to arrays for interpolation
depths = [0] + [f['tvd'] for f in formations]
pressures = [0] + [f['pressure'] for f in formations]
# Create interpolation function
from scipy.interpolate import PchipInterpolator
pressure_interp = PchipInterpolator(depths, pressures)
# Generate detailed pressure profile
depth_step = 10 # meters
detailed_depths = np.arange(0, max(depths) + depth_step, depth_step)
detailed_pressures = pressure_interp(detailed_depths)
return detailed_depths, detailed_pressures
Step 3: Import to Oliasoft
Option A: Direct API Import to WellDesign
class OliasoftFormationImporter:
def __init__(self, api_key, base_url="https://welldesign.oliasoft.com/api/v1"):
self.api_key = api_key
self.base_url = base_url
self.headers = {
'Authorization': f'Bearer {api_key}',
'Content-Type': 'application/json'
}
def import_to_design(self, design_id, formations, pressure_profile=None):
"""
Import formation data directly to a well design
"""
import requests
# Update formation tops
formation_payload = {
'formations': [
{
'name': f['name'],
'top_tvd': f['tvd'],
'top_md': f.get('md', f['tvd']), # Use TVD if MD not available
}
for f in formations
]
}
response = requests.post(
f"{self.base_url}/designs/{design_id}/formations",
json=formation_payload,
headers=self.headers
)
if response.status_code != 200:
raise Exception(f"Failed to import formations: {response.text}")
# Update pressure data
if pressure_profile:
pressure_payload = {
'pressure_points': [
{
'tvd': depth,
'pressure': pressure,
'pressure_type': 'pore'
}
for depth, pressure in zip(*pressure_profile)
]
}
response = requests.put(
f"{self.base_url}/designs/{design_id}/formations/pressure",
json=pressure_payload,
headers=self.headers
)
if response.status_code != 200:
raise Exception(f"Failed to import pressures: {response.text}")
return response.json()
Option B: Bulk Import via Archives
class ArchivesBulkImporter:
def __init__(self, api_key, base_url="https://archives.oliasoft.com/api/v1"):
self.api_key = api_key
self.base_url = base_url
self.session = requests.Session()
self.session.headers.update({
'Authorization': f'Bearer {api_key}',
'Content-Type': 'application/json'
})
def create_import_job(self, field_name, formations_data):
"""
Create bulk import job for formation data
"""
job_payload = {
'job_type': 'formation_import',
'field': field_name,
'data_source': 'geological_model',
'timestamp': datetime.utcnow().isoformat(),
'record_count': len(formations_data)
}
response = self.session.post(
f"{self.base_url}/import/jobs",
json=job_payload
)
return response.json()['job_id']
def upload_formation_batch(self, job_id, formations_batch):
"""
Upload formation data in batches
"""
batch_size = 1000 # Records per batch
total_batches = (len(formations_batch) + batch_size - 1) // batch_size
for i in range(0, len(formations_batch), batch_size):
batch = formations_batch[i:i + batch_size]
# Transform to Archives schema
records = []
for well_name, formations in batch:
for formation in formations:
records.append({
'well_name': well_name,
'formation_name': formation['name'],
'tvd': formation['tvd'],
'pressure': formation['pressure'],
'temperature': formation.get('temperature'),
'gradient': formation.get('gradient'),
'metadata': {
'source': 'geological_model',
'import_date': datetime.utcnow().isoformat(),
'confidence': formation.get('confidence', 'high')
}
})
# Upload batch
response = self.session.post(
f"{self.base_url}/import/jobs/{job_id}/data",
json={'records': records}
)
print(f"Uploaded batch {i//batch_size + 1}/{total_batches}")
# Finalize import
self.session.post(f"{self.base_url}/import/jobs/{job_id}/finalize")
Step 4: Complete Integration Workflow
def complete_formation_import_workflow(config):
"""
End-to-end workflow for formation data import
"""
# Step 1: Extract data from geological model
print("Extracting formation data from geological model...")
if config['source_type'] == 'petrel':
extractor = PetrelDataExtractor(
config['resqml_path'],
config['h5_path']
)
raw_formations = extractor.extract_formation_data()
formations = extractor.interpolate_at_well_location(
raw_formations,
config['well_x'],
config['well_y']
)
else:
formations = import_formation_csv(config['csv_path'])
# Step 2: Validate and clean data
print("Validating formation data...")
validator = FormationDataValidator()
validated_formations, errors = validator.validate_formation_data(formations)
if errors:
print(f"Validation warnings: {errors}")
# Step 3: Generate detailed pressure profile
depths, pressures = validator.interpolate_missing_data(validated_formations)
# Step 4: Import to Oliasoft
print("Importing to Oliasoft...")
if config['import_mode'] == 'direct':
importer = OliasoftFormationImporter(config['api_key'])
result = importer.import_to_design(
config['design_id'],
validated_formations,
(depths, pressures)
)
else:
importer = ArchivesBulkImporter(config['api_key'])
job_id = importer.create_import_job(
config['field_name'],
validated_formations
)
importer.upload_formation_batch(
job_id,
[(config['well_name'], validated_formations)]
)
print("Import completed successfully!")
# Step 5: Generate report
report = generate_import_report(
validated_formations,
depths,
pressures,
errors
)
return report
def generate_import_report(formations, depths, pressures, errors):
"""
Generate detailed import report
"""
import matplotlib.pyplot as plt
# Create pressure vs depth plot
plt.figure(figsize=(8, 10))
plt.plot(pressures, depths, 'b-', linewidth=2, label='Pore Pressure')
# Add formation tops
for f in formations:
plt.axhline(y=f['tvd'], color='r', linestyle='--', alpha=0.5)
plt.text(
max(pressures) * 0.9,
f['tvd'],
f['name'],
fontsize=8,
va='bottom'
)
plt.gca().invert_yaxis()
plt.xlabel('Pressure (bar)')
plt.ylabel('TVD (m)')
plt.title('Imported Formation Pressure Profile')
plt.grid(True, alpha=0.3)
plt.legend()
# Save plot
plt.savefig('formation_import_report.png', dpi=150, bbox_inches='tight')
# Generate summary statistics
report = {
'import_date': datetime.now().isoformat(),
'formations_imported': len(formations),
'depth_range': f"{min(depths)}-{max(depths)} m",
'pressure_range': f"{min(pressures):.1f}-{max(pressures):.1f} bar",
'validation_errors': len(errors),
'plot_path': 'formation_import_report.png'
}
return report
Automation and Scheduling
Set up automated daily imports
from apscheduler.schedulers.blocking import BlockingScheduler
import logging
def setup_automated_import():
"""
Configure automated formation data import
"""
scheduler = BlockingScheduler()
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
@scheduler.scheduled_job('cron', hour=2) # Run at 2 AM daily
def scheduled_import():
try:
logging.info("Starting scheduled formation import")
config = {
'source_type': 'petrel',
'resqml_path': '/shared/geological_models/current.resqml',
'h5_path': '/shared/geological_models/current.h5',
'api_key': os.environ['OLIASOFT_API_KEY'],
'import_mode': 'bulk',
'field_name': 'North Sea Field A'
}
report = complete_formation_import_workflow(config)
# Send notification
send_import_notification(report)
except Exception as e:
logging.error(f"Import failed: {str(e)}")
send_error_notification(str(e))
scheduler.start()
Best Practices
-
Data Validation
- Always validate geological consistency
- Check for unrealistic gradients
- Ensure proper depth ordering
-
Version Control
- Track changes in formation data
- Maintain audit trail of imports
- Archive source files
-
Error Handling
- Implement retry logic for API calls
- Log all validation warnings
- Provide rollback capability
-
Performance Optimization
- Use batch imports for large datasets
- Implement parallel processing where possible
- Cache frequently accessed data
Troubleshooting
| Issue | Solution |
|---|---|
| Import fails with "Invalid formation order" | Sort formations by TVD before import |
| Pressure gradients rejected | Check unit conversions (bar vs psi, m vs ft) |
| Memory errors with large models | Process data in chunks, increase heap size |
| API timeout errors | Reduce batch size, implement retry logic |
Advanced Features
Multi-well Formation Correlation
def correlate_formations_across_wells(wells_data):
"""
Correlate formations across multiple wells using machine learning
"""
from sklearn.cluster import DBSCAN
# Extract features for correlation
features = []
for well in wells_data:
for formation in well['formations']:
features.append([
formation['tvd'],
formation['pressure'],
formation.get('gamma_ray', 0),
formation.get('resistivity', 0)
])
# Cluster similar formations
clustering = DBSCAN(eps=50, min_samples=3).fit(features)
# Group correlated formations
correlated_formations = {}
for well, label in zip(wells_data, clustering.labels_):
if label not in correlated_formations:
correlated_formations[label] = []
correlated_formations[label].append(well)
return correlated_formations
For additional support with formation data import, contact Oliasoft support or refer to the API documentation.